消息中间件的设计思路一般基于主题的订阅发布机制消息生产者( Producer )发送某一主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者
(Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者( PUSH 模式)或者消息消费者主动向消息服务器拉取消息( PULL 模式),从而实现消息生产者与消息消费者解调。
为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发往哪台消息服务器呢?如果某一台消息服务器若机了,那么生产者如何在不重启服务的情况下感知呢?
RocketMQ为了避免上述问题的产生设计了一个NameServer,用于类似注册中心一样去感知当前有哪些消息服务器(Broker)以及他们的状态。
看一下他的物理部署图:
下面我们来讲一下大致的流程:
1、Broker 消息服务器在启动时向所有Name Server 注册
2、消息生产者(Producer)在发送消息之前先从Name Server 获取Broker 服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。
3 、NameServer 与每台Broker 服务器保持长连接,并间隔30s 检测Broker 是否存活,如果检测到Broker 宕机, 则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer 实现的复杂性,在消息发送端会提供容错机制来保证消息发送的高可用性。
4、同时Broker会定时的向所有NameServer中发送自身的状态等等一系列信息(后面看源码实现会讲),对NameServer来说就是第三步定时的去检查broker上一次连接的时间,来判断是否断开连接了。
NameServer与NameServer之间的关系
NameServer 本身的高可用可通过部署多台NameServer 服务器来实现,但彼此之间
互不通信,也就是NameServer 服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响。
下面从几个方面来讲下NameServer的实现。
一、启动流程
启动流程可以在这个类中看到源码的实现:org.apache.rocketmq.namesrv.NamesrvStartup
public static NamesrvController main0(String[] args) {
try {
// 1.构造基础信息,解析配置文件启动参数,填充NameServerConfig 、NettyServerConfig 属性
NamesrvController controller = createNamesrvController(args);
// 2. 开始启动
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
先看createNamesrvController里面包含了启动参数的解析和设置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
需要填充NameServerConfig 、NettyServerConfig 属性值。
NameServerConfig的一些属性包括:
rocketmqhome: rocketmq主目录,可以通过-D rocketmq. home.dir= path 或通过设置环境变量R O C K E T M Q _H O M E 来配置R o c k e t M Q 的主目录。
kvConfigPath: NameServer存储KV配置属性的持久化路径。
configStorePath:nameServer默认配置文件路径,不生效。nameServer启动时如果要
通过配置文件配置NameServer启动属性的话,请使用-c选项。
orderMessageEnable: 是否支持顺序消息,默认是不支持。
而NettyServerConfig的属性包括以下(NettyServerConfig该类可以看到):
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
口listenPort: N ameServer 监昕端口,该值默认会被初始化为9876 0
口serverWorkerThreads: Net ty 业务线程池线程个数。
口serverCallbackExecutorThreads : Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型( R e que stCode )未注册线程池, 则由public 线程池执行。
口serverSelectorThreads: IO 线程池线程个数,主要是NameServer 、Brok e r 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包, 然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方。
0 serverOnewaySemaphore Value: send oneway 消息请求井发度( Broker 端参数) 。
0 serverAsyncSemaphore Value : 异步消息发送最大并发度( Broker 端参数) 。
0 serverChannelMaxld l eTimeSeconds :网络连接最大空闲时间,默认120s 。如果连接
空闲时间超过该参数设置的值,连接将被关闭。
0 serverSocketSndBufSize :网络socket 发送缓存区大小, 默认64k 。
0 serverSocketRcvBufSize :网络socket 接收缓存区大小,默认6 4k 。
口serverPooledByteBufAllocatorEnable: ByteBuffer 是否开启缓存, 建议开启。
口useEpollNativeSelector : 是否启用E poll IO 模型, Linux 环境建议开启。
紧接着,我们看最上面,接着会执行Start(),在里面实现时会去初识化该实例NameServerConfig
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 1、执行实例化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
2、添加jvm钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
看
public boolean initialize() {
//加载KV 配置, 创建NettyServer 网络处理对象
this.kvConfigManager.load();
//然后开启两个定时任务, 在RocketMQ中此类定时任务统称为心跳检测。
//O 定时任务1 : NameServer 每隔1 0 s 扫描一次Broker , 移除处于不激活状态的Broker
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//O 定时任务2 : n a m e S e r v e r 每隔1 0 分钟打印一次K V 配置。
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
.....
第三步,注册J V M 钩子函数并启动服务器, 以便监听B r o k e r 、消息生产者的网络请求。
为什么需要这个呢???
主要是向读者展示一种常用的编程技巧,如果代码中使用了线程池,一种优雅停
机的方式就是注册一个JV M 钩子函数,在JV M 进程关闭之前,先将线程池关闭,及时释放资源。
启动流程总结:
1、首先来解析配置文件, 需要填充NameServerConfig 、NettyServerConfig 属性值。
2、根据启动属性创建NamesrvController实例并初始化该实例,NameServerController实例为NameServer 核心控制器。
3、注册J V M 钩子函数并启动服务器, 以便监听Broker 、消息生产者的网络请求。
NameServer具备的作用
NameServer 主要作用是为消息生产者和消息消费者提供关于主题T opic 的路由信息,那么N am eServer 需要存储路由的基础信息,还要能够管理B roker 节点,包括路由注册、路由删除等功能。
二、NameServer路由的实现
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
基础属性
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock()
//topicQueueTable: Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//brokerAddrTable : Broker 基础信息, 包含brokerName 、所属集群名称、主备Broker地址。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// clusterAddrTable: Broker 集群信息,存储集群中所有Broker 名称。
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// brokerLiveTable: Broker 状态信息。NameServer 每次收到心跳包时会替换该信息。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */
// filterServerTable : Broker 上的FilterServer 列表,用于类模式消息过滤
List<String>/* Filter Server */> filterServerTable;
RocketMQ 基于订阅发布机制,一个Topic 拥有多个消息队列,一个Broker 为每一主题默认创建4 个读队列4 个写队列。多个B roker 组成一个集群,BrokerNam e 由相同的多台Broker组成Master-Slave 架构,brokerld 为0 代表Master, 大于0表示Slave。B rokerLivelnfo 中的lastUpdateTimestamp 存储上次收到Broker 心跳包的时间。
以上的四个重要的类存储的数据结构如下:
对应运行时数据结构如图所示:
三、NameServer路由注册实现
RocketMQ 路由注册是通过Broker 与NameServer 的心跳功能实现的。Broker 启动时向集群中所有的NameServer 发送心跳语旬,每隔3 0 s 向集群中所有NameServer 发送心跳包NameServer 收到Broker 心跳包时会更新brokerLiveTable 缓存中Bro kerLivelnfo 的lastUpdateTimestamp , 然后NameServer 每隔10s 扫描brokerLiveTab le , 如果连续120s 没有收到心跳包,NameServer 将移除该Broker的路由信息同时关闭Socket连接。
- Broker 发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
该方法主要是遍历NameServer 列表,Broker 消息服务器依次向NameServer 发送心跳包。
处理心跳包的逻辑:
首先封装请求包头(H ea d er)。
brokerAddr: broker 地址。
broker Id: brokerld,O:Master; 大于0 : Slave 。
brokerName : broker 名称。
clusterName: 集群名称。
haServerAddr: master地址,初次请求时该值为空,slave向Nameserver注册后返回。
requestBody:
• filterServerList。消息过滤服务器列表。
• topicConfigWrapper。主题配置
(扩展:RocketMQ网络传输基于Netty,,在这里介绍一下网络跟踪方法:每一个请求,RocketMQ都会定义一个RequestCode, 然后在服务端会对应相应的网络处理器(processor包中),只需整库搜索RequestCode即可找到相应的处理逻辑。)
2、NameServer处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网络处理器解析请求类型,如果请求类型为RequestCode.REGISTER—BROKER, 则请求最终转发到RoutelnfoManager#registerBroker。
// 1、路由注册需要加写锁,防止并发修改RoutelnfoManager中的路由表。
this.lock.writeLock().lockInterruptibly();
// 2首先判断Broker所属集群是否存在,如果不存在,则创建,然后将broker名加入到集群Broker集合中。
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 3.维护BrokerData 信息,
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//将从设备切换为主设备:首先在namesrv中删除<1,IP:PORT>,然后添加<0,IP:PORT>
//同一IP:PORT在brokerAddrTable中必须只有一条记录
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
// 维护BrokerData 信息
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//如果Broker 为Master ,并且BrokerTopic 配置信息发生变化或者是初次注册,则需要创建或更新Topic 路由元数据,填充topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 看下面代码
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 设置BrokerLiveInfo的值
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 注册过滤服务器列表
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
createAndUpdateQueueData(根据TopicConfig 创建QueueData 数据结构,然后更新topicQueueTable 。)
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
讲下大致执行的流程:
Step 1 : 路由注册需要加写锁,防止并发修改RoutelnfoManager中的路由表。首先判断Broker所属集群是否存在,如果不存在,则创建,然后将broker名加入到集群Broker集合中。
Step2 :维护BrokerData 信息,首先从brokerAddrTable 根据BrokerName 尝试获取
Broker 信息,如果不存在, 则新建BrokerData 并放入到brokerAddrTable , registerFirst 设置为true ;如果存在, 直接替换原先的, registerFirst 设置为false,表示非第一次注册。
Step3 :如果Broker 为Master ,并且BrokerTopic 配置信息发生变化或者是初次注册,则需要创建或更新Topic 路由元数据,填充topicQueueTable , 其实就是为默认主题自动注册路由信息,其中包含MixAII.DEFAULT TOPIC 的路由信息。当消息生产者发送主题时,如果该主题未创建并且BrokerConfig 的autoCreateTopicEnable 为true 时, 将返回MixAII.DEFAULT TOPIC 的路由信息。
Step4 : 更新BrokerLivelnfo ,存活Broker 信息表, BrokeLivelnfo 是执行路由删除的重要依据。
Step5 : 注册Broker 的过滤器Server 地址列表,一个Broker 上会关联多FilterServer
消息过滤服务器,如果此Broker 为从节点,则需要查找该Broker 的Master 的节点信息,并更新对应的masterAddr 属性。
设计亮点: NameServe与Broker保持长连接,Broker状态存储在brokerLiveTable中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable)。更新上述路由表(HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者(Producer)并发读,保证消息发送时的高并发。但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。这也是读写锁经典使用场景
四、NameServer路由删除的实现
NameServer会每隔10 s 扫描brokerLiveTable状态表,如果BrokerLive的
lastUpdateTimestamp的时间戳距当前时间超过120s, 则认为Broker失效,移除该Broker,关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocktMQ 有两个触发点来触发路由删除:
1 ) NameServer 定时扫描brokerLiveTable 检测上次心跳包与当前系统时间的时间差,如果时间戳大于120s ,则需要移除该Broker 信息。
2 ) Broker 在正常被关闭的情况下,会执行unregisterBroker 指令。
由于不管是何种方式触发的路由删除,路由删除的方法都是一样的,就是从topicQueueTable 、brokerAddrTable 、brokerLiveTable 、filterServerTable 删除与该Broker 相关的信息,但RocketMQ 这两种方式维护路由信息时会抽取公共代码。
scanNotActi veBroker 在NameServer 中每10s 执行一次。逻辑也很简单,遍历brokerLivelnfo 路由表( HashMap ),检测BrokerLiveInfo lastUpdateTimestamp
上次收到心跳包的时间如果超过当前时间120s, NameServer 则认为该Broker 已不可用,故需要将它移除,关闭Channel ,然后删除与该Broker 相关的路由信息,路由表维护过程,需要申请写锁。
五、路由发现
RocketMQ 路由发现是非实时的,当Topic 路由出现变化后, NameServer 不主动推送给客户端, 而是由客户端定时拉取主题最新的路由。根据主题名称拉取路由信息的命令编码为: GET ROUTEINTOBY_TOPIC 。
orderTopicConf :顺序消息配置内容,来自于kvConfig 。
List< QueueData> queueData: topic 队列元数据。
List<BrokerData> brokerDatas: topic 分布的broker 元数据。
HashMap< String/* brokerAdress/ List<String> / filterServer* /> : broker 上过滤服务器地址列表。
NameServer 路由发现实现类: DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
} else {
try {
topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());
} catch (Exception e) {
log.info("get route info by topic from product environment failed. envName={},", productEnvName);
}
}
if (topicRouteData != null) {
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
顺序:
Step 1:调用RouterlnfoManager 的方法,从路由表topicQueueTable 、brokerAddrTable 、filterServerTable 中分别填充TopicRouteData 中List<QueueData>、List<BrokerData>和filterServer 地址表
Step2 : 如果找到主题对应的路由信息并且该主题为顺序消息,则从NameServer KVconfig 中获取关于顺序消息相关的配置填充路由信息。如果找不到路由信息CODE 则使用TOPIC NOT_EXISTS ,表示没有找到对应的路由。
本篇文章总结如下图所示:
摘自《RocketMQ技术内幕实现》阿里巴巴推荐