简介
官方简介:
- RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
1.还是先理解一些重要概念及说明
- Disk Flush(磁盘刷新/同步操作):就是将内存的数据落地,存储在磁盘中。RocketMQ提供了以下两种模式:
- SYNC_FLUSH(同步刷盘):生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问
题,但是有很大的磁盘IO开销,性能有一定影响。- ASYNC_FLUSH(异步刷盘):生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步>的将缓存数据保存到磁盘,有两种情况:1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种>方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式。
- Broker Replication(Broker间数据同步/复制):集群环境下需要部署多个Broker,Broker分为两种角色:一种是master,即可以写也可以>
读,其brokerId=0,只能有一个;另外一种是slave,只允许读,其brokerId为非0。一个master与多个slave通过指定相同的brokerName被归为一>个broker set(broker集)。通常生产环境中,我们至少需要2个broker set。Broker Replication只的就是slave获取或者是复制master的数据。
- Sync Broker:生产者发送的每一条消息都至少同步复制到一个slave后才返回告诉生产者成功,即“同步双写”。
- Async Broker:生产者发送的每一条消息只要写入master就返回告诉生产者成功。然后再“异步复制”到slave。
- 推荐的几种Broker集群方式:(官网提供了下面几种集群方式的配置文件供参考,在$ROCKETMQ_HOME/target/apache-rocketmq-all/conf目>录下)
- 2m-2s-sync:两主两从同步双写(两个master,两个slave,数据同步双写到master和slave)
- 2m-2s-async:两主两从异步复制(两个master,两个slave,master数据通过异步复制到slave)
- 2m-noslave:两主(只有两个master,没有slave)
注意:
1、上述“2”只是说作为一个集群的最低配置数量,可以根据实际情况扩展。
2、所有的刷盘(Dish Flush)操作全部默认为:ASYNC_FLUSH(异步刷盘)。- Name Server集群:Name Server集群比较简单,只要部署多个实例就行了,多个实例间不需要进行数据共享,只要保证一个实例存活就可>以正常运转。
2、三种Broker集群方式优缺点
上面三种集群方式的优缺点(主要区别在于主从复制方式):
多Master模式(2m-noslave)
一个集群无Slave,全是Master,例如2个Master或者3个Master
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。多Master多Slave模式,异步复制(2m-2s-async)
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明。不需要人工干预。性能同多Master模式几乎一样。
缺点:Master宕机,磁盘损坏情况,会丢失少量消息。多Master多Slave模式,同步双写(2m-noslave)
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功。
优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低10%左右,发送单个消息的RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
3、安装集群 (broker-a)(ip :99.48.66.80)
- 为了避免乱七八糟的错误建议先关闭防火墙
- 目录 /usr/local/src
1.这是一个master的配置
cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#创建文件夹
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=0
#nameServer地址,分号分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=11911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=80
#存储路径
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默认4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
********************这里是公共可选配置 (maste ,slave)********************************
crunserver.sh 配置( /usr/local/src/rocketmq-all-4.2.0-bin-release/bin) 此处根据自己的硬件配置来调整
-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m
runbroker.sh( /usr/local/src/rocketmq-all-4.2.0-bin-release/bin) 此处根据自己的硬件配置来调整
-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m
**********************这里是公共配置 (maste ,slave)********************************
启动命令如下:
#启动master:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
1.这是一个slave的配置
cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#创建文件夹
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=1
#nameServer地址,分号分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=11911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=80
#存储路径
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默认4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
#启动name server:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqnamesrv &
#启动slave:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
4、安装集群 (broker-b)(ip :99.48.66.82)
- 为了避免乱七八糟的错误建议先关闭防火墙
- 目录 /usr/local/src
1.这是一个master的配置
cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#创建文件夹
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=0
#nameServer地址,分号分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=11911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=80
#存储路径
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默认4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
#启动master:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b.properties >/dev/null 2>&1 &
1.这是一个slave的配置
cd /usr/local/src
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-all-4.2.0-bin-release
#创建文件夹
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
mkdir /usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
vi /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
waitTimeMillsInSendQueue=300
brokerId=1
#nameServer地址,分号分割
namesrvAddr=99.48.66.80:9876;99.48.66.82:9876
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#Broker 对外服务的监听端口
listenPort=11911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
#mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
#mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=80
#存储路径
storePathRootDir=/usr/local/src/rocketmq-all-4.2.0-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/src/rocketmq-all-4.2.0-bin-release/store/abort
#限制的消息大小 默认4M
#maxMessageSize=4194304
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
sendMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
#启动name server:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqnamesrv &
#启动slave:
nohup sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqbroker -c /usr/local/src/rocketmq-all-4.2.0-bin-release/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 &
这样就安装好啦 下面是关闭服务的命令
#关闭服务 namesrv 和broker
sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqshutdown broker
sh /usr/local/src/rocketmq-all-4.2.0-bin-release/bin/mqshutdown namesrv
测试是否成功 最后关闭防火墙 开通 9876,11911,11912,11909端口
*******特别注意以下几段英文说明*****************
Consumer Group and Subscriptions
The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets.
Please make sure each Consumer within the same Group to subscribe the same topics.
MessageListener
Orderly
The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss,
but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.
Concurrently
As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance.
It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.
Consume Status
For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later.
Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message,
but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.
ConsumeFromWhere
When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker.
CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that.
CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
(注意,这个CONSUME_FROM_LAST_OFFSET 是对一个新的Consumer Group 生效,如果这个Consumer Group 原来就已经有过,那么是不生效的)