消息队列使用与改造点

序言

文档背景

消息队列改造是双创框架升级工作的一部分。

文档主题

文档主要讲述消息队列代码更新后,新的使用方式和如何使用原有的消费者模式完成业务逻辑。

文档结构图

[站外图片上传中...(image-e177db-1522380525219)]

文档变更历史

作者 日期 版本 变更点
李清泉 2018-3-29 0.5 创建文档

配置文件的写法

配置数据源

配置数据源有多种方式,我使用过有效的有两种:

  1. 单独配置一个源
  2. 配置到绑定

单独配置一个源

rabbitmq:
  addresses: amqp://192.168.1.241:5672
  username: mqadmin
  password: mqadmin

单独配置的方式是使用rabbitmq作为顶层配置,然后在其他配置中引用,如绑定时使用:

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: ${rabbitmq.addresses}
                username: ${rabbitmq.username}
                password: ${rabbitmq.password}
                virtual-host: test1

在上述代码中引用了单独配置。这种好处是可以配置一次多处引用,避免重复写。

配置到绑定

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.241:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test1

这种方式是直接写到绑定上。如果只有一个配置,可以这么写,但如果有多个绑定并且用同一个数据源,就变成了冗余。

绑定数据源

绑定数据源是将数据源配置到一个变量之中,方便配置接收或者发送时使用。上述已经说过配置方法了。在下面的配置代码中,是将一个数据源配置到变量 rabbit1之中。

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.241:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test1

可以绑定多个的,如果下面的配置:

binders:
    rabbit1:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.241:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test1
    rabbit2:
        type: rabbit
        environment:
           spring:
              rabbitmq:
                addresses: amqp://192.168.1.245:5672
                username: mqadmin
                password: mqadmin
                virtual-host: test2

上面的代码配置了两个变量,rabbit1和rabbit2. 在接收和发送配置中可以引用这两个不同的变量代表不同的源。

配置接收

spring:
  cloud:
    stream:
      bindings:
        input1:
          binder: rabbit1
          contentType: text/plain
          destination: testquene

上述代码中,定义一个叫做 input1的接收,内容格式为文本,目标(队列名)为testquene ,绑定到rabbit1源变量(上述的绑定配置)。

配置发送

spring:
  cloud:
    stream:
      bindings:
        output1:
          binder: rabbit1
          destination: testquene2
          contentType: text/plain

上述代码中,配置了一个叫做output1的消息发送,目标(队列名)为testquene2,模式是文本。

完整配置示例

server:
  port: 9087

rabbitmq:
  addresses: amqp://192.168.1.241:5672
  username: mqadmin
  password: mqadmin

spring:
  cloud:
    stream:
      bindings:
        input1:
          binder: rabbit1
          #group: test.qqqq
          contentType: text/plain
          destination: mqTestDefault.test.qqqq
        input2:
          binder: rabbit1
          #group: test.qqqq
          contentType: text/plain
          destination: quene1
        output1:
          binder: rabbit1
          destination: mqTestDefault.test.qqqq
          contentType: text/plain
        output2:
          binder: rabbit1
          destination: quene1
          contentType: text/plain
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: ${rabbitmq.addresses}
                username: ${rabbitmq.username}
                password: ${rabbitmq.password}
                virtual-host: test1
  #              exchange:
      defaultBinder:

消息队列使用方式

消息队列涉及到两种操作:

  • 消费处理信息
  • 生产发送信息

下面分别说明这两个操作的使用方式

消息队列的接收处理

消息队列的接收处理有两种方式:

  1. 直接使用监听
  2. 使用适配原有的消费模式

将配置文件中的接收与发送定义到代码中

无论是接收还是发送消息。都要先在一个接口类中定义信道。接收时,定义为SubscribableChannel ;发送时,定义为MessageChannel 。示例:

public interface Sink {
    String INPUT1 = "input1";

    String INPUT2="input2";

    String OUTPUT1="output1";
    String OUTPUT2="output2";

    @Input(INPUT1)
    SubscribableChannel input1();

    @Input(INPUT2)
    SubscribableChannel input12();

    @Output(OUTPUT1)
    MessageChannel output1();
    @Output(OUTPUT2)
    MessageChannel output2();
}

上述代码中,分别定义了两个接收信道和两个发送信息。请务必注意名称一定要对应对配置文件中。比如上述的input1 input2 output1 output2 ,必须在配置文件中存在的。
上述的定义是注册到spring之中的,使用时,只需要使用相应的名称的bean即可以。如果使用input1的bean名称,即为input1的接收信道。

使用监听的方式

定义好接收与发送的spring bean后,可以在监听中使用接收了。

@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager extends AbstractMessageQuene implements MessageQueueManager{
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT1)
    public void input1(Message<String> message) {
        System.out.println("第一个队列:" + message.getPayload());
        doSomething(message.getPayload());
    }
}

上述代码中,StreamMessageQueneManager 这个类为spring bean,使用了@EnableBinding(Sink.class)注解。代表将已经定义好的Sink接口中的定义好的接收在这个类中开启监听。在public void input1(Message<String> message)方法头上,加上了@StreamListener(Sink.INPUT1)注解,目的是将Sink中的INPUT1代表的接收在这个方法上开启监听。当监听收到消息时,将自动调用public void input1(Message<String> message)方法,传入message对象,我们就可以使用这个对象执行任何逻辑。

使用适配原有消费模式

原来的双创是使用生产-消费模式处理消息的,我们原来是使用MessageQueueManager接口接收和发送信息的,这个接口代码如下:

public interface MessageQueueManager {
    /**
     * 发送消息
     *  @param queueName 队列名称
     *  @param message 放入队列的内容
     */
     void sendMessage(String queueName, Object message);

    /**
     * 获取通道消息
     *  @param queueName 队列名称
     *  @return message 队列的内容
     */
     Object getMessage(String queueName);

  
}

Object getMessage(String queueName);方法中,我们通过传入队列名的方法,主动获取消息的。因此改造后为了减少代码变动,这种方式保持不变。原有的代码不需要变动即可正常执行。原理是使用了适配的方法:

@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager  implements MessageQueueManager{
    private Map<String,Queue<String>> queueMap=new HashMap<>();
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT1)
    public void input1(Message<String> message) {
        System.out.println("第一个队列:" + message.getPayload());
        Queue queue = getQueue(Sink.INPUT1);
        queue.offer(message.getPayload());
    }
    private Queue getQueue(String queneName) {
        Queue queue=queueMap.get(queneName);
        if(queue==null){
            queue=new ConcurrentLinkedQueue();//非阻塞
            queueMap.put(queneName,queue);
        }
        return queue;
    }
     @Override
    public Object getMessage(String queueName) {
        Queue queue = getQueue(queueName);
        return queue.poll();
    }
    
    @Override
    public void sendMessage(String queueName, Object message) {
        MessageChannel channel=channelMap.get(queueName);
        if(channel==null){
            throw new RuntimeException(queueName+"对应的信通不存在!");
        }
        if(message!=null) {
            channel.send(MessageBuilder.withPayload(message).build());
        }
    }
}

在上述接口的实现类中,监听将input1收到的消息放入了临时非阻塞且线程安全的ConcurrentLinkedQueue中。业务逻辑通过定时主动调用public Object getMessage(String queueName),获取到目标队列并取出消息执行逻辑。值得注意的是由于ConcurrentLinkedQueue没有限制容量,如果不能及时消费掉里面存储的消息,可能会造成内存占用过多甚至溢出,因此需要考虑消费的速度和调用的间隔。

发送消息

消息的发送也有两种方式:

  • 使用 messageChannel
  • 使用原有生产消费模式接口

使用 messageChannel

相对于使用监听,发送也可以使用新的messageChannel方式。

@Service
public class TestSender  {
    @Autowired
    @Qualifier("output1")
    MessageChannel output1;
    
    public void send(){
        output1.send(MessageBuilder.withPayload("您好,这是一个测试消息").build());
    }
}

上述代码中,output1为spring bean的名称,代表了发送的信道代理。直接使用它就可以将消息发送到对应信道。

使用原有的适配

请参考上述的MessageQueueManager接口,调用public void sendMessage(String queueName, Object message) 指定队列名即可。原理是,适配代码中已将messagechannel封装起来:

@Service
@EnableBinding(Sink.class)
public class StreamMessageQueneManager  implements MessageQueueManager{
    @Autowired
    @Qualifier("output1")
    MessageChannel output1;

    @Autowired
    @Qualifier("output2")
    MessageChannel output2;
    
    private Map<String,MessageChannel> channelMap;
    
     /**
     * bean初始化后执行这个方法
     */
    @PostConstruct
    public void postConstruct(){
        if (channelMap == null) {
            channelMap = new HashMap<>();
            channelMap.put(Sink.OUTPUT1, output1);
            channelMap.put(Sink.OUTPUT2, output2);
        }
    }
    
    @Override
    public void sendMessage(String queueName, Object message) {
        MessageChannel channel=channelMap.get(queueName);
        if(channel==null){
            throw new RuntimeException(queueName+"对应的信通不存在!");
        }
        if(message!=null) {
            channel.send(MessageBuilder.withPayload(message).build());
        }
    }
    
    @Override
    public Object getMessage(String queueName) {
        Queue queue = getQueue(queueName);
        return queue.poll();
    }

}

上述代码中,channel被放入Map中,通过队列名可以取出,然后发送消息。原理本质上是适配。

改造点

原有的双创中,只有solr和微博模块有用到消息队列的功能。
我认为,至少下面的模块可以用消息队列:

  • solr写入消息
  • 微博发布
  • 站内信
  • 日志
  • 积分写入
  • 定时任务协调
  • 其他附合要求的模块
    什么是“附合要求”呢?

消息队列应用点

消息队列主要应用到以下情境:

  • 不要求该分逻辑与主逻辑实时性
  • 执行慢的逻辑
  • 任务协调 分布式集群中,同一个实例可以通过消息队列接收的唯一实例的特性进行任务协调。

“附合要求”的其他模块是可以使用消息队列的,这需要我们在后续优化。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,599评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,869评论 2 11
  • “ 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列...
    落羽成霜丶阅读 3,974评论 1 41
  • 中庭明月花间照, 秋桂幽香廊下飘。 快意举杯诗在酒, 乐承素女此招邀。 中宵满月照藤丫, 秋露无声湿桂花。 快意壶...
    A_master阅读 241评论 13 14
  • 一 老宋头一大早就来了,因为工资的事。每年年初他一定会来,先找劳资,再到财务拿工资条核对。我说要等等呢。他就坐在我...
    冠世墨玉yanzi阅读 181评论 0 1