序
本文主要研究一下rocketmq的SequenceProducerImpl
SequenceProducerImpl
io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
private BlockingQueue<Message> msgCacheQueue;
public SequenceProducerImpl(final KeyValue properties) {
super(properties);
this.msgCacheQueue = new LinkedBlockingQueue<>();
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public void send(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
try {
Validators.checkMessage(rmqMessage, this.rocketmqProducer);
} catch (MQClientException e) {
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
}
msgCacheQueue.add(message);
}
@Override
public void send(final Message message, final KeyValue properties) {
send(message);
}
@Override
public synchronized void commit() {
List<Message> messages = new ArrayList<>();
msgCacheQueue.drainTo(messages);
List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
for (Message message : messages) {
rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
}
if (rmqMessages.size() == 0) {
return;
}
try {
SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
String[] msgIdArray = sendResult.getMsgId().split(",");
for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
}
} catch (Exception e) {
throw checkProducerException("", "", e);
}
}
@Override
public synchronized void rollback() {
msgCacheQueue.clear();
}
}
- 采用的是LinkedBlockingQueue,send方法实际调用的是添加到队列
- 另外提供了commit以及rollback方法,都加了synchronized保证对LinkedBlockingQueue操作的线程安全
- commit的时候,将queue的数据drainTo到list,然后批量发送;rollback的时候清空整个LinkedBlockingQueue
小结
rocketmq的SequenceProducerImpl在send方法的时候不是真正方法,而是添加到队列,只有在commit的时候才批量发送,rollback的时候清空队列。这里的send方法语义不是太好,可以改为pending之类的名称。