架构图
基本概念
Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
Broker
消息中转角色,负责存储消息,转収消息,一般也称为 Server。
NameServer
类似于zookeeper
概念模型
即消息是根据主题(即我们图里面的Topic)进行订阅,而每个Topic下面又可以有多个队列,只是这里的队列并不真正存储消息,而是起到类似索引的作用,消息真正存储在CommitLog里面,如下图
所有数据单独存储到一个 Commit Log,完全顺序写,随机读。对最终用户展现的队列实际只存储消息在 Commit Log 的位置信息
Message Queue
在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。
这样做的好处
(1)队列轻量化,单个队列数据量非常少
(2)对磁盘的访问串行化,避免磁盘竟争,丌会因为队列增加导致 IOWAIT 增高
这样做的缺点
(1)写虽然完全是顺序写,但是读却发成了完全的随机读。
(2)读一条消息,会先读 Consume Queue,再读 Commit Log,增加了开销。
(3)要保证 Commit Log 不 Consume Queue 完全的一致,增加了编程的复杂度
RocketMq的解决方案
随机读(主要是指磁盘随机读),尽可能让读命中 PAGECACHE,减少 IO 读操作,所以内存越大越好。同时由于缓存的局部性原理,可以很快的在内存上读取到消息
RocketMq里面的消息类型
顺序消息
消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送到同一个队列,返样 Consumer 就可以按照 Producer 发送的顺序去消费消息。
普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
严格顺序消息
严格顺序消息顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
获取消息的方式
Broker主动进行推送至消费者
缺点:消费者可能消费过慢造成堆积,同时如果有很多消费者对于Broker也是一件很繁重的事情长轮询
即消费者会主动去拉取,缺点是可能获取不及时,但长轮询指的是我会多等一会,类似于长连接
RocketMq里面消息的几种消费方式
涉及到磁盘,就会有零拷贝,RocketMq也不例外,常用的零拷贝有如下两种方式
内存映射
对应的java代码
File file = new File("data.zip");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = raf.getChannel();
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
真正的零拷贝
对应的java代码
File file = new File("test.zip");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = raf.getChannel();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
// 直接使用了transferTo()进行通道间的数据传输
fileChannel.transferTo(0, fileChannel.size(), socketChannel);
这两种方式的比较
使用mmap + write方式
优点:即使频繁调用,使用小块文件传输,效率也很高
缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题。
使用sendfile方式
优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全新问题。
缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO。
RocketMq采用的是基于内存映射的方式,因为小块数据传输的更为频繁
消息的持久化
异步刷盘
写入到Page Cache后就立马返回了,然后再调用fsync函数异步的去将数据刷到磁盘
优点
效率高又快
缺点
断点或者重启,内存里面的数据还没来得及刷入到磁盘就没有了,所以会有丢消息的概率
同步刷盘
当然就是写入到Page Cache后就立马调用fsync函数立马刷入到磁盘
优点
可以做到不丢消息
缺点
当然就是牺牲性能了
接着再来分析下 几种消费消息的方式
At least Once
是指每个消息必须投递一次,RocketMQConsumer先pull消息到本地,消费完成后,才吐服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性
Exactly Only Once
(1).发送消息阶段,不允许収送重复的消息。
(2).消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情况下,才能称为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然步能严格保证不重复,但是正常情冴下很少会出现重复収送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。此问题的本质原因是网络调用存在不确定性,即不成功也不失败的第三种状态,所以才产生了消息重复性问题。
定时消息
定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在Broker局面,必须要做消息排序,如果再涉及到持久化,那消息排序要不可避免的产生巨大性能开销。RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
消息过滤
RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下Consume Queue的存储结构
(1)在Broker端迕行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。(2).Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode
这么做的原因?
(1)Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间
(2)过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤
(3)即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失
高可用
谈到高可用,自然就想到集群,那么多台机器间消息的同步方式就有同步双写和异步复制两种
异步复制
异步复制的实现思路非常简单,Slave启劢一个线程,不断从Master拉取Commit Log中的数据,然后在异步build出Consume Queue数据结构。整个实现过程基本同Mysql主从同步类似。
同步双写
也类似于Mysql的半同步复制,即主上写完,其中一台从也要写完才统一返回给客户端ok.整体思想是类似的
上面我们谈到RocketMq没有使用Zookeeper而是自己实现了NameServer
public boolean initialize() {
.....
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
.....
}
- 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker
- 定时任务2:NameServer每隔10分钟打印一次KV配置
我们可以看到,集群其实就是维护心跳,这里面其实还有很多细节,还没看完,看完再更新吧
Producer最佳实践
发送消息注意事项
(1)一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。message.setTags("TagA");
(2)每个消息在业务局面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,返样可以避免潜在的哈希冲突。// 订单IdString orderId = "20034568923546";message.setKeys(orderId);
(3)消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。
(4)send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义
SEND_OK消息发送成功
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失对与精确发送顺序消息的应用,由亍顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样。
(5)对于消息不可丢失应用,务必要有消息重发机制
例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。
Consumer最佳实践
(1)将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情冴(有多种原因),返种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。
2.使用业务局面的状态机去重
具体可见幂等总结
最后讲一下集群的搭建
Master-Slave 方式
1.服务器环境
序号 | IP | 用户名 | 密码 | 角色 | 模式 |
---|---|---|---|---|---|
1 | 192.168.11.128 | root | *** | nameServer1,brokerServer1 | Master1 |
2 | 192.168.11.129 | root | *** | nameServer2,brokerServer2 | Master2 |
2.Hosts 添加信息
IP | NAME |
---|
192.168.11.128 rocketmq-nameserver1
192.168.11.128 rocketmq-master1
192.168.11.129 rocketmq-nameserver2
192.168.11.129 rocketmq-master1-slave
vi /etc/hosts
3.上传解压【两台机器】
# 上传 apache-rocketmq.tar.gz 文件至/usr/local
# tar -zxvf apache-rocketmq.tar.gz -C /usr/local
# ln -s apache-rocketmq rocketmq ll /usr/local
4.创建存储路径【两台机器】
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
5.RocketMQ 配置文件【两台机器】
# vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
# vim /usr/local/rocketmq/conf/2m-2s-async /broker-a-s.properties
配置文件如下
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数 defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
6.修改日志配置文件【两台机器】
# mkdir -p /usr/local/rocketmq/logs
# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
7.修改启动脚本参数【两台机器】
runbroker.sh脚本
# vim /usr/local/rocketmq/bin/runbroker.sh
脚本如下
修改为1个g就好了
#开发环境
JVM Configuration JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
runserver.sh脚本
# vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
8.启动 NameServer【两台机器】
# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &
9.启动 BrokerServer A【192.168.11.128】
# cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
10.启动 BrokerServer B【192.168.11.129】
# cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a-s.properties
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
11.停止命令
# cd /usr/local/rocketmq/bin
# sh mqshutdown broker
# sh mqshutdown namesrv
# --等待停止
# rm -rf /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
# --按照上面步骤重启 NameServer 与 BrokerServer