RocketMQ 与 Spring Boot整合(一、3种发送消息的方式)

一、概述

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  1. 能够保证严格的消息顺序
  2. 提供丰富的消息拉取模式
  3. 高效的订阅者水平扩展能力
  4. 实时的消息订阅机制
  5. 亿级消息堆积能力

在本文中,提供更多的生产者 Producer 和消费者 Consumer 的使用示例。例如说:

  • Producer 三种发送消息的方式。
  • Producer 发送顺序消息,Consumer 顺序消费消息。
  • Producer 发送定时消息。
  • Producer 批量发送消息。
  • Producer 发送事务消息。
  • Consumer 广播集群消费消息。

二、快速入门

我们先来对 RocketMQ-Spring 做一个快速入门,实现 Producer 三种发送消息的方式的功能,同时创建一个 Consumer 消费消息。

考虑到一个应用既可以使用生产者 Producer ,又可以使用消费者 Consumer ,所以2角色都进行了配置。

2.1 引入依赖

在 [pom.xml] 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq</artifactId>

    <dependencies>
        <!-- 实现对 RocketMQ 的自动化配置 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

2.2 应用配置文件

resources 目录下,创建 application.yaml 配置文件。配置如下:

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: 101.133.227.13:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: erbadagang-producer-group # 生产者分组
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      erbadagang-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
  • rocketmq 配置项,设置 RocketMQ 的配置,对应 RocketMQProperties 配置类。
  • RocketMQ-Spring RocketMQAutoConfiguration 自动化配置类,实现 RocketMQ 的自动配置,创建相应的 Producer 和 Consumer 。
  • rocketmq.name-server 配置项,设置 RocketMQ Namesrv 地址。如果多个,使用逗号分隔。
  • rocketmq.producer 配置项,一看就知道是 RocketMQ Producer 所独有。
    • group 配置,生产者分组。
    • retry-next-server 配置,发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false 。如果使用多 Broker 的情况下,需要设置 true ,这样才会在发送消息失败时,重试另外一台 Broker 。
    • 其它配置,一般默认即可。
  • rocketmq.consumer 配置项,一看就知道是 RocketMQ Consumer 所独有。
    • listener 配置,配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。一般情况下,只有我们在想不监听消费某个消费分组的某个 Topic 时,才需要配 listener 配置。

2.3 Demo01Message

创建 [Demo01Message 消息类,提供给当前示例使用。代码如下:

package com.ebadagang.springboot.rocketmq.message;

/**
 * 示例 01 的 Message 消息
 */
public class Demo01Message {

    public static final String TOPIC = "DEMO_01";

    /**
     * 编号
     */
    private Integer id;

    public Demo01Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo01Message{" +
                "id=" + id +
                '}';
    }

}
  • TOPIC 静态属性,我们设置该消息类对应 Topic 为 "DEMO_01" 。

2.4 Demo01Producer

它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种(同步、异步、oneway)发送消息的方式。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.message.Demo01Message;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo01Producer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult syncSend(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 同步发送消息
        return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
    }

    public void asyncSend(Integer id, SendCallback callback) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 异步发送消息
        rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback);
    }

    public void onewaySend(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // oneway 发送消息
        rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message);
    }

}
  • 三个方法,对应三个 RocketMQ 发送消息的方式,分别调用 RocketMQTemplate 提供的 #syncSend(...)#asyncSend(...) 以及 #sendOneWay(...) 方法。

我们来简单聊下 RocketMQTemplate 类,它继承 Spring Messaging 定义的 AbstractMessageSendingTemplate 抽象类,以达到融入 Spring Messaging 体系中。

在 RocketMQTemplate 中,会创建一个 RocketMQ DefaultMQProducer 生产者 producer ,所以 RocketMQTemplate 后续的各种发送消息的方法,都是使用它。当然,因为 RocketMQTemplate 的封装,所以我们可以像使用 Spring Messaging 一样的方式,进行消息的发送,而无需直接使用 RocketMQ 提供的 Producer 发送消息。

2.5 Demo01Consumer

实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo01Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = Demo01Message.TOPIC,
        consumerGroup = "demo01-consumer-group-" + Demo01Message.TOPIC
)
public class Demo01Consumer implements RocketMQListener<Demo01Message> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}
  • 在类上,添加了 @RocketMQMessageListener 注解,声明消费的 Topic 是 "DEMO_01" ,消费者分组是 "demo01-consumer-group-DEMO_01" 。一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。这样做会有两个好处:
    • 每个消费者分组职责单一,只消费一个 Topic 。
    • 每个消费者分组是独占一个线程池,这样能够保证多个 Topic 隔离在不同线程池,保证隔离性,从而避免一个 Topic 消费很慢,影响到另外的 Topic 的消费。

当然如果是同样消费注册topic的积分和会员子系统他们的消费者分组是不同的,来实现分别消费topic。

  • 实现 RocketMQListener 接口,在 T 泛型里,设置消费的消息对应的类。此处,我们就设置了 Demo01Message 类。

2.6 Demo01AConsumer

实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo01Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = Demo01Message.TOPIC,
        consumerGroup = "demo01-A-consumer-group-" + Demo01Message.TOPIC
)
public class Demo01AConsumer implements RocketMQListener<MessageExt> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(MessageExt message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}
  • 整体和上面 [Demo01Consumer]是一致的,主要有两个差异点,也是为什么我们又额外创建了这个消费者的原因。

差异一,在类上,添加了 @RocketMQMessageListener 注解,声明消费的 Topic 还是 "DEMO_01" ,消费者分组修改成"demo01-A-consumer-group-DEMO_01" 。这样,我们就可以测试 RocketMQ 集群消费的特性。

集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

  • 也就是说,如果我们发送一条 Topic 为 "DEMO_01" 的消息,可以分别被 "demo01-A-consumer-group-DEMO_01""demo01-consumer-group-DEMO_01" 都消费一次。
  • 但是,如果我们启动两个该示例的实例,则消费者分组 "demo01-A-consumer-group-DEMO_01""demo01-consumer-group-DEMO_01" 都会有多个 Consumer 实例。此时,我们再发送一条 Topic 为 "DEMO_01"的消息,只会被 "demo01-A-consumer-group-DEMO_01" 的一个 Consumer 消费一次,也同样只会被 "demo01-consumer-group-DEMO_01" 的一个 Consumer 消费一次。

好好理解上述的两段话,非常重要。

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

  • 积分模块:判断如果是手机注册,给用户增加 20 积分。
  • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
  • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
  • … 等等

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

差异二,实现 RocketMQListener 接口,在 T 泛型里,设置消费的消息对应的类不是 Demo01Message 类,而是 RocketMQ 内置的 MessageExt 类。通过 MessageExt 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(body)就需要自己去反序列化。当然,一般情况下,我们不会使用 MessageExt 类。

2.7 测试

创建 [Demo01ProducerTest]测试类,编写三个单元测试方法,调用 Demo01Producer 三种发送消息的方式。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo01ProducerTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Demo01Producer producer;

    @Test
    public void testSyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void testASyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        producer.asyncSend(id, new SendCallback() {

            @Override
            public void onSuccess(SendResult result) {
                logger.info("[testASyncSend][发送编号:[{}] 发送成功,结果为:[{}]]", id, result);
            }

            @Override
            public void onException(Throwable e) {
                logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
            }

        });

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void testOnewaySend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        producer.onewaySend(id);
        logger.info("[testOnewaySend][发送编号:[{}] 发送完成]", id);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

}

2.7.1 测试#testSyncSend()

启动执行#testSyncSend()方法,测试同步发送消息,生产者会报这样的错误 :

RemotingTooMuchRequestException: sendDefaultImpl call timeout

其实是连不上远程mq,解决方法:
在conf/broker.conf 中 加入 两行配置

namesrvAddr = 你的公网IP:9876
brokerIP1=你的公网IP

重新启动 broker,启动broker的指令要修改下, 要将这个配置文件指定加载:
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &

正常情况我们看到控制台输出consumer的注册信息:

running container: DefaultRocketMQListenerContainer{consumerGroup='demo01-consumer-group-DEMO_01', nameServer='101.133.227.13:9876', topic='DEMO_01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}

信息生产和消费信息:

2020-08-04 15:48:43.059  INFO 14056 --- [           main] c.e.s.r.producer.Demo01ProducerTest      : [testSyncSend][发送编号:[1596527322] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, offsetMsgId=6585E30D00002A9F0000000000031954, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=0]]]
2020-08-04 15:48:43.605  INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer  : [onMessage][线程编号:168 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=0, sysFlag=0, bornTimestamp=1596527322965, bornHost=/202.99.106.26:38252, storeTimestamp=1596527322642, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000031954, commitLogOffset=203092, bodyCRC=68977144, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1596527323605, id=8007a731-58b8-bee7-c3e5-755cf737b5bd, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596527322658}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 55, 51, 50, 50, 125], transactionId='null'}]]
2020-08-04 15:49:04.155  INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer   : [onMessage][线程编号:172 消息内容:Demo01Message{id=1596527322}]

上面信息看到queueId=3,可以到控制台去验证一下。

查看具体的存储queue

通过日志我们可以看到,我们发送的消息,分别被 Demo01AConsumer 和 Demo01Consumer 两个消费者(消费者分组)都消费了一次。
同时,两个消费者在不同的线程池中,消费了这条消息。虽然说,我们看到两条日志里,我们都看到了线程名为 "MessageThread_1" ,但是线程编号分别是 168 和 172 。 因为,每个 RocketMQ Consumer 的消费线程池创建的线程都是以 "MessageThread_" 开头,同时这里相同的线程名结果不同的线程编号,很容易判断出时候用了两个不同的消费线程池。

2.7.3 测试#testASyncSend()方法

注意,不要关闭上一个 #testSyncSend() 单元测试方法,Springboot每次启动的时候会向nameServer注册consumer,启动2个相当于2个consumer集群消费同样topic。这里我们要模拟每个消费者集群,都有多个 Consumer 节点。
控制台输出如下:

2020-08-04 16:06:04.245  INFO 14916 --- [ublicExecutor_1] c.e.s.r.producer.Demo01ProducerTest      : [testASyncSend][发送编号:[1596528363] 发送成功,结果为:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, offsetMsgId=6585E30D00002A9F000000000003220A, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=1]]]
2020-08-04 16:06:04.291  INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer   : [onMessage][线程编号:166 消息内容:Demo01Message{id=1596528363}]
2020-08-04 16:06:04.463  INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer  : [onMessage][线程编号:175 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=1, sysFlag=0, bornTimestamp=1596528364166, bornHost=/202.99.106.26:38723, storeTimestamp=1596528363846, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F000000000003220A, commitLogOffset=205322, bodyCRC=408850356, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1596528364463, id=ee6e1a9c-4a78-e840-1820-616637e5a9b3, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596528364026}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 56, 51, 54, 51, 125], transactionId='null'}]]

和 #testSyncSend() 方法执行的结果,是一致的。此时,我们打开 #testSyncSend() 方法所在的控制台,不会看到有新的消息消费日志。说明,符合集群消费的机制:集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。而广播模式是每个 Consumer 实例都消费消息。
不过如上的日志,也可能出现在 #testSyncSend() 方法所在的控制台,而不在 #testASyncSend() 方法所在的控制台,但只有一个地方消费。

2.7.4 #testOnewaySend()方法

执行后控制台输出发送消息,日志输出发送编号:[1596528903]。

2020-08-04 16:15:03.479  INFO 9364 --- [           main] c.e.s.r.producer.Demo01ProducerTest      : [testOnewaySend][发送编号:[1596528903] 发送完成]

消费消息在第一个控制台输出:
被集群中的其他消费者消费

三、@RocketMQMessageListener

在 [ Demo01Consumer] 中,我们已经使用了 @RocketMQMessageListener 注解,设置每个 RocketMQ 消费者 Consumer 的消息监听器的配置。

@RocketMQMessageListener 注解的常用属性如下:

/**
 * Consumer 所属消费者分组
 *
 * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
 * load balance. It's required and needs to be globally unique.
 *
 * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
 */
String consumerGroup();

/**
 * 消费的 Topic
 *
 * Topic name.
 */
String topic();

/**
 * 选择器类型。默认基于 Message 的 Tag 选择。
 *
 * Control how to selector message.
 *
 * @see SelectorType
 */
SelectorType selectorType() default SelectorType.TAG;
/**
 * 选择器的表达式。
 * 设置为 * 时,表示全部。
 *
 * 如果使用 SelectorType.TAG 类型,则设置消费 Message 的具体 Tag 。
 * 如果使用 SelectorType.SQL92 类型,可见 https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/ 文档
 *
 * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
 */
String selectorExpression() default "*";

/**
 * 消费模式。可选择并发消费,还是顺序消费。
 *
 * Control consume mode, you can choice receive message concurrently or orderly.
 */
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;

/**
 * 消息模型。可选择是集群消费,还是广播消费。
 *
 * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
 */
MessageModel messageModel() default MessageModel.CLUSTERING;

/**
 * 消费的线程池的最大线程数
 *
 * Max consumer thread number.
 */
int consumeThreadMax() default 64;

/**
 * 消费单条消息的超时时间
 *
 * Max consumer timeout, default 30s.
 */
long consumeTimeout() default 30000L;

@RocketMQMessageListener 注解的不常用属性如下:

// 默认从配置文件读取的占位符
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";

/**
 * The property of "access-key".
 */
 String accessKey() default ACCESS_KEY_PLACEHOLDER;
 /**
 * The property of "secret-key".
 */
String secretKey() default SECRET_KEY_PLACEHOLDER;

/**
 * Switch flag instance for message trace.
 */
boolean enableMsgTrace() default true;
/**
 * The name value of message trace topic.If you don't config,you can use the default trace topic name.
 */
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;

/**
 * Consumer 连接的 RocketMQ Namesrv 地址。默认情况下,使用 `rocketmq.name-server` 配置项即可。
 *
 * 如果一个项目中,Consumer 需要使用不同的 RocketMQ Namesrv ,则需要配置该属性。
 *
 * The property of "name-server".
 */
String nameServer() default NAME_SERVER_PLACEHOLDER;

/**
 * 访问通道。目前有 LOCAL 和 CLOUD 两种通道。
 *
 * LOCAL ,指的是本地部署的 RocketMQ 开源项目。
 * CLOUD ,指的是阿里云的 ONS 服务。具体可见 https://help.aliyun.com/document_detail/128585.html 文档。
 *
 * The property of "access-channel".
 */
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;

四、 @ExtRocketMQTemplateConfiguration

RocketMQ-Spring 考虑到开发者可能需要连接多个不同的 RocketMQ 集群,所以提供了 @ExtRocketMQTemplateConfiguration 注解,实现配置连接不同 RocketMQ 集群的 Producer 的 RocketMQTemplate Bean 对象。

@ExtRocketMQTemplateConfiguration 注解的具体属性,和我们在 [ 应用配置文件」]的 rocketmq.producer 配置项是一致的,就不重复赘述啦。

@ExtRocketMQTemplateConfiguration 注解的简单使用示例,代码如下:

@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer:demo.rocketmq.name-server}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

在类上,添加 @ExtRocketMQTemplateConfiguration 注解,并设置连接的 RocketMQ Namesrv 地址。
同时,需要继承 RocketMQTemplate 类,从而使我们可以直接使用 @Autowire 或 @Resource 注解,注入 RocketMQTemplate Bean 属性。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336