分布式锁

有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。单机的可以使用ReentrantLock或者synchronized代码块来实现,但是这些API在分布式场景中就无能为力了。
针对分布式锁的实现,目前比较常用的有以下几种方案:基于数据库, 基于缓存(redis,memcached),基于Zookeeper。

应用的场景例子:
管理后台的部署架构(多台tomcat服务器+redis【多台tomcat服务器访问一台redis】+mysql【多台tomcat服务器访问一台服务器上的mysql】)就满足使用分布式锁的条件。多台服务器要访问redis全局缓存的资源,如果不使用分布式锁就会出现问题。 看如下伪代码:

    long N=0L;
    //N从redis获取值
    if(N<5){
    N++;
    //N写回redis
    }

从redis获取值N,对数值N进行边界检查,自加1,然后N写回redis中。 这种应用场景很常见,像秒杀,全局递增ID、IP访问限制等。

基于redis:

使用 SETNX key value 命令。
设置成功,返回1,加锁。该客户端最后可以通过DEL key来释放该锁。
设置失败,返回0,获取锁失败。这时我们可以先返回或进行重试等对方完成或等待锁超时。

存在的问题:

  • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直存在,其他线程无法再获得到锁。
  • 这把锁只能是非阻塞的,无论成功还是失败都直接返回。
  • 这把锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在tair中已经存在。无法再执行put操作。

解决方式:

  • 用put方法支持传入失效时间,到达时间之后数据会自动删除。
  • while重复执行。
  • 在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者。

仍然存在的问题:

  • 为什么不直接使用expire设置超时时间,而将时间的毫秒数其作为value放在redis中。因为假如在setnx后,redis崩溃了,expire就没有执行(set和expire设置失效时间只能分两步执行),结果就是死锁了。锁永远不会超时。
  • 对于失效时间,如果如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。这个问题使用数据库实现分布式锁同样存在。

具体实现:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
    private RedisTemplate redisTemplate;
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    private String lockKey;

    //锁超时时间,防止线程在入锁以后,无限的执行等待    
    private int expireMsecs = 60 * 1000;
    //获取锁失败后,锁等待时间。防止无限期等待
    private int timeoutMsecs = 10 * 1000;
    //是否加锁标志位
    private volatile boolean locked = false;

    //重载构造方法
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
        this(redisTemplate, lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
        this(redisTemplate, lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }


    public String getLockKey() {
        return lockKey;
    }

    //redis在存储数据时,都把数据转化成了byte[]数组的形式,那么在存取数据时,需要用到Serializer将数据格式进行转化。
    //对redis进行get操作
    private String get(final String key) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] data = connection.get(serializer.serialize(key));
                    connection.close();
                    if (data == null) {
                        return null;
                    }
                    return serializer.deserialize(data);
                }
            });
        } catch (Exception e) {
            logger.error("get redis error, key : {}", key);
        }
        return obj != null ? obj.toString() : null;
    }

    //对redis进行setNX操作
    private boolean setNX(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return success;
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (Boolean) obj : false;
    }

    //对redis进行getset操作
    private String getSet(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return serializer.deserialize(ret);
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (String) obj : null;
    }

    /**
     * 使用setnx命令,缓存了锁。, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁。
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值,成功获得锁。
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            
            //加锁。如果不存在加锁成功,返回true。
            if (this.setNX(lockKey, expiresStr)) {
                //得到锁
                locked = true;
                return true;
            }

            //lockKey已存在,获取lockKey里的时间。
            String currentValueStr = this.get(lockKey);
            //判断是否为空,是否过期。
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //过期,设置现在的锁到期时间。只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的。
                String oldValueStr = this.getSet(lockKey, expiresStr);

                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受
                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁

                    //得到锁
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
        }
        //已存在lockKey,且未过期,则获取锁失败
        return false;
    }


    //解锁。直接删除lockKey即可
    public synchronized void unlock() {
        if (locked) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

    public static void main(String[] args) throws Exception {
        RedisLock lock = new RedisLock(redisTemplate, key, 10000, 20000);
        try{
            if(lock.lock()) {
                //需要加锁的代码
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而超时,此时锁已经被别人获得,这时就不必解锁了。
            lock.unlock();
        }
    }

}

基于zookeeper:

  • 加锁:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的节点下生成一个唯一的临时(挥发性)有序(自增长)节点。
  • 获取锁:调用getChildren(“locker”)来获取locker下面的所有子节点。客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小(自增长,先到先得),那么就认为该客户端获取到了锁。
  • 阻塞:如果创建的节点并非locker所有子节点中最小的,则没有获取到锁,此时找到比自己小的那个节点,然后对其调用exist()方法,并注册监听器。如果这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如果是则获取到了锁,否则重复以上步骤。
  • 释放锁:只需将这个临时节点删除即可。

左边的整个区域表示一个Zookeeper集群。locker是Zookeeper的一个持久节点。node_x是locker这个持久节点下面的临时顺序节点。client_x表示多个客户端。Service表示需要互斥访问的共享资源。

Zookeeper如何解决前面提到的问题:

  • 锁无法释放?客户端可以在Zookeeper中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
  • 非阻塞锁? 客户端可以通过在ZK中创建顺序(自增长)节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
  • 不可重入? 客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中。下次想要获取锁的时候和当前最小的节点中的数据比对,如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。

仍然存在的问题:

  • 性能上没有缓存高。每次都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
  • 如果由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁。可以设置重试策略。多次重试之后还不行的话才会删除临时节点。

curator实现zookeeper的分布式锁:

    package bjsxt.curator.lock;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;


    public class Lock {

        /** zookeeper地址 */
        static final String CONNECT_ADDR = "192.168.0.4:2181,192.168.0.9:2181,192.168.0.6:2181";
        /** session超时时间 */
        static final int SESSION_OUTTIME = 5000;
        
        static int count = 10;
        public static void genarNo(){
            try {
                count--;
                System.out.println(Thread.currentThread().getName()+" : "+count);
            } finally {
            
            }
        }
        
        public static void main(String[] args) throws Exception {
            
            //1 重试策略:初试时间为1s 重试10次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
            //2 通过工厂创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                                                         .connectString(CONNECT_ADDR)
                                                         .sessionTimeoutMs(SESSION_OUTTIME)
                                                         .retryPolicy(retryPolicy)
                                                         //.namespace("super")
                                                         .build();
            //3 开启连接
            cf.start();
            
            //4 分布式锁
            final InterProcessMutex lock = new InterProcessMutex(cf, "/lockpath");
            final CountDownLatch countdown = new CountDownLatch(1);
            
            for(int i = 0; i < 10; i++){

                //循环开始10个线程,模仿多个客户端同时操作
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            countdown.await();
                            //加锁
                            lock.acquire();
                            //-------------业务处理开始
                            genarNo();
                            //-------------业务处理结束
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                //释放
                                lock.release();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }, "t"+i ).start();
            }
            Thread.sleep(100);
            countdown.countDown();
        }
    }

运行结果为依次打印出9876543210。
不加锁的话,一种可能的结果为:7564271703。


参考:
http://www.jianshu.com/p/c77a5257303a
http://blog.csdn.net/pengshuai128/article/details/70593995
http://www.cnblogs.com/520playboy/p/6441651.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容