一、场景
高并发下的商品秒杀,转账安全等等。
在集群的条件下,可能有存在这种业务需求:
1、查看当前商品A的库存
2、如果商品A库存>0,商品A的库存-1
即需要同时读和写。如果服务器只有一台那么直接使用synchronized或者Lock就行,但是如果是集群就只能靠分布式锁或者数据库自带的排他锁来解决了。
二、方案
1、基于数据库建立一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
2、利用数据库的排他锁 select for update
3、基于Redis的实现方式:SETNX
SET命令
Redis分布式锁的正确实现方式
4、基于ZooKeeper的实现方式:临时顺序节点
(1)创建一个目录mylock;
(2)线程A想获取锁就在mylock目录下创建临时顺序节点;
(3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
三、性能分析
数据库的性能最差
Redis分布式锁,必须使用者自己间隔时间轮询去尝试加锁,当锁被释放后,存在多线程去争抢锁,并且可能每次间隔时间去尝试锁的时候,都不成功,对性能浪费很大。
Zookeeper分布锁,首先创建临时顺序节点,如果需要不是最小的节点则添加监听后等待通知或者超时,当有锁释放,无须争抢,按照节点顺序,依次通知使用者。
四、redis分布式锁工具类
public class RedisLockUtil {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";//[EX seconds] [PX milliseconds]
private static final Long RELEASE_SUCCESS = 1L;
/**
* 尝试获取分布式锁
*
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁
*
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}