简介
官方简介:
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式- 单一队列百万消息的堆积能力
- 支持多种消息协议,如 JMS、MQTT 等
- 分布式高可用的部署架构,满足至少一次消息传递语义
- 提供 docker 镜像用于隔离测试和云集群部署
- 提供配置、指标和监控等功能丰富的 Dashboard
网络架构
核心概念
Broker(存储数据)
broker消息存储主要包括3个部分,分别commitLog(消息)的存储,consumeQueue(用户消费消息的位置信息)的存储,index的存储,这章分享会把这三个过程分解清楚,同时会对里面涉及的存储位置的偏移量着重讲解清楚。
1.commitLog:同步存储producer发送给broker的消息。
commitLog
几乎存储所有信息,包括消息数据,ConsumeQueues、Message Key、Tag等。与关系数据库的重做日志类似,只要存在提交日志,就可以完全恢复使用队列、消息键索引和所有其他必需的数据。(All message data are stored in commit log files
)
2.consumeQueue:存储用户消费消息的位置信息(这些信息也按顺序刷新到磁盘),由于
consumequeue
只存储固定大小的元数据,主要用于记录消费进度,因此支持随机读取。利用页面缓存预取,访问ConsumeQueue的速度与访问主内存的速度一样快,即使它是在大量消息累积的情况下。因此,Consumeue不会对读取性能带来明显的消耗。(如果消费者直接基于commitlog进行消费的话,简直就是一个恶梦,因为不同的主题的消息完全顺序的存储在commitlog文件中,根据主题去查询消息,不得不遍历整个commitlog文件,显然作为一款消息中间件这是绝不允许的。RocketMQ的ConsumeQueue文件就是来解决消息消费的。首先我们知道,一个主题,在broker上可以分成多个消费对列,默认为4个,也就是消费队列是基于主题+broker。那ConsumeQueue中当然不会再存储全量消息了,而是存储为定长(20字节,8字节commitlog偏移量+4字节消息长度+4字节tag hashcode),消息消费时,首先根据commitlog offset去commitlog文件组(commitlog每个文件1G,填满了,另外创建一个文件),找到消息的起始位置,然后根据消息长度,读取整条消息。在消费模式为集群消费的情况下数据流程如下:每个 Topic 在Broker 端都会有多个消费队列,Producer每次都会选择一个MessageQueue(此MessageQueue是一个概念模型,可以理解为与ComsumeQueue对应,即相当于选择一个ComsumeQueue)发送消息,Consumer同样也会每次都选择一个ComsumeQueue拉取消息进行消费,从而更改broker端ComsumeQueue的offset并保存在broker端,达到负载均很);在广播消费模式下offset不会保存在broker端,只会保存在consumer端,其他与集群模式保持一致
)
3.indexFile:此文件主要存储消息ID(
MessageId
)。(有时候我们需要根据消息ID,来查找消息,但是还consumeQueue中没有存储消息ID,如果不采取其他措施,又得遍历commitlog文件了,为了解决这个问题,rocketmq采用inddex文件存储消息ID
)。
Index Header结构各字段的含义:
beginTimestamp:第一个索引消息落在Broker的时间戳;
endTimestamp:最后一个索引消息落在Broker的时间戳;
beginPhyOffset:第一个索引消息在commitlog的偏移量;
endPhyOffset:最后一个索引消息在commitlog的偏移量;
hashSlotCount:构建索引占用的槽位数;
indexCount:构建的索引个数;
Slot Table:
Slot Table:里面的每一项保存的是这个topic-key是第几个索引;
根据topic-key的Hash值除以500W取余得到这个Slot Table的序列号,然后将此索引的顺序个数存入此Table中。
Slottable的位置(absSlotPos)的计算公式:40+keyHash%(500W)*4;
Index Linked List的字段含义:
keyHash:topic-key(key是消息的key)的Hash值;
phyOffset:commitLog真实的物理位移;
timeOffset:时间位移,消息的存储时间与Index Header中beginTimestamp的时间差;
slotValue:当topic-key(key是消息的key)的Hash值取500W的余之后得到的Slot Table的slot位置中已经有值了(即Hash值取余后在Slot Table中有冲突时),则会用最新的Index值覆盖,并且将上一个值写入最新Index的slotValue中,从而形成了一个链表的结构。
Index Linked List的位置(absIndexPos)的计算公式: 40+ 500W*4+index的顺序数*40;
Index,ConsumerQune的存储 与 commitLog的存储是隔离开的,非同步的
Producer生产者
消息发送方,将业务系统中产生的消息发送到brokers(brokers可以理解为消息代理),rocketmq提供了以下消息发送方式:同步、异步、单向(适用于可靠性要求不高的场景,如日志收集)。(A producer sends messages generated by the business application systems to brokers. RocketMQ provides multiple paradigms of sending: synchronous, asynchronous and one-way.
)
Producer Group
相同角色的生产者被归为同一组,譬如一个应用web应用可能会部署多个实例,那么这些实例就是一个生产组,生产者分组的作用只体现在消息回查的时候(即如果一个生产者组中的一个生产者实例发送一个事务消息到broker后挂掉了,那么broker会回查此实例所在组的其他实例,从而进行消息的提交或回滚操作)。(Producers of the same role are grouped together. A different producer instance of the same producer group may be contacted by a broker to commit or roll back a transaction in case the original producer crashed after the transaction.
)
Consumer
消息的消费方,主要又一下2种方式(A Consumer pulls messages from brokers and feeds them into application. In perspective of user application, two types of consumers are provided:
)
PullConsumer
从brokers中主动获取消息并消费(Pull consumer actively pulls messages from brokers. Once batches of messages are pulled, user application initiates consuming process.
)
PushConsumer
其内部也是通过pull的方式获取消息,只是将消息获取、消息消费以及其他的维护功能进行了封装和扩展,在消息回掉时调用用户自定义i的回掉接口(Push consumer, on the other hand, encapsulates message pulling, consuming progress and maintaining other work inside, leaving a callback interface to end user to implement which will be executed on message arrival.
)
Consumer Group
和生产者组相似。主要作用体现在对消费者的负载均衡和容错方面。需要注意:相同消费者组的消费者订阅的消息主题必须相同(Similar to previously mentioned producer group, consumers of the exactly same role are grouped together and named Consumer Group.Consumer Group is a great concept with which achieving goals of load-balance and fault-tolerance, in terms of message consuming, is super easy. **Warning**: consumer instances of a consumer group must have exactly the same topic subscription(s).
)
Topic
主题就是消息传递的类型。一个生产者实例可以发送消息到多个主题,多个生产者实例也可以发送消息到同一个主题。同样的,对于消费者端来说,一个消费者组可以订阅多个主题的消息,一个主题的消息也可以被多个消费者组订阅
(Topic is a category in which producers deliver messages and consumers pull messages. Topics have very loose relationship with producers and consumers. Specifically, a topic may have zero, one or multiple producers that sends messages to it; conversely, a producer can send messages of different topics. In consumer’s perspective, a topic may be subscribed by zero, one or multiple consumer groups. And a consumer group, similarly, may subscribe to one or more topics as long as instances of this group keep their subscription consistent.
)
Message
消息就像是你发送邮件信息。每个消息必须指定一个主题,就好比每个信封上都必须写明收件人。(Message is the information to be delivered. A message must have a topic, which can be interpreted as address of your letter to mail to. A message may also have an optional tag and extra key-value pairs. For example, you may set a business key to your message and look up the message on a broker server to diagnose issues during development.
)
Tag
标签,可以被当作是子主题。主要用于区分同一个主题下的不同作用或者说不同业务的消息。同时也是避免主题定义过多引起性能问题,通常情况下一个生产者组只向一个主题发送消息,其中不同业务的消息通过标签或者说子主题来区分。
(Tag, in other words sub-topic, provides extra flexibility to users. With tag, messages with different purposes from the same business module may have the same topic and different tag. Tags would be helpful to keep your code clean and coherent, and tags also can facilitate the query system RocketMQ provides.
)
Message Queue
消息主题被划分为一个或者多个子主题,每个子主题被称为消息队列。主要作用是提高并发以及故障切换。(Topic is partitioned into one or more sub-topics, “message queues”.
)
Message Model
- Clustering:缺省的,Consumer的MessageModel就是CLUSTERING模式,也就是同1个Consumer Group内部,多个Consumer分摊同1个topic的多个queue,也就是负载均衡。
- Broadcasting:同1个Consumer Group内部也变成了广播,此时ConsumerGroup其实就没有区分的意义了。此时,不管是1个Consumer Group,还是多个Consumer Group,对同1个topic的消息,都变成了广播。
nameserver
Name Server 为 producer 和 consumer 提供路由信息。相对来说,nameserver的稳定性非常高。原因有二:
- nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。无状态
- nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
broker与nameserver关系
连接
单个broker和所有nameserver保持长连接
心跳
心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。
断开
时机:broker挂掉;心跳超时导致nameserver主动关闭连接
动作:一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者
负载均衡
- 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。
- 如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。
- topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。
可用性
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。
这里有两个关键点:
- 一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。
- 消费者得到master宕机通知后,转向slave消费(重定向,对于2次开发者透明),但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。
可靠性
- 所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高
- 同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠
- 异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电
消息清理
扫描间隔
默认10秒,由broker配置参数cleanResourceInterval决定
空间阈值
物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%
清理时机
默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值
文件保留时长
默认72小时,由broker配置参数fileReservedTime决定
读写性能
- 文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高
- 永远一个文件在写,其他文件在读
- 顺序写,随机读
- 利用linux的sendfile???mmap+write吧机制,将消息内容直接输出到sokect管道,避免系统调用
系统特性
- 大内存,内存越大性能越高,否则系统swap会成为性能瓶颈
- IO密集
- cpu load高,使用率低,因为cpu占用后,大部分时间在IO WAIT
磁盘可靠性要求高,为了兼顾安全和性能,采用RAID10阵列
磁盘读取速度要求快,要求高转速大容量磁盘
消费者与nameserver关系
连接
单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
心跳
与nameserver没有心跳
轮询时间
默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。
消费者与broker关系
连接
单个消费者和该消费者关联的所有broker保持长连接。
心跳
默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费
断开
时机:消费者挂掉;心跳超时导致broker主动关闭连接
动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费
负载均衡
集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
消费机制
本地队列
消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。
轮询间隔
消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。
消息消费数量
监听器每次接受本地队列的消息是多少条?这个参数由DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。
消费进度存储
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。
如果一个topic在某broker上有3个队列,一个消费者消费这3个队列,那么该消费者和这个broker有几个连接?
一个连接,消费单位与队列相关,消费连接只跟broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务
生产者与nameserver关系
连接
单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
轮询时间
默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由DefaultMQProducer的pollNameServerInteval参数决定,可手动配置。
心跳
与nameserver没有心跳
生产者与broker关系
连接
单个生产者和该生产者关联的所有broker保持长连接。
心跳
默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。
连接断开
移除broker上的生产者信息
负载均衡
生产者时间没有关系,每个生产者向队列轮流发送消息
Broker集群配置方式及优缺点
单个 Master
这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
多 Master 模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master(每个master之间是互不通信).
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。
### 先启动 NameServer,例如机器 IP 为:172.16.8.106:9876
nohup sh mqnamesrv &
### 在机器 A,启动第一个 Master
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 在机器 B,启动第二个 Master
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
多 Master 多 Slave 模式,异步复制
每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级(每个master之间是互不通信)。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
### 先启动 NameServer,例如机器 IP 为:172.16.8.106:9876
nohup sh mqnamesrv &
### 在机器 A,启动第一个 Master
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器 B,启动第二个 Master
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器 C,启动第一个 Slave
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器 D,启动第二个 Slave
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
多 Master 多 Slave 模式,同步双写
每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功(每个master之间是互不通信)。
优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
### 先启动 NameServer,例如机器 IP 为:172.16.8.106:9876
nohup sh mqnamesrv &
### 在机器 A,启动第一个 Master
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### 在机器 B,启动第二个 Master
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### 在机器 C,启动第一个 Slave
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### 在机器 D,启动第二个 Slave
nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
以上 Broker 与 Slave 配对是通过指定相同的brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave的BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。
安装
下载安装
下载RocketMQ,在每个节点,解压到指定目录
alibaba-rocketmq-3.2.6.tar.gz
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local
解压后的文件夹:alibaba-rocketmq 进入bin目录。注:RocketMQ需要jdk1.7及以上版本
启动NameServer
[root@m106 2m-2s-sync]# nohup sh mqnamesrv &
[2] 17938
[root@m106 2m-2s-sync]# nohup: ignoring input and appending output to `nohup.out'
查看nohup.out文件中:
The Name Server boot success.表示NameServer启动成功。Jps查看NameServer进程
启动BrokerServer a, BrokerServer b
在m106上启动master A
[root@m106 bin]# nohup sh mqbroker -n 172.16.8.106:9876 -c ../conf/2m-noslave/broker-a.properties &[1] 17206
在m107上启动master B
[root@m107 bin]# nohup sh mqbroker -n 172.16.8.106:9876 -c ../conf/2m-noslave/broker-b.properties &[1] 14488
Jps查看服务启动情况
[root@m106 bin]# jps
12494 HRegionServer
12240 Kafka
16556 DataNode
18499 NamesrvStartup
13101 RunJar
17210 BrokerStartup
创建topic
[root@m106 bin]# sh mqadmin updateTopic
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg> create topic to which broker
-c,--clusterName <arg> create topic to which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg> set topic's order(true|false
-p,--perm <arg> set topic's permission(2|4|6), intro[2:R; 4:W; 6:RW]
-r,--readQueueNums <arg> set read queue nums
-s,--hasUnitSub <arg> has unit sub (true|false
-t,--topic <arg> topic name
-u,--unit <arg> is unit topic (true|false
-w,--writeQueueNums <arg> set write queue nums
实例:
[root@m106 bin]# sh mqadmin updateTopic -n 172.16.8.106:9876 -c DefaultCluster -t TopicTest1
create topic to 172.16.8.107:10911 success.
TopicConfig [topicName=TopicTest1, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
删除topic
[root@m106 bin]# sh mqadmin deleteTopic -n 172.16.8.106:9876 -c DefaultCluster -t TopicTest1
delete topic [TopicTest1] from cluster [DefaultCluster] success.
delete topic [TopicTest1] from NameServer success.
查看topic信息
[root@m106 bin]# sh mqadmin topicList -n 172.16.8.106:9876
BenchmarkTest
TopicTest1
broker-a
DefaultCluster
查看topic统计信息
[root@m106 bin]# sh mqadmin topicStatus -n 172.16.8.106:9876 -t TopicTest1
#Broker Name #QID #Min Offset #Max Offset #Last Updated
broker-a 0 0 0
broker-a 1 0 0
broker-a 2 0 0
broker-a 3 0 0
broker-a 4 0 0
broker-a 5 0 0
broker-a 6 0 0
broker-a 7 0 0
查看所有消费组group
[root@m106 bin]# sh mqadmin consumerProgress -n 172.16.8.106:9876
查看指定消费组下的所有topic数据堆积情况
[root@m106 bin]# sh mqadmin consumerProgress -n 172.16.8.106:9876 -g ConsumerGroupName
使用指南
客户端寻址方式
在代码中指定NameServer地址
Producer.setNamesrvAddr(“192.168.8.106:9876”);
Consumer.setNamesrvAddr(“192.168.8.106:9876”);
Java启动参数中指定NameServer地址
-Drocketmq.namesrv.addr=192.168.8.106:9876
环境变量指定NameServer地址
export NAMESRV_ADDR=192.168.8.106:9876
http静态服务器寻址
客户端启动后,会定时访问一个静态的HTTP服务器,地址如下:
http://jmenv.tbsite.net:8080/rocketmq/msaddr
这个URL的返回内容如下:
192.168.8.106:9876
客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的NameServer地址。URL已经在代码中写死,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:
10.232.22.67 jmenv.taobao.net
客户端的公共配置类:ClientConfig
参数名 | 默认值 | 说明 |
---|---|---|
NamesrvAddr | NameServer地址列表,多个nameServer地址用分号隔开 | |
clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等) |
clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
pollNameServerInteval | 30000 | 轮训Name Server 间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
3. Producer配置
参数名 | 默认值 | 说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定key |
defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
maxMessageSize | 131072 | 客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K) |
transactionCheckListener | 事物消息回查监听器,如果发送事务消息,必须设置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
PushConsumer配置
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组 |
messageModel | CLUSTERING | 消息模型,支持以下两种1.集群消费2.广播消费 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Subscription | {} | 订阅关系 |
messageListener | 消息监听器 | |
offsetStore | 消费进度存储 | |
consumeThreadMin | 10 | 消费线程池数量 |
consumeThreadMax | 20 | 消费线程池数量 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
Pullinterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
PullConsumer配置
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒 |
messageModel | BROADCASTING | 消息模型,支持以下两种:1集群消费 2广播模式 |
messageQueueListener | 监听队列变化 | |
offsetStore | 消费进度存储 | |
registerTopics | 注册的topic集合 | |
allocateMessageQueueStrategy | Rebalance算法实现策略 |
Broker配置参数
查看Broker默认配置
sh mqbroker -m
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
listenPort | 10911 | Broker对外服务的监听端口 |
namesrvAddr | Null | Name Server地址 |
brokerIP1 | 本机IP | 本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。 |
brokerName | 本机主机名 | |
brokerClusterName | DefaultCluster | Broker所属哪个集群 |
brokerId | 0 | BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对 |
storePathCommitLog | $HOME/store/commitlog | commitLog存储路径 |
storePathConsumeQueue | $HOME/store/consumequeue | 消费队列存储路径 |
storePathIndex | $HOME/store/index | 消息索引存储队列 |
deleteWhen | 4 | 删除时间时间点,默认凌晨4点 |
fileReservedTime | 48 | 文件保留时间,默认48小时 |
maxTransferBytesOnMessageInMemory | 262144 | 单次pull消息(内存)传输的最大字节数 |
maxTransferCountOnMessageInMemory | 32 | 单次pull消息(内存)传输的最大条数 |
maxTransferBytesOnMessageInMemory | 65535 | 单次pull消息(磁盘)传输的最大字节数 |
maxTransferCountOnMessageInDisk | 8 | 单次pull消息(磁盘)传输的最大条数 |
messageIndexEnable | TRUE | 是否开启消息索引功能 |
messageIndexSafe | FALSE | 是否提供安全的消息索引机制,索引保证不丢 |
brokerRole | ASYNC_MASTER | Broker的角色-ASYNC_MASTER异步复制Master-SYNC_MASTER同步双写Master-SLAVE |
flushDiskType | ASYNC_FLUSH | 刷盘方式-ASYNC_FLUSH异步刷盘-SYNC_FLUSH同步刷盘 |
cleanFileForciblyEnable | TRUE | 磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用FALSE标记服务不可用,文件不删除 |
参考
https://www.jianshu.com/p/824066d70da8
https://blog.csdn.net/zengxiaosen/article/details/78896820
https://www.jianshu.com/p/bc85c0695da0IndexWE
http://www.cnblogs.com/xiaodf/p/5075167.html