Java中使用消息中间件redis已经是一个月前的事情了,后面被经理发配到设计了,现在有空闲就梳理总结一下,我学习并使用redis过程中遇到的一些问题。
第一阶段 学习阶段
注:其中的switch("字符串")需要jdk1.7版本以上的,不然会报错滴。
)。其中,RedisMessage中放的是key和value,当然,我这里的key是指消息队列的名字了,具体怎么用,还要看你自己咯!
什么?你想直接拷过去用?其实应用不一样,使用方式不一样的,我只能共享一个共用的使用jedispool操作redis的三个文件:jedispool.properties连接池参数配置文件,jedisutil.java操作redis相关方法及jedispool初始化,objectutil.java处理序列化与反序列化。
传送门:https://pan.baidu.com/s/1ge6MB4Z
当然,jedis相关包自己去下咯,另注明一点,jedispool初始化的时候,需要加同步(synchronize)。因为,公司其他的项目(投票,只持续1个月左右,提升交互速度也是不错的选择啊,ps:防止意外,还是有存入阿里云滴)中,是把redis直接放在主程序中的,初始化的时候,防止阻塞。
好了,就这么多吧,也没做集群(主从),还是需要大神带的,还有就是redis的配置和jedispool的配置信息,只能希望你看仔细一点了,收。
[Java使用消息中间件redis的问题总结](http://upload-images.jianshu.io/upload_images/15665725-01e3257aaeefd995.gif?imageMogr2/auto-orient/strip Java使用消息中间件redis的问题总结")
)来解决这个需求,不过,我也比较好奇是个什么东西,后来看了一下(其实github有开源社区),具体需求:请求虽多,但是从微信返回的成功的信息只能处理一遍,而且要保证客户端的友好型和响应速度。好了,看了redis之后,有列表(list)类型和集合(set)两种数据类型有待考虑,最终还是考虑用list列表来玩了,主要是考虑list可用于生产者-消费者模式和发布者-订阅者模式,比如:可以改进推送,邮件发送等功能。
<wbr> <wbr> 上图就是描述生产者-消费者模式,可以一对多,多对一,多对多,都是可以滴,当然List消息队列也是可以有多个的,其中的brpop是线程阻塞的,不浪费资源,毕竟我用的是一个死循环的线程。
<wbr> <wbr> 在我所在项目中,有名为payMoney,coreWechat两个消息队列,payMoney是用于支付成功,生成销售出库单、销售收款单,coreWeChat是用于微信关注过程中,部分复杂的逻辑处理同步返回结果,异步处理,总而言之,调节系统在某些功能中,由于高并发引起的响应速度慢,数据原子性处理等问题。
<wbr> <wbr> <wbr>注:图中FIFO是表示List消息队列先进先出规则。
<wbr> <wbr> 发布者-订阅者模式是生产者-消费者模式的一个特例,我在这里就不多做说明了,简而言之,就是发布者发布了一条信息之后,所有订阅者就会收到这条信息,就相当于上图的一个生产者,多个消费者的模式(即一对多)。
<wbr> <wbr> 至于redis相关的数据结构,参数配置的问题,官网很详细(虽然是英文的),我这有个redis入门指南传送门:https://pan.baidu.com/s/1bo0jgzd
第二阶段 使用阶段
要完成这个需求,修改的文件不多,添加的文件倒是挺多的,首先,安装redis并配置config,;其次,选择另一项目还是线程来的List消息队列进行监听,当然,我是选择线程咯,并且随项目启动而死循环;最后,由于项目是Struts2-Spring-hibernate,所以也不能坏了规矩,比如我,就有XXXService.,当然,别忘了添加配置文件咯;
废话也不多说,直接上我的监听文件代码了,当然合理的设计List消息队列(不管是个数还是功能)是很重要的,这会影响你创建的线程个数(这些线程就是异步处理的主体了,有什么问题,都可以去找他们的麻烦)。
import java.util.List;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import com.shujian.ptms.domain.base.wechat.message.RedisMessage;
import com.shujian.ptms.pay.OrderDeal;
import com.shujian.ptms.pay.WechatDeal;
import com.shujian.ptms.web.action.base.JedisUtil;
import com.shujian.ptms.web.action.base.ObjectUtil;
public class OrderDealListener implements ServletContextListener {
private MyThread paymoneyUpdCust;
private MyThread paymoneyAddMyCust;
private MyThread paymoneyUpdPe;
private MyThread paymoneyUpdMarket;
private MyThread wechatUpdCust;
public void contextDestroyed(ServletContextEvent e) {
if (paymoneyUpdCust != null && paymoneyUpdCust.isInterrupted()) {
paymoneyUpdCust.interrupt();
}
if (paymoneyAddMyCust != null && paymoneyAddMyCust.isInterrupted()) {
paymoneyAddMyCust.interrupt();
}
if (paymoneyUpdPe != null && paymoneyUpdPe.isInterrupted()) {
paymoneyUpdPe.interrupt();
}
if (paymoneyUpdMarket != null && paymoneyUpdMarket.isInterrupted()) {
paymoneyUpdMarket.interrupt();
}
if (wechatUpdCust != null && wechatUpdCust.isInterrupted()) {
wechatUpdCust.interrupt();
}
}
public void contextInitialized(ServletContextEvent e) { <wbr>
paymoneyUpdCust = new MyThread("paymoneyUpdCust");//创建处理销售出库单、销售收款单的线程
paymoneyAddMyCust=new MyThread("paymoneyAddMyCust");
paymoneyUpdPe=new MyThread("paymoneyUpdPe");
paymoneyUpdMarket=new MyThread("paymoneyUpdMarket");
wechatUpdCust=new MyThread("wechatUpdCust");//创建处理WeChat关注的线程
paymoneyUpdCust.start(); // servlet 上下文初始化时启动 socket <wbr>
paymoneyAddMyCust.start();
paymoneyUpdPe.start();
paymoneyUpdMarket.start();
wechatUpdCust.start();
}
}
class MyThread extends Thread {
private String name;//备注每个线程的名字
public MyThread(String name) {
this.name=name;
}
@SuppressWarnings("static-access")
@Override
public void run() {
while (!this.isInterrupted()) {// 线程未中断执行循环 <wbr>
List bytes=null;
switch(this.getName()){//判断是哪一个消息队列的监听
case "payMoney":
bytes=JedisUtil.brpop("payMoney".getBytes());
case "coreWechat":
bytes=JedisUtil.brpop("coreWechat".getBytes());
}
try {
this.sleep(20000);//相隔20秒钟运行一次
} catch (InterruptedException e1) {
e1.printStackTrace();
} //每隔2000ms执行一次
if(bytes==null){//如果bytes是NULL的话,我们就直接跳过本次循环操作
continue;
}else{
try {
RedisMessage redisMessage=(RedisMessage)ObjectUtil.bytesToObject(bytes.get(0));//将传递过来的二进制数据反序列化成对象类型
if(redisMessage.equals(null)){//如果得到的redis信息为NULL的话,就不用执行了
switch(this.getName()){
case "paymoney":
OrderDeal.orderDeal(redisMessage);
case "coreWechat":
WechatDeal.wechatDeal(redisMessage);
}
}else
> System.out.println("点这么快,我真的是服了!!!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
注:其中的switch("字符串")需要jdk1.7版本以上的,不然会报错滴。
说明一下,上面的程序,只是将List消息队列分类了而已,可以解决高并发的时候,微信相应慢导致客户端卡死的情况,但是,并不能解决重复请求导致数据库中存在重复数据的情况,所以,我们得另外建一个List列表或者集合来存放近期处理过的微信响应信息(当然,近期是多久合适呢?我也是不晓得,一直存着,太消耗内存不建议,我弄的是6小时的,没办法,其实内存也不怎么值钱了![Java使用消息中间件redis的问题总结](http://upload-images.jianshu.io/upload_images/15665725-05cfffebe87f6b5a.gif?imageMogr2/auto-orient/strip "Java使用消息中间件redis的问题总结")
)。其中,RedisMessage中放的是key和value,当然,我这里的key是指消息队列的名字了,具体怎么用,还要看你自己咯!
<wbr> <wbr> 什么?你想直接拷过去用?其实应用不一样,使用方式不一样的,我只能共享一个共用的使用jedispool操作redis的三个文件:jedispool.properties连接池参数配置文件,jedisutil.java操作redis相关方法及jedispool初始化,objectutil.java处理序列化与反序列化。
传送门:https://pan.baidu.com/s/1ge6MB4Z
<wbr> <wbr> 当然,jedis相关包自己去下咯,另注明一点,jedispool初始化的时候,需要加同步(synchronize)。因为,公司其他的项目(投票,只持续1个月左右,提升交互速度也是不错的选择啊,ps:防止意外,还是有存入阿里云滴)中,是把redis直接放在主程序中的,初始化的时候,防止阻塞。
<wbr> <wbr> 好了,就这么多吧,也没做集群(主从),还是需要大神带的,还有就是redis的配置和jedispool的配置信息,只能希望你看仔细一点了![Java使用消息中间件redis的问题总结](http://upload-images.jianshu.io/upload_images/15665725-b6c8d433de456e16.gif?imageMogr2/auto-orient/strip "Java使用消息中间件redis的问题总结")
,收。