RocketMQ介绍
RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,具有以下特点:
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力
选用理由:
强调集群无单点,可扩展,任意一点高可用,水平可扩展
海量消息堆积能力,消息堆积后,写入低延迟
支持上万个队列
消息失败重试机制
消息可查询
开源社区活跃
成熟度(经过双十一考验)
RocketMQ的各部分角色介绍
角色名称功能
Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
Broker:消息中转角色,负责存储消息,转发消息,一般也称为Server
NameServer:管理中心,一般存储Broker的信息
RocketMQ这四个角色就相当于我们现实生活中的邮政系统,其中Producer、Consumer、 Broker、NameServer分别代表发信者、收信者、负责暂存和传输的邮局、以及协调各个地方邮局的管理机构。
启动RocketMQ之前先要启动NameServer,再启动Broker,这时候消息队列已经在开始工作了。如果想要发送消息,就用Producer;接受消息就用Consumer。如果程序中既要接收也要发送,可以启动多个Producer和Consumer。如果想要增加可靠性或者增大吞吐量,防止单点故障也可以在多台机器上部署多个NameServer和Broker,并且每个Broker也可以部署一个或者过个Slave。
大致了解了基本角色功能后,再介绍两个重要的名词概念Topic(主题)和Message Queue(消息队列)。当一个企业搭建好消息平台后会有多条业务线接入进来,同一个业务也会有不同类型的消息需要投递,如何保证这消息准确地进行,就需要给不同类型的消息加上Topic名称来进行区分。所以在发送消息和接受消息时,需要先创建Topic。有了Topic后,仍然还有性能问题需要考虑。当一个Topic下的消息投递量或者发送量过大怎么办,这就需要在一个Topic下设置一个或者多个Message Queue来提高并行处理速度。有了Message Queue后,消息就可以并行地向各个Message Queue进行分发,从而消费者也可以从多个队列中读取消息,满足性能要求。
RocketMQ单点安装
参照官网:Downloading the Apache RocketMQ Releases - Apache RocketMQ
RocketMQ多级集群部署以及安装
本次先讲如何利用两台物理机,搭建出双主双从无单点故障的高可用RoketMQ集群。假设这两台物理机的ip分别为192.168.218.51和192.168.218.52,端口号默认为9876。
1.启动多个NameServer和Broker
首先按照单点部署,在两台服务器上分别安装RocketMQ,服务地址分别为192.168.218.51:9876和192.168.218.52:9876,然后启动NameServer(nohup sh bin/mqnamesrv &)
启动Broker,每台机器都需启动一个Master角色和一个Slave角色,作为主备。修改的配置文件在安装目录下的conf/2m-2s-sync下。
192.168.218.51机器上的Master Broker配置文件(conf/2m-2s-sync/broker-a.properties)
brokerClusterName=ifind-rocketmq-cluster 所属集群名字,集群比较多可以分成多个Cluster,每个供一个业务使用
brokerName=broker-a broker名字,注意此处不同的配置文件填写的不一样,2选1
brokerId=1 0表示 Master,>0表示 Slave
namesrvAddr=192.168.216.57:9876;192.168.216.61:9876;192.168.216.58:9876;192.168.216.62:9876 nameServer地址,分号分割
defaultTopicQueueNums=4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
autoCreateTopicEnable=true 是否允许 Broker自动创建Topic,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true 是否允许 Broker自动创建订阅组,建议线下开启,线上关闭
listenPort=10911 Broker对外服务的监听端口
deleteWhen=04 删除文件时间点,默认凌晨 4点
fileReservedTime=120 文件保留时间,默认 48小时
mapedFileSizeCommitLog=1073741824 commitLog每个文件的大小默认1G
brokerRole=ASYNC_MASTER Broker的角色
flushDiskType=ASYNC_FLUSH 刷盘方式
修改192.168.218.51机器上的Master Broker配置文件(conf/2m-2s-sync/broker-a-s.properties)
修改192.168.218.52机器上的Master Broker配置文件(conf/2m-2s-sync/broker-b.properties)
修改192.168.218.52机器上的Master Broker配置文件(conf/2m-2s-sync/broker-b-s.properties)
几个配置参数的含义
参数名含义
brokerId有三种:SYNCMASTER ASYNCMASTER SLAVE,SYNC表示当Slave和Master消息同步完成 后,再返回发送成功的状态
flushDiskType表示刷盘策略,分为SYNCFLUSH和ASYNCFLUSH两种,代表同步刷盘和异步刷盘。同 步状态下,消息真正写入磁盘才返回成功状态;异步刷盘情况下,消息写入缓存后才返回成功状态
2.发送和接受消息的demo
procucer
public class SyncProducer {
public static void main(String[] args) throws Exception {
//实例化一个生产者
DefaultMQProducer producer = new DefaultMQProducer("please_input_group_name"); //生产组名称
producer.setNamesrvAddr("192.168.218.51"); //确定服务地址,集群时通过读取配置文件变量赋值
producer.start(); //生产者开始工作
//发送消息
for () {
//三个参数,第一个topic,第二个tag标识,第三个是消息内容
Message msg = new Message("test-topic","tag-a","msg");
SendResult sendResult = producer.send(msg); //生产者发送消息
}
producer.shutdown();
}
}
consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_input_group_name"); //消费组名称
consumer.subscribe("topic","*"); //消费者订阅
consumer.registerMessageListener(new MessageListenerConcurrentliy) {
public ConsumeConcurrentStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//处理的逻辑代码
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
};
consumer.start();
消息队列的协调者
NameServer的功能
NameServer是消息队列中的状态服务器,集群中的各个组件通过它了解全局的信息。同时,各个机器都要定期向NameServer上报自己的状态,如果超时不上报的话,其它组件会把这个机器从列表中移除。NameServer可以部署多个,本身是无状态的,也就是Broker、Topic等状态信息不会持久存储,是由各个角色上报存储到内存的。
集群状态的存储结构
private final HashMap> topicQueue topicQueueTable //这个map的key是Topic的名称,存储了所有topic的信息。Value存储着Broker的名称、读写Queue的数量以及同步标识等
private final HashMap //这个结构key是BrokerName,value存储着地址信息以及所属Cluster的名称
private final HashMap //这个结构的key是Broker的ip地址,value为Broker机器的实时状态,包括上次更新状态的时间戳
private final HashMap> //key为Cluster的名称,set中存储的是Broker的名称,就是集群的BrokerName的集合
private final HashMap> filterServerTable //key是Broker的地址,value是和这个Broker关联的多个过滤器的地址
以上五个变量的定义,可以清楚的看出各个组件的状态是如何进行存储的,而NameServer的作用便是维护这五个变量中存储的信息。