前言
Thrift
提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。
阻塞服务模型:
TSimpleServer
、TThreadPoolServer
。非阻塞服务模型:
TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。
TServer
类的层次关系:
正文
TServer
TServer
定义了静态内部类Args
,Args
继承自抽象类AbstractServerArgs
。AbstractServerArgs
采用了建造者模式,向TServer
提供各种工厂:
工厂属性 | 工厂类型 | 作用 |
---|---|---|
ProcessorFactory | TProcessorFactory | 处理层工厂类,用于具体的TProcessor对象的创建 |
InputTransportFactory | TTransportFactory | 传输层输入工厂类,用于具体的TTransport对象的创建 |
OutputTransportFactory | TTransportFactory | 传输层输出工厂类,用于具体的TTransport对象的创建 |
InputProtocolFactory | TProtocolFactory | 协议层输入工厂类,用于具体的TProtocol对象的创建 |
OutputProtocolFactory | TProtocolFactory | 协议层输出工厂类,用于具体的TProtocol对象的创建 |
下面是TServer
的部分核心代码:
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
TServer
的三个方法:serve()
、stop()
和isServing()
。serve()
用于启动服务,stop()
用于关闭服务,isServing()
用于检测服务的起停状态。
TServer
的不同实现类的启动方式不一样,因此serve()
定义为抽象方法。不是所有的服务都需要优雅的退出, 因此stop()
方法没有被定义为抽象。
TSimpleServer
TSimpleServer
的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket
连接,效率比较低。它主要用于演示Thrift
的工作过程,在实际开发过程中很少用到它。
(一) 工作流程
(二) 使用入门
服务端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 简单的单线程服务模型 一般用于测试
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
客户端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
(三) 源码分析
查看上述流程的源代码,即TSimpleServer.java
中的serve()
方法如下:
serve()
方法的操作:
- 设置
TServerSocket
的listen()
方法启动连接监听。 - 以阻塞的方式接受客户端地连接请求,每进入一个连接即为其创建一个通道
TTransport
对象。 - 为客户端创建处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。
- 通过
TServerEventHandler
对象处理具体的业务请求。
ThreadPoolServer
TThreadPoolServer
模式采用阻塞socket
方式工作,主线程负责阻塞式监听是否有新socket
到来,具体的业务处理交由一个线程池来处理。
(一) 工作流程
(二) 使用入门
服务端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
客户端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
(三) 源码分析
ThreadPoolServer
解决了TSimpleServer
不支持并发和多连接的问题,引入了线程池。实现的模型是One Thread Per Connection
。查看上述流程的源代码,先查看线程池的代码片段:
TThreadPoolServer.java
中的serve()
方法如下:
serve()
方法的操作:
- 设置
TServerSocket
的listen()
方法启动连接监听。 - 以阻塞的方式接受客户端的连接请求,每进入一个连接,将通道对象封装成一个
WorkerProcess
对象(WorkerProcess
实现了Runnabel
接口),并提交到线程池。 -
WorkerProcess
的run()
方法负责业务处理,为客户端创建了处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。 - 通过
TServerEventHandler
对象处理具体的业务请求。
WorkerProcess
的run()
方法:
(四) 优缺点
TThreadPoolServer模式的优点
拆分了监听线程(Accept Thread
)和处理客户端连接的工作线程(Worker Thread
),数据读取和业务处理都交给线程池处理。因此在并发量较大时新连接也能够被及时接受。
线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。
TThreadPoolServer模式的缺点
线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。
TNonblockingServer
TNonblockingServer
模式也是单线程工作,但是采用NIO
的模式,借助Channel/Selector
机制, 采用IO
事件模型来处理。
所有的socket
都被注册到selector
中,在一个线程中通过seletor
循环监控所有的socket
。
每次selector
循环结束时,处理所有的处于就绪状态的socket
,对于有数据到来的socket
进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket
则产生一个新业务socket
并将其注册到selector
上。
注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。
(一) 工作流程
(二) 使用入门
服务端:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
客户端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
(三) 源码分析
TNonblockingServer
继承于AbstractNonblockingServer
,这里我们更关心基于NIO
的selector
部分的关键代码。
(四) 优缺点
TNonblockingServer模式优点
相比于TSimpleServer
效率提升主要体现在IO
多路复用上,TNonblockingServer
采用非阻塞IO
,对accept/read/write
等IO
事件进行监控和处理,同时监控多个socket
的状态变化。
TNonblockingServer模式缺点
TNonblockingServer
模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。
THsHaServer
鉴于TNonblockingServer
的缺点,THsHaServer
继承于TNonblockingServer
,引入了线程池提高了任务处理的并发能力。THsHaServer
是半同步半异步(Half-Sync/Half-Async
)的处理模式,Half-Aysnc
用于IO
事件处理(Accept/Read/Write
),Half-Sync
用于业务handler
对rpc
的同步处理上。
注意:THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。
(一) 工作流程
(二) 使用入门
服务端:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半异步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
客户端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
(三) 源码分析
THsHaServer
继承于TNonblockingServer
,新增了线程池并发处理工作任务的功能,查看线程池的相关代码:
任务线程池的创建过程:
下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。
(四) 优缺点
THsHaServer的优点
THsHaServer
与TNonblockingServer
模式相比,THsHaServer
在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。
THsHaServer的缺点
主线程仍然需要完成所有socket
的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket
上新连接请求不能被及时接受。
TThreadedSelectorServer
TThreadedSelectorServer
是对THsHaServer
的一种扩充,它将selector
中的读写IO
事件(read/write
)从主线程中分离出来。同时引入worker
工作线程池,它也是种Half-Sync/Half-Async
的服务模型。
TThreadedSelectorServer
模式是目前Thrift
提供的最高级的线程服务模型,它内部有如果几个部分构成:
-
一个
AcceptThread
线程对象,专门用于处理监听socket
上的新连接。 -
若干个
SelectorThread
对象专门用于处理业务socket
的网络I/O
读写操作,所有网络数据的读写均是有这些线程来完成。 - 一个负载均衡器
SelectorThreadLoadBalancer
对象,主要用于AcceptThread
线程接收到一个新socket
连接请求时,决定将这个新连接请求分配给哪个SelectorThread
线程。 - 一个
ExecutorService
类型的工作线程池,在SelectorThread
线程中,监听到有业务socket
中有调用请求过来,则将请求数据读取之后,交给ExecutorService
线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc
请求的handler
回调处理(这部分是同步的)。
(一) 工作流程
(二) 使用入门
服务端:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多线程半同步半异步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多线程半同步半异步的服务模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
客户端:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭传输通道
transport.close();
}
}
}
}.start();
}
(三) 核心代码
以上工作流程的三个组件AcceptThread
、SelectorThread
和ExecutorService
在源码中的定义如下:
TThreadedSelectorServer
模式中有一个专门的线程AcceptThread
用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread
线程中来完成,因此能够快速对网络I/O
进行读写操作,能够很好地应对网络I/O
较多的情况。
TThreadedSelectorServer
默认参数定义如下:
- 负责网络IO读写的selector默认线程数(selectorThreads):2
- 负责业务处理的默认工作线程数(workerThreads):5
- 工作线程池单个线程的任务队列大小(acceptQueueSizePerThread):4
创建、初始化并启动AcceptThread
和SelectorThreads
,同时启动selector
线程的负载均衡器(selectorThreads
)。
AcceptThread源码
AcceptThread
继承于Thread
,可以看出包含三个重要的属性:非阻塞式传输通道(TNonblockingServerTransport
)、NIO
选择器(acceptSelector
)和选择器线程负载均衡器(threadChooser
)。
查看AcceptThread
的run()
方法,可以看出accept
线程一旦启动,就会不停地调用select()
方法:
查看select()
方法,acceptSelector
选择器等待IO
事件的到来,拿到SelectionKey
即检查是不是accept
事件。如果是,通过handleAccept()
方法接收一个新来的连接;否则,如果是IO
读写事件,AcceptThread
不作任何处理,交由SelectorThread
完成。
在handleAccept()
方法中,先通过doAccept()
去拿连接通道,然后Selector
线程负载均衡器选择一个Selector
线程,完成接下来的IO
读写事件。
[图片上传失败...(image-314d7a-1536936397051)]
接下来继续查看doAddAccept()
方法的实现,毫无悬念,它进一步调用了SelectorThread
的addAcceptedConnection()
方法,把非阻塞传输通道对象传递给选择器线程做进一步的IO
读写操作。
SelectorThreadLoadBalancer源码
SelectorThreadLoadBalancer
如何创建?
SelectorThreadLoadBalancer
是一个基于轮询算法的Selector
线程选择器,通过线程迭代器为新进来的连接顺序分配SelectorThread
。
SelectorThread源码
SelectorThread
和AcceptThread
一样,是TThreadedSelectorServer
的一个成员内部类,每个SelectorThread
线程对象内部都有一个阻塞式的队列,用于存放该线程被接收的连接通道。
[站外图片上传中...(image-ce3409-1536936397051)]
阻塞队列的大小可由构造函数指定:
上面看到,在AcceptThread
的doAddAccept()
方法中调用了SelectorThread
的addAcceptedConnection()
方法。
这个方法做了两件事:
- 将被此
SelectorThread
线程接收的连接通道放入阻塞队列中。 - 通过
wakeup()
方法唤醒SelectorThread
中的NIO
选择器selector
。
既然SelectorThread
也是继承于Thread
,查看其run()
方法的实现:
SelectorThread
方法的select()
监听IO
事件,仅仅用于处理数据读取和数据写入。如果连接有数据可读,读取并以frame
的方式缓存;如果需要向连接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,需要向NIO
的selector
清空和注销自身的SelectionKey
。
-
数据写操作完成以后,整个
rpc
调用过程也就结束了,handleWrite()
方法如下:
-
数据读操作完成以后,
Thrift
会利用已读数据执行目标方法,handleRead()
方法如下:
handleRead
方法在执行read()
方法,将数据读取完成后,会调用requestInvoke()
方法调用目标方法完成具体业务处理。requestInvoke()
方法将请求数据封装为一个Runnable
对象,提交到工作任务线程池(ExecutorService
)进行处理。
select()
方法完成后,线程继续运行processAcceptedConnections()
方法处理下一个连接的IO
事件。
这里比较核心的几个操作:
- 尝试从
SelectorThread
的阻塞队列acceptedQueue
中获取一个连接的传输通道。如果获取成功,调用registerAccepted()
方法;否则,进入下一次循环。 -
registerAccepted()
方法将传输通道底层的连接注册到NIO
的选择器selector
上面,获取到一个SelectionKey
。 - 创建一个
FrameBuffer
对象,并绑定到获取的SelectionKey
上面,用于数据传输时的中间读写缓存。
总结
本文对Thrift
的各种线程服务模型进行了介绍,包括2种阻塞式服务模型:TSimpleServer
、TThreadPoolServer
,3种非阻塞式服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。对各种服务模型的具体用法、工作流程、原理和源码实现进行了一定程度的分析。
鉴于篇幅较长,请各位看官请慢慢批阅!
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。