1. 概述
Thrift是跨语言的RPC框架,现在是一个Apache的顶级项目。Thrift通过一个中间语言--IDL接口定义语言,来定义RPC的接口和数据类型。使用Thrift的代码生成工具(thrift-0.10.0.exe编译器)读取IDL文件,生成不同语言的服务端与客户端代码,并由生成的代码负责RPC协议层和传输层的实现。目前支持语言C++,Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk
2. 下载使用
官网地址:thrift.apache.org
- 如果是Win OS , 在官网下载exe文件
- 然后编辑.thrift 文件,比如:
/**
* 实现功能:创建一个查询结果struct和一个服务接口service
* 基于:thrift-0.10.0
**/
namespace java com.thrift
struct QryResult {
/**
*返回码, 1成功,0失败
*/
1:i32 code;
/**
*响应信息
*/
2:string msg;
}
service TestQry{
/**
* 测试查询接口,当qryCode值为1时返回"成功"的响应信息,qryCode值为其他值时返回"失败"的响应信息
* @param qryCode测试参数
*/
QryResult qryTest(1:i32 qryCode)
}
- 进入命令行,执行
thrift-0.10.0.exe --gen java TestQry.thrift
,之后会生成基本代码. - 在工程中引入thrift jar 包,把生成的文件引入工程中
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.10.0</version>
</dependency>
3. 基本概念
3.1 数据类型
基本类型:
bool:布尔值,true 或 false,对应 Java 的 boolean
byte:8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:utf-8编码的字符串,对应 Java 的 String
结构体类型:
struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean
容器类型:
list:对应 Java 的 ArrayList
set:对应 Java 的 HashSet
map:对应 Java 的 HashMap
异常类型:
exception:对应 Java 的 Exception
服务类型:
service:对应服务的类
3.2 传输协议
Thrift 支持多种传输协议,用户可以根据实际需求选择合适的类型。Thrift 传输协议上总体可划分为文本 (text) 和二进制 (binary) 传输协议两大类,一般在生产环境中使用二进制类型的传输协议为多数(相对于文本和 JSON 具有更高的传输效率)。常用的协议包含:
TBinaryProtocol:是Thrift的默认协议,使用二进制编码格式进行数据传输,基本上直接发送原始数据
TCompactProtocol:压缩的、密集的数据传输协议,基于Variable-length quantity的zigzag 编码格式
TJSONProtocol:以JSON (JavaScript Object Notation)数据编码协议进行数据传输
TDebugProtocol:常常用以编码人员测试,以文本的形式展现方便阅读
3.3 服务
Thrift 包含三个主要的组件:protocol,transport 和 server。
其中,protocol 定义了消息是怎样序列化的;transport 定义了消息是怎样在客户端和服务器端之间通信的;server 用于从 transport 接收序列化的消息,根据 protocol 反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回 transport。
Thrift 模块化的结构使得它能提供各种 server 实现。下面列出了 Java 中可用的 server 实现:
TSimpleServer
TNonblockingServer
THsHaServer
TThreadedSelectorServer
TThreadPoolServer
3.4 编码步骤
3.4.1 服务端基本步骤
- 实现服务处理接口
- 创建TProcessor
- 创建TServerTransport
- 创建TProtocol
- 创建TServer
- 启动Server
3.4.2 客户端基本步骤
- 创建Transport
- 创建TProtocol
- 基于TTransport和TProtocol创建Client
- 调用Client的相应方法
- 服务方式的选择需要根据具体的业务需求.
4. 实例演示
4.1 一些说明
- 服务端采用的是多接口的实现,因此支持多个.thrift的文件的实现.对应的在客户端,也要使用多接口的方式实现
- 在实际运行中,会出现
org.apache.thrift.transport.TTransportException: Frame size (40792739) larger than max length (16384000)!
异常,所以在代码中会修改一次性传输的大小(1638400000),这个需在客户端和服务端同时设定. - 服务模型的选择
采用Thrift的TThreadedSelectorServer 服务模式,提高并发请求的响应.TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况;Thrift的TThreadedSelectorServer,用业务之外的小demo进行测试,并发提高能很快返回结果。
4.2 接口的实现
public class QueryImp implements TestQry.Iface{
public QryResult qryTest(int qryCode) throws TException {
QryResult result = new QryResult();
if(qryCode == 1){
result.code = 1;
result.msg = "success";
}else {
result.code = 0;
result.msg = "fail";
}
for(int i=0;i<10000;i++){
System.out.println("2");
}
return result;
}
}
4.3 服务端的实现
private final static int DEFAULT_PORT = 30002;
private static TServer server = null;
public static void main(String[] args) {
TNonblockingServerSocket socket = null;
try {
socket = new TNonblockingServerSocket(DEFAULT_PORT);
} catch (TTransportException e) {
e.printStackTrace();
}
//多接口的实现
TProcessor tProcessor1 = new TestQry.Processor<TestQry.Iface>(new QueryImp());
TProcessor tProcessor2 = new TestQry1.Processor<TestQry1.Iface>(new QueryImp1());
TThreadedSelectorServer.Args arg = new TThreadedSelectorServer.Args(socket);
TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
multiplexedProcessor.registerProcessor("processor1",tProcessor1);
multiplexedProcessor.registerProcessor("processor2",tProcessor2);
arg.processor(multiplexedProcessor);
arg.protocolFactory(new TCompactProtocol.Factory());
//如果传输数据量过大,需要修改这个地方的参数,默认16M
arg.transportFactory(new TFramedTransport.Factory(1638400000));
arg.processorFactory(new TProcessorFactory(multiplexedProcessor));
//监听线程数
arg.selectorThreads(10);
//工作线程数
ExecutorService pool = Executors.newFixedThreadPool(100);
arg.executorService(pool);
arg.getExecutorService();
server = new TThreadedSelectorServer(arg);
System.out.println("Starting server on port " + DEFAULT_PORT + "......");
server.serve();
}
4.4 客户端实现
private final static int DEFAULT_QRY_CODE = 1;
public void startClient() {
TTransport tTransport = null;
try {
tTransport = getTTransport();
} catch (Exception e) {
e.printStackTrace();
}
TProtocol protocol = new TCompactProtocol(tTransport);
//对应的客户端也要用多接口的方式实现
TMultiplexedProtocol q1 = new TMultiplexedProtocol(protocol,"processor1");
TMultiplexedProtocol q2 = new TMultiplexedProtocol(protocol,"processor2");
TestQry.Client client1 = new TestQry.Client(q1);
TestQry1.Client client2 = new TestQry1.Client(q2);
try {
QryResult result = client1.qryTest(DEFAULT_QRY_CODE);
System.out.println("code="+result.code+" msg="+result.msg);
close(tTransport);
} catch (TException e) {
e.printStackTrace();
}
}
private static TTransport getTTransport() throws Exception{
TTransport tTransport = getTTransport("127.0.0.1",30002,300000);
if(tTransport != null && !tTransport.isOpen()){
tTransport.open();
}
return tTransport;
}
private static TTransport getTTransport(String host, int port, int timeout) {
final TSocket tSocket = new TSocket(host,port,timeout);
final TTransport tTransport = new TFramedTransport(tSocket,1638400000);
return tTransport;
}
private void close(TTransport transport){
if(transport !=null && transport.isOpen()){
transport.close();
}
}