Spring-data-redis + redis 分布式锁(二)

分布式锁的解决方式

  1. 基于数据库表做乐观锁,用于分布式锁。(适用于小并发)
  2. 使用memcached的add()方法,用于分布式锁。
  3. 使用memcached的cas()方法,用于分布式锁。(不常用)
  4. 使用redis的setnx()、expire()方法,用于分布式锁。
  5. 使用redis的setnx()、get()、getset()方法,用于分布式锁。
  6. 使用redis的watch、multi、exec命令,用于分布式锁。(不常用)
  7. 使用zookeeper,用于分布式锁。(不常用)

这里主要介绍第四种和第五种:

前文提供的两种方式其实都有些问题,要么是死锁,要么是依赖服务器时间同步。从Redis 2.6.12 版本开始, SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果。这样我们的可以将加锁操作用一个set命令来实现,直接是原子性操作,既没有死锁的风险,也不依赖服务器时间同步,可以完美解决这两个问题。
在redis文档上有详细说明:
http://doc.redisfans.com/string/set.html

使用redis的SET resource-name anystring NX EX max-lock-time 方式,用于分布式锁

原理

命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。

客户端执行以上的命令:

  • 如果服务器返回 OK ,那么这个客户端获得锁。
  • 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
  • 设置的过期时间到达之后,锁将自动释放。

可以通过以下修改,让这个锁实现更健壮:

  • 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
  • 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
    这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。

以下是一个简单的解锁脚本示例:

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end

可能存在的问题

要保证redis支持eval命令

具体实现

锁具体实现RedisLock:


import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * Redis分布式锁
 * 使用 SET resource-name anystring NX EX max-lock-time 实现
 * <p>
 * 该方案在 Redis 官方 SET 命令页有详细介绍。
 * http://doc.redisfans.com/string/set.html
 * <p>
 * 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性,
 * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中:
 * <p>
 * EX seconds — 以秒为单位设置 key 的过期时间;
 * PX milliseconds — 以毫秒为单位设置 key 的过期时间;
 * NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
 * XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。
 * <p>
 * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
 * <p>
 * 客户端执行以上的命令:
 * <p>
 * 如果服务器返回 OK ,那么这个客户端获得锁。
 * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
 *
 * @author yuhao.wangwang
 */
public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    private RedisTemplate redisTemplate;

    /**
     * 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
     */
    public static final String NX = "NX";

    /**
     * seconds — 以秒为单位设置 key 的过期时间,等效于EXPIRE key seconds
     */
    public static final String EX = "EX";

    /**
     * 调用set后的返回值
     */
    public static final String OK = "OK";

    /**
     * 默认请求锁的超时时间(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默认锁的有效时间(s)
     */
    public static final int EXPIRE = 60;

    /**
     * 解锁的lua脚本
     */
    public static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    /**
     * 锁标志对应的key
     */
    private String lockKey;

    /**
     * 记录到日志的锁标志对应的key
     */
    private String lockKeyLog = "";

    /**
     * 锁对应的值
     */
    private String lockValue;

    /**
     * 锁的有效时间(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 请求锁的超时时间(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 锁标记
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用默认的锁过期时间和请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用默认的请求锁的超时时间,指定锁的过期时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param expireTime    锁的过期时间(单位:秒)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用默认的锁的过期时间,指定请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisLock(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 锁的过期时间和请求锁的超时时间都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param expireTime    锁的过期时间(单位:秒)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * 尝试获取锁 超时返回
     *
     * @return
     */
    public boolean tryLock() {
        // 生成随机key
        lockValue = UUID.randomUUID().toString();
        // 请求锁超时时间,纳秒
        long timeout = timeOut * 1000000;
        // 系统当前时间,纳秒
        long nowTime = System.nanoTime();
        while ((System.nanoTime() - nowTime) < timeout) {
            if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
                locked = true;
                // 上锁成功结束请求
                return true;
            }

            // 每次请求等待一段时间
            seleep(10, 50000);
        }
        return locked;
    }

    /**
     * 尝试获取锁 立即返回
     *
     * @return 是否成功获得锁
     */
    public boolean lock() {
        lockValue = UUID.randomUUID().toString();
        //不存在则添加 且设置过期时间(单位ms)
        String result = set(lockKey, lockValue, expireTime);
        locked = OK.equalsIgnoreCase(result);
        return locked;
    }

    /**
     * 以阻塞方式的获取锁
     *
     * @return 是否成功获得锁
     */
    public boolean lockBlock() {
        lockValue = UUID.randomUUID().toString();
        while (true) {
            //不存在则添加 且设置过期时间(单位ms)
            String result = set(lockKey, lockValue, expireTime);
            if (OK.equalsIgnoreCase(result)) {
                locked = true;
                return locked;
            }

            // 每次请求等待一段时间
            seleep(10, 50000);
        }
    }

    /**
     * 解锁
     * <p>
     * 可以通过以下修改,让这个锁实现更健壮:
     * <p>
     * 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
     * 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
     * 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
     */
    public Object unlock() {
        // 只有加锁成功并且锁还有效才去释放锁
        // 只有加锁成功并且锁还有效才去释放锁
        if (locked) {
            try {
                return redisTemplate.execute((RedisConnection connection) -> {
                    Object nativeConnection = connection.getNativeConnection();
                    Long result = 0L;

                    List<String> keys = new ArrayList<>();
                    keys.add(lockKey);
                    List<String> values = new ArrayList<>();
                    values.add(lockValue);

                    // jedis集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    } else if (nativeConnection instanceof Jedis) {
                        // jedis单机模式
                        result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                    } else if (nativeConnection instanceof RedisScriptingAsyncCommands) {
                        // lettuce
                        try {
                            List<byte[]> bkeys = keys.stream().map(key -> redisTemplate.getKeySerializer().serialize(lockKey)).collect(Collectors.toList());
                            List<byte[]> bargs = values.stream().map(arg -> redisTemplate.getValueSerializer().serialize(lockValue)).collect(Collectors.toList());
                            RedisFuture<Long> future = ((RedisScriptingAsyncCommands) nativeConnection).eval(UNLOCK_LUA, ScriptOutputType.INTEGER, bkeys.toArray(new byte[0][0]), bargs.toArray(new byte[0][0]));
                            result = future.get(1, TimeUnit.SECONDS);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    } else {
                        return redisTemplate.delete(lockKey);
                    }

                    if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
                        logger.debug("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis());
                    }

                    locked = result == 0;
                    return result == 1;
                });
            } catch (Throwable e) {
                logger.warn("Redis不支持EVAL命令,使用降级方式解锁:{}", e.getMessage());
                String value = this.get(lockKey, String.class);
                if (lockValue.equals(value)) {
                    redisTemplate.delete(lockKey);
                    return true;
                }
                return false;
            }
        }

        return true;
    }

    /**
     * 获取锁状态
     *
     * @return
     * @Title: isLock
     * @author yuhao.wang
     */
    public boolean isLock() {

        return locked;
    }

    /**
     * 重写redisTemplate的set方法
     * <p>
     * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
     * <p>
     * 客户端执行以上的命令:
     * <p>
     * 如果服务器返回 OK ,那么这个客户端获得锁。
     * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
     *
     * @param key     锁的Key
     * @param value   锁里面的值
     * @param seconds 过去时间(秒)
     * @return
     */
    private String set(final String key, final String value, final long seconds) {
        Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空");

        return (String) redisTemplate.execute(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                String result = null;
                // jedis
                if (nativeConnection instanceof JedisCommands) {
                    result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
                } else if (nativeConnection instanceof RedisStringAsyncCommands) {
                    // lettuce
                    try {
                        byte[] serializeKey = redisTemplate.getKeySerializer().serialize(key);
                        byte[] serializeValue = redisTemplate.getValueSerializer().serialize(value);
                        RedisFuture<String> future = ((RedisStringAsyncCommands) nativeConnection).set(serializeKey, serializeValue, SetArgs.Builder.nx().ex(seconds));
                        result = future.get(1, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } else {
                    //  默认方式
                    boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
                    if (flag) {
                        redisTemplate.expire(key, seconds, TimeUnit.SECONDS);
                        result = OK;
                    }
                }

                if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
                    logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis());
                }

                return result;
            }
        });
    }

    /**
     * 获取redis里面的值
     *
     * @param key    key
     * @param aClass class
     * @return T
     */
    private <T> T get(final String key, Class<T> aClass) {
        Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空");
        return (T) redisTemplate.execute((RedisConnection connection) -> {
            Object nativeConnection = connection.getNativeConnection();
            Object result = null;
            if (nativeConnection instanceof JedisCommands) {
                result = ((JedisCommands) nativeConnection).get(key);
            }
            return (T) result;
        });
    }


    /**
     * @param millis 毫秒
     * @param nanos  纳秒
     * @Title: seleep
     * @Description: 线程等待时间
     * @author yuhao.wang
     */
    private void seleep(long millis, int nanos) {
        try {
            Thread.sleep(millis, random.nextInt(nanos));
        } catch (InterruptedException e) {
            logger.info("获取分布式锁休眠被中断:", e);
        }
    }

    public String getLockKeyLog() {
        return lockKeyLog;
    }

    public void setLockKeyLog(String lockKeyLog) {
        this.lockKeyLog = lockKeyLog;
    }

    public int getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(int expireTime) {
        this.expireTime = expireTime;
    }

    public long getTimeOut() {
        return timeOut;
    }

    public void setTimeOut(long timeOut) {
        this.timeOut = timeOut;
    }
}

调用方式:

public void redisLock3(int i) {
    RedisLock redisLock3 = new RedisLock(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
    try {
        long now = System.currentTimeMillis();
        if (redisLock3.tryLock()) {
            logger.info("=" + (System.currentTimeMillis() - now));
            // TODO 获取到锁要执行的代码块
            logger.info("j:" + j++);
        } else {
            logger.info("k:" + k++);
        }
    } catch (Exception e) {
        logger.info(e.getMessage(), e);
    } finally {
        redisLock2.unlock();
    }
}

对于这种种redis实现分布式锁的方案还是有一个问题:就是你获取锁后执行业务逻辑的代码只能在redis锁的有效时间之内,因为,redis的key到期后会自动清除,这个锁就算释放了。所以这个锁的有效时间一定要结合业务做好评估。

源码: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-data-redis-distributed-lock 工程

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容