简述
利用Redis的Setnx命令,来实现一个分布式的加锁方案。利用注解,在拥有该注解的方法上,进行切面处理,在方法执行前,进行加锁,执行结束后,根据是否自动释放锁,进行解锁。
将该注解用在定时任务的方法上,即可实现分布式定时任务,即获取到锁的方法,才会执行。
1 redis命令
1.1 setnx命令
- Redis setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。(该命令无法设置过期时间)
Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。redis的SETNX命令可以方便的实现分布式锁。
当某一个客户端将key的值设置成功后,其他的客户端再进行设置,将返回失败,保证同一时间,只有一个客户端能够设置成功。 - Redis事务
watch key1 key2 ... : 监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断 ( 类似乐观锁 )
multi : 标记一个事务块的开始( queued )
exec : 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 )
discard : 取消事务,放弃事务块中的所有命令
unwatch : 取消watch对所有key的监控
# 事务正常使用
127.0.0.1:6379> multi
127.0.0.1:6379> set name jack
127.0.0.1:6379> exec
# 取消事务
127.0.0.1:6379> multi
127.0.0.1:6379> set name jack
127.0.0.1:6379> discard
# watch使用
# number初始为10
127.0.0.1:6379> watch number
127.0.0.1:6379> multi
127.0.0.1:6379> set number 11
127.0.0.1:6379> exec
# 如果在执行exec时,number没有被其他客户端修改,还是10,则事务执行成功;
# 如果被其他客户端修改了,number不是10了,则事务执行失败,这时候就需求程序自行处理,进行再次提交或者其他操作
- 在spring boot 中,我们用StringRedisTemplate来操作Redis,它的方法:stringRedisTemplate.opsForValue().setIfAbsent()方法即对应setnx命令,这个方法有两个重载的方法:
1、Boolean setIfAbsent(K key, V value); 设置key value,返回成功/失败
2、Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit); 设置key value,返回成功/失败,同时设置过期时间,redisTemplate 会调用 EXPIRE进行过期时间的设定,同时在设置值和过期时间时,会开启事务,保存全部成功。
// org.springframework.data.redis.core 中实现的方法
@Override
public Boolean setIfAbsent(K key, V value) {
byte[] rawKey = rawKey(key);
byte[] rawValue = rawValue(value);
return execute(connection -> connection.setNX(rawKey, rawValue), true);
}
@Override
public Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit) {
byte[] rawKey = rawKey(key);
byte[] rawValue = rawValue(value);
Expiration expiration = Expiration.from(timeout, unit);
return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent()), true);
}
1.2 DEL命令、lua脚本
在加锁之后,解锁时,需要判断锁,是否是当前线程所拥有的,如果是当前线程拥有的,则删除该key,删除key,用del命令。
- del key_name
我们会先取出key对应的值,然后判断是否和当前线程的定义的值一致。如果一致,则说明是该线程拥有的key。如果我们在代码中取出key的值,然后判断通过后,调用redis del 删除key,这就不是一个原子操作了。如果在我们取出key的值后,然后在删除前,其他线程获取了锁,当前线程删除的动作,就会导致删除其他线程拥有的锁。所以释放锁,需要利用lua脚本进行,将判断和删除,这两个动作,合为一个原子性的操作。
所以我们会利用代码去执行下面的lua脚本,保证判断和删除的原子性。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
一般教程中,利用RedisTemplate来执行lua脚本时,会将lua脚本放到静态资源目录中。而在下面的代码中,利用ByteArrayResource直接从String字符串中读取了lua脚本内容:
/*
* 保存lua脚本
*/
private DefaultRedisScript<List> getRedisScript;
@PostConstruct
public void init(){
// 定义lua脚本资源
// 也可以放到文件中,加载进来: new ResourceScriptSource(new ClassPathResource("redis/demo.lua"))
String luaStr = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
ByteArrayResource resource = new ByteArrayResource(luaStr.getBytes());
getRedisScript = new DefaultRedisScript<>();
getRedisScript.setResultType(List.class);
getRedisScript.setScriptSource(new ResourceScriptSource(resource));
}
2 分布式锁实现
下面是实现的核心类:
- RedisLock: reids分布式锁工具类
- EmLock: 分布式锁注解
- LockRangeEnum: 分布式锁的范围枚举
- EmLockAspect: 分布式锁切面
2.1 RedisLock,reids分布式锁工具类
代码如下:
package com.emdata.lowvis.common.redislock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
/**
* reids分布式锁工具类
*
* @version 1.0
* @date 2020/12/8 14:37
*/
@Slf4j
@Component
public class RedisLock {
private static final String SPLIT = "_";
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 加锁解锁工具类
* @param lockKey 加锁的key
* @param uuid 线程的标志
* @param timeout 超时时间
* @param timeUnit 超时时间粒度
* @return true:获取成功
*/
public boolean lock(String lockKey, String uuid, long timeout, TimeUnit timeUnit) {
// 根据key获取值
String currentLock = stringRedisTemplate.opsForValue().get(lockKey);
// 值为:uuid_时间
String value = uuid + SPLIT + (timeUnit.toMillis(timeout) + System.currentTimeMillis());
// 如果为空,则设置值
if (StringUtils.isEmpty(currentLock)) {
if (stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, timeUnit)) {
// 对应setnx命令,可以成功设置,也就是key不存在,获得锁成功
return true;
} else {
return false;
}
} else {
// 可重入锁,如果是这个uuid持有的锁,则更新时间
if (currentLock.startsWith(uuid)) {
stringRedisTemplate.opsForValue().set(lockKey, value, timeout, timeUnit);
return true;
} else {
return false;
}
}
}
/*
* 保存lua脚本
*/
private DefaultRedisScript<List> getRedisScript;
@PostConstruct
public void init(){
// 定义lua脚本资源
// 也可以放到文件中,加载进来: new ResourceScriptSource(new ClassPathResource("redis/demo.lua"))
String luaStr = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
ByteArrayResource resource = new ByteArrayResource(luaStr.getBytes());
getRedisScript = new DefaultRedisScript<>();
getRedisScript.setResultType(List.class);
getRedisScript.setScriptSource(new ResourceScriptSource(resource));
}
/**
* 释放锁
*
* @param lockKey 加锁的key
* @param uuid 线程的标志
*/
public void release(String lockKey, String uuid) {
try {
List<Integer> execute = stringRedisTemplate.execute(getRedisScript, Collections.singletonList(lockKey), uuid);
log.debug("解锁结果: {}", execute.get(0) == 0);
} catch (Exception e) {
log.error("解锁异常, key: {}, uuid: {}", lockKey, uuid);
log.error("", e);
}
}
}
2.2 EmLock,分布式锁注解
package com.emdata.lowvis.common.redislock;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁注解
*
* @version 1.0
* @date 2020/12/8 17:59
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface EmLock {
/**
* 锁的范围,默认应用级别
* @return 锁的范围
*/
LockRangeEnum lockRange() default LockRangeEnum.APPLICATION;
/**
* 锁对应的key
* @return key
*/
String key();
/**
* 锁超时时间
* @return 时间
*/
int timeout() default 5;
/**
* 锁超时时间粒度
* @return 粒度
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/**
* 是否自动释放锁
* @return true: 方法完成后,自动释放
*/
boolean autoRelease() default true;
}
2.3 LockRangeEnum, 分布式锁的范围枚举
package com.emdata.lowvis.common.redislock;
/**
* 分布式锁的范围枚举
*
* @author pupengfei
* @version 1.0
* @date 2020/12/10 13:46
*/
public enum LockRangeEnum {
/**
* 应用级别,锁的级别在整个应用容器内
*/
APPLICATION,
/**
* 线程级别,锁的级别在每个线程
*/
THREAD
}
2.4 EmLockAspect,分布式锁切面
package com.emdata.lowvis.common.redislock;
import com.emdata.lowvis.common.utils.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁切面
*
* @version 1.0
* @date 2020/12/8 17:59
*/
@Slf4j
@Component
@Aspect
@Configuration
public class EmLockAspect {
@Autowired
private RedisLock redisLock;
/**
* 应用级别的容器的id
*/
private final String appUUID = UUIDUtils.get();
/**
* 线程级别的线程的id
*/
private final ThreadLocal<String> threadUUID = ThreadLocal.withInitial(UUIDUtils::get);
/**
* 定义切点
*/
@Pointcut("@annotation(com.emdata.lowvis.common.redislock.EmLock)")
public void lockAop() {
}
@Around("lockAop()")
public Object doAround(ProceedingJoinPoint point) throws Throwable {
// 获取方法
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
// 看有没有日志注解
EmLock emLock = method.getAnnotation(EmLock.class);
if (emLock == null) {
return point.proceed();
}
// 获取锁的级别
LockRangeEnum lockRangeEnum = emLock.lockRange();
String uuid = lockRangeEnum == LockRangeEnum.APPLICATION ? appUUID : threadUUID.get();
// 获取锁的key和超时时间
String key = emLock.key();
int timeout = emLock.timeout();
TimeUnit timeUnit = emLock.timeUnit();
// 加锁
boolean lock = redisLock.lock(key, uuid, timeout, timeUnit);
Object proceed = null;
try {
if (lock) {
log.info("获取到锁,继续执行...");
// 继续执行
proceed = point.proceed();
}
} finally {
// 自动释放,则释放锁
if (emLock.autoRelease()) {
redisLock.release(key, uuid);
}
}
return proceed;
}
}
3 使用示例
3.1 使用RedisLock
@Autowired
private RedisLock redisLock;
public void useLock() {
// 定义锁的key
String lockKey = "camera_update_key";
String uuid = UUIDUtils.get();
// 定义超时时间
long timeout = 5;
TimeUnit timeUnit = TimeUnit.SECONDS;
// 加锁
boolean lock = redisLock.lock(lockKey, uuid, timeout, timeUnit);
try {
if (lock) {
log.info("执行...");
} else {
throw new IllegalStateException("未获取到锁,放弃执行");
}
} finally {
// 在finally里面进行解锁
redisLock.release(lockKey, uuid);
}
}
3.2 使用EmLock
@Component
@Slf4j
public class ScheduleTask {
/**
* 用在定时任务方法上,锁的key为test_lock,指定了超时时间为2秒钟
* 锁的级别为默认的应用级别(LockRangeEnum.APPLICATION),在这个如果应用启动了多个容器运行,在只会有一个容器获取到锁,
* 自动释放锁为false,即方法执行完成后,也不会自动释放锁,只有到超时时间了,锁才会释放
*/
@Scheduled(cron = "0 0/1 * * * ? ")
@EmLock(key = "test_lock", timeout = 2, timeUnit = TimeUnit.SECONDS, autoRelease = false)
public void recordUpdateTask() {
log.info("执行任务.......");
}
/**
* 用在普通的方法上,锁的key为method_Lock,指定了超时时间为1分钟,
* 锁的级别为默认的线程级别,在该应用内多个线程执行该方法,则只会有一个线程获取到锁
* 如果启动了多个应用容器,同样多个容器内的所有线程,也只会有一个线程获取到锁
*/
@EmLock(key = "method_Lock", timeout = 1, timeUnit = TimeUnit.MINUTES, lockRange = LockRangeEnum.THREAD)
public void recordUpdate() {
log.info("执行任务2.......");
}
}
4 使用注意
- 使用Redis作为分布式锁的实现,依赖于Redis服务,如果Redis服务无法正常访问,则会导致整个方法无法执行。
- 如果EmLock注解用在定时任务上时,如果应用运行在不同的服务器上,或者不同的docker容器里面时,必须保证运行环境的时间一致。
- 如果设置了定时任务上面的锁,不是自动释放的,则运行环境的时间,相差不大于锁超时时间的时候,也可以保证定时任务,唯一执行。因为在超时时间范围内,某个应用容器持有该锁,其他应用来获取锁时,同样获取不到,方法不会执行。