1. 步骤分析
- 导入MQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
- 消息发送者步骤分析
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
- 消息消费者步骤分析
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
2. 基本样例
2.1 消息发送
1) 发送同步消息
这种可靠性同步地发送方式使用比较广发,比如:重要的消息通知,短信通知。
public class SyncProducerTest {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题Topic
* 参数2:消息Tag
* 参数3:消息内容
*/
Message msg = new Message("base", "Tag1", ("Hello World!+ " + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus staus = result.getSendStatus();
//消息ID
String msgId = result.getMsgId();
//消息接收队列ID
int queueId = result.getMessageQueue().getQueueId();
System.out.println("发送状态:" + staus + ",消息ID:" + msgId + ",队列:" + queueId);
System.out.println("发送结果:" + result);
//线程睡一秒
TimeUnit.MILLISECONDS.sleep(100);
}
//6.关闭生产者producer
producer.shutdown();
}
}
2)发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducerTest {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题Topic
* 参数2:消息Tag
* 参数3:消息内容
*/
Message msg = new Message("base", "Tag2", ("Hello World!+ " + i).getBytes());
//5.发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
//发送状态
SendStatus staus = result.getSendStatus();
//消息ID
String msgId = result.getMsgId();
//消息接收队列ID
int queueId = result.getMessageQueue().getQueueId();
System.out.println("发送状态:" + staus + ",消息ID:" + msgId + ",队列:" + queueId);
System.out.println("发送结果:" + result);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
//线程睡一秒
TimeUnit.MILLISECONDS.sleep(100);
}
//6.关闭生产者producer
producer.shutdown();
}
3) 单向发送消息
这种方式主要用在不特别关心发送结果的场景,比如,日志发送
public class OnewayProducerTest {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址
producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 100; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题Topic
* 参数2:消息Tag
* 参数3:消息内容
*/
Message msg = new Message("base", "Tag3", ("Hello World!+单向消息 " + i).getBytes());
//5.发送消息
producer.sendOneway(msg);
// 睡眠100ms
TimeUnit.MILLISECONDS.sleep(100);
}
//6.关闭生产者producer
producer.shutdown();
}
}
2.2 消费消息
1)负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。
public class ClusterConsumerTest {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer comsumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
comsumer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
//3.订阅主题Topic和Tag
comsumer.subscribe("base", "Tag2");
//设置负载均衡消费(默认模式)
comsumer.setMessageModel(MessageModel.CLUSTERING);
//4.设置回调函数,处理消息
comsumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
comsumer.start();
}
}
2) 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
public class BroadcastConsumerTest {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer comsumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
comsumer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
//3.订阅主题Topic和Tag
comsumer.subscribe("base", "Tag1");
//广播模式消费
comsumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理消息
comsumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
comsumer.start();
}
}