场景 & 原理
分布式场景中,经常需要保证在多个jvm进程中,在同一时刻只有一个线程对某一条数据进行操作。
例如:资金问题,甲用户同时支付了两笔订单,第一笔订单请求进入了A节点,第二笔订单请求进入了B节点。
在这个情况下,我们通常不允许同一时刻有个线程对资金进行操作,毕竟这太危险了。因此需要分布式锁,锁住其他线程的操作(类似单JVM下的synchronized)。
分布式锁 可见性如下图:
分布式原理如下图:
代码
上代码。。。
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.log4j.Log4j2;
import java.util.concurrent.CountDownLatch;
/**
* 甲用户发起多笔支付订单,分布式保证不并发扣款。
*
* @author eblly
* @since 2019/7/28
*/
@Log4j2
public class RedisLockTest {
static String redisKey = "balance:jia";
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1";
int port = 6379;
RedisClient redisUtil = RedisClient.builder()
.setHost(host)
.setPort(port)
.build();
int size = 10;
CountDownLatch countDownLatch = new CountDownLatch(size);
Balance balance = new Balance();
balance.setUserName("甲用户");
balance.setBalance(100);
for (int i = 1; i <= size; i++) {
String hodler = "running" + i;
BalanceRunnable runnable = new BalanceRunnable(balance, redisUtil, hodler, i, countDownLatch);
Thread thread = new Thread(runnable);
thread.start();
}
countDownLatch.await();
log.info("最后剩余金额, balance: {}", balance);
}
/**
*
*/
static class BalanceRunnable implements Runnable {
private String holder;
private RedisClient redisUtil;
private Balance balance;
private int fee;
private CountDownLatch countDownLatch;
public BalanceRunnable(Balance balance, RedisClient redisUtil, String threadName, int fee,
CountDownLatch countDownLatch) {
this.balance = balance;
this.redisUtil = redisUtil;
this.holder = threadName;
this.fee = fee;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
while (true) {
String value = redisUtil.get(redisKey);
boolean isPay = false;
if (value != null) {
log.info("用户资金被锁住");
} else {
if (!isPay && lock(redisKey, holder)) {
// 扣款成功
balance.setBalance(balance.getBalance() - fee);
log.info("剩余balance: {} , fee: {}", balance.getBalance(), fee);
isPay = true;
}
}
// 已扣款, 尝试释放锁
if (isPay && unlock(redisKey, holder)) {
countDownLatch.countDown();
// 释放锁并退出
break;
}
// 等待锁
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 锁资源
*
* @param key
* @param value
* @return
*/
public boolean lock(String key, String value) {
String valueRedis = redisUtil.get(key);
// 判断是否获取 可重入锁
if (valueRedis != null) {
if (value.equals(valueRedis)) {
return true;
} else {
return false;
}
}
if (redisUtil.setnx(key, value) > 0) {
return true;
}
return false;
}
/**
* 释放锁
*
* @param key
* @param value
* @return
*/
public boolean unlock(String key, String value) {
String valueRedis = redisUtil.get(key);
if (valueRedis == null) {
return true;
}
// 值不相等,不是锁的持有者
if (!valueRedis.equals(value)) {
return false;
}
redisUtil.del(key);
return true;
}
}
}
@Setter
@Getter
@ToString
class Balance {
private String userName;
private Integer balance;
}
运行结果:
==> 剩余balance: 97 , fee: 3
==> 剩余balance: 90 , fee: 7
==> 用户资金被锁住
==> 剩余balance: 80 , fee: 10
==> 剩余balance: 79 , fee: 1
==> 剩余balance: 70 , fee: 9
==> 剩余balance: 64 , fee: 6
==> 剩余balance: 56 , fee: 8
==> 剩余balance: 54 , fee: 2
==> 剩余balance: 49 , fee: 5
==> 用户资金被锁住
==> 剩余balance: 45 , fee: 4
==> 最后剩余金额, balance: Balance(userName=甲用户, balance=45)
版本2
RedisUtil源码 https://gist.github.com/eblly/2b11f05d027ecb678d6d410bf7138daa
上面的版本存在锁超时没有释放,会导致其他节点的操作一直阻塞。
因此,需要改进:
- 加时间超时
- 使用lua作为原子操作
import lombok.extern.log4j.Log4j2;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
* 甲用户发起多笔支付订单,分布式保证不并发扣款。
*
* @author eblly
* @since 2019/7/28
*/
@Log4j2
public class RedisLockTest_v2 {
private static String redisKey = "balance:jia";
private static Queue<Integer> feeQueue = new ConcurrentLinkedQueue<>();
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1";
int port = 6379;
RedisClient redisClient = RedisClient.builder()
.setHost(host)
.setPort(port)
.build();
int size = 100;
int balanceSize = 100000;
CountDownLatch countDownLatch = new CountDownLatch(size);
Balance balance = new Balance();
balance.setUserName("甲用户");
balance.setBalance(balanceSize);
// TODO 加入cycri barr , 保证所有线程一起开始
for (int i = 1; i <= size; i++) {
String hodler = "running" + i;
BalanceRunnable runnable = new BalanceRunnable(balance, redisClient, hodler, i, countDownLatch);
Thread thread = new Thread(runnable);
thread.start();
}
countDownLatch.await();
// log.info("sizeQueue: {}", feeQueue);
int feeTotal = 0;
for (Integer fee : feeQueue) {
feeTotal += fee;
}
int residue = balanceSize - feeTotal;
log.info("==========> 线程数: {}", size);
log.info("最后剩余金额, balance: {} , residue: {}", balance, residue);
if (balance.getBalance() == residue) {
log.info("===========> 正常");
} else {
log.error("===========> 不正常");
}
}
/**
*
*/
static class BalanceRunnable implements Runnable {
private String holder;
private RedisClient redisClient;
private Balance balance;
private int fee;
private CountDownLatch countDownLatch;
public BalanceRunnable(Balance balance, RedisClient redisClient, String threadName, int fee,
CountDownLatch countDownLatch) {
this.balance = balance;
this.redisClient = redisClient;
this.holder = threadName;
this.fee = fee;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
while (true) {
String value = redisClient.getLock(redisKey);
if (value != null && !holder.equals(value)) {
log.info("用户资金被锁住, 锁: {}", value);
} else {
if (redisClient.lock(redisKey, holder)) {
// 扣款成功
balance.setBalance(balance.getBalance() - fee);
feeQueue.add(fee);
log.info("剩余balance: {} , fee: {}", balance.getBalance(), fee);
// 已扣款, 尝试释放锁
if (redisClient.unlock(redisKey, holder)) {
countDownLatch.countDown();
// 释放锁并退出
log.info("释放锁并退出");
break;
}
}
}
// 等待锁释放
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
锁操作代码
/**
* 获取锁
* @param key
* @param <T>
* @return
*/
public <T> T getLock(Object key) {
Jedis jedis = getJedis();
try {
return (T) jedis.get(String.valueOf(key));
} finally {close(jedis);}
}
/**
* 锁资源
*
* @param key
* @param value
* @return
*/
public boolean lock(String key, String value) {
String expire = "30";
String script = " if redis.call('get',KEYS[1]) then " +
" return redis.call('get',KEYS[1]) " +
" else " +
" return redis.call('setex',KEYS[1],ARGV[2],ARGV[1]) " +
" end";
Object eval = eval(script, Arrays.asList(key), Arrays.asList(value, expire));
log.debug("eval: {}", eval);
if ("OK".equals(eval)) {
log.debug("===========> OK");
return true;
}
return false;
}
/**
* 释放锁
*
* @param key
* @param value
* @return
*/
public boolean unlock(String key, String value) {
String script = " if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else " +
" return 0 end ";
String eval = String.valueOf(eval(script, Arrays.asList(key), Arrays.asList(value)));
log.debug("eval: {}", eval);
if ("1".equals(eval)) {
log.debug("成功释放锁");
return true;
}
return false;
}
RedisUtil代码在 https://www.jianshu.com/p/6605f9f34ed0