项目业务需求用到Redis订阅发布,于是去百度订阅发布,搜索内容是springBoot 的Redis订阅发布
然而全网络都是一篇写法,我就去官方去看,原来,全网出自官方。
贴出官方:https://spring.io/guides/gs/messaging-redis/
官方给的例子固然是能用的,但是问题来了,我现在照搬此方法去用是不行的,我的需求是在线程池主线程和内部线程加订阅。于是开启一波代码分析:
首先,我们找一些一些关键代码:
先找到发布的地方,在main方法里的
template.convertAndSend("chat", "Hello from Redis!");
再找订阅的地方,在container的这一行:
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
再找回调方法:
Receiver类的receiveMessage方法。
声明回调方法的地方:
return new MessageListenerAdapter(receiver, "receiveMessage");
然后我们看下官方的解释:
这Receiver是一个简单的POJO,它定义了接收消息的方法。正如您在注册Receiver为消息侦听器时所看到的,您可以根据需要命名消息处理方法。
注册侦听器并发送消息
Spring Data Redis提供了使用Redis发送和接收消息所需的所有组件。具体来说,你需要配置:
连接工厂
消息侦听器容器
Redis模板
您将使用Redis模板发送消息,并且您将Receiver使用消息侦听器容器进行注册,以便它可以接收消息。连接工厂驱动模板和消息侦听器容器,使它们能够连接到Redis服务器。
本示例使用Spring Boot的默认设置RedisConnectionFactory,其实例JedisConnectionFactory基于jedis Redis库。连接工厂被注入到消息监听器容器和Redis模板中。
在该listenerAdapter方法中定义的bean在消息侦听器容器中注册为消息侦听器container,并将侦听“chat”主题上的消息。由于Receiver该类是POJO,因此需要将其包装在实现所需MessageListener接口的消息侦听器适配器中addMessageListener()。消息侦听器适配器还配置为在消息到达时调用该receiveMessage()方法Receiver。
连接工厂和消息监听器容器bean都是您需要侦听消息的。要发送消息,您还需要一个Redis模板。在这里,它是一个配置为a的bean,它的StringRedisTemplate一个实现RedisTemplate集中在Redis的常见用法上,其中键和值都是`String`s。
该main()方法通过创建Spring应用程序上下文来解决所有问题。应用程序上下文然后启动消息监听器容器,并且消息监听器容器bean开始监听消息。main()然后该方法StringRedisTemplate从应用程序上下文中检索bean,并使用它发送“来自Redis的Hello!” 消息在“聊天”主题上。最后,它关闭Spring应用程序上下文并结束应用程序。
接下来就是我的悟性了:
整个订阅的步骤应该是这样的,先有RedisConnectionFactory,这个可以由springboot配置
加载。再是有RedisMessageListenerContainer,这是个redis消息监听容器(直接翻译)。
给容器设置连接工厂。这时候去创建MessageListenerAdapter,由它去声明回调方法。
那么机智的我发现CountDownLatch是什么鬼,然后去搜索了,然后又是本阶段自悟:因为监听是堵塞的,这个东西是专门开启一个线程去放监听的。暂且骗过自己。
再下来就是把这套东西挪进自己代码了:
经过艰难的一步步尝试,慢慢在失败中思索,排除。得出:
先在启动类里注册这个bean(监听容器),并且把连接工厂这时候就给设置了,毕竟springboot只要配置的正确,这个连接工厂就能自动加载。
接下来就是代码了:
先获取到上面的bean。
RedisMessageListenerContainer redisMessageListenerContainer = SpringUtil.getBean(RedisMessageListenerContainer.class);
自己声明自己的监听回调方法
参数一个是你回调方法所在类的一个对象,另一个参数是一个字符串,是回调方法名。
经过一层层查找,方法名默认是handleMessage。(是的这个可以不配置)
MessageListenerAdapter messageListenerAdapter =new MessageListenerAdapter(自己的类的一个对象,"methodName");
按照本来的套路,这时候该这样了:
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("lc"));
但是,但是,关键来了,这是网上至今没搜到的,这句前面加这个:
messageListenerAdapter.afterPropertiesSet();
所以正确的代码是:
来看下加的那行代码的作用:
给这个invoker赋值
invoker干啥的:
如果没有上面对它的赋值,那么这里是null。
剩下的不再说了,自悟,以上内容足够做redis的订阅发布了。
当天下午就去解决了取消订阅,逆向思维
还是先获取监听容器,remove 掉MessageListenerAdapter
前提是得是之前add进去的那个对象。
RedisMessageListenerContainer redisMessageListenerContainer = SpringUtil.getBean(RedisMessageListenerContainer.class);
redisMessageListenerContainer.remove(messageListenerAdapter);
2018/4/2
解决一下当时看到的坑:
private CountDownLatch latch;
这个东西出现在官方的例子中,在自己抽空学习后,了解到了作用,这是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。代码中的作用主要是,在调用了监听的回调方法后,主线程先阻塞,让监听方法在执行完成后,执行countDown()方法。
本来的数量就是1在执行完后-1就是0.当为0时,主线程的await方法相当于收到通知,不用再等了,继续执行下面的方法。