RabbitMQ 可靠投递

RabbitMQ 可靠投递

标签: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投递

背景

confirmCallback 确认模式

returnCallback 未投递到 queue 退回模式

shovel-plugin 跨机房可靠投递

背景

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两个选项用来控制消息的投递可靠性模式。

rabbitmq 整个消息投递的路径为:

producer->rabbitmq broker cluster->exchange->queue->consumer

message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback 。

message 从 exchange->queue 投递失败则会返回一个 returnCallback 。我们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。

confirmCallback 确认模式

在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。

CachingConnectionFactory factory =newCachingConnectionFactory();factory.setPublisherConfirms(true);//开启confirm模式

RabbitTemplate rabbitTemplate =newRabbitTemplate(factory);rabbitTemplate.setConfirmCallback((data, ack, cause)->{if(!ack) {              log.error("消息发送失败!"+ cause + data.toString());        }else{            log.info("消息发送成功,消息ID:"+ (data !=null? data.getId() :null));        }    });

我们来看下 ConfirmCallback 接口。

publicinterfaceConfirmCallback{/**        * Confirmation callback.        *@paramcorrelationData correlation data for the callback.        *@paramack true for ack, false for nack        *@paramcause An optional cause, for nack, when available, otherwise null.        */voidconfirm(CorrelationData correlationData,booleanack, String cause);    }

重点是 CorrelationData 对象,每个发送的消息都需要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。

发送的时候创建一个 CorrelationData 对象。

User user = new User();user.setID(1010101L);user.setUserName("plen");rabbitTemplate.convertAndSend(exchange, routing, user,message-> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);returnmessage;        },new CorrelationData(user.getID().toString()));

这里将 user ID 设置为当前消息 CorrelationData id 。当然这里是纯粹 demo,真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。

消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback

被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。

在此我向大家推荐一个架构学习交流群。交流学习群号:478030634 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

returnCallback 未投递到queue退回模式

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。

同样创建 ConnectionFactory 到时候需要设置 PublisherReturns(true) 选项。

CachingConnectionFactory factory =newCachingConnectionFactory();factory.setPublisherReturns(true);//开启return模式

rabbitTemplate.setMandatory(true);//开启强制委托模式rabbitTemplate.setReturnCallback((message, replyCode, replyText,                    exchange, routingKey) ->log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

shovel-plugin 跨机房可靠投递

RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受机房之间的网络断开、机器下线等不稳定因素。

这里有两个 broker :

10.211.55.3 rabbit_node1

10.211.55.4 rabbit_node2

我们希望将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。我们先开启 rabbit_node1 的 shovel-plugin

先看下当前 RabbitMQ 版本是否安装了 shovel-plugin,如果有的话直接开启。

rabbitmq-plugins  listrabbitmq-pluginsenablerabbitmq_shovelrabbitmq-pluginsenablerabbitmq_shovel_management

然后就可以在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,amqp://user:password@server-name/my-vhost 。

如果配置没有问题的话,应该是这样的一个状态,说明已经顺利连接到 rabbit_node2 broker 。

我们来看下 rabbit_node1 和 rabbit_node2 的 Connections 面板。

rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 创建了两个 tcp 连接,端口 39544 连接是用来消费 plen.queue 里的消息,端口 55706 连接是用来推送消息给 rabbit_node2 。

我们来看下 rabbit_node1 tcp 连接状态:

tcp60      0 10.211.55.3:567210.211.55.3:39544ESTABLISHEDtcp0      0 10.211.55.3:5570610.211.55.4:5672ESTABLISHED

rabbit_node2 tcp 连接状态:

tcp60      0 10.211.55.4:567210.211.55.3:55706ESTABLISHED

为了验证 shovel-plugin 稳定性,我们将 rabbit_node2 下线。

然后再发送消息,发现消息会现在 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 连接恢复将消费 rabbit_node1 plen.queue 消息,然后投递给 rabbit_node2 plen.queue 。

在此我向大家推荐一个架构学习交流群。交流学习群号:478030634 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

大家觉得文章对你还是有一点点帮助的,大家可以点击下方二维码进行关注。《Java烂猪皮》公众号聊的不仅仅是Java技术知识,还有面试等干货,后期还有大量架构干货。大家一起关注吧!关注烂猪皮,你会了解的更多..............

原文出处:https://www.cnblogs.com/wangiqngpei557/p/9381478.html

作者:王清培 (沪江集团资深JAVA架构师)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,590评论 18 139
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,805评论 0 3
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,555评论 2 39
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,861评论 2 11
  • 整体架构 部署步骤 基于 Docker 基本概念内存节点只保存状态到内存,例外情况是:持久的 queue 的内容将...
    mvictor阅读 12,741评论 5 30