RocketMQ的消息发送和消费

生产消息

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:
1)设置Producer的GroupName。
2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的
InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。
4)设置NameServer地址
5)组装消息并发送。
消息发生返回状态(SendResult#SendStatus)有如下四种:
FLUSH_DISK_TIMEOUT
FLUSH_SLAVE_TIMEOUT
SLAVE_NOT_AVAILABLE
SEND_OK
不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:
1) FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。
2) FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步。
3)SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
4)SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。
写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。
提升写入的性能
发送一条消息出去要经过三步
1)客户端发送请求到服务器。
2)服务器处理该请求。
3)服务器向客户端返回应答。
一次消息的发送耗时是上述三个步骤的总和。
在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway方式发送Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。
用这种方式发送消息的耗时可以缩短到微秒级。
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。
目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。

消息消费

简单总结消费的几个要点:

  1. 消息消费方式(Pull和Push)
  2. 消息消费的模式(广播模式和集群模式)
  3. 流量控制(可以结合sentinel来实现,后面单独讲)
  4. 并发线程数设置
  5. 消息的过滤(Tag、Key) TagA||TagB||TagC * null
    当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。
    1.提高消费并行度
    在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度。
    通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。
    注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。
    此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。
    2.以批量方式进行消费
    某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
    可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
    3.检测延时情况,跳过非重要消息
    Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容