写在前面
当使用spring rabbit
的时候我以为生产者发送消息是有序的,直到某天线上出现问题(支付消息在前,下单消息在后)才下定决心要看看spring rabbit
的源码。
CachingConnectionFactoryd 的基本思路
有一个LinkedList
当作缓存channel
的队列,线程获取channel
的时候都是从队头拿,而且每次只有一个线程进入临界区拿channel
(channel
不能被多个线程共享),只有上一个线程退出临界区,后一个线程才能进入。当线程使用完channel
后会将channel
放入队尾。
rabbitmq
只保证在同一个channel
中,生产者发送消息到队列是有序的。详情见rabbitmq-message-order-of-delivery。
所以spring rabbitmq
的这种设计就有可能使用不同的channel
发送下单、支付消息,接着在队列里面支付消息在前,下单消息在后。
源码
获取channel
CachingConnectionFactory.getChannel
这是获取channel
的入口
// 缓存channel队列
LinkedList<ChannelProxy> channelList;
if (this.cacheMode == CacheMode.CHANNEL) {
channelList = transactional ? this.cachedChannelsTransactional
: this.cachedChannelsNonTransactional;
}
else {
channelList = transactional ? this.allocatedConnectionTransactionalChannels.get(connection)
: this.allocatedConnectionNonTransactionalChannels.get(connection);
}
ChannelProxy channel = null;
if (connection.isOpen()) {
//临界区
synchronized (channelList) {
while (!channelList.isEmpty()) {
// 从对头获取channel
channel = channelList.removeFirst();
if (logger.isTraceEnabled()) {
logger.trace(channel + " retrieved from cache");
}
if (channel.isOpen()) {
break;
}
else {
Channel target = channel.getTargetChannel();
if (target != null) {
target.close();
}
channel = null;
}
}
}
}
//队列中没有channel ,则新建channel
if (channel == null) {
channel = getCachedChannelProxy(connection, channelList, transactional);
}
return channel;
channel放进队尾
CachingConnectionFactory.CachedChannelInvocationHandler.logicalClose
.....
// Allow for multiple close calls...
if (!this.channelList.contains(proxy)) {
// channel 放入队尾
this.channelList.addLast(proxy);
setHighWaterMark();
}
...
最后
如果你的业务对生产者发送消息的顺序比较敏感,使用spring rabbit
的时候就要小心了。