一、启动方式
// 创建 RpcClient 实例:主要是初始化心跳处理器HeartbeatHandler和相应消息分发器RpcHandler
client =new RpcClient();
// 初始化 netty 客户端:主要是初始化SOFA的相关组件
client.init();
二、源码分析
client.init();
==this.connectionManager.setAddressParser(this.addressParser);//设置地址编解码器
==this.connectionManager.init(); ===this.connectionEventHandler.setConnectionManager(this);
===this.connectionEventHandler.setConnectionEventListener(connectionEventListener)//添加链接监听器
===this.connectionFactory.init(connectionEventHandler);//初始化链接工厂
====bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSock etChannelClass()) .option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay()) .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr()) .option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
// init netty write buffer water
markinitWriteBufferWaterMark();
// init byte buf allocator
if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);} else {
this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);}
bootstrap.handler(new ChannelInitializer<SocketChannel>()
{ @Override protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder());
pipeline.addLast("encoder", codec.newEncoder());
boolean idleSwitch = ConfigManager.tcp_idle_switch(); if (idleSwitch) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS));
pipeline.addLast("heartbeatHandler", heartbeatHandler); }//添加心跳链接处理器 pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", handler); }});
==this.rpcRemoting =new RpcClientRemoting(new RpcCommandFactory(),this.addressParser,
this.connectionManager);//初始化客户端远程调用
==this.taskScanner.add(this.connectionManager);//使用scanner定时扫描完成的链接,并移除连接池
==this.taskScanner.start();
==if (monitorStrategy == null) { ScheduledDisconnectStrategy strategy = new ScheduledDisconnectStrategy(); connectionMonitor = new DefaultConnectionMonitor(strategy, this.connectionManager);} else { connectionMonitor = new DefaultConnectionMonitor(monitorStrategy, this.connectionManager);}connectionMonitor.start();//Monitor定时监听已经关闭的链接,
==if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { reconnectManager = new ReconnectManager(connectionManager); connectionEventHandler.setReconnectManager(reconnectManager); logger.warn("Switch on reconnect manager");}//设置重连管理器,内部通过通过HealConnectionRunner子线程循环需要重连的任务