Java教程:如何深入理解Redis分布式锁?

相信很多同学都听说过分布式锁,但也仅仅停留在概念的理解上,这篇文章会从分布式锁的应用场景讲起,从实现的角度上深度剖析redis如何实现分布式锁。

一、超卖问题

我们先来看超卖的概念: 当宝贝库存接近0时,如果多个买家同时付款购买此宝贝,或者店铺后台在架数量大于仓库实际数量,将会出现超卖现象。超卖现象本质上就是买到了比仓库中数量更多的宝贝。

本文主要解决超卖问题的第一种,同时多人购买宝贝时,造成超卖。

测试代码

那么超卖问题是如何产生的呢?我们准备一段代码进行测试:

@AutowiredprivateStringRedisTemplate stringRedisTemplate;/**

    * 第一种实现,进程内就存在线程安全问题

    * 可以只启动一个进程测试

    */@RequestMapping("/deduct_stock1")publicvoiddeductStock1(){        String stock = stringRedisTemplate.opsForValue().get("stock");intstockNum = Integer.parseInt(stock);if(stockNum >0){//设置库存减1intrealStock = stockNum -1;            stringRedisTemplate.opsForValue().set("stock",realStock +"");            System.out.println("设置库存"+ realStock);        }else{            System.out.println("库存不足");        }    }

这段代码中,使用redis先获取库存数量(当然实际场景中不会只保存一个全局库存数,应该根据每一个商品单元(sku)保存一份库存数)。

String stock = stringRedisTemplate.opsForValue().get("stock");intstockNum = Integer.parseInt(stock);

接下来,判断库存数是否大于0:

如果大于0,将库存数减一,通过set命令,写回redis

这里没有使用redis的decrement命令,因为此命令在redis单线程模型下是线程安全的,而为了可以模拟线程不安全的情况将其拆成三步操作

//设置库存减1intrealStock = stockNum -1;  stringRedisTemplate.opsForValue().set("stock",realStock +"");  System.out.println("设置库存"+ realStock);

如果小于等于0,提示库存不足

JMeter测试

通过JMeter进行并发测试,看下会不会出现超卖的问题:

1.启动tomcat

这种情况下,只需要启动一个tomcat就会出现超卖。我们先启动一个tomcat在8080端口上。


2.下载JMeter

Apache JMeter是Apache组织开发的基于Java的压力测试工具。 从官网上下载即可:

https://jmeter.apache.org/download_jmeter.cgi 下载完之后解压,运行bin目录下的jmeter.bat,显示如下界面:


如果嫌字体太小,可以选择放大:


3.配置JMeter

在Test Plan上点击右键,创建线程组(Thread Group)


配置一下具体参数:


Number of Threads 同时并发线程数

Ramp-Up Period(in-seconds) 代表隔多长时间执行,0代表同时并发。假设线程数为100, 估计的点击率为每秒10次, 那么估计的理想ramp-up period 就是 100/10 = 10 秒

Loop Count 循环次数

这里给出500是为了直接测试并发500抢,看看能不能正好把500个货物抢完。

添加Http请求:


添加请求URL:


添加聚合结果,用来显示整体的运行情况:


到此为止JMeter的配置结束。

4.设置库存量

启动redis-server,使用redis-client连接:


把库存数设置为500。

5.开始测试

点击运行按钮,启动测试:


首先我们看到聚合报告里输出的结果:


错误率0%,样本数500,证明500个请求都已经执行,但是发现控制台输出如下:


很显然,一份商品都被卖了多次,这显然是不合理的。

原因分析

现在我们只启动了一个tomcat,在单jvm进程的情况下,tomcat会使用线程池接收请求:


而由于每个线程可能同时获取到库存量,所以库存量在两个线程中显示的都是500,然后两个线程就继续进行扣减库存操作,得出499写回redis中,在这个过程中,显然存在线程安全的问题。同一个商品被卖出了2份,超卖问题就出现了。

二、加锁优化

synchronized锁

要保证单jvm中线程安全,最简单直接的方式就是添加synchronized关键字,那么这样行不行呢,我们来做一个测试:

/**

    * 第二种实现,使用synchronized加锁

    * 可以只启动一个进程测试

    */@RequestMapping("/deduct_stock2")publicvoiddeductStock2(){        synchronized (this){            String stock = stringRedisTemplate.opsForValue().get("stock");intstockNum = Integer.parseInt(stock);if(stockNum >0){//设置库存减1intrealStock = stockNum -1;                stringRedisTemplate.opsForValue().set("stock",realStock +"");                System.out.println("设置库存"+ realStock);            }else{                System.out.println("库存不足");            }        }    }

在进行扣减库存前,先通过synchronized关键字,对资源加锁,这样就只有一个线程能进入到扣减库存的代码块中。来测试一下:

重置库存

setstock 500

修改接口地址


测试


可以看到,库存被扣减为0,并且没有出现超卖的情况(设置了500库存,并且500个人抢,正好抢完)。 但是这种方案显然是不行的,在生产环境上如果部署多个tomcat实例,那么就会出现如下情况:


多个进程无法共享jvm内存中的锁,所以会出现多把锁,这种情况下也会出现超卖问题。

三、分布式锁的实现

多Tomcat实例下的超卖演示

接下来我们演示一下如何在多个Tomcat情况下,演示超卖的问题:

1.启动两个tomcat服务

在IDEA中配置两个spring boot的启动项,使用vm参数指定不同的端口号

-Dserver.port=8080


2.配置nginx

编写

~/nginx_redis/conf/nginx.conf如下:

usernginx;worker_processes1;error_log/var/log/nginx/error.logwarn;pid/var/run/nginx.pid;events{worker_connections1024;}http{include/etc/nginx/mime.types;default_typeapplication/octet-stream;log_formatmain'$remote_addr-$remote_user[$time_local] "$request" ''$status$body_bytes_sent"$http_referer" ''"$http_user_agent" "$http_x_forwarded_for"';upstreamredislock{server192.168.226.1:8080weight=1;server192.168.226.1:8081weight=1;}server{listen80;server_namelocalhost;location/{roothtml;proxy_passhttp://redislock;    }}access_log/var/log/nginx/access.log  main;sendfileon;#tcp_nopush    on;keepalive_timeout65;#gzip  on;include/etc/nginx/conf.d/*.conf;}

192.168.226.1这是我宿主机的IP

准备一个虚拟机(也可以使用windows下的nginx),使用docker启动nginx:

docker pull nginxdocker run -di -p10085:80--name nginx-redis-hc  -v ~/nginx_redis/html:/usr/share/nginx/html-v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf  -v ~/nginx_redis/logs:/var/log/nginxnginx

在宿主机下使用虚拟机的IP地址:10085访问nginx,如果出现如下页面就代表成功:


3.测试

修改接口地址为nginx:


运行查看两个tomcat的控制台:

tomcat1


tomcat2


没有将库存清空,证明存在超卖问题。

手动实现分布式锁

使用redis手动实现分布式锁,需要用到命令setnx。先来介绍一下setnx:

SETNX key value[]

可用版本: >= 1.0.0

时间复杂度: O(1)

只在键 key 不存在的情况下, 将键 key 的值设置为 value 。

若键 key 已经存在, 则 SETNX 命令不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

返回值

命令在设置成功时返回 1 , 设置失败时返回 0 。

代码示例

redis>EXISTS job# job 不存在# job 不存在(integer) 0redis>SETNX job"programmer"# job 设置成功(integer) 1redis>SETNX job"code-farmer"# 尝试覆盖 job ,失败(integer) 0redis>GET job# 没有被覆盖

使用redis构建分布式锁流程如下:


image.png

线程1申请锁(setnx),拿到了锁。

线程2申请锁,由于线程1已经拥有了锁,setnx返回0失败,这一步用户操作会失败。

线程1执行扣减库存操作并释放锁。

线程2再次申请锁,获取到锁并执行扣减库存,然后释放锁。

注意这里线程没有拿到锁,如果不尝试while(true)重新获取锁,这个操作就直接失败了。

代码实现

/**

    * 第三种实现,使用redis中的setIfAbsent(setnx命令)实现分布式锁

    */@RequestMapping("/deduct_stock3")publicvoiddeductStock3(){//在获取到锁的时候,给锁分配一个idString opId = UUID.randomUUID().toString();        Boolean stockLock = stringRedisTemplate                .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);if(stockLock){try{                String stock = stringRedisTemplate.opsForValue().get("stock");intstockNum = Integer.parseInt(stock);if(stockNum >0){//设置库存减1intrealStock = stockNum -1;                    stringRedisTemplate.opsForValue().set("stock",realStock +"");                    System.out.println("设置库存"+ realStock);                }else{                    System.out.println("库存不足");                }            }catch(Exception e){                e.printStackTrace();            }finally{if(opId.equals(stringRedisTemplate                        .opsForValue().get("stockLock"))){                    stringRedisTemplate.delete("stockLock");                }            }        }    }

测试略过,这里有几个知识点需要说明

setIfAbsent设置超时

如果setIfAbsent不设置超时时间,假设线程执行业务代码时间时死锁或者其他原因导致长时间不释放,那么会影响其他线程获取到锁,这个时候整体业务就会出现不可用。

BooleanstockLock = stringRedisTemplate                .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);

设置超时时间为30秒,该时间一般大于业务执行的最大时间。

每次获取到锁,设置唯一ID

考虑这样的场景


线程1获取锁扣减库存,但是由于操作不当,长时间卡住,这样会触发超时时间锁被释放。

线程2获取到锁,扣减库存。

线程1的代码抛出异常,执行finally释放锁,但是释放的是进程B的锁。 解决方案就是在加锁前生成UUID,释放的时候校验UUID是否正确,如果不正确,说明加锁线程不是当前线程。

使用Redisson实现分布式锁

setnx虽好,但是实现起来毕竟太过麻烦,一不小心就可能陷入并发编程的陷阱中,那么有没有更加简单的实现方式呢?答案就是redisson。

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】 Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

总而言之,redisson提供了一系列较为完善的工具类,其中就包含了分布式锁。用redisson实现分布式锁的流程极为简单。

引入依赖

org.redissonredisson3.14.0

创建Redisson实例

@BeanpublicRedissonClientredisson(){// 1. Create config objectConfig config =newConfig();        config.useSingleServer().setAddress("redis://127.0.0.1:6379");//        config.useClusterServers()//                // use "rediss://" for SSL connection//                .addNodeAddress("redis://127.0.0.1:7181");returnRedisson.create(config);    }

编写分布式锁代码

@AutowiredprivateRedissonClient redissonClient;/**

    * 第四种实现,使用redisson实现

    */@RequestMapping("/deduct_stock4")publicvoiddeductStock4(){        RLocklock= redissonClient.getLock("redisson:stockLock");try{//加锁lock.lock();            String stock = stringRedisTemplate.opsForValue().get("stock");intstockNum = Integer.parseInt(stock);if(stockNum >0){//设置库存减1intrealStock = stockNum -1;                stringRedisTemplate.opsForValue().set("stock",realStock +"");                System.out.println("设置库存"+ realStock);            }else{                System.out.println("库存不足");            }        }catch(Exception e){            e.printStackTrace();        }finally{lock.unlock();        }    }

其中加锁代码基本与进程内加锁一致,就不再详细解读,读者自行实践即可。

Redisson分布式锁原理

Redisson分布式锁的主要原理非常简单,利用了lua脚本的原子性。 在分布式环境下产生并发问题的主要原因是三个操作并不是原子操作:

获取库存

扣减库存

写入库存 那么如果我们把三个操作合并为一个操作,在默认单线程的Redis中运行,是不会产生并发问题的。源码如下:

RFuturetryLockInnerAsync(longwaitTime,longleaseTime, TimeUnit unit,longthreadId, RedisStrictCommand command){        internalLockLeaseTime = unit.toMillis(leaseTime);returnevalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+"return redis.call('pttl', KEYS[1]);",                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));    }

这一段源码中,redisson利用了lua脚本的原子性,校验key是否存在,如果不存在就创建key并利用incrby加一操作(这步操作主要是为了实现可重入性)。redisson实现的分布式锁具备如下特性:

锁失效

锁续租

执行时间长的锁快要到期时会自动续租

可重入

操作原子性

锁续租原理

使用如下代码进行测试锁续租的情况

@Testvoidtest() throws InterruptedException{    RLock testlock1111 = redissonClient.getLock("testlock");    testlock1111.lock();try{        Thread thread =newThread(() -> {while(true){                Long testlock = redisTemplate.getExpire("testlock");                System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) +" ttl:"+ testlock);try{                    Thread.sleep(1000L);                }catch(InterruptedException e) {                    e.printStackTrace();                }            }        });        thread.start();        thread.join();    }finally{if(testlock1111.isHeldByCurrentThread()){            testlock1111.unlock();        }    }}

我们会发现,每隔10秒会自动续租一次,保证锁不被释放。


那么这种续租的行为是如何实现的呢?考虑这种情况:如果线程加锁之后,进程宕机,线程无法执行解锁代码,那么这个锁就无法得到释放(注意,不是加锁线程不允许乱解锁),为了避免这种情况的发生,锁都会设置一个过期时间。比如使用lock无参命令会默认设置30秒的过期时间。那么30秒之后呢?如果线程还在工作,自动释放依然会产生线程安全的问题。所以Redisson使用了watch dog看门狗机制来实现自动续租。

核心代码及注释:

privateRFuturetryAcquireAsync(longwaitTime,longleaseTime, TimeUnit unit,longthreadId){    RFuture ttlRemainingFuture;//lock()无参方法leaseTime为-1,所以进else分值if(leaseTime >0) {        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);    }else{//通过lua脚本加锁ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);    }    CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 异步方法,等到加锁成功会回调,第一次加锁ttlRemaining为空,leaseTime为-1if(ttlRemaining ==null) {if(leaseTime >0) {                internalLockLeaseTime = unit.toMillis(leaseTime);            }else{//设置延时任务scheduleExpirationRenewal(threadId);            }        }returnttlRemaining;    });returnnewCompletableFutureWrapper<>(f);}

接下来分析scheduleExpirationRenewal的过程:

privatevoid renewExpiration() {    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if(ee ==null) {return;    }//创建一个延迟任务Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublicvoid run(Timeout timeout) throws Exception {            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if(ent ==null) {return;            }LongthreadId = ent.getFirstThreadId();if(threadId ==null) {return;            }//执行lua脚本进行续租CompletionStage future = renewExpirationAsync(threadId);            future.whenComplete((res, e) -> {if(e !=null) {                    log.error("Can't update lock "+ getRawName() +" expiration", e);                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;                }//执行lua续租,锁还在就续租,锁不在返回false就取消续租的行为if(res) {// reschedule itselfrenewExpiration();                }else{                    cancelExpirationRenewal(null);                }            });        }    }, internalLockLeaseTime /3, TimeUnit.MILLISECONDS);//internalLockLeaseTime默认值30,所以每10秒会续租一次,续租到30秒ee.setTimeout(task);}

其中,renewExpirationAsync执行的lua脚本如下:

protectedCompletionStagerenewExpirationAsync(longthreadId){returnevalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return 1; "+"end; "+"return 0;",            Collections.singletonList(getRawName()),            internalLockLeaseTime, getLockName(threadId));}

判断hash中是否存在锁,如果存在就设置过期时间为30秒,返回1。如果不存在就返回0。

总结

本文介绍了超卖问题产生的原因:操作不具备原子性,同时提出了集中解决思路。

synchronized锁,无法保证多实例下的线程安全

setnx手动实现,坑很多、代码较为复杂

redisson实现,能够保证多实例下线程安全,代码简单可靠

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容