一、启动方式
1.业务逻辑处理器定义
/**
* 自定义的业务逻辑用户处理器
* SyncUserProcessor属于同步,soaf bolt还提供了异步的方式AsyncUserProcessor
* 二者的区别在于,前者需要在当前处理线程以return返回值的形式返回处理结果;而后者,有一个 AsyncContext 客户端的调用,调用 sendResponse 方法,内部通过穿件RCMD对象,可以在当前线程,也可以在异步线程,返回处理结果
* 如果一个处理器需要对多种数据模型感兴趣,两种方式,一种是使用基类的方式处理,第二种方式通过MultiInterestUserProcessor
*/
public class TestServerUserProcessorextends SyncUserProcessor {
@Override
public Object handleRequest(BizContext bizCtx, TestRequest request)throws Exception {
TestResponse response =new TestResponse();
if (request !=null) {
System.out.println(request);
response.setResp("from server -> " + request.getReq());
}
return response;
}
/**
* 业务处理器感兴趣的消息体,
*/
@Override
public String interest() {
return TestRequest.class.getName();
}
}
2.服务端启动
//创建RpcServer实例,指定监听ip和port,指定是否使用链接管理器
RpcServer server =new RpcServer("127.0.0.1",8888,false);
//注册业务逻辑处理器 UserProcessor
server.registerUserProcessor(new TestServerUserProcessor());
//启动服务端
server.start();
3.server.start()原理分析
server.start()主要有两步
=doInit();
==this.addressParser =new RpcAddressParser();//初始化地址解析器
==this.connectionEventHandler =new ConnectionEventHandler(switches());//初始化链接事件处理器,主要负责入站出站数据的处理
==this.connectionEventHandler.setConnectionManager(this.connectionManager);//添加初始化的连接管理器到链接事件处理器
==this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);//添加时间监听器
==initRpcRemoting();//初始化远程调用对象RpcServerRemoting
==this.bootstrap =new ServerBootstrap();
this.bootstrap.group(bossGroup,workerGroup)
.channel(NettyEventLoopUtil.getServerSocketChannelClass())
. option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())//tcp等待队列大小,默认1024
.option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())//端口快速释放,默认true
.childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
.childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
==initWriteBufferWaterMark();//设置出站的低水位和高水位
==if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); }//根据配置决定是否使用对象池化
==NettyEventLoopUtil.enableTriggeredMode(bootstrap)//设置selector的epoll模式是边缘触发还是水平触发,默认水平触发
==this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder());if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS));pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); } /** * create connection operation<br> * <ul> * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li> * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li> * </ul> */ private void createConnection(SocketChannel channel) { Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel)); if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { connectionManager.add(new Connection(channel, url), url.getUniqueKey()); } else { new Connection(channel, url); } channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } });//添加加密、解密、心跳(原理后面讲)、远程请求(原理后面讲)处理器,并创建一个和监听端口绑定的链接(业务层面的绑定)
=doStart();
==this.channelFuture =this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();//绑定指定的端口