[原创]JMS-ActiveMQ基础与SpringBoot整合

ActiveMQ实现了JMS规范。

# ActiveMQ中相关概念术语

  1. Destination目的地
    消息将要发送的地方,包括:QueueTopic,它们都对Destination接口进行了实现
    1. PTP模式 - Queue
    2. 发布订阅模式 - Topic
      MessageProvider需要指定Destination才能发送消息,MessageConsumer需要指定Destination才能接收和消费消息。
  2. Producer消息生产者
    消息生产者,负责将消息发送到目的地Destination
  3. Consumer消息消费者
    消息消费者,负责从目的地Destination消费消息。
  4. Message消息本体
  5. ConnectionFactory连接工厂
    用于创建连接的工厂
  6. Connection连接
    用户访问ActiveMQ
  7. Session会话
    一次持久有效有状态的访问,由Connection创建,是具体操作消息的基础支撑。
    JMS中定义了两种消息模型:点对点(point to point, queue)发布/订阅(publish/subscribe,topic)。主要区别就是是能否重复消费

# JMS中Queue模式与Topic模式对比

Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点
有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

1. PTP Queue不可重复消费

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后(消费者ack应答确认/事务模式),queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。
当消费者不存在时,消息会一直保存,直到有消费消费

img

2. 发布订阅模式 Topic 可以重复消费

消息生产者(发布)将消息发布到Topic中,同时有多个消息消费者(订阅该Topic)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息。如果生产者向队列发送消息时,没有消费者订阅该队列,则消息全部丢失。否则向所有订阅了该Topic的消费者发送同样的消息(即:消费者必须在线)

img

# 在SpringBoot中使用ActiveMQ

ActiveMQ管理地址: http://localhost:8161/admin/

  1. PTP模式

    • 依赖

          //jms-active
          compile 'org.springframework.boot:spring-boot-starter-activemq'
          //active连接池-1.5.13依赖
          compile 'org.apache.activemq:activemq-pool'
      
    • 配置信息

      spring:
        # activemq
        activemq:
          broker-url: failover:(tcp://localhost:61616,tcp://localhost:666)?randomize=false      # tcp://localhost:61616/故障转移,默认情况下如果某个链接失效了,则从列表中随机获取一个,如果设置了randomize=false则是严格按照列表的先后顺序的
          user: admin           # 用户名
          password: admin       # 密码
          in-memory: false      # 基于内存的activemq
          close-timeout: 15s     # 在考虑结束之前等待的时间
          pool:
            enabled: true                               # 启动连接池(是否用Pooledconnectionfactory代替普通的ConnectionFactory)
            max-connections: 10                         # 最大链接数量
            idle-timeout: 60s                           # 空闲连接存活时间
            block-if-full: true                         # 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”
            block-if-full-timeout: -1                   # 如果池仍然满,则在抛出异常之前阻塞时间
            create-connection-on-startup: true          # 是否在启动时创建连接。可以在启动时用于加热池
            maximum-active-session-per-connection: 500  # 每个连接的有效会话的最大数目。
            reconnect-on-exception: true                # 当发生"JMSException"时尝试重新连接
        jms:
          pub-sub-domain: false                  # 默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
      
    • 定义PTP模式下的Destination-Queue

      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @AllArgsConstructor
      @Getter
      public enum ActiveMqQueueEnum {
          /**
           * springboot-test-queue=测试Queue
           */
          TEST_QUEUE("springboot-test-queue", "测试Queue");
        
          private String queueName;
          private String desc;
      
          public static final String testQueue = "springboot-test-queue";
      }
      
      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @Configuration
      public class ActiveMqConfig {
      
         /**
           * The ActiveMQConnectionFactory creates ActiveMQ Connections.
           * The PooledConnectionFactory pools Connections.
           * If you only need to create one Connection and keep it around for a long time you don't need to pool.
           * If you tend to create many Connection instances over time then Pooling is better as connecting is a heavy operation and can be a performance bottleneck.
           * <p>
           * 可以在这里统一设置JmsTemplate的一些配置,也可以在具体使用到JmsTemplate的时候单独设置
           * JmsMessageTemplate是对JmsTemplate的进一步封装
           * TODO 目前看起来不起作用
           *
           * @param factory
           * @return
           */
          //    @Primary
      //    @Bean
          public JmsTemplate jmsTemplate(PooledConnectionFactory factory) {
              JmsTemplate jmsTemplate = new JmsTemplate();
              //关闭事物
              jmsTemplate.setSessionTransacted(false);
              //TODO 在此设置无效
      //        jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
              jmsTemplate.setConnectionFactory(factory);
              return jmsTemplate;
          }
        
          @Bean(name = ActiveMqQueueEnum.testQueue)
          public ActiveMQQueue activeTestQueue() {
              return new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName());
          }
        /**
           * 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
           *
           * @param pooledConnectionFactory
           * @return
           */
          @Bean(name = "jmsQueueListener")
          public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(PooledConnectionFactory pooledConnectionFactory) {
              DefaultJmsListenerContainerFactory factory =
                      new DefaultJmsListenerContainerFactory();
              factory.setConnectionFactory(pooledConnectionFactory);
              factory.setSessionTransacted(false);
              factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
              return factory;
          }
      }  
      
    • 定义PTP模式下的生产者

      package com.futao.springbootdemo.foundation.mq.active.ptp;
      
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.jms.core.JmsMessagingTemplate;
      import org.springframework.stereotype.Component;
      
      import javax.jms.JMSException;
      
      /**
       * PTP模式生产者
       *
       * @author futao
       * Created on 2019-06-06.
       */
      @Slf4j
      @Component
      public class PtpProducer {
        
          @Autowired
          private JmsMessagingTemplate jmsMessagingTemplate;
      
          /**
           * 目的地
           */
          @Qualifier("springboot-test-queue")
          @Autowired
          private ActiveMQQueue springBootTestQueue;
      
          public void send(String msg) {
              jmsMessagingTemplate.convertAndSend(springBootTestQueue, msg);
              try {
                  log.info("send to ActiveMQ-Queue[{}] success ,msg:[{}]", springBootTestQueue.getQueueName(), msg);
              } catch (JMSException e) {
                  e.printStackTrace();
              }
          }
      }
      
      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @RequestMapping("/activemq")
      @RestController
      public class ActiveController {
        @Resource
          private PtpProducer ptpProducer;
      
          @PostMapping("/ptp/sender")
          public void ptpSender(@RequestParam String msg) {
              ptpProducer.send(msg);
          }
      }
      
    • 定义PTP模式下的消费者

      package com.futao.springbootdemo.foundation.mq.active.ptp;
      
      import com.futao.springbootdemo.foundation.mq.active.ActiveMqQueueEnum;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.command.ActiveMQMessage;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.junit.Test;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.stereotype.Service;
      
      import javax.jms.*;
      
      /**
       * @author futao
       * Created on 2019-06-06.
       */
      @Slf4j
      @Service
      public class PtpConsumer {
      
          @JmsListener(destination = ActiveMqQueueEnum.testQueue, containerFactory = "jmsQueueListener")
          public void ptpConsumer(ActiveMQMessage message) throws JMSException {
              String text = ((TextMessage) message).getText();
              if ("节日快乐666".equalsIgnoreCase(text)) {
                  message.acknowledge();    //ack手动确认
              }
              log.info("receive message from activeMQ :[{}]", text);
          }
        /**
         * 手动创建ActiveMQConnectionFactory消费消息,生产消息也类似
         */
          @Test
          public void test() throws Exception {
              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
              Connection connection = connectionFactory.createConnection();
              Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//开启ack手动确认
              MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName()));
              connection.start();
              consumer.setMessageListener(message -> {
                  try {
                      String text = ((TextMessage) message).getText();
                      System.out.println(("收到消息:{}" + text));
                      if ("节日快乐666".equalsIgnoreCase(text)) {
                          message.acknowledge();    //ack手动确认
                      }
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              });
              Thread.sleep(999999999);
          }
      }
      
      
    image.png
    • 特点
      • 一条消息只会发送给其中某一个单独的消费者


        image.png
      • 未被确认的消息将再次发送给其他消费


        image.png
  2. 发布订阅模式

    • 发布订阅模式需要将spring.jms.pub-sub-domain=true,其他配置不需要修改

    • 定义发布订阅模式下的Destination - Topic

      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @Configuration
      public class ActiveMqConfig {
        /**
           * ActiveMQ topic的定义
           */
          public static class TopicDefinition {
              public static final String activeTestTopic = "active-test-topic";
              public static final String activeProdTopic = "active-prod-topic";
          }
      
          /**
           * 定义一个名为BeanName为activeTestTopic的Topic:active-test-topic
           *
           * @return
           */
          @Bean(name = "activeTestTopic")
          public ActiveMQTopic activeMQTestTopic() {
              return new ActiveMQTopic(TopicDefinition.activeTestTopic);
          }
      
          /**
           * 定义一个名为BeanName为activeProdTopic的Topic:active-prod-topic
           *
           * @return
           */
          @Bean(name = "activeProdTopic")
          public ActiveMQTopic activeMQProdTopic() {
              return new ActiveMQTopic(TopicDefinition.activeProdTopic);
          }
      }
          @PostMapping("/ps/sender")
          public void pushTest(@RequestParam String msg) {
              activeMqProducer.send(msg);
          }
      
    • 发布订阅模式下的消费者定义

      package com.futao.springbootdemo.foundation.mq.active.topic;
      
      import com.futao.springbootdemo.foundation.mq.active.ActiveMqConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.command.ActiveMQMessage;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.stereotype.Service;
      
      import javax.jms.JMSException;
      import javax.jms.TextMessage;
      
      /**
       * 订阅的队列是PTP模式还是Topic模式,与这边的定义无关。取决于配置
       * # 开启topic模式
       * spring:
       * jms:
       * pub-sub-domain: true
       *
       * @author futao
       * Created on 2019-06-04.
       */
      @Slf4j
      @Service
      public class ActiveMqConsumer {
      
          /**
           * 订阅testTopic  -1
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
          public void testTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("testTopicConsumer1接收到activeMq-activeTestTopic消息:[{}]", text);
          }
      
          /**
           * 订阅testTopic  -2
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
          public void testTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("testTopicConsumer2接收到activeMq-activeTestTopic消息:[{}]", text);
          }
      
          /**
           * 订阅prodTopic  -1
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
          public void prodTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("prodTopicConsumer1接收到activeMq-activeProdTopic消息:[{}]", text);
          }
      
          /**
           * 订阅 prodTopic  -2
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
          public void prodTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("prodTopicConsumer2接收到activeMq-activeProdTopic消息:[{}]", text);
          }
      }
      
    • 结果展示

      **发送到Topic的消息被所有订阅了该Topic的消费者接收

image.png
image.png

# 参考资料

SpringBoot与ActiveMQ整合实现手动ACK(事务模式与ack应答模式)

# TODO:

  • 如何保证消费者将消息发送到ActiveMQ的过程中消息不丢失
  • ActiveMQ的集群与主从
  • 消息的持久化
  • 事务
  • PTP模式下消费者多久没ACK后ActiveMQ会认为该条消息消费失败呢?(是不是有个消费超时时间设置)。还是只能等到该消费者下线。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,474评论 0 11
  • 应用场景:在我们的项目中是触发预警消息,在预警列表中插入一条预警记录,同时我们有水务app和河长治app等多个ap...
    蛋皮皮652阅读 1,175评论 0 0
  • 个人专题目录[https://www.jianshu.com/p/140e2a59db2c] 一、JMS简介 全称...
    Java及SpringBoot阅读 2,069评论 0 10
  • 中间件在中大型的系统中应用较为广泛,主要用来解决系统模块之间的强耦合关系;也就是说消息中间件不需要同步返回结果,也...
    帅可儿妞阅读 300评论 0 0
  • ActiveMQ 简介:ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ ...
    CoderZS阅读 2,656评论 0 23