本专题所写所感所得,来自转转首席架构师和字节架构团队,此致,敬礼。。
一、幂等设计
1.1 定义
幂等需要考虑请求层面和业务层面的幂等。
- 请求层面
保证请求重复执行和执行一次结果相同;即f...f(f(x)) = f(x)
- 业务层面
如同一用户不重复下单;商品不超买
1.2 目标
- 请求重试不出问题
- 避免结果灾难性(重复转账、多交易等)
1.3 幂等范围
幂等范围主要是针对请求对数据造成改变。以下从两个维度判断幂等范围。
- 读/写请求层面:写请求
-
架构层层面:数据访问层
二、分布式锁设计
业务层面的幂等存在并发消费的可能性,需要转化为串行消费。本质上就是分布式锁的问题。
2.1 定义与目的
分布式锁是在分布式环境下,锁定全局唯一资源,使得请求处理串行化,实现类似于互斥锁的效果。
分布式锁的目的是:
- 防止重复下单,解决业务层幂等问题
- 解决MQ消息消费幂等性问题,如发送消息重复、消息消费端去重等
- 状态的修改行为需要串行处理,避免出现数据错乱
2.2 高可用分布式锁设计
2.2.1 目标
- 强一致性
- 服务高可用、系统稳健
- 锁自动续约及其自动释放
- 代码高度抽象业务接入极简
- 可视化管理后台,监控及管理
2.2.2 特点
- 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
- 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
- 锁超时:和本地锁一样支持锁超时,防止死锁。
- 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
- 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)。
- 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。
2.2.3 方案对比
mysql | redis | zookeeper | etcd | |
---|---|---|---|---|
一致性算法 | 无 | 无 | paxos | raft |
CAP | cp | ap | cp | cp |
高可用 | 主从 | 主从 | N+1可用(奇数) | N+1可用(奇数) |
接口类型 | sql | 客户端 | 客户端 | http/grpc |
实现 | select * from update | setnx + lua | crateephemeral | restful api |
- redis是ap模型,无法保证数据一致性
- zk对锁实现使用创建临时节点和watch机制,执行效率、拓展能力、社区活跃度都不如etcd
2.3 Mysql分布式锁
首先来说一下Mysql分布式锁的实现原理,相对来说这个比较容易理解,毕竟数据库和我们开发人员在平时的开发中息息相关。对于分布式锁我们可以创建一个锁表:
前面我们所说的lock(),trylock(long timeout),trylock()这几个方法可以用下面的伪代码实现。
2.3.1 lock()
lock一般是阻塞式的获取锁,意思就是不获取到锁誓不罢休,那么我们可以写一个死循环来执行其操作:
mysqlLock.lcok内部是一个sql,为了达到可重入锁的效果那么我们应该先进行查询,如果有值,那么需要比较node_info是否一致,这里的node_info可以用机器IP和线程名字来表示,如果一致那么就加可重入锁count的值,如果不一致那么就返回false。如果没有值那么直接插入一条数据。伪代码如下:
需要注意的是这一段代码需要加事务,必须要保证这一系列操作的原子性。
2.3.2 tryLock()和tryLock(long timeout)
tryLock()是非阻塞获取锁,如果获取不到那么就会马上返回,代码可以如下:
tryLock(long timeout)实现如下:
mysqlLock.lock和上面一样,但是要注意的是select ... for update这个是阻塞的获取行锁,如果同一个资源并发量较大还是有可能会退化成阻塞的获取锁。
2.3.3 unlock()
unlock的话如果这里的count为1那么可以删除,如果大于1那么需要减去1。
2.3.4 锁超时
我们有可能会遇到我们的机器节点挂了,那么这个锁就不会得到释放,我们可以启动一个定时任务,通过计算一般我们处理任务的一般的时间,比如是5ms,那么我们可以稍微扩大一点,当这个锁超过20ms没有被释放我们就可以认定是节点挂了然后将其直接释放。
2.3.5 Mysql小结
- 适用场景: Mysql分布式锁一般适用于资源不存在数据库,如果数据库存在比如订单,那么可以直接对这条数据加行锁,不需要我们上面多的繁琐的步骤,比如一个订单,那么我们可以用select * from order_table where id = 'xxx' for update进行加行锁,那么其他的事务就不能对其进行修改。
- 优点:理解起来简单,不需要维护额外的第三方中间件(比如Redis,Zk)。
- 缺点:虽然容易理解但是实现起来较为繁琐,需要自己考虑锁超时,加事务等等。性能局限于数据库,一般对比缓存来说性能较低。对于高并发的场景并不是很适合。
2.3.6 乐观锁
前面我们介绍的都是悲观锁,这里想额外提一下乐观锁,在我们实际项目中也是经常实现乐观锁,因为我们加行锁的性能消耗比较大,通常我们会对于一些竞争不是那么激烈,但是其又需要保证我们并发的顺序执行使用乐观锁进行处理,我们可以对我们的表加一个版本号字段,那么我们查询出来一个版本号之后,update或者delete的时候需要依赖我们查询出来的版本号,判断当前数据库和查询出来的版本号是否相等,如果相等那么就可以执行,如果不等那么就不能执行。这样的一个策略很像我们的CAS(Compare And Swap),比较并交换是一个原子操作。这样我们就能避免加select * for update行锁的开销。
2.4 基于redis分布式锁
redis是单线程的,所以能保证线程串行处理,但因为redis分布式锁是ap模型,不是cp模型,无法实现强一致性。但因实现简单,接入成本低,如果对数据一致性要求不那么高,可以选择此方式。
2.4.1 Redis分布式锁简单实现
熟悉Redis的同学那么肯定对setNx(set if not exist)方法不陌生,如果不存在则更新,其可以很好的用来实现我们的分布式锁。对于某个资源加锁我们只需要
setNx resourceName value
这里有个问题,加锁了之后如果机器宕机那么这个锁就不会得到释放所以会加入过期时间,加入过期时间需要和setNx同一个原子操作,在Redis2.8之前我们需要使用Lua脚本达到我们的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。
set resourceName value ex 5 nx
2.4.2 Redission
Javaer都知道Jedis,Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持。Redission也是Redis的客户端,相比于Jedis功能简单。Jedis简单使用阻塞的I/O和redis交互,Redission通过Netty支持非阻塞I/O。Jedis最新版本2.9.0是2016年的快3年了没有更新,而Redission最新版本是2018.10月更新。
Redission封装了锁的实现,其继承了java.util.concurrent.locks.Lock的接口,让我们像操作我们的本地Lock一样去操作Redission的Lock,下面介绍一下其如何实现分布式锁。
Redission不仅提供了Java自带的一些方法(lock,tryLock),还提供了异步加锁,对于异步编程更加方便。 由于内部源码较多,就不贴源码了,这里用文字叙述来分析他是如何加锁的,这里分析一下tryLock方法:
-
尝试加锁:首先会尝试进行加锁,由于保证操作是原子性,那么就只能使用lua脚本,相关的lua脚本如下:
可以看见他并没有使用我们的sexNx来进行操作,而是使用的hash结构,我们的每一个需要锁定的资源都可以看做是一个HashMap,锁定资源的节点信息是Key,锁定次数是value。通过这种方式可以很好的实现可重入的效果,只需要对value进行加1操作,就能进行可重入锁。当然这里也可以用之前我们说的本地计数进行优化。
- 如果尝试加锁失败,判断是否超时,如果超时则返回false。
- 如果加锁失败之后,没有超时,那么需要在名字为redisson_lock__channel+lockName的channel上进行订阅,用于订阅解锁消息,然后一直阻塞直到超时,或者有解锁消息。
- 重试步骤以上三步,直到最后获取到锁,或者某一步获取锁超时。
对于我们的unlock方法比较简单也是通过lua脚本进行解锁,如果是可重入锁,只是减1。如果是非加锁线程解锁,那么解锁失败。
Redission还有公平锁的实现,对于公平锁其利用了list结构和hashset结构分别用来保存我们排队的节点,和我们节点的过期时间,用这两个数据结构帮助我们实现公平锁,这里就不展开介绍了,有兴趣可以参考源码。
2.4.3 RedLock
我们想象一个这样的场景当机器A申请到一把锁之后,如果Redis主宕机了,这个时候从机并没有同步到这一把锁,那么机器B再次申请的时候就会再次申请到这把锁,为了解决这个问题Redis作者提出了RedLock红锁的算法,在Redission中也对RedLock进行了实现。
通过上面的代码,我们需要实现多个Redis集群,然后进行红锁的加锁,解锁。具体的步骤如下:
- 首先生成多个Redis集群的Rlock,并将其构造成RedLock。
- 依次循环对三个集群进行加锁,加锁的过程和5.2里面一致。
- 如果循环加锁的过程中加锁失败,那么需要判断加锁失败的次数是否超出了最大值,这里的最大值是根据集群的个数,比如三个那么只允许失败一个,五个的话只允许失败两个,要保证多数成功。
- 加锁的过程中需要判断是否加锁超时,有可能我们设置加锁只能用3ms,第一个集群加锁已经消耗了3ms了。那么也算加锁失败。
- 3,4步里面加锁失败的话,那么就会进行解锁操作,解锁会对所有的集群在请求一次解锁。
可以看见RedLock基本原理是利用多个Redis集群,用多数的集群加锁成功,减少Redis某个集群出故障,造成分布式锁出现问题的概率。
2.4.4 Redis小结
- 优点:对于Redis实现简单,性能对比ZK和Mysql较好。如果不需要特别复杂的要求,那么自己就可以利用setNx进行实现,如果自己需要复杂的需求的话那么可以利用或者借鉴Redission。对于一些要求比较严格的场景来说的话可以使用RedLock。
- 缺点:需要维护Redis集群,如果要实现RedLock那么需要维护更多的集群。
2.5 基于ZK分布式锁
ZooKeeper也是我们常见的实现分布式锁方法,相比于数据库如果没了解过ZooKeeper可能上手比较难一些。ZooKeeper是以Paxos算法为基础分布式应用程序协调服务。Zk的数据节点和文件目录类似,所以我们可以用此特性实现分布式锁。我们以某个资源为目录,然后这个目录下面的节点就是我们需要获取锁的客户端,未获取到锁的客户端注册需要注册Watcher到上一个客户端,可以用下图表示。
/lock是我们用于加锁的目录,/resource_name是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。
2.5.1 Curator
Curator封装了Zookeeper底层的Api,使我们更加容易方便的对Zookeeper进行操作,并且它封装了分布式锁的功能,这样我们就不需要再自己实现了。
Curator实现了可重入锁(InterProcessMutex),也实现了不可重入锁(InterProcessSemaphoreMutex)。在可重入锁中还实现了读写锁。
2.5.2 InterProcessMutex
InterProcessMutex是Curator实现的可重入锁,我们可以通过下面的一段代码实现我们的可重入锁:
我们利用acuire进行加锁,release进行解锁。
加锁的流程具体如下:
- 首先进行可重入的判定:这里的可重入锁记录在ConcurrentMap<Thread, LockData> threadData这个Map里面,如果threadData.get(currentThread)是有值的那么就证明是可重入锁,然后记录就会加1。我们之前的Mysql其实也可以通过这种方法去优化,可以不需要count字段的值,将这个维护在本地可以提高性能。
- 然后在我们的资源目录下创建一个节点:比如这里创建一个/0000000002这个节点,这个节点需要设置为EPHEMERAL_SEQUENTIAL也就是临时节点并且有序。
- 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个。
- 如果是第一个,则获取到锁,那么可以返回。
- 如果不是第一个,则证明前面已经有人获取到锁了,那么需要获取自己节点的前一个节点。/0000000002的前一个节点是/0000000001,我们获取到这个节点之后,再上面注册Watcher(这里的watcher其实调用的是object.notifyAll(),用来解除阻塞)。
- object.wait(timeout)或object.wait():进行阻塞等待这里和我们第5步的watcher相对应。
解锁的具体流程:
- 首先进行可重入锁的判定:如果有可重入锁只需要次数减1即可,减1之后加锁次数为0的话继续下面步骤,不为0直接返回。
- 删除当前节点。
- 删除threadDataMap里面的可重入锁的数据。
2.5.3 读写锁
Curator提供了读写锁,其实现类是InterProcessReadWriteLock,这里的每个节点都会加上前缀:
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
根据不同的前缀区分是读锁还是写锁,对于读锁,如果发现前面有写锁,那么需要将watcher注册到和自己最近的写锁。写锁的逻辑和我们之前2.5.2分析的依然保持不变。
2.5.4锁超时
Zookeeper不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个ZK的session,通过这个session,ZK可以判断机器是否宕机。如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。
2.5.5 ZK小结
- 优点:ZK可以不需要关心锁超时时间,实现起来有现成的第三方包,比较方便,并且支持读写锁,ZK获取锁会按照加锁的顺序,所以其是公平锁。对于高可用利用ZK集群进行保证。
- 缺点:ZK需要额外维护,增加维护成本,性能和Mysql相差不大,依然比较差。并且需要开发人员了解ZK是什么。
2.6 基于etcd分布式锁
2.6.1 机制
etcd 支持以下功能,正是依赖这些功能来实现分布式锁的:
- Lease 机制:即租约机制(TTL,Time To Live),Etcd 可以为存储的 KV 对设置租约,当租约到期,KV 将失效删除;同时也支持续约,即 KeepAlive。
- Revision 机制:每个 key 带有一个 Revision 属性值,etcd 每进行一次事务对应的全局 Revision 值都会加一,因此每个 key 对应的 Revision 属性值都是全局唯一的。通过比较 Revision 的大小就可以知道进行写操作的顺序。
- 在实现分布式锁时,多个程序同时抢锁,根据 Revision 值大小依次获得锁,可以避免 “羊群效应” (也称 “惊群效应”),实现公平锁。
- Prefix 机制:即前缀机制,也称目录机制。可以根据前缀(目录)获取该目录下所有的 key 及对应的属性(包括 key, value 以及 revision 等)。
- Watch 机制:即监听机制,Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个目录(前缀机制),当被 Watch 的 key 或目录发生变化,客户端将收到通知。
2.6.2 过程
实现过程:
- 步骤 1: 准备
客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1",第二个为 key="/lock/mylock/UUID2";客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定,假设为 15s;
- 步骤 2: 创建定时任务作为租约的“心跳”
当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。
- 步骤 3: 客户端将自己全局唯一的 key 写入 Etcd
进行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。
- 步骤 4: 客户端判断是否获得锁
客户端以前缀 /lock/mylock 读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。
- 步骤 5: 执行业务
获得锁后,操作共享资源,执行业务代码。
- 步骤 6: 释放锁
完成业务流程后,删除对应的key释放锁。
2.6.3 实现
自带的 etcdctl 可以模拟锁的使用:
// 第一个终端
$ ./etcdctl lock mutex1
mutex1/326963a02758b52d
// 第二终端
$ ./etcdctl lock mutex1
// 当第一个终端结束了,第二个终端会显示
mutex1/326963a02758b531
在etcd的clientv3包中,实现了分布式锁。使用起来和mutex是类似的,为了了解其中的工作机制,这里简要的做一下总结。
etcd分布式锁的实现在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下几个方法:
* func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex
* func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,并且支持通过context来取消获取锁。
* func (m *Mutex) Unlock(ctx context.Context) error,解锁
因此在使用etcd提供的分布式锁式非常简单,通常就是实例化一个mutex,然后尝试抢占锁,之后进行业务处理,最后解锁即可。
demo:
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"os"
"os/signal"
"time"
)
func main() {
c := make(chan os.Signal)
signal.Notify(c)
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
lockKey := "/lock"
go func () {
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
m := concurrency.NewMutex(session, lockKey)
if err := m.Lock(context.TODO()); err != nil {
log.Fatal("go1 get mutex failed " + err.Error())
}
fmt.Printf("go1 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(10) * time.Second)
m.Unlock(context.TODO())
fmt.Printf("go1 release lock\n")
}()
go func() {
time.Sleep(time.Duration(2) * time.Second)
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
m := concurrency.NewMutex(session, lockKey)
if err := m.Lock(context.TODO()); err != nil {
log.Fatal("go2 get mutex failed " + err.Error())
}
fmt.Printf("go2 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(2) * time.Second)
m.Unlock(context.TODO())
fmt.Printf("go2 release lock\n")
}()
<-c
}
2.6.4 原理
Lock()函数的实现很简单:
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmp、put、get、getOwner。需要注意的是,key是由pfx和Lease()组成的。
- cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
- put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
- get: get就是通过key来查询
- getOwner: 注意这里是用m.pfx来查询的,并且带了查询参数WithFirstCreate()。使用pfx来查询是因为其他的session也会用同样的pfx来尝试加锁,并且因为每个LeaseID都不同,所以第一次肯定会put成功。但是只有最早使用这个pfx的session才是持有锁的,所以这个getOwner的含义就是这样的。
接下来才是通过判断来检查是否持有锁
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
m.myRev是当前的版本号,resp.Succeeded是cmp为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。
下面是取得锁的持有者的key。如果当前没有人持有这把锁,那么默认当前会话获得了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
上面这段代码就很好理解了,因为走到这里说明没有获取到锁,那么这里等待锁的删除。
waitDeletes方法的实现也很简单,但是需要注意的是,这里的getOpts只会获取比当前会话版本号更低的key,然后去监控最新的key的删除。等这个key删除了,自己也就拿到锁了。
这种分布式锁的实现和我一开始的预想是不同的。它不存在锁的竞争,不存在重复的尝试加锁的操作。而是通过使用统一的前缀pfx来put,然后根据各自的版本号来排队获取锁。效率非常的高。避免了惊群效应
如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。
这里面需要注意一个惊群效应,每一个client在锁住/lock这个path的时候,实际都已经插入了自己的数据,类似/lock/LEASE_ID,并且返回了各自的index(就是raft算法里面的日志索引),而只有最小的才算是拿到了锁,其他的client需要watch等待。例如client1拿到了锁,client2和client3在等待,而client2拿到的index比client3的更小,那么对于client1删除锁之后,client3其实并不关心,并不需要去watch。所以综上,等待的节点只需要watch比自己index小并且差距最小的节点删除事件即可。
2.6.5 基于 ETCD的选主
2.6.5.1 机制
etcd有多种使用场景,Master选举是其中一种。说起Master选举,过去常常使用zookeeper,通过创建EPHEMERAL_SEQUENTIAL节点(临时有序节点),我们选择序号最小的节点作为Master,逻辑直观,实现简单是其优势,但是要实现一个高健壮性的选举并不简单,同时zookeeper繁杂的扩缩容机制也是沉重的负担。
master 选举根本上也是抢锁,与zookeeper直观选举逻辑相比,etcd的选举则需要在我们熟悉它的一系列基本概念后,调动我们充分的想象力:
MVCC,key存在版本属性,没被创建时版本号为0;
CAS操作,结合MVCC,可以实现竞选逻辑,if(version == 0) set(key,value),通过原子操作,确保只有一台机器能set成功;
Lease租约,可以对key绑定一个租约,租约到期时没预约,这个key就会被回收;
Watch监听,监听key的变化事件,如果key被删除,则重新发起竞选。
至此,etcd选举的逻辑大体清晰了,但这一系列操作与zookeeper相比复杂很多,有没有已经封装好的库可以直接拿来用?etcd clientv3 concurrency中有对选举及分布式锁的封装。后面进一步发现,etcdctl v3里已经有master选举的实现了,下面针对这部分代码进行简单注释,在最后参考这部分代码实现自己的选举逻辑。
2.6.5.2 etcd选主的实现
官方示例:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_election_test.go
如crontab 示例:
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"log"
"time"
)
const prefix = "/election-demo"
const prop = "local"
func main() {
endpoints := []string{"szth-cce-devops00.szth.baidu.com:8379"}
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
campaign(cli, prefix, prop)
}
func campaign(c *clientv3.Client, election string, prop string) {
for {
// 租约到期时间:5s
s, err := concurrency.NewSession(c, concurrency.WithTTL(5))
if err != nil {
fmt.Println(err)
continue
}
e := concurrency.NewElection(s, election)
ctx := context.TODO()
log.Println("开始竞选")
err = e.Campaign(ctx, prop)
if err != nil {
log.Println("竞选 leader失败,继续")
switch {
case err == context.Canceled:
return
default:
continue
}
}
log.Println("获得leader")
if err := doCrontab(); err != nil {
log.Println("调用主方法失败,辞去leader,重新竞选")
_ = e.Resign(ctx)
continue
}
return
}
}
func doCrontab() error {
for {
fmt.Println("doCrontab")
time.Sleep(time.Second * 4)
//return fmt.Errorf("sss")
}
}
2.6.5.3 etcd选主的原理
/*
* 发起竞选
* 未当选leader前,会一直阻塞在Campaign调用
* 当选leader后,等待SIGINT、SIGTERM或session过期而退出
* https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go
*/
func campaign(c *clientv3.Client, election string, prop string) error {
//NewSession函数中创建了一个lease,默认是60s TTL,并会调用KeepAlive,永久为这个lease自动续约(2/3生命周期的时候执行续约操作)
s, err := concurrency.NewSession(c)
if err != nil {
return err
}
e := concurrency.NewElection(s, election)
ctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigc
cancel()
close(donec)
}()
//竞选逻辑,将展开分析
if err = e.Campaign(ctx, prop); err != nil {
return err
}
// print key since elected
resp, err := c.Get(ctx, e.Key())
if err != nil {
return err
}
display.Get(*resp)
select {
case <-donec:
case <-s.Done():
return errors.New("elect: session expired")
}
return e.Resign(context.TODO())
}
/*
* 类似于zookeeper的临时有序节点,etcd的选举也是在相应的prefix path下面创建key,该key绑定了lease并根据lease id进行命名,
* key创建后就有revision号,这样使得在prefix path下的key也都是按revision有序
* https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
*/
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()
//真正创建的key名为:prefix + lease id
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
//Txn:transaction,依靠Txn进行创建key的CAS操作,当key不存在时才会成功创建
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
//如果key已存在,则创建失败;
//当key的value与当前value不等时,如果自己为leader,则不用重新执行选举直接设置value;
//否则报错。
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
//一直阻塞,直到确认自己的create revision为当前path中最小,从而确认自己当选为leader
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
2.7 分布式锁的安全问题
下面我们来讨论一下分布式锁的安全问题:
-
长时间的GC pause:熟悉Java的同学肯定对GC不陌生,在GC的时候会发生STW(stop-the-world),例如CMS垃圾回收器,他会有两个阶段进行STW防止引用继续进行变化。那么有可能会出现下面图(引用至Martin反驳Redlock的文章)中这个情况:
client1获取了锁并且设置了锁的超时时间,但是client1之后出现了STW,这个STW时间比较长,导致分布式锁进行了释放,client2获取到了锁,这个时候client1恢复了锁,那么就会出现client1,2同时获取到锁,这个时候分布式锁不安全问题就出现了。这个其实不仅仅局限于RedLock,对于我们的ZK,Mysql一样的有同样的问题。
- 时钟发生跳跃:对于Redis服务器如果其时间发生了向跳跃,那么肯定会影响我们锁的过期时间,那么我们的锁过期时间就不是我们预期的了,也会出现client1和client2获取到同一把锁,那么也会出现不安全,这个对于Mysql也会出现。但是ZK由于没有设置过期时间,那么发生跳跃也不会受影响。
- 长时间的网络I/O:这个问题和我们的GC的STW很像,也就是我们这个获取了锁之后我们进行网络调用,其调用时间由可能比我们锁的过期时间都还长,那么也会出现不安全的问题,这个Mysql也会有,ZK也不会出现这个问题。
对于这三个问题,在网上包括Redis作者在内发起了很多讨论。
2.7.1 GC的STW
对于这个问题可以看见基本所有的都会出现问题,对于ZK这种他会生成一个自增的序列,那么我们真正进行对资源操作的时候,需要判断当前序列是否是最新,有点类似于我们乐观锁。当然这个解法Redis作者进行了反驳,你既然都能生成一个自增的序列了那么你完全不需要加锁了,也就是可以按照类似于Mysql乐观锁的解法去做。
我自己认为这种解法增加了复杂性,当我们对资源操作的时候需要增加判断序列号是否是最新,无论用什么判断方法都会增加复杂度。
2.7.2 时钟发生跳跃
RedLock不安全很大的原因也是因为时钟的跳跃,因为锁过期强依赖于时间,但是ZK不需要依赖时间,依赖每个节点的Session。Redis作者也给出了解答:对于时间跳跃分为人为调整和NTP自动调整。
- 人为调整:人为调整影响的那么完全可以人为不调整,这个是处于可控的。
- NTP自动调整:这个可以通过一定的优化,把跳跃时间控制的可控范围内,虽然会跳跃,但是是完全可以接受的。
2.7.3长时间的网络I/O
对于这个问题的优化可以控制网络调用的超时时间,把所有网络调用的超时时间相加,那么我们锁过期时间其实应该大于这个时间,当然也可以通过优化网络调用比如串行改成并行,异步化等。可以参考文章: 并行化-你的高并发大杀器,异步化-你的高并发大杀器