RabbitMq与RocketMq知识点

1.RabbitMq是什么

Rabbitmq是使用erlang语言编写,利用高效可靠的消息传递机制进行与平台无关的数据交流。通过提供消息传递和消息排队模型,可以在分布式环境下提供应用之间的解耦,削峰填谷,异步通信功能。他的重要组成部分包括
exchange交换机:用于接收消息根据路由及绑定规则分发到某一个或多个队列上
queue队列:用于存储与持久化消息
bindingkey绑定规则:将队列与交换机按照某种规则绑定在一起。
rabbitmq模型图:

图片.png

Queue

Queue是rabbitMq中内部一个对象,用于存储producer发送的消息。Exchange不参与存储,所有的消息只会在QUeue中持久化,consumer可以订阅queue中消息进行消费,一个队列可以被多个消费者监听,消息是通过轮询机制分配给消费者。队列是不支持广播消费模式,广播只针对Exchange的fanout模式可以分发到不同binding的队列上。

Exchange

生产者投递的消息进行接收与转发到相应的队列上。生产者投递消息需要指定routingkey,exchange根据routingkey匹配对应的bindingkey将消息投递到队列上。
bindingkey是将exchange与queue相互绑定的关系。有了这层关系之后exchange根据routingkey就好路由到对应的队列了。
Exchange交换机上有几种类型分别是:Direct,Topic,Fanout,Header
Direct:精准匹配,当Exchange类型为Direct时,生产者投递消息需要携带routingkey精准匹配到bindingkey才可以投递到队列上。
Topic:模糊匹配,*代表一个单词,#代表多个单词,如 *.key 在生产者投递消息如果时rout.key的routingkey可以路由到该队列,如果时top.rout.key就不能路由到。如果时#.key那么rout.key和top.rout.key都可以匹配到。
Fanout:广播类型,生产者投递的消息不关心routingkey值,只要与该交换机绑定的队列都会投递。
Header:消息头类型,生产者在投递消息需要将消息头中设置routingkey与之匹配才可以分发到队列上,这种类型性能差很少使用。

Vhost

虚拟host,启动rabbit服务可以创建多个vhost配置不同的权限,每个vhost都有独立的exchange,queue和binding规则,可以有效的规避不同application可用拥有独立的rabbitmq。

2.持久化配置

持久化包括Exchange,Queue,Message持久化

Exchange持久化

设置durable=true,表示将Exchange持久化到磁盘,在重启rabbitMq也不会丢失该Exchange。
autoDelete:是否自动删除,至少有一个队列与之绑定,之后与该绑定的queue自动删除
internal:是否内置exchange,ture表示内置exchange,生产者不能直接发送消息到内置的exchange,只能通过另一个非内置的exchange路由到该exchange发送消息。

Queue持久化

durable=ture,表示将queue持久化到磁盘,rabbitmq服务重启也不会丢失queue(这里是不丢失queue而不是消息持久化)
autodelete=true自动删除,前提是至少有一个消费者监听该队列,当所有消费者断开连接后该队列自动删除
exclusive:独占队列,同一个connection创建的队列其他connection是不可以创建相同名的queue

Queue消息持久化

设置basicProperties的属性deliveryMode=2消息会持久化到磁盘。

3死信队列

死信队列(Dead-letter-Queue)用来保存过期或者失败的消息,当队列的参数arguments配置Dead-Letter-Exchange当消息过期或者失败,会投递到Dead-Letter-Exchange中死信交换机会投递到死信队列中。

4.rabbitMq之Consumer

RabbitMq消费模式有两种:服务端推送(push)和 客户端拉取(pull),push适合持续订阅,pull适合单条消费。

5.RabbitMqListener注解

当方法使用@RabbitMqListener注解会监听队列消费消息。RabbitMqAnnotationBeanPostProcessor处理器实现了BeanPostProcessor接口的后置处理器postProcessAfterInitialization,当bean初始化完毕之后会调用此接口,首先获取该bean的所有方法使用@RabbitMqListener注解,
1.创建MethodRabbitListenerEndPoint对象设置endpoint属性,
2.紧接着创建RabbitListenerContainerFactory容器
3.将endpoint保存在RabbitListenerEndpointRegister中
4.在endpoint创建MessageListenerContainer
5.在MessageListenerContainer创建MessageListener,
6.在MessageListener创建一个HandlerAdapter,adapter与rabbit的broker建立连接,接收broker投递的消息到对应的blocking queue上。这样消费者接口就可以从该queue上进行poll消息进行消费。

6.Mq面试解答

mq解决问题:解耦,异步,削峰填谷。
架构引入Mq存在的问题:系统可用性降低,系统复杂性变高,数据一致性问题。

RabbitMq集群

普通模式集群:队列存在于多个节点的rabbitmq队列中,但是每个节点数据是不一样的,(某个queue上有10条数据,有3个节点rabbitmq实例,每个节点rabbitmq可能被分配3千多条消息)当某个节点mq实例宕机了,该实例上的数据是不可以被消费的,无法保证高可用性能。
镜像模式集群:多个节点的mq实例的队列,消息都是一样的,每次写消息都会同步到其他节点的mq上,任何一个节点宕机了,其他节点上数据是完整的,可以继续消费,保证高可用,但不是分布式的。queue的数据量可能会越来越大。

如何保证mq消息不丢失

rabbitmq丢失数据可能场景

1.生产者投递消息,mq接收消息发生异常,消息丢失
2.mq接收到消息,存放在mq内存中,在持久化到磁盘时候mq宕机了,消息丢失
3.消费者在接收到消息在未处理完数据消费者宕机了,消息丢失

针对上述三种丢失数据情况解决方案

事务机制,Confirm机制

情况一丢失:事务机制

事务机制是同步的,生产者发送消息会阻塞等待此消息成功还是失败,导致投递消息的性能严重下降(吞吐量低)需要手动执行开启事务,事务回滚,事务提交

try{
    channel.txSelect;
    //执行投递
   }catch(Exception e) {
     channel.txRollback; 
}
channel.txCommit; 
情况一丢失:Confirm机制

强烈推荐使用这中机制,异步吞吐量高不会阻塞
1.设置channel是confirm模式
2.生产者投递消息到mq之后就不用管了,mq收到消息处理消息无论成功还是失败,都会给生产者一个通知,生产者可以根据消息的结果进行处理,是重新投递还是丢弃消息。

情况二丢失解决方案

设置queue的durable=true表示持久化队列,在生产者发送消息将MessageProperties的deliveryMode=2表示持久化消息,持久化消息的前提是队列持久化。这样配合生产者的confirm机制,只有消息持久化之后或者处理失败的消息,mq才会通知生产者成功与否。
可能会丢失部分数据:当mq收到消息,在持久化消息之前,mq宕机了,生产者无法接受mq的ack消息,可以定时扫描未投递的消息重新发送保证消息。

情况三丢失解决方案

消费者开启的自动autoAck机制,会丢失数据。关闭autoAck机制,每次处理完消息再发送给rabbitmq,告知消费完成,如果消费者接收消息未来的及处理导致无法ack给mq,mq有检测机制,当消费者与broker断开连接会重新投递此条消息给其他的消费者消费。

如何保证mq消息顺序执行

场景:某些数据执行有先后顺序,如binglog日志对于一条数据的曾改改操作,不需要保证其顺便。如果顺序发生错乱数据也就错乱了。
rabbitmq顺序消费,一个队列一个生产者,只要投递的消息顺序是正确的,那么执行的顺序也是正确的。

>RocketMq

RocketMq介绍

RocketMq是使用Java语言编写的高可用的分布式消息中间件,

RocketMq在使用中架构图

RocektMq包括:生产者,消费者,broker,nameServer四个重要组成部分


图片.png
Broker

1.可以理解为Rocketmq本身
2.broker的主要作用是接收生产者的消息,和发送给消费者处理消息
3.broker会定时30s发送topic等路由信息到nameServer上。
4.是消息中间件的存储消息和转发服务器
5.每个Broker节点启动都会和nameserver集群的每个nameserver服务进行长连接,注册自身的信息,之后定时上报

NameServer

1.可以理解为zookeeper效果,没使用zookeeper而是自己开发了Nameserver代替zookepeer。
2.提供了服务注册,服务发现,路由管理等是一个无状态的节点。
3.nameserver是一个服务发现者,集群中的各个角色(producer,consumer,broker)都需要定时上报自身信息到nameserver上,当长时间未上报nameserver会从列表上移除。
4.nameserver可以集群部署,各个nameserver间是不互通的,其他各个角色定时上报信息,保证nameserver的高可用。
5.nameserver是内存式存储,nameserver中信息不会持久化,所以是无状态节点。

Producer

1.生产者,在与nameserver集群连接只需要从中随机选择一个nameserver进行长连接,获取topic路由信息(包括topic下的queue,queue在哪个broker上)
2.生产者会向提供topic的broker的master节点进行长连接,定时向master发送心跳。

consumer

1.消费者,与nameserver集群连接只需要随机选择一个namerserver进行长连接,获取topic路由信息连接到提供topic路由的broker上,master和slave都会进行长连接。定时会向nameserver上报信息

核心流程

broker会注册到每台的nameserver上
producer会从某一个的nameserver上拉取topic路由信息
producer与broker的master节点长连接发送消息
consumer从某一个节点的nameserver上拉取topic信息
consumer会与所有的master和slave节点长连接接收消息

概念篇

1.Message

消息的载体,Message发送或者消费必须指定一个topic,message有个可选的tag项可以过滤消息。

2.topic

消息的逻辑分类,发消息之前必须指定一个topic才可以发送,消费者消费时候也必须指定这个topic上消费,就是逻辑分类

3.queue

1个topic可以有N个queue,数量可以配置,message本身其实是存储到queue上,消费者也是通过queue消息拉取消费,

4.tag

tag是topic进一步细化,也就是标签,发送每个消息都可以打上tag标签,消费时候可以根据标签进行选择性消费。

5.Message模型

集群(每个消息只会被一个消费者消费)
广播(每条消息会被所有消费者消费)设置consumer.setMessageModel(MessageModel.BROADCASTING);

发送模式

单条同步发送,批量同步发送,异步发送(sendCallback实现类操作成功与否逻辑),sendoneway(只发不追求结果)

RocketMq事物消息和顺序消费

RocketMq支持事物消息

生产者先发送一个预发布的消息(Half message)到broker,broker收到消息会回调接口,开发者根据接收的成功与否处理本地事物,本地事物如果处理成功,将该事物进行commit,失败进行rollback,如果是commit,则将half的消息投递到真正的topic队列上,让consumer消费
图片.png
消息顺序执行

方案一:生产者只要保证需要顺序执行的消息投递到同一个队列即可,消费端使用一个线程进行消费。由于我们在创建topic会创建多条队列,在代码中需要自定义队列选择器(MessageSelectQueue)

RocketMq消息不丢失

图片.png

丢失三个阶段:
1.生产阶段:producer发送个broker,可能会丢失,网络延迟不可达等。
2.存储阶段:当broker接收到producer消息先存储到page cache中,准备持久化磁盘时候,宕机了缓存页数据丢失。
3.消费阶段:消费未启用ack机制,消费消息失败导致丢失。

解决方案

1.发送阶段数据丢失,使用同步方式实时响应这种(效率低,保证数据不丢失),异步方式成功与否会回调接口。sendOneway只负责投递(无法保证)
同步发送+自动重试+多个master+多个slave
2.broker存储阶段:Mq持久化两个行为:同步刷盘 和 异步刷盘。同步刷盘:指的是接收到消息待持久化完成后通知producer投递成功,异步刷盘:接收到消息存储到page cache缓存就通知producer投递成功,异步刷盘是10s持久化一次。
3.消费端丢失消息
设置ack机制 或者使用消息失败重试机制配置messageDelayLevel默认15次

RocketMq生产者

启动实例MQClientAPIImpl,这里封装了客户端与Broker进行通信的方法。
启动各种定时任务,与Broker之间的心跳等等。
启动消息拉取服务。
启动负载均衡服务。

启动默认的Producer服务(重复启动了,因为客户端一开始就启动了这个)。

RocketMq如何发送消息的

配招生产者组,namesever地址和topic以及要发送的内容,启动producer的star()方法,完成后调用send进行发送。
start()方法内部进行检查nameserver,生产组名等参数验证,内部会获取mqClientFactory对象,此对象包含了所有与Broker进行通信的api,通过mqclientfactory启动请求响应,启动一些定时任务,与broker的心跳,启动负载均衡等。
send()发送消息有三种方式:单向发送,同步发送,异步发送,区别在与异步的是线程池一步发出请求,同步当前线程同步调用,核心都是:先选择一个适合的queue存储消息,组装header参数,发送给broker,发送失败会自动重试默认发送次数3次,

Broker收到消息如何持久化

broker收到消息持久化有两种:同步和异步,一般选择异步持久化,同步效率比异步低但是同步更可靠。
MappedFile对应的每个CommitLog文件,MappedFileQueue相当于文件夹,管理所有的文件,还有一个管理者CommitLog对象,他负责写入,读取操作。broker拿到消息,将消息 topic queue等内容存储到byteBuffer中,然后持久化到commitlog文件中,默认文件大小1G,超出1G重新创建commitlog文件来存储。

消息投递到队列的算法

从接口调用方可以看出有两种:一种是直接发送内部使用内置的投递queue的算法(三种),第二种是使用自定义的队列选择器。
内置的queue选择算法有三种:随机算法(selectMessageQueueByRandom),hash算法(SelectMessageQueuebyHash)
选择过程:在不开启容错策略情况下,轮询队列进行发送,如果失败了,重试时候会过滤掉发送失败的broker。
在开启容错策略情况下,会通过RocketMq预测机制来预测broker是否可用,如果上次失败的broker可用,还是会选择这个broker,如果不可用,会随机选择一个进行发送,在发送消息的时候会记录一下调用时间和是否报错,根据该时间预测broker是否可用

Consumer如何做负载均衡

queue的个数大于consumer个数,且queue能被consumer整除,那么queue会平均分配给consumer。()如4queue 2consumer,那么每个consumer分配两个queue。4个queue 4个consumer那么每个consumer分配1个queue)
queue的个数大于consumer个数,queue的个数不能被consumer整除,会出现一个consumer比其他consumer多分配一个queue(如:4queue 3个consumer,有一个consumer分配到2个queue)
queue的个数小于consumer个数,多余的consumer会被闲置,也就浪费consumer资源。(如 4queue 5个consumer,就会有一个consumer浪费)
策略:allocateMessageQueueAveragely(平均分配)
allocatemessageQueueAveragelyByCircle 环状的平均消费。
allocateMessageQueueConsistentHash 一致性hash算法
allocateMachineRoomNearby:就近原则,consumer离机房近的就近消费
allocateMessageQueueByConfig:配置方式

Mq选型

activemq 性能不如 rabbitmq,rocketmq,kafka等。适用于小型系统,不支持分布式部署
rabbitmq 不支持分布式部署,镜像部署,无法扩容,对于消息堆积影响mq性能,对于顺序消费,只能部署对于该exchange绑定的queue单台服务。(小型的系统)
rocketmq 天然的分布式部署,水平扩展,每秒几十万的并发量,对于顺序消费可以有效的保证,无需考虑consumer端是否是集群部署,只要确保投递到一个queue中即可(推荐使用)
kafka适用于日志收集系统,分布式部署,

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容