RocketMQ源码(二):broker的启动(一)
RocketMQ源码(三):broker的启动(二)
RocketMQ源码(四):producer的启动
RocketMQ源码(五):producer发送消息
RocketMQ源码(六):broker接收消息
RocketMQ源码(七):consumer的启动
RocketMQ源码(八):consumer消息拉取(一)
RocketMQ源码(九):consumer消息拉取(二)
本系列文章主要是记录自己在学习rocketmq主流程源码时的一些心得体会,并不旨在讲诉如何去使用rocketmq,或者整体的去认识rocketmq,如果文中有不正确的地方还请大家不吝赐教,共勉。
另外推荐一个学习资料,可以比较全面和整体的看待RocketMq
1.rocketmq整体架构
引用官网的一段话对rocketmq架构中的四个部分做一个简单的说明
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
2.源码目录结构
- rocketmq-broker:mq的核心,它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。
- rocketmq-client:mq客户端实现,目前官方仅仅开源了java版本的mq客户端,c++,go客户端有社区开源贡献。
- rocketmq-common:一些模块间通用的功能类,比如一些配置文件、常量。
- rocketmq-example:官方提供的例子,对典型的功能比如order message,push consumer,pull consumer的用法进行了示范。
- rocketmq-filter:消息过滤服务,相当于在broker和consumer中间加入了一个filter代理。
- rocketmq-namesrv:命名服务,更新和路由发现 broker服务。
- rocketmq-remoting:基于netty的底层通信实现,所有服务间的交互都基于此模块。
- rocketmq-srvutil:解析命令行的工具类ServerUtil。
- rocketmq-store:存储层实现,同时包括了索引服务,高可用HA服务实现。
- rocketmq-tools:mq集群管理工具,提供了消息查询等功能。
可以从NamesrvStartup的main方法来启动一个NameServer服务,因此以此方法为入口,来了解NameServer的启动过程,以及它是如何工作的。
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 读取 namesrvConfig 和 nettyServerConfig 配置,创建 NamesrvController 对象
NamesrvController controller = createNamesrvController(args);
// 启动 NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
从这里可以看出来,NameServer的启动其实就是NamesrvController的启动,NamesrvController是NameServer的核心类,启动过程主要分为两步:
- 读取配置,创建NamesrvController
- NamesrvController的启动
关于配置的读取这里就不详细了解了,有兴趣的可以自行研究。这里主要看下第二步,NamesrvController的启动过程
2.NamesrvController的启动
2.1 NamesrvController
这里先了解下NamesrvController的一些重要属性:
// Namesrv的配置类
private final NamesrvConfig namesrvConfig;
// netty的配置类
private final NettyServerConfig nettyServerConfig;
// 任务1、每隔 10s 扫描 broker ,维护当前存活的Broker信息。
// 任务2、每隔 10s 打印KVConfig 信息。
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 猜测可能是Namesrv实现配置中心的功能
private final KVConfigManager kvConfigManager;
// 数据载体,记录broker和topic信息
private final RouteInfoManager routeInfoManager;
// 处理netty的连接事件,主要是定义了几个handler
private RemotingServer remotingServer;
// 处理 Nameserver 与 Broker 通道事件
private BrokerHousekeepingService brokerHousekeepingService;
// 该线程池供 DefaultRequestProcessor 类使用,实现具体的默认的请求命令处理。
private ExecutorService remotingExecutor;
这里重点看看RouteInfoManager、RemotingServer 以及BrokerHousekeepingService
2.1.1 RouteInfoManager
// NameServer 与 Broker 空闲时长,默认2分钟,在2分钟内 Nameserver 没有收到 Broker 的心跳包,则关闭该连接
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// 读写锁,用来保护非线程安全容器 HashMap。
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// topicQueueTable,主题与队列关系,记录一个主题的队列分布在哪些Broker上,每个Broker上存在该主题的队列个数。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// brokerAddrTable,所有 Broker 信息,使用 brokerName 当key,BrokerData 信息描述每一个 broker 信息。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// clusterAddrTable,broker 集群信息,每个集群包含哪些 Broker
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// brokerLiveTable,当前存活的 Broker,该信息不是实时的,NameServer 每10S扫描一次所有的 broker,
// 根据心跳包的时间得知 broker的状态,该机制也是导致当一个 Broker 进程假死后,消息生产者无法立即感知,可能继续向其发送消息,导致失败(非高可用)。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
2.1.2 RemotingServer
看下它的默认实现类NettyRemotingServer
// 连接协议校验
private HandshakeHandler handshakeHandler;
// 信息解码
private NettyEncoder encoder;
// 封装连接建立、连接断开等事件,交给一个线程不断执行这些事件,更新RouteInfoManager中的信息
private NettyConnectManageHandler connectionManageHandler;
// 处理request和response请求
private NettyServerHandler serverHandler;
这几个Handler在后面netty的ServerBootstrap初始化中将会得到使用,并且发挥对应的作用
2.1.3 BrokerHousekeepingService
上文中提到的RemotingServer.connectionManageHandler最终就是把事件传递给BrokerHousekeepingService,然后BrokerHousekeepingService再调用RouteInfoManager中的对应方法处理连接事件。
2.2 start(controller)
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// NamesrvController 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册虚拟机关闭钩子函数,用于关闭 controller
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动 NamesrvController
controller.start();
return controller;
}
2.2.1 controller.initialize
此方法主要是用于controller的初始化
public boolean initialize() {
// 加载配置
this.kvConfigManager.load();
// 初始化 remotingServer,主要是定义线程池和netty相关参数
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化 DefaultRequestProcessor 的处理线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 绑定 remotingExecutor 与 defaultRequestProcessor
this.registerProcessor();
// 每隔 10s 扫描 broker ,维护当前存活的Broker信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 每隔 10s 打印KVConfig 信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 刷新ssl链接配置
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
这里主要做了几个事情:
- kvConfigManager.load():加载kvConfig.json中的配置,到kvConfigManager的一个2层结构hashmap中
- 创建NettyRemotingServer
- 初始化事件处理的线程池 remotingExecutor
- 绑定事件处理器和事件处理的线程池,事件指的是从broker、client发过来的比如查询topic信息、查询broker信息等事件。这里是把NamesrvController.remotingExecutor和DefaultRequestProcessor进行了绑定。
- 创建2个定时任务:1.每10秒过滤不活跃的broker。2.每10秒打印kvConfigManager中的配置
基本都比较简单就不展开讲述了
值得注意的是
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
// 遍历所有存活的broker
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 上次心跳时间大于配置
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 删除超时没有发送心跳的broker信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
因为是定时扫描broker,所以并不是实时更新broker的过期信息。provider会继续像这个broker发送信息,造成消息丢失
2.2.2 controller.start()
这里主要关注NettyRemotingServer的start()方法
public void start() {
// 初始化netty的EventExecutorGroup
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 初始化Netty事件处理器
prepareSharableHandlers();
// 初始化 ServerBootstrap
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 开始循环执行,connectionManageHandler提交上来的连接和断开连接事件
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 1秒扫描一次已经超时的response
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
这里就是开启了端口的监听,以及定义了处理事件的handler
这里重点看下NettyServerHandler
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 根据请求code,获取已经注册的处理器与线程池配对组
// Namesrv为空,所以使用 defaultRequestProcessor
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 如果找不到 则使用默认处理器 在server端使用 adminprocess 作为默认处理器
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 执行钩子方法
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
// 发送response
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
// 同步和异步执行命令
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
// 封装任务,扔到线程池执行
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
processRequestCommand就是通过绑定的事件处理器和线程池来执行request。因为
NameSrv的processorTable为空,因此直接看DefaultRequestProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
这里对应了各种事件的处理方法,感兴趣的可以自行研究下。