上一篇 分布式缓存重建并发冲突问题以及zookeeper分布式锁解决方案, 主要讲解了分布式缓存重建冲突原因及利用zookeeper分布式锁解决缓存重建冲突问题,本篇接着上篇,实现上篇思路,带你利用zookeeper代码实现分布式锁解决重建缓存冲突问题。
从上图我们可以看出:
-
缓存主动更新
我们监听kafka中的缓存操作消息队列,当接收到一个商品变更消息后,我们会立即根据源数据服务获取商品最新信息,然后更新到ehcache 和 redis cluster 中,这种情况笔者将之称为缓存主动更新 -
缓存被动重建
当nginx 请求获取商品信息时,发现redis cluster 和 ehcache 中都没有获取到相关商品信息,这时候就需要到源数据服务中拉取商品信息,这时候我们需要同步更新到redis cluster 和 ehcache 中,然后返回nginx 并进行nginx 本地缓存,这种情况笔者将之称为缓存被动重建
那么前面一章中,笔者分析了缓存重建情况
这里笔者在着重讲下,当缓存数据由于个方面因素(如LRU等算法)清理了,这时候缓存主动更新 和 缓存在高并发或者特殊情况下,同时进行时,缓存重建冲突就悲剧的发生了(注:上篇说多个缓存服务实例时,出现分布式缓存重建冲突没错,但是就算不是多缓存实例服务,单个也会发生,只要两者同时发生即可,这里着重补充一下)
上篇讲了可以通过分布式锁方案解决
zookeeper分布式锁的解决逻辑思路
- 变更缓存重建或者空缓存请求重建,更新redis之前,先获取对应商品id的分布式锁
- 拿到分布式锁后,做时间版本比较,如果自己的版本新于redis中的版本,那么就更新,否则就不更新
- 如果拿不到分布式锁,那么就等待,不断轮询等待,直到自己获取到分布式的锁
那下面我们就进入coding 环节,来吧!(注:以下代码层面只针对分布式锁,其他不做介绍,具体其他设计实现会单独剥离讲解)
首先,我们来写个zookeeper 工具类,提供 获取锁 acquireDistributedLock
、释放锁releaseDistributedLock
方法
- 添加zookeeper client 依赖(注:如果你添加了kafka 依赖,这里就不需要单独依赖了)
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
- 创建 zookeeperSession 工具类,代码如下,对应代码都有注释说明,故不做过多解释了
/**
*
* zookeeper 分布式锁工具类
* @author bill
* @date 2017年10月3日 上午11:39:08
*/
public class ZookeeperSession {
private ZooKeeper zookeeper;
//计数器(同步锁),连接信号量,用于控制并发请求时,确保 zookeeper client 与 server 已连接
private static CountDownLatch connectSemaphore = new CountDownLatch(1);
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperSession.class);
public ZookeeperSession(){
try {
// 连接 zookeeper server
this.zookeeper = new ZooKeeper("192.168.0.16:2181,192.168.0.17:2181,192.168.0.18:2181", 50000, new ZookeeperWatcher());
// 等待,保证 client、server连接
connectSemaphore.await();
LOGGER.debug(" zookeeper session established ...");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取分布式锁
* @param productId 商品id
*/
public void acquireDistributedLock(Long productId){
String path = "/product-lock-" + productId;
try {
// 尝试获取分布式锁
zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
LOGGER.debug("success to acquire lock for productId [{}]", productId);
} catch (Exception e) {
// 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止
int count = 0;
while(true){
try {
// 睡眠一下下,为了测试效果,生产环境,可以20ms
Thread.sleep(1000);
// 再次尝试获取分布式锁
zookeeper.create(path, "".getBytes(), Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e2) {
// 如果报 nodeExitsException,说明已经有请求获取了锁,所有当前重复尝试获取锁,知道获取到锁为止
LOGGER.debug("repeat to acquire lock for productId:[{}] - count:[{}] ...", productId, count);
count ++;
continue;
}
LOGGER.debug("success to acquire lock for productId:[{}] after count:[{}] repeat 。。。", productId, count);
break;
}
}
}
/**
* 释放分布式锁
* @param productId 商品id
*/
public void releaseDistributedLock(Long productId){
String path = "/product-lock-" + productId;
try {
// 删除node,释放锁
zookeeper.delete(path, -1);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建 zookeeper session watcher
* @author bill
* @date 2017年10月3日 下午12:05:21
*/
private class ZookeeperWatcher implements Watcher{
@Override
public void process(WatchedEvent evt) {
LOGGER.debug("receive zookeeper watched event: {}", evt.getState());
if(KeeperState.SyncConnected == evt.getState()){
// client、server 已连接 是否等待信号量锁
connectSemaphore.countDown();
}
}
}
/**
* 单例有很多种方式去实现,这里采取绝对线程安全的一种方式
* 静态内部类的方式,去初始化单例
*/
private static class Singleton {
private static ZookeeperSession instance;
static{
instance = new ZookeeperSession();
}
public static ZookeeperSession getInstance(){
return instance;
}
}
/**
* jvm 的机制去保证多线程并发安全
* 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化
*/
public static ZookeeperSession getIntance(){
return Singleton.getInstance();
}
/**
* 初始化单例 zookeeperSession
*/
public static void init(){
getIntance();
}
}
好,工具类写完了呢,我们不要忘了,对它进行初始化,笔者这里就直接在 InitListener
监听器中初始化了,如下图:
那,接下来要干什么呢,自然就是缓存主动更新 或者 缓存被动重建代码中,加入分布式锁啦,让多个请求串行执行,即可
下面笔者先讲 缓存主动更新 中怎么做,既然我们是通过接收kafka 商品变更消息去更新缓存,那么对应的就是在消费kafka 消息的时候先获取分布式锁,得到锁后,对比时间版本,决定是否更新缓存
缓存主动更新
我们找呀找,终于找到了消费kafka 商品变更消息线程 KafkaMessageProcessor
在更新之前先获取锁,得到锁后,先获取redis 中 数据跟 当前商品数据时间版本对比,当前数据比缓存数据更靠后(更新),则更新,相关代码如下:
// 在数据写入redis 缓存之前,先获取 zookeeper 分布式锁,确保缓存重建冲突
ZookeeperSession zkSession = ZookeeperSession.getIntance();
zkSession.acquireDistributedLock(productId);
// 获取到了锁
// 先从redis 中获取当前最新数据
ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productId);
if(null != redisLastProductInfo){
//比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redis
try {
Date date = sdf.parse(productInfo.getModifiedTime());
Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());
if(date.before(redisLastProductInfoDate)){
LOGGER.debug("无需更新 > 现有数据 date:[{}] - before redis 最新版本 date:[{}]", date, redisLastProductInfoDate);
// 无需更新,直接返回
return;
}
} catch (ParseException e) {
e.printStackTrace();
}
LOGGER.debug("current product info date is after redis product info date , to update redis");
}else{
LOGGER.debug("product Info is null, to update redis");
}
/** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- start*/
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/** 此休眠为了延迟,更好的查看打印效果 -----生产环境去掉即可----- end*/
// 更新本地 ehcache 缓存
cacheService.saveProductInfo2LocalCache(productInfo);
LOGGER.debug("获取刚保存到本地缓存的商品信息:[{}]", cacheService.getProductInfoFromLocalCache(productId));
// 更新redis 缓存
cacheService.saveProductInfo2RedisCache(productInfo);
// 释放 zookeeper 分布式锁
zkSession.releaseDistributedLock(productId);
好了,缓存主动更新就完了,其实就这么简单,你懂了没有?
接下来继续,缓存被动重建那就是从http 入手了,请求进来后,先到redis cluster中获取商品数据,发现没有,然后又到本地ehcache 中获取,发现也没有,这时候就到源数据服务中拉取mysql 商品数据,这时候是不是要更新到redis cluster 以及 ehcache 中呢,那是必须的,所以这里就有发生缓存重建冲突的可能。
缓存被动重建
注:这里的核心实现和缓存主动更新差不多,但是处理流程稍微有点不一样
- 创建一个缓存重建队列,提供加入队列、获取队列数据方法
- 创建一个缓存重建队列消费线程,设置商品数据缓存,同时做缓存重建冲突处理
- 请求进来,如果缓存中都没有商品数据,到源数据服务拉取商品数据,然后将商品数据加入缓存重建队列,同时响应http 商品数据
下面直接 coding 吧!
- 创建一个缓存重建队列
RebuildCacheQueue
,这是一个单例类,代码如下:
/**
*
* 重建缓存的内存队列
* @author bill
* @date 2017年10月3日 上午11:39:48
*/
public class RebuildCacheQueue {
/**
* 内存队列
*/
private ArrayBlockingQueue<ProductInfo> queue = new ArrayBlockingQueue<ProductInfo>(1000);
/**
* 将商品信息对象加入队列
* @param productInfo 商品信息对象
*/
public void putProductInfo(ProductInfo productInfo){
try {
queue.put(productInfo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 从队列中获取商品信息对象
* @return 商品信息对象
*/
public ProductInfo takeProductInfo(){
try {
return queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
/**
* 单例有很多种方式去实现,这里采取绝对线程安全的一种方式
* 静态内部类的方式,去初始化单例
*/
private static class Singleton {
private static RebuildCacheQueue instance;
static {
instance = new RebuildCacheQueue();
}
private static RebuildCacheQueue getInstance(){
return instance;
}
}
/**
* jvm 的机制去保证多线程并发安全
* 内部类的初始化,一定只会发生一次,不管多少个线程并发去初始化
*/
public static RebuildCacheQueue getInstance(){
return Singleton.getInstance();
}
}
- 创建一个缓存重建队列消费线程
RebuilCacheThread
,进行重建缓存队列消费,没什么好说的了,上面流程已经讲了很清楚了,直接看代码吧:
/**
*
* 重建缓存队列消费线程
* @author bill
* @date 2017年10月3日 上午11:52:40
*/
public class RebuilCacheThread implements Runnable{
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProcessor.class);
private static final SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
@Override
public void run() {
// 获取重建缓存队列实例
RebuildCacheQueue rebuildCacheQueue = RebuildCacheQueue.getInstance();
// 获取zookeeperSession 实例
ZookeeperSession zkSession = ZookeeperSession.getIntance();
CacheService cacheService = SpringContext.applicationContext.getBean(CacheService.class);
while(true){
ProductInfo productInfo = rebuildCacheQueue.takeProductInfo();
// 获取zookeeper分布式锁
zkSession.acquireDistributedLock(productInfo.getId());
// 获取到了锁
// 先从redis 中获取当前最新数据
ProductInfo redisLastProductInfo = cacheService.getProductInfoFromRedisCache(productInfo.getId());
if(null != redisLastProductInfo){
//比较更新时间,redis中的时间 与现有数据比较.redis product info 比现有数据小则更新,否则不更新redis
try {
Date date = sdf.parse(productInfo.getModifiedTime());
Date redisLastProductInfoDate = sdf.parse(redisLastProductInfo.getModifiedTime());
if(date.before(redisLastProductInfoDate)){
LOGGER.debug("无需更新 > 现有数据 date:[{}] - before redis 最新版本 date:[{}]", date, redisLastProductInfoDate);
// 无需更新,直接返回
continue;
}
} catch (ParseException e) {
e.printStackTrace();
}
LOGGER.debug("current product info date is after redis product info date , to update redis");
}else{
LOGGER.debug("product Info is null, to update redis");
}
// 更新本地 ehcache 缓存
cacheService.saveProductInfo2LocalCache(productInfo);
// redis 缓存
cacheService.saveProductInfo2RedisCache(productInfo);
// 释放 zookeeper 分布式锁
zkSession.releaseDistributedLock(productInfo.getId());
}
}
}
- http 请求时,无缓存数据,重建缓存,加入重建缓存队列
/**
* 获取商品信息
* @param productId 商品id
* @return 商品信息
*/
@GetMapping("/getProductInfo")
public ProductInfo getProductInfo(Long productId){
ProductInfo productInfo = null;
try {
productInfo = cacheService.getProductInfoFromRedisCache(productId);
LOGGER.debug("从redis中获取缓存,商品信息: {}", productInfo);
if(productInfo == null){
productInfo = cacheService.getProductInfoFromLocalCache(productId);
LOGGER.debug("从ehcache中获取缓存,商品信息: {}", productInfo);
}
if(productInfo == null){
//走 数据源重新拉数据并重建缓存,注意这里笔者就直接写死数据了
String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:01\"}";
productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
// 将数据推送到一个内存队列中消费(重建缓存的内存队列)
RebuildCacheQueue.getInstance().putProductInfo(productInfo);
}
} catch (Exception e) {}
return productInfo;
}
好了,缓存主动更新 和 缓存被动重建三部曲就完了,可以很清楚的看到,代码量不多,主要集中在更新缓存前获取分布式锁即可,结合缓存重建分析图
接下来要干啥呢,有个很重要的环节没有做,那就是测试,不到黄河不死心,看不到效果,我也不信,那我们就拿出来遛遛吧
代码测试
测试数据
缓存主动更新
kafka 生产数据:
{"serviceId":"productInfoService","productId":10}
从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:00):
String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:00\"}";
缓存被动重建
http 请求:
http://localhost:81/getProductInfo?productId=10
从数据库中拉取数据为(注意modifiedTime时间:2017-10-3 12:30:01):
String productInfoJSON = "{\"id\": 10, \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-10-3 12:30:01\"}";
准备环境:
启动 redis cluster
启动 zookeeper 集群
启动 kafka 集群
启动 缓存服务(缓存项目服务)
注:由于笔者使用Windows 系统,可能避免windows 中 centos ip与主机名映射不了,所以作以下配置,推荐使用 SwitchHosts 的工具,很方便
C:\Windows\System32\drivers\etc\hosts
* ################################### 配置本地hosts #################### 很重要 ######################
* # 缓存架构方案
* 192.168.0.16 my-cache1
* 192.168.0.17 my-cache2
* 192.168.0.18 my-cache3
* ################################### 配置本地hosts #################### 很重要 ######################
利用kafka-console-producer.sh 生产一条商品变更消息,并回车(由于代码中更新时为了演示效果,休眠了10s,把握时间)
cd /usr/local/kafka && bin/kafka-console-producer.sh --broker-list my-cache1:9092,my-cache2:9092,my-cache3:9092 --topic cache-message
商品消息如下:
{"serviceId":"productInfoService","productId":10}
浏览器 http 请求
测试效果
期望效果,先看到控制台打印kafka 的消费日志,等kafka 消费线程释放分布式锁后,才能看到缓存被动重建 获取分布式锁,并更新redis,并且返回modifiedTime:2017-10-3 12:30:01 的商品信息,同时redis cluster 中的数据也必须是 modifiedTime:2017-10-3 12:30:01 的商品信息,表示测试通过,这里都说得我有点迫不及待了,来吧,试试
好了,是不是很激动呀,今天的讲解就到这里哈,赶紧去试试吧!
注:这里只是以商品信息为例来讲解利用分布式锁解决缓存重建冲突,其他如商铺信息等也是同理,这里希望告诉你的是方法,就不一一演示了。
以上就是本章内容,如有不对的地方,请多多指教,谢谢!
为了方便有需要的人,本系列全部软件都在 https://pan.baidu.com/s/1qYsJZfY
下章预告:主要讲解 简单看 storm
代码地址附上:https://github.com/bill5/cache-project/tree/master/cache-cache
作者:逐暗者 (转载请注明出处)