[转]分布式锁-RedisLockRegistry源码分析

前言

官网的英文介绍大概如下:

Starting with version 4.0, the RedisLockRegistry is available. Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread is manipulating a group at a time. The DefaultLockRegistry performs this function within a single component; you can now configure an external lock registry on these components. When used with a shared MessageGroupStore, the RedisLockRegistry can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
To avoid "hung" locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but this can be configured on the registry. Locks are normally held for a much smaller time.

上述大概意思是RedisLockRegistry可以确保在分布式环境中,只有一个thread在执行,也就是实现了分布式锁,当一个本地线程释放了锁,其他本地现场会立即去抢占锁,如果锁被占用了,那么会进行重试机制,100毫秒进行重试一次。同时也避免了"hung" locks 当服务器fails的时候。同时也给锁设置了默认60秒的过期时间

如何获取锁

锁的获取过程

详细流程如上图所示,这里主要核心业务是这样,首先Lock是java.util.concurrent.locks中的锁,也就是本地锁。然后自己用RedisLock实现了Lock接口而已,但是实际上RedisLock也使用了本地锁。主要是通过redis锁+本地锁双重锁的方式实现的一个比较好的锁。针对redis锁来说只要能获取到锁,那么就算是成功的。如果获取不到锁就等待100毫秒继续重试,如果获取到锁那么就采用本地锁锁住本地的线程。通过两种方式很好的去实现了一个完善的分布式锁机制。
下面代码主要是获取锁的一个流程,先从本地锁里面获取,如果获取到了那么和redis里面存放的RedisLock锁做对比,判断是否是同一个对象,如果不是那么就删除本地锁然后重新创建一个锁返回

@Override
public Lock obtain(Object lockKey) {
    Assert.isInstanceOf(String.class, lockKey);

    //try to find the lock within hard references
    //从本地强引用里面获取锁,
    RedisLock lock = findLock(this.hardThreadLocks.get(), lockKey);

    /*
     * If the lock is locked, check that it matches what's in the store.
     * If it doesn't, the lock must have expired.
     */
    //这里主要判断了这个锁是否是锁住的,如果不是的那么该锁已经过期了
    //如果强引用里面有这个锁,并且lock.thread!=null,说明这个锁没有被占用
    if (lock != null && lock.thread != null) {
        //从redis获取锁,若如果redis锁为空或者跟当前强引用的锁不一致,可以确定两个问题
        //1.redis里面的锁和本地的锁不是一个了
        //2.redis里面没有锁
        RedisLock lockInStore = this.redisTemplate.boundValueOps(this.registryKey + ":" + lockKey).get();
        if (lockInStore == null || !lock.equals(lockInStore)) {
            //删除强引用里面锁
            getHardThreadLocks().remove(lock);
            lock = null;
        }
    }
    //如果锁==null
    if (lock == null) {
        //try to find the lock within weak references
        //尝试线从弱引用里面去找锁
        lock = findLock(this.weakThreadLocks.get(), lockKey);
        //如果弱引用锁==null 那么新建一个锁
        if (lock == null) {
            lock = new RedisLock((String) lockKey);
            //判断是否用弱引用,如果用那么就加入到弱引用里面
            if (this.useWeakReferences) {
                getWeakThreadLocks().add(lock);
            }
        }
    }

    return lock;
}

上面获取到的是RedisLock,RedisLock是实现java原生Lock接口,并重写了lock()方法。首先从localRegistry中获取到锁,这里的锁是java开发包里面的ReentrantLock。首先把本地先锁住,然后再去远程obtainLock。每次sleep() 100毫秒直到获取到远程锁为止,代码如下所示:

@Override
public void lock() {
    //这里采用java开发包里面的ReentrantLock 进行多线程的加锁,单机多线程的情况下解决并发的问题
    Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
    localLock.lock();
    while (true) {
        try {
            while (!this.obtainLock()) {
                Thread.sleep(100); //NOSONAR
            }
            break;
        }
        catch (InterruptedException e) {
                /*
                 * This method must be uninterruptible so catch and ignore
                 * interrupts and only break out of the while loop when
                 * we get the lock.
                 */
        }
        catch (Exception e) {
            localLock.unlock();
            rethrowAsLockException(e);
        }
    }
}

核心远程锁还是在RedisLock中,这里采用了redis事务+watch的方式,watch和事务都是redis里面自带的。使用watch时候如果key的值发生了任何变化。那么exec()将不会执行,那么如下代码返回的success就是false。从而来实现redis锁的功能

private boolean obtainLock() {
    //判断创建这个类的线程和当前是否是一个,如果是就直接获取锁
    Thread currentThread = Thread.currentThread();
    if (currentThread.equals(this.thread)) {
        this.reLock++;
        return true;
    }
    //把当前锁存到集合种
    toHardThreadStorage(this);

    /*
     * Set these now so they will be persisted if successful.
     */
    this.lockedAt = System.currentTimeMillis();
    this.threadName = currentThread.getName();

    Boolean success = false;
    try {
        success = RedisLockRegistry.this.redisTemplate.execute(new SessionCallback<Boolean>() {

            @SuppressWarnings({"unchecked", "rawtypes"})
            @Override
            public Boolean execute(RedisOperations ops) throws DataAccessException {
                String key = constructLockKey();
                //监控key如果该key被改变了 那么该事务是不能被实现的会进行回滚
                ops.watch(key); //monitor key
                //如果key存在了就停止监控,如果key已经存在了 那么肯定是被别人占用了
                if (ops.opsForValue().get(key) != null) {
                    ops.unwatch(); //key already exists, stop monitoring
                    return false;
                }

                ops.multi(); //transaction start
                //设置一个值并加上过期时间 m默认是一分钟左右的时间
                //set the value and expire
                //把锁放入到redis中
                ops.opsForValue()
                        .set(key, RedisLock.this, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);

                //exec will contain all operations result or null - if execution has been aborted due to 'watch'
                return ops.exec() != null;
            }

        });

    }
    finally {
      //如果不成功那么把当前过期时间和锁的名字设置成null
        if (!success) {
            this.lockedAt = 0;
            this.threadName = null;
            toWeakThreadStorage(this);
        }
        else {
        //如果成功把当前锁的thread名称设置成currentThread
            this.thread = currentThread;
            if (logger.isDebugEnabled()) {
                logger.debug("New lock; " + this.toString());
            }
        }

    }

    return success;
}

上面是整个加锁的流程,基本流程比较简单,看完加锁应该自己都能解锁,无非就是去除redis锁和本地的锁而已。

@Override
public void unlock() {
    //判断当前运行的线程和锁的线程做对比,如果两个线程不一样那么抛出异常
    if (!Thread.currentThread().equals(this.thread)) {
        if (this.thread == null) {
            throw new IllegalStateException("Lock is not locked; " + this.toString());
        }
        throw new IllegalStateException("Lock is owned by " + this.thread.getName() + "; " + this.toString());
    }

    try {
       //如果reLock--小于=0的话就删除redis里面的锁
        if (this.reLock-- <= 0) {
            try {
                this.assertLockInRedisIsUnchanged();
                RedisLockRegistry.this.redisTemplate.delete(constructLockKey());
                if (logger.isDebugEnabled()) {
                    logger.debug("Released lock; " + this.toString());
                }
            }
            finally {
                this.thread = null;
                this.reLock = 0;
                toWeakThreadStorage(this);
            }
        }
    }
    finally {
    //拿到本地锁,进行解锁
        Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
        localLock.unlock();
    }
}

tryLock在原有的加锁上面增加了一个超时机制,主要是先通过本地的超时机制

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    //拿到本地锁
    Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
    //先本地锁进行tryLock
    if (!localLock.tryLock(time, unit)) {
        return false;
    }
    try {
        long expire = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(time, unit);
        boolean acquired;
        //这里添加了超时机制,跟之前的无限等待做了一个区分
        while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
            Thread.sleep(100); //NOSONAR
        }
         //超时后没有获取到锁,那么就把本地锁进行解锁
        if (!acquired) {
            localLock.unlock();
        }
        return acquired;
    }
    catch (Exception e) {
        localLock.unlock();
        rethrowAsLockException(e);
    }
    return false;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容