前言
Redisson的功能非常强大,下面摘抄官网上的描述:
Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能,还在此基础上融入了更高级的应用方案,不但将原生的Redis Hash,List,Set,String,Geo,HyperLogLog等数据结构封装为Java里大家最熟悉的映射(Map),列表(List),集(Set),通用对象桶(Object Bucket),地理空间对象桶(Geospatial Bucket),基数估计算法(HyperLogLog)等结构,在这基础上还提供了分布式的多值映射(Multimap),本地缓存映射(LocalCachedMap),有序集(SortedSet),计分排序集(ScoredSortedSet),字典排序集(LexSortedSet),列队(Queue),阻塞队列(Blocking Queue),有界阻塞列队(Bounded Blocking Queue),双端队列(Deque),阻塞双端列队(Blocking Deque),阻塞公平列队(Blocking Fair Queue),延迟列队(Delayed Queue),布隆过滤器(Bloom Filter),原子整长形(AtomicLong),原子双精度浮点数(AtomicDouble),BitSet等Redis原本没有的分布式数据结构。不仅如此,Redisson还实现了Redis文档中提到像分布式锁Lock这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock),读写锁(ReadWriteLock),公平锁(Fair Lock),红锁(RedLock),信号量(Semaphore),可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)这些实际当中对多线程高并发应用至关重要的基本部件。正是通过实现基于Redis的高阶应用方案,使Redisson成为构建分布式系统的重要工具。
在提供这些工具的过程当中,Redisson广泛的使用了承载于Redis订阅发布功能之上的分布式话题(Topic)功能。使得即便是在复杂的分布式环境下,Redisson的各个实例仍然具有能够保持相互沟通的能力。在以这为前提下,结合了自身独有的功能完善的分布式工具,Redisson进而提供了像分布式远程服务(Remote Service),分布式执行服务(Executor Service)和分布式调度任务服务(Scheduler Service)这样适用于不同场景的分布式服务。使得Redisson成为了一个基于Redis的Java中间件(Middleware)。
先入门学习一下redisson的使用。
Redisson的配置
redisson提供了文件方式配置和程序方式配置,支持redis单点,主从,哨兵,集群模式,以redis的cluster模式为例,使用基于文件方式配置,首先在resource目录下定义了redis.yml配置文件:
---
clusterServersConfig:
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 5000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: 123456
subscriptionsPerConnection: 5 #单个连接最大订阅数量
slaveSubscriptionConnectionMinimumIdleSize: 1
slaveSubscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 32
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 32
masterConnectionPoolSize: 64
readMode: "SLAVE"
nodeAddresses:
- "redis://10.110.27.139:6379"
- "redis://10.110.27.139:6380"
- "redis://10.110.27.139:6381"
- "redis://10.110.27.138:6379"
- "redis://10.110.27.138:6380"
- "redis://10.110.27.138:6381"
scanInterval: 1000
transportMode: NIO
编写配置类
@Configuration
public class InitializingRedis{
@Bean
public RedissonClient getRedissonClient() throws IOException {
ResourceLoader loader = new DefaultResourceLoader();
Resource resource = loader.getResource("redis.yml");
Config config = Config.fromYAML(resource.getInputStream());
config.useClusterServers();
return Redisson.create(config);
}
}
依赖注入RedisClient即可以使用了。值得一提的是Redisson集成了Spring Session会话管理,那么需要将@Configuration 换成 @EnableRedissonHttpSession
同时定义一个会话初始化器即可:
public class SessionInitializer extends AbstractHttpSessionApplicationInitializer {
public SessionInitializer() {
super(InitializingRedis.class);
}
}
Redisson的使用
RedissonClient是线程安全的,由于其内部是通过Netty通信,所以除了同步执行方式,也支持异步执行。同步我们使用RedissonClient,异步使用RedissonReactiveClient.
分布式对象
- 通用对象桶
我们可以使用RBucket来存放任意类型的对象:
RedissonClient client = Redisson.create(config);
RBucket<Object> bucket = client.getBucket("city");
bucket.set("nanjing");
Object o = bucket.get();
System.out.println(o.getClass());
System.out.println(o);
代码输出
class java.lang.String
nanjing
我们登陆redis查看结果如下:
10.150.27.139:6380> get city
"\xfc\ananjing"
10.150.27.139:6380> type city
string
10.150.27.139:6380> ttl city
(integer) -1
发现get city 多了 \xfc\a,这是因为redisson默认使用的Jackson JSON做的数据序列化,我们可以使用StringCodec作为编码:
RedissonClient client = Redisson.create(config);
RBucket<Object> bucket = client.getBucket("city", new StringCodec("utf-8"));
bucket.set("nanjing");
再在服务器上看就是get city > "nanjing" 了。Redisson提供了非常丰富的编码,比如SerializationCodec(JDK序列化编码),FstCodec(10倍于JDK序列化性能而且100%兼容的编码),LongCodec(纯整长型数字编码),ByteArrayCodec(字节数组编码),AvroJacksonCodec(二进制的JSON编码)。
//java对象
RBucket<Object> bucket = client.getBucket("city");
City city = new City(); //对象必须实现序列化接口
city.name = "hangzhou";
city.province = "zhejiang";
bucket.set(city);
City c1 = (City)bucket.get();
System.out.println(c1.province);
查看服务器上的数据类型
10.150.27.139:6380> get city
"\x00\x01\x04City\xfc\bhangzhou\xfc\bzhejiang\x00"
10.150.27.139:6380> type city
string
发现使用通用对象桶都是以String的方式存入到redis中的。
Redisson还提供了地理位置桶RGeo和位向量RBitSet用于位置空间的计算。
- 原子长整型与双精度浮点
我们有时候需要一个全局的计数器,那么就可以使用原子长整型。
RedissonClient client = Redisson.create(config);
RAtomicLong count = client.getAtomicLong("count");
long l = count.incrementAndGet();
System.out.println(l);
RAtomicLong的用法和juc下的AtomicLong是一样的。在jdk8中,增加了LongAdder,该类在高并发的环境下性能更优于RAtomicLong,Redisson同样也有该类的实现RLongAdder count = client.getLongAdder("count");
在java中并没有提供AtomicDouble,Redisson为我们提供了:
RAtomicDouble d = client.getAtomicDouble("double");
我们就可以使用该类存储或计算浮点数据。
- 话题(订阅分发)
发布内容代码:
RedissonClient client = Redisson.create(config);
RTopic topic = client.getTopic("anyTopic");
DemoMessage message = new DemoMessage();
message.setTitle("震惊,一女子深夜竟然做出这种事情!");
message.setArticle("阿巴阿巴阿巴");
topic.publish(message);
订阅的代码
RedissonClient client = Redisson.create(config);
RTopic topic = client.getTopic("anyTopic");
topic.addListenerAsync(DemoMessage.class, new MessageListener<DemoMessage>() {
@Override
public void onMessage(CharSequence channel, DemoMessage msg) {
System.out.println(msg.getTitle());
}
});
除却上面的对象,Redisson还提供了布隆过滤器,基数估计算法及限流器,有兴趣的可以深入了解。
分布式集合
- 映射(Map)
Redisson使用map来存取redis中hash的数据结构:
RedissonClient client = Redisson.create(config);
RMap<Object, Object> cities = client.getMap("cities");
City c1 = new City("南京", "江苏");
City c2 = new City("杭州", "浙江");
cities.put(1,c1);
cities.put(2,c2);
City c = (City)cities.get(2);
System.out.println(c.name +"-"+ c.province);
登录服务器查看:
10.150.27.139:6381> type cities
hash
10.150.27.139:6381> hgetall cities
1) "\xf7\x01"
2) "\x00\x01\x04City\xfc\x02\xffWS\xff\xacN\xfc\x02\xff_l\xff\xcf\x82\x00"
3) "\xf7\x02"
4) "\x00\x01\x04City\xfc\x02\xffmg\xff\xde]\xfc\x02\xffYm\xff_l\x00"
对于高度频繁读写的缓存,Redisson提供了本地缓存的机制,以减少网络通信带来的时间等待。
RLocalCachedMap<Object, Object> cities = client.getLocalCachedMap("cities", LocalCachedMapOptions.defaults());
City c1 = new City("武汉", "湖北");
cities.put(1,c1);
City c = (City)cities.get(1);
System.out.println(c.name+"-"+c.province);
redisson pro中支持数据分片,类似分库的原理,可以将一个map中的数据分散映射到多个节点中,这样大大的提高了redis单一hash的容量。
- Redisson中的元素淘汰机制
元素淘汰功能(Eviction)
我们使用Redis作为缓存时,就需要考虑缓存的淘汰机制。可以通过client.getKey() 来设定key的存活时间,另外可以使用RMapCache控制每一条数据的过期时间。
RedissonClient client = Redisson.create(config);
RMapCache<Object, Object> cities = client.getMapCache("cities", new StringCodec("utf-8"));
cities.put(1,new City("成都","四川"),60,TimeUnit.SECONDS);
cities.put(2,new City("深圳","广东"),30, TimeUnit.SECONDS);
while (true){
}
每隔30s登录服务器查看数据如下:
10.150.27.139:6381> hgetall cities
1) "1"
2) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@669d2b1b"
3) "2"
4) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@50b1f030"
10.150.27.139:6381> hgetall cities
1) "1"
2) "\x00\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00City@669d2b1b"
10.150.27.139:6381> hgetall cities
(empty list or set)
redis并没有实现对hash元素过期时间的设置。Redisson通过在初始化RedissonMapCache时,设置了一个EvictionScheduler,这个类通过netty的EventloopGroup线程池周期地向以redisson_map_cache_expired前缀名的频道发布消息。RedissonMapCache会订阅这个频道来处理消息。它一次可移除 100 条过期项。
任务的调度时间会根据上次任务中删除的过期项数量自动调整,时间在 1 秒到 2 个小时内。因此若清理任务每次删除了100项数据,它将每秒钟执行一次(最小的执行延迟)。但如果当前过期项数量比前一次少,则执行延迟将扩大为 1.5 倍。
本地缓存功能(Local Cache)
在上面的代码已经介绍了本地缓存机制,其中有一个参数LocalCachedMapOptions,这个参数可以自定义缓存的淘汰机制。EvictionPolicy可以选择使用LRU,LFU或者通过GC过程清除元素,SyncStrategy实现了本地缓存的同步机制。
- 列表与队列
RedissonClient client = Redisson.create(config);
RList<String> list = client.getList("list",new StringCodec("utf-8"));
list.add("北京");
list.add("济南");
RedissonClient client = Redisson.create(config);
RQueue<String> qq = client.getQueue("qq");
qq.add("12");
qq.offer("34");
上面代码都对应了redis的list数据结构
分布式锁与同步器
Redisson的强大之处在于完美的实现了分布式锁和同步器,不需要我们再考虑怎么设计分布式锁的可重入?怎么保证分布式锁的公平性?如何实现一个分布式读写锁?怎么实现分布式的信号量和闭锁?这些在Redisson中都已经帮我们实现好了。先看一下最常用的lock的使用:
@RestController
public class TestController {
@Autowired
private RedissonClient client;
@RequestMapping("/test")
public String test(){
RLock anyLock = client.getLock("anyLock");
anyLock.lock();
return "success";
}
}
上面的demo获取到一个lock不去释放。我们打开一个浏览器请求这个controller返回success后,再打开一个窗口重新请求,发现一直等待无法返回结果。查看redis:
10.150.27.139:6380> hgetall anyLock
1) "c5745dc6-3105-4d60-9d5d-e39258714c31:38"
2) "1"
删除了这个key后就可以成功执行了。在设计分布式锁我们一般都要考虑锁的释放。因为如果获取到锁而线程出现异常或者系统故障,会导致这个锁无法释放。自己实现redis的锁的话会给这个key一个过期时间以避免死锁的发生。Redisson默认的锁的过期时间为30s。如果这个期间任务并没有执行完,而锁已经过期了不就出问题了吗?Redisson这里有一个watch dog,看一下lock()方法的代码:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
看一下tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
如果lock指定过期时间,那么直接执行tryLockInnerAsync,tryLockInnerAsync方法是一段lua脚本,如下:
eval "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" 1 anyLock 30000 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31
先判断anyLock 这个key是否存在,不存在则执行hset anyLock 4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31 1结束。否则判断anyLock这个hash中4a23dfaa-9d98-4f4c-9c6a-8966b28e1a95:31元素是否存在,如果存在则说明是重入锁,累加重入次数,重置key的失效时间为30s,结束。否则说明anyLock已经被其他线程获取,这里直接返回anyLock的失效时间。该方法是一个基于Future的异步方法。这里类似于JS通过Promise来实现异步操作的模式。在onComplete中执行了一个BiConsumer,这个函数会启动失效检查:
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
上面代码会将该线程放入到一个concurrentmap中,并执行renewExpiration方法。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
上面的方法会生成一个timertask来检查concurrentmap中的key是否存在,如果存在说明该线程还没有释放掉锁,则会更新锁的过期时间,该方法以一种异步递归的方式循环执行。
返回到lock方法,如果返回的ttl>0,则会进入while循环中一直尝试获取,达到了阻塞的目的。
Redisson还有许多的功能,比如分布式任务调度,Redisson事务,spring cache整合等,有空再说了。