一、概述
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
在本文中,提供更多的生产者 Producer 和消费者 Consumer 的使用示例。例如说:
- Producer 三种发送消息的方式。
- Producer 发送顺序消息,Consumer 顺序消费消息。
- Producer 发送定时消息。
- Producer 批量发送消息。
- Producer 发送事务消息。
- Consumer 广播和集群消费消息。
二、快速入门
我们先来对 RocketMQ-Spring 做一个快速入门,实现 Producer 三种发送消息的方式的功能,同时创建一个 Consumer 消费消息。
考虑到一个应用既可以使用生产者 Producer ,又可以使用消费者 Consumer ,所以2角色都进行了配置。
2.1 引入依赖
在 [pom.xml
] 文件中,引入相关依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq</artifactId>
<dependencies>
<!-- 实现对 RocketMQ 的自动化配置 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2.2 应用配置文件
在 resources
目录下,创建 application.yaml
配置文件。配置如下:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: erbadagang-producer-group # 生产者分组
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
erbadagang-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
- 在
rocketmq
配置项,设置 RocketMQ 的配置,对应 RocketMQProperties 配置类。 - RocketMQ-Spring RocketMQAutoConfiguration 自动化配置类,实现 RocketMQ 的自动配置,创建相应的 Producer 和 Consumer 。
-
rocketmq.name-server
配置项,设置 RocketMQ Namesrv 地址。如果多个,使用逗号分隔。 -
rocketmq.producer
配置项,一看就知道是 RocketMQ Producer 所独有。-
group
配置,生产者分组。 -
retry-next-server
配置,发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为false
。如果使用多主 Broker 的情况下,需要设置true
,这样才会在发送消息失败时,重试另外一台 Broker 。 - 其它配置,一般默认即可。
-
-
rocketmq.consumer
配置项,一看就知道是 RocketMQ Consumer 所独有。-
listener
配置,配置某个消费分组,是否监听指定 Topic 。结构为Map<消费者分组, <Topic, Boolean>>
。默认情况下,不配置表示监听。一般情况下,只有我们在想不监听消费某个消费分组的某个 Topic 时,才需要配listener
配置。
-
2.3 Demo01Message
创建 [Demo01Message 消息类,提供给当前示例使用。代码如下:
package com.ebadagang.springboot.rocketmq.message;
/**
* 示例 01 的 Message 消息
*/
public class Demo01Message {
public static final String TOPIC = "DEMO_01";
/**
* 编号
*/
private Integer id;
public Demo01Message setId(Integer id) {
this.id = id;
return this;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Demo01Message{" +
"id=" + id +
'}';
}
}
- TOPIC 静态属性,我们设置该消息类对应 Topic 为 "DEMO_01" 。
2.4 Demo01Producer
它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种(同步、异步、oneway)发送消息的方式。代码如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.message.Demo01Message;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Demo01Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
}
public void asyncSend(Integer id, SendCallback callback) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 异步发送消息
rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback);
}
public void onewaySend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// oneway 发送消息
rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message);
}
}
- 三个方法,对应三个 RocketMQ 发送消息的方式,分别调用 RocketMQTemplate 提供的
#syncSend(...)
和#asyncSend(...)
以及#sendOneWay(...)
方法。
我们来简单聊下 RocketMQTemplate 类,它继承 Spring Messaging 定义的 AbstractMessageSendingTemplate 抽象类,以达到融入 Spring Messaging 体系中。
在 RocketMQTemplate 中,会创建一个 RocketMQ DefaultMQProducer 生产者 producer
,所以 RocketMQTemplate 后续的各种发送消息的方法,都是使用它。当然,因为 RocketMQTemplate 的封装,所以我们可以像使用 Spring Messaging 一样的方式,进行消息的发送,而无需直接使用 RocketMQ 提供的 Producer 发送消息。
2.5 Demo01Consumer
实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
package com.ebadagang.springboot.rocketmq.consumer;
import com.ebadagang.springboot.rocketmq.message.Demo01Message;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = Demo01Message.TOPIC,
consumerGroup = "demo01-consumer-group-" + Demo01Message.TOPIC
)
public class Demo01Consumer implements RocketMQListener<Demo01Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 在类上,添加了
@RocketMQMessageListener
注解,声明消费的 Topic 是"DEMO_01"
,消费者分组是"demo01-consumer-group-DEMO_01"
。一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。这样做会有两个好处:- 每个消费者分组职责单一,只消费一个 Topic 。
- 每个消费者分组是独占一个线程池,这样能够保证多个 Topic 隔离在不同线程池,保证隔离性,从而避免一个 Topic 消费很慢,影响到另外的 Topic 的消费。
当然如果是同样消费注册topic的积分和会员子系统他们的消费者分组是不同的,来实现分别消费topic。
- 实现 RocketMQListener 接口,在
T
泛型里,设置消费的消息对应的类。此处,我们就设置了 Demo01Message 类。
2.6 Demo01AConsumer
实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
package com.ebadagang.springboot.rocketmq.consumer;
import com.ebadagang.springboot.rocketmq.message.Demo01Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = Demo01Message.TOPIC,
consumerGroup = "demo01-A-consumer-group-" + Demo01Message.TOPIC
)
public class Demo01AConsumer implements RocketMQListener<MessageExt> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(MessageExt message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 整体和上面 [Demo01Consumer]是一致的,主要有两个差异点,也是为什么我们又额外创建了这个消费者的原因。
差异一,在类上,添加了 @RocketMQMessageListener
注解,声明消费的 Topic 还是 "DEMO_01"
,消费者分组修改成了 "demo01-A-consumer-group-DEMO_01"
。这样,我们就可以测试 RocketMQ 集群消费的特性。
集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 也就是说,如果我们发送一条 Topic 为
"DEMO_01"
的消息,可以分别被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消费一次。 - 但是,如果我们启动两个该示例的实例,则消费者分组
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都会有多个 Consumer 实例。此时,我们再发送一条 Topic 为"DEMO_01"
的消息,只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次,也同样只会被"demo01-consumer-group-DEMO_01"
的一个 Consumer 消费一次。
好好理解上述的两段话,非常重要。
通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER"
的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
- 积分模块:判断如果是手机注册,给用户增加 20 积分。
- 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
- 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
- … 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
差异二,实现 RocketMQListener 接口,在 T
泛型里,设置消费的消息对应的类不是 Demo01Message 类,而是 RocketMQ 内置的 MessageExt 类。通过 MessageExt 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(body
)就需要自己去反序列化。当然,一般情况下,我们不会使用 MessageExt 类。
2.7 测试
创建 [Demo01ProducerTest]测试类,编写三个单元测试方法,调用 Demo01Producer 三种发送消息的方式。代码如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo01ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo01Producer producer;
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testASyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.asyncSend(id, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
logger.info("[testASyncSend][发送编号:[{}] 发送成功,结果为:[{}]]", id, result);
}
@Override
public void onException(Throwable e) {
logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
}
});
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testOnewaySend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.onewaySend(id);
logger.info("[testOnewaySend][发送编号:[{}] 发送完成]", id);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
2.7.1 测试#testSyncSend()
启动执行#testSyncSend()
方法,测试同步发送消息,生产者会报这样的错误 :
RemotingTooMuchRequestException: sendDefaultImpl call timeout
其实是连不上远程mq,解决方法:
在conf/broker.conf 中 加入 两行配置
namesrvAddr = 你的公网IP:9876
brokerIP1=你的公网IP
重新启动 broker,启动broker的指令要修改下, 要将这个配置文件指定加载:
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &
正常情况我们看到控制台输出consumer的注册信息:
running container: DefaultRocketMQListenerContainer{consumerGroup='demo01-consumer-group-DEMO_01', nameServer='101.133.227.13:9876', topic='DEMO_01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
信息生产和消费信息:
2020-08-04 15:48:43.059 INFO 14056 --- [ main] c.e.s.r.producer.Demo01ProducerTest : [testSyncSend][发送编号:[1596527322] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, offsetMsgId=6585E30D00002A9F0000000000031954, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=0]]]
2020-08-04 15:48:43.605 INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer : [onMessage][线程编号:168 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=0, sysFlag=0, bornTimestamp=1596527322965, bornHost=/202.99.106.26:38252, storeTimestamp=1596527322642, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000031954, commitLogOffset=203092, bodyCRC=68977144, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1596527323605, id=8007a731-58b8-bee7-c3e5-755cf737b5bd, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596527322658}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 55, 51, 50, 50, 125], transactionId='null'}]]
2020-08-04 15:49:04.155 INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer : [onMessage][线程编号:172 消息内容:Demo01Message{id=1596527322}]
上面信息看到queueId=3
,可以到控制台去验证一下。
通过日志我们可以看到,我们发送的消息,分别被 Demo01AConsumer 和 Demo01Consumer 两个消费者(消费者分组)都消费了一次。
同时,两个消费者在不同的线程池中,消费了这条消息。虽然说,我们看到两条日志里,我们都看到了线程名为 "MessageThread_1" ,但是线程编号分别是 168 和 172 。 因为,每个 RocketMQ Consumer 的消费线程池创建的线程都是以 "MessageThread_" 开头,同时这里相同的线程名结果不同的线程编号,很容易判断出时候用了两个不同的消费线程池。
2.7.3 测试#testASyncSend()方法
注意,不要关闭上一个 #testSyncSend() 单元测试方法,Springboot每次启动的时候会向nameServer注册consumer,启动2个相当于2个consumer集群消费同样topic。这里我们要模拟每个消费者集群,都有多个 Consumer 节点。
控制台输出如下:
2020-08-04 16:06:04.245 INFO 14916 --- [ublicExecutor_1] c.e.s.r.producer.Demo01ProducerTest : [testASyncSend][发送编号:[1596528363] 发送成功,结果为:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, offsetMsgId=6585E30D00002A9F000000000003220A, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=1]]]
2020-08-04 16:06:04.291 INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer : [onMessage][线程编号:166 消息内容:Demo01Message{id=1596528363}]
2020-08-04 16:06:04.463 INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer : [onMessage][线程编号:175 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=1, sysFlag=0, bornTimestamp=1596528364166, bornHost=/202.99.106.26:38723, storeTimestamp=1596528363846, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F000000000003220A, commitLogOffset=205322, bodyCRC=408850356, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1596528364463, id=ee6e1a9c-4a78-e840-1820-616637e5a9b3, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596528364026}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 56, 51, 54, 51, 125], transactionId='null'}]]
和 #testSyncSend() 方法执行的结果,是一致的。此时,我们打开 #testSyncSend() 方法所在的控制台,不会看到有新的消息消费日志。说明,符合集群消费的机制:集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。而广播模式是每个 Consumer 实例都消费消息。
不过如上的日志,也可能出现在 #testSyncSend() 方法所在的控制台,而不在 #testASyncSend() 方法所在的控制台,但只有一个地方消费。
2.7.4 #testOnewaySend()方法
执行后控制台输出发送消息,日志输出发送编号:[1596528903]。
2020-08-04 16:15:03.479 INFO 9364 --- [ main] c.e.s.r.producer.Demo01ProducerTest : [testOnewaySend][发送编号:[1596528903] 发送完成]
消费消息在第一个控制台输出:三、@RocketMQMessageListener
在 [ Demo01Consumer] 中,我们已经使用了 @RocketMQMessageListener
注解,设置每个 RocketMQ 消费者 Consumer 的消息监听器的配置。
@RocketMQMessageListener
注解的常用属性如下:
/**
* Consumer 所属消费者分组
*
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
String consumerGroup();
/**
* 消费的 Topic
*
* Topic name.
*/
String topic();
/**
* 选择器类型。默认基于 Message 的 Tag 选择。
*
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* 选择器的表达式。
* 设置为 * 时,表示全部。
*
* 如果使用 SelectorType.TAG 类型,则设置消费 Message 的具体 Tag 。
* 如果使用 SelectorType.SQL92 类型,可见 https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/ 文档
*
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";
/**
* 消费模式。可选择并发消费,还是顺序消费。
*
* Control consume mode, you can choice receive message concurrently or orderly.
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 消息模型。可选择是集群消费,还是广播消费。
*
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 消费的线程池的最大线程数
*
* Max consumer thread number.
*/
int consumeThreadMax() default 64;
/**
* 消费单条消息的超时时间
*
* Max consumer timeout, default 30s.
*/
long consumeTimeout() default 30000L;
@RocketMQMessageListener 注解的不常用属性如下:
// 默认从配置文件读取的占位符
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* Consumer 连接的 RocketMQ Namesrv 地址。默认情况下,使用 `rocketmq.name-server` 配置项即可。
*
* 如果一个项目中,Consumer 需要使用不同的 RocketMQ Namesrv ,则需要配置该属性。
*
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* 访问通道。目前有 LOCAL 和 CLOUD 两种通道。
*
* LOCAL ,指的是本地部署的 RocketMQ 开源项目。
* CLOUD ,指的是阿里云的 ONS 服务。具体可见 https://help.aliyun.com/document_detail/128585.html 文档。
*
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
四、 @ExtRocketMQTemplateConfiguration
RocketMQ-Spring 考虑到开发者可能需要连接多个不同的 RocketMQ 集群,所以提供了 @ExtRocketMQTemplateConfiguration
注解,实现配置连接不同 RocketMQ 集群的 Producer 的 RocketMQTemplate Bean 对象。
@ExtRocketMQTemplateConfiguration
注解的具体属性,和我们在 [ 应用配置文件」]的 rocketmq.producer
配置项是一致的,就不重复赘述啦。
@ExtRocketMQTemplateConfiguration
注解的简单使用示例,代码如下:
@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer:demo.rocketmq.name-server}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
在类上,添加 @ExtRocketMQTemplateConfiguration 注解,并设置连接的 RocketMQ Namesrv 地址。
同时,需要继承 RocketMQTemplate 类,从而使我们可以直接使用 @Autowire 或 @Resource 注解,注入 RocketMQTemplate Bean 属性。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。