顺序消息
之前我本地使用的client版本是3.6.2的,但是公司服务器上安得是3.2.6的版本。导致我测试顺序消息一直不成功。后来将client版本降低到3.2.6终于测试成功。所以在使用时,还是要注意一下版本的匹配,否则可能有诡异的错误。
producer
package com.yunsheng.orderExample;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = 0;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicOrder","TagA", "KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
解析:
要保证消息的顺序性,在发送消息时,这一组消息必须发送到同一个queue中。(一个broker默认4个queue)。
在上面的代码中,orderId表示一个订单号。
在send方法中实现了一个选择器。这个选择器的作用就是根据orderId对queue的数量取模,保证同一个orderId的所有消息落到同一个queue上。
consumer
package com.yunsheng.orderExample;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.yunsheng.Factory;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrder", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random(10);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");
}
try {
Thread.sleep(random.nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
解析:
上面保证了生产端的消息顺序性,那么消费端必须保证消息被顺序的消费。使用MessageListenerOrderly。作用是,必须等前面的消息消费完,后面的消息才能进行消费。
在代码里加了sleep验证。
结果:
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9
可以看到并不是单线程处理的,但是保证了顺序消费。