Redis分布式锁的Python实现[python-redis-lock]

转自https://readthefuckingsource.codes/2018/08/06/%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81python-redis-lock/

关于分布式锁有很多种实现方式,可以用数据库锁或者ZooKeeper这类的专业的分布式开源项目。本文讲的是用Redis实现的一个分布式锁库python-redis-lock. Redis官方有推荐一个分布式锁的算法Redlock(这个库实现的并不是这个算法), 该算法自动释放锁没有考虑到客户端长期持有的情况,因此也有人对这个算法提出了质疑

那回到我们今天要讲的这个库python-redis-lock。作者: Ionel Cristian Mărieș, 这个库整体的思路作者也用很直观的图展现出来了,如下:

大致思路

从图上看出作者和其它大多数用Redis实现分布式锁的思路类似(SET NX),但是他对每个锁多用了一个list类型键来做信号控制,如果客户端第一次尝试获取锁失败,可以选择在signal列表上阻塞一个timeout时间用来接收锁被释放的通知,Redis列表的这个特性保证了每次只有一个客户端接收到了锁释放的通知。而获取到锁的客户端在使用完后会在对应的信号列表上推送一个通知。另外,作者对锁超时还增加了一个刷新的功能来延长(Extend)对锁的占用,可以保证在持有锁的客户端上完成所有操作后才释放锁。个人认为这种设计的优点和需要注意的点如下:

优点

  • 一方面避免客户端反复请求锁,另一方面通过list signal来让客户端决定是否要block自己;
  • 如果有设置超时,则等待超时后客户端仍然会再尝试获取一次锁而不是直接失败;
  • 这个算法不依赖客户端时间戳,也就没有time drift问题;
  • 结合Lua脚本做原子操作,如果再加上细粒度锁,个人认为基本可以满足各种高需求场景的分布式锁要求。

⚠️Warning

  • 自动刷新可能会造成饥饿问题,如果持有锁的客户端因为某种未知原因阻塞,并且开启了自动刷新锁,那其它客户端就跪了,所以需要使用者慎用刷新机制;
  • 如果没有设置超时,且持有锁的客户端无响应的情况下就会造成死锁;

源码分析

了解过大体思路后,我们来一步步分解作者的实现。首先这个库源码只有两个脚本(不含测试和示例代码), 结构很简单。

src/redis_lock
├── init.py
└── django_cache.py

核心代码在__init__.py中,django_cache.py则是结合django-redis做的缓存后端,来避免缓存失效时遇上所谓的“狗屎效应(dogpile effect, Google翻译)”,这里不对它进行解析。

载入Lua脚本

# Check if the id match. If not, return an error code.
UNLOCK_SCRIPT = b"""
    if redis.call("get", KEYS[1]) ~= ARGV[1] then
        return 1
    else
        redis.call("del", KEYS[2])
        redis.call("lpush", KEYS[2], 1)
        redis.call("del", KEYS[1])
        return 0
    end
"""
UNLOCK_SCRIPT_HASH = sha1(UNLOCK_SCRIPT).hexdigest()

作者用上面这种方式定义了UNLOCK, EXTEND, RESET… 等5个原子操作的Lua脚本,每个脚本也定义了对应的哈希值。关于Redis的Lua脚本支持可以看这篇文章。比较有意思的是下面这段代码,可以说是很Pythonic了:

((UNLOCK, _, _,   # noqa
  EXTEND, _, _,
  RESET, _, _,
  RESET_ALL, _, _,
  DELETE_ALL_SIGNAL_KEYS, _, _),
 SCRIPTS) = zip(*enumerate([
    UNLOCK_SCRIPT_HASH, UNLOCK_SCRIPT, 'UNLOCK_SCRIPT',
    EXTEND_SCRIPT_HASH, EXTEND_SCRIPT, 'EXTEND_SCRIPT',
    RESET_SCRIPT_HASH, RESET_SCRIPT, 'RESET_SCRIPT',
    RESET_ALL_SCRIPT_HASH, RESET_ALL_SCRIPT, 'RESET_ALL_SCRIPT',
    DELETE_ALL_SIGNAL_KEYS_SCRIPT_HASH, DELETE_ALL_SIGNAL_KEYS_SCRIPT,
    'DELETE_ALL_SIGNAL_KEYS_SCRIPT'
]))

为了把使用时要指定的脚本ID与其脚本、哈希值关联起来,作者用了enumerate来自动生成索引ID, 然后又用*来拍扁整个列表,最后再用zip把索引提取出来,把哈希值对应的索引ID用变量名存起来,没用的索引用_忽略,其余内容依然在SCRIPTS元组中。这样一来用下面这个函数执行Redis的Lua脚本就很舒服了:

def _eval_script(redis, script_id, *keys, **kwargs):
    """Tries to call ``EVALSHA`` with the `hash` and then, if it fails, calls
    regular ``EVAL`` with the `script`.
    """
    # Lua脚本的 KEYS 参数放在 *keys 中, ARGV 参数则在 kwargs 的 args 命名参数中提取出来, 和 keys 拼接起来传给 evalsha 或者 eval 函数
    args = kwargs.pop('args', ())
    if kwargs:
        raise TypeError("Unexpected keyword arguments %s" % kwargs.keys())
    try:
        # 首先尝试调用evalsha
        return redis.evalsha(SCRIPTS[script_id], len(keys), *keys + args)
    except NoScriptError:
        # 如果脚本不存在则调用eval载入并执行脚本
        logger.warn("%s not cached.", SCRIPTS[script_id + 2])
        return redis.eval(SCRIPTS[script_id + 1], len(keys), *keys + args)

创建锁

主要代码都在Lock类中,创建锁对象时大部分都是常规操作,保存实例的一些设定。

class Lock(object):
    """
    A Lock context manager implemented via redis SETNX/BLPOP.
    """

    def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True):
        """
        :param redis_client:
            An instance of :class:`~StrictRedis`.
        :param name:
            The name (redis key) the lock should have.
        :param expire:
            The lock expiry time in seconds. If left at the default (None)
            the lock will not expire.
        :param id:
            The ID (redis value) the lock should have. A random value is
            generated when left at the default.

            Note that if you specify this then the lock is marked as "held". Acquires
            won't be possible.
        :param auto_renewal:
            If set to ``True``, Lock will automatically renew the lock so that it
            doesn't expire for as long as the lock is held (acquire() called
            or running in a context manager).

            Implementation note: Renewal will happen using a daemon thread with
            an interval of ``expire*2/3``. If wishing to use a different renewal
            time, subclass Lock, call ``super().__init__()`` then set
            ``self._lock_renewal_interval`` to your desired interval.
        :param strict:
            If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
        """
        # ... 此处省略参数校验代码
        self._client = redis_client
        self._expire = expire if expire is None else int(expire)
        if id is None:
            self._id = urandom(16)
        elif isinstance(id, bytes):
            self._id = id
        else:
            raise TypeError("Incorrect type for `id`. Must be bytes not %s." % type(id))
        self._name = 'lock:'+name
        self._signal = 'lock-signal:'+name
        self._lock_renewal_interval = (float(expire)*2/3
                                       if auto_renewal
                                       else None)
        self._lock_renewal_thread = None

需要注意的几个点:

  • 可以用id来申明对锁所有权识别,例如客户端的主机名称或者进程号什么的,默认是16个随机字节。
  • 如果指定了锁自动刷新,那刷新间隔会设定在超时的2/3时间。

获取锁

获取锁的相关代码如下,还是选择在代码注释中解析代码会比较直观点。这里和上一段代码一样,省略了参数校验,这是很重要的一步,并且是一个良好的编程习惯,但是限于篇幅这里不做介绍。

def acquire(self, blocking=True, timeout=None):
    """
    :param blocking:
        Boolean value specifying whether lock should be blocking or not.
    :param timeout:
        An integer value specifying the maximum number of seconds to block.
    """
    logger.debug("Getting %r ...", self._name)

    if self._held:  # 锁不可重入
        raise AlreadyAcquired("Already acquired from this Lock instance.")

    # ... 此处省略参数校验代码,如timeout不能大于锁的_expire等各种条件

    busy = True
    blpop_timeout = timeout or self._expire or 0
    timed_out = False
    while busy:
        # 如果set失败则代表锁被占用,返回False
        busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
        if busy:
            if timed_out:
                return False
            elif blocking:  # 如果阻塞则在signal列表上监听
                # 如果blpop在blpop_timeout时间内获取到信号通知的话,timeout会被设置为False
                timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
            else:
                logger.debug("Failed to get %r.", self._name)
                return False

    logger.debug("Got lock for %r.", self._name)
    if self._lock_renewal_interval is not None: 
        # 如果需要自动刷新锁,则开启刷新线程
        self._start_lock_renewer()
    return True

这里有一个问题,如果在blpop成功获取到信号,并不代表下一次while循环尝试获取就一定成功,如果在此间隙中被其它客户端获得了锁,那该客户端仍然会获取失败,并去阻塞一个timeout时间。也就是说假设这个客户端的网络质量很差,而又恰恰是一个高频请求的锁,那就可能造成它虽然设置了超时,但最终结果可能等待了不止一个timeout时间才拿到结果,而且还可能会一直获取不到锁。

获取锁的开头用一个_held内部属性来判断当前实例是否已经拥有了锁,这里就是上一步中的id属性的用处,来判断锁的拥有者。代码如下:

def get_owner_id(self):
   return self._client.get(self._name)

@property
def _held(self):
   return self.id == self.get_owner_id()

刷新锁

刷新锁比较繁琐,作者用了一个线程在后台定时刷新,不过我们先来看刷新锁的实际操作:extend方法, 这个函数没加下划线前缀也就是允许锁的拥有者自己手动刷新。

def extend(self, expire=None):
    """Extends expiration time of the lock.

    :param expire:
        New expiration time. If ``None`` - `expire` provided during
        lock initialization will be taken.
    """
    # ... 此处省略参数校验代码
    # 这里调用第一步提到的Lua脚本,用索引EXTEND来指定脚本,并将超时时间`expire`和自身识别 id 传入脚本。
    error = _eval_script(self._client, EXTEND, self._name, args=(expire, self._id))
    if error == 1:
        raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
    elif error == 2:
        raise NotExpirable("Lock %s has no assigned expiration time" %
                           self._name)
    elif error:
        raise RuntimeError("Unsupported error code %s from EXTEND script" % error)

这个EXTEND操作的 Lua 脚本如下:

# Covers both cases when key doesn't exist and doesn't equal to lock's id
# 刷新前判断锁是否属于该拥有者,只允许拥有者延长锁的TTL
EXTEND_SCRIPT = b"""
    if redis.call("get", KEYS[1]) ~= ARGV[2] then
        return 1
    elseif redis.call("ttl", KEYS[1]) < 0 then
        return 2
    else
        redis.call("expire", KEYS[1], ARGV[1])
        return 0
    end
"""
EXTEND_SCRIPT_HASH = sha1(EXTEND_SCRIPT).hexdigest()

刷新锁的线程相关代码如下:

def _start_lock_renewer(self):
    """
    Starts the lock refresher thread.
    """
    if self._lock_renewal_thread is not None:
        raise AlreadyStarted("Lock refresh thread already started")

    # 线程事件用来监听刷新是否结束
    self._lock_renewal_stop = threading.Event()
    self._lock_renewal_thread = threading.Thread(
        group=None,
        target=self._lock_renewer,
        kwargs={'lockref': weakref.ref(self),   # 对锁实例做了一个弱引用
                'interval': self._lock_renewal_interval,
                'stop': self._lock_renewal_stop}
    )
    self._lock_renewal_thread.setDaemon(True)
    self._lock_renewal_thread.start()

@staticmethod
def _lock_renewer(lockref, interval, stop):
    """
    Renew the lock key in redis every `interval` seconds for as long
    as `self._lock_renewal_thread.should_exit` is False.
    """
    log = getLogger("%s.lock_refresher" % __name__)
    # 等待终止事件到来,否则在指定超时后返回False
    while not stop.wait(timeout=interval):
        log.debug("Refreshing lock")
        lock = lockref()    # 调用这个弱引用来获取当前锁实例
        if lock is None:    # 如果这个锁已经在其它线程被销毁则对应刷新线程也应该关闭
            log.debug("The lock no longer exists, "
                      "stopping lock refreshing")
            break
        lock.extend(expire=lock._expire)
        del lock    # 删除弱引用
    log.debug("Exit requested, stopping lock refreshing")

def _stop_lock_renewer(self):
    """
    Stop the lock renewer.

    This signals the renewal thread and waits for its exit.
    """
    if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
        return
    logger.debug("Signalling the lock refresher to stop")
    self._lock_renewal_stop.set()   # 事件通知子线程退出
    self._lock_renewal_thread.join()
    self._lock_renewal_thread = None
    logger.debug("Lock refresher has stopped")

这里主要是一个弱引用问题。传入了Daemon守护进程的变量要格外小心,很容易造成即使主线程已经不再引用这个变量,而守护进程不依赖该变量,却一直引用着,就导致内存无法释放。

释放锁

释放锁的实际操作和刷新锁一样,因为都涉及多个 Redis 命令,所以他们都放在了 Lua 脚本中。

def release(self):
    """Releases the lock, that was acquired with the same object.

    .. note::

        If you want to release a lock that you acquired in a different place you have two choices:

        * Use ``Lock("name", id=id_from_other_place).release()``
        * Use ``Lock("name").reset()``
    """
    if self._lock_renewal_thread is not None:
        # 如果有刷新线程则停止它
        self._stop_lock_renewer()
    logger.debug("Releasing %r.", self._name)
    error = _eval_script(self._client, UNLOCK, self._name, self._signal, args=(self._id,))
    if error == 1:
        raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
    elif error:
        raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
    else:
        self._delete_signal()

def _delete_signal(self):
    self._client.delete(self._signal)

至于UNLOCK脚本操作在第一步里已经展示出来,如果持有该锁则先删除signal列表,再push一个通知到signal列表,最后删除锁。这些步骤都会在一个Lua函数中执行保证原子性。

上下文支持

作者也重载了__enter____exit__两个函数来支持with的上下文调用,也很简单:

def __enter__(self):
    acquired = self.acquire(blocking=True)
    assert acquired, "Lock wasn't acquired, but blocking=True"
    return self

def __exit__(self, exc_type=None, exc_value=None, traceback=None):
    self.release()

示例代码

这里我就直接贴上作者的示例代码:

conn = StrictRedis()
with redis_lock.Lock(conn, "name-of-the-lock"):
    print("Got the lock. Doing some work ...")
    time.sleep(5)
# Eg:
lock = redis_lock.Lock(conn, "name-of-the-lock")
if lock.acquire(blocking=False):
    print("Got the lock.")
else:
    print("Someone else has the lock.")

总结

这个库提供的分布式锁很灵活,是否需要超时?是否需要自动刷新?是否要阻塞?都是可选的。没有最好的算法,只有最合适的算法,用户应该根据自己是场景谨慎选择。喜欢的朋友可以去这个项目的GitHub页面 点个🌟。

另外,Redis这个东西感觉可以做很多事,而且可以做很多高性能的事。尤其在分布式环境下,重点是还支持各种有意思的特性。用它来实现分布式锁就显得再合适不过了。

分享就到这里,谢谢大家!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容