thrift
thrift是一个快速实现rpc的框架,
1、通过安装thrift可执行软件
2、编写idl内容
3、输出指定语言代码
4、编写方法实现
5、编写框架代码即可快速实现rpc的调用
1、 安装thrift软件
wget http://qiniu.asdu.cn/thrift-0.19.0.exe
# 创建环境变量将下载的包放到目标位置;D:\Program Files\thrift
# 暴露环境变量
# 编辑PATH ,增加路径D:\Program Files\thrift
thrift -version
#输出 Thrift version 0.19.0则安装成功
[图片上传失败...(image-9e159a-1704700347281)]
2、idl语法
thrift提供多种类型,通过这些类型定义建立对语言的映射关系,以便快速生成rpc 框架类
2.1、基本类型
类型 | 说明 | 备注 |
---|---|---|
bool | 布尔值 | |
byte | 8 bit的数字 | |
i16 | 16位数字 | 对应short |
i32 | 32为数字 | 对应int |
i64 | 64位数字 | 对应long |
double | 64为长度数字 | |
string | 文本 |
# 这里是java的thrift文件写法,为了使用方便定义了类型。如果想兼容性强就不用定义类型快捷方式,直接使用对应的标识符。下面给的不是一个好的实践
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String
2.2、structs
对象类,该类是用来存放数据对象的
struct Person {
1:optional String username,
2:optional i32 age,
3:optional bool married
}
2.3、union
字段类,该类使用来存放多种数据的。创建一个中间类,这个类可以设置多个类型,通过set防范设置数据。
与struct基本没啥区别,可以直接用struct代替
#标识TTypeQualifierValue的值可以设置多个类型,有对应的构造方法
union TTypeQualifierValue {
1: optional i32 intname
2: optional string stringname
}
2.4、containers
复杂的元素可以用这些
list 对应java ArrayList
set 对应java set
map 对应java map
- list:元素的有序列表,对应转换到STL的vector,Java的ArrayList
- set:无序的元素集合,对应转换到STL的set,Java的HashSet
- map:对应转到到STL的map,Java的HashMap
为了获得最大的兼容性,map的键类型最好是基本数据类型,而不是struct或者container
typedef list<string> listType
typedef set<string> setType
typedef map<string, string> mapType
#这是定义的,使用跟基本类型一致。
2.5、Services
用来定义rpc传输的接口防范,需要自定义实现接口方法用于thrift框架调用。
rpc主要是要将调用的方法序列化传输到调用到server端,再有server端继续往下触发。
exception DataException {
1:optional String message,
2:optional String callback,
3:optional String date
}
service PersonService {
Person getPersonByUsername(1:required String username) throws (1:DataException dataException),
void savePerson(1:required Person person) throws(1:DataException dataException)
}
3、输出java代码
thrift --gen java .\thriftTest\src\main\resources\tutorial.thrift
4、编写实现
package com.tianzehao;
import com.tianzehao.tutorial.DataException;
import com.tianzehao.tutorial.Person;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TException;
public class PersonImpl implements PersonService.Iface {
@Override
public Person getPersonByUsername(String username) throws DataException, TException {
System.out.println("getPersonByUsername被调用并收到参数:" + username);
Person person = new Person();
person.setUsername("公众号:霸道的程序猿");
person.setAge(100);
person.setMarried(true);
return person;
}
@Override
public void savePerson(Person person) throws DataException, TException {
System.out.println("savePerson方法被调用,接收到的参数为:");
System.out.println(person.getUsername());
System.out.println(person.getAge());
System.out.println(person.isMarried());
}
}
5、thrift rpc介绍
5.1 协议层protocol
定义序列化模式,主要在网络传输中编码与解码。比如二进制、json格式
协议类型 | 说明 | 备注 |
---|---|---|
TBinaryProtocol | 二进制编码格式进行数据传输 | 常用方式,客户端无需使用TFramedTransport或者TFastFramedTransport包装客户端的transport进行包裹 |
TCompactProtocol | 高效率、密集的二进制编码格式进行传输 | 常用方式,如果服务端开启协议为TCompactProtocol的话,客户端则需要使用TFramedTransport或者TFastFramedTransport包装客户端的transport,不然无法连接客户端 |
TJSONProtocol | 使用Json字符串数据编码传输 | |
TSimpleJSONProtocol | 只提供JSON只写的协议,适用脚本语言解析 | |
TProtocolDecorator | 用于扩展协议的,比如我们可以再协议中添加处理逻辑 |
使用TProtocolDecorator的demo
package com.tianzehao.servertest;
import com.tianzehao.PersonImpl;
import com.tianzehao.tutorial.Person;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolDecorator;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.*;
public class DecoratorTest {
static void startServer() throws TTransportException {
Thread server = new Thread(new Runnable() {
@Override
public void run() {
TServerTransport transport = null;
try {
transport = new TServerSocket(8890);
} catch (TTransportException e) {
throw new RuntimeException(e);
}
TServer.Args Args = new TServer.Args(transport);
PersonService.Processor<PersonImpl> processor = new PersonService.Processor<>(new PersonImpl());
Args.protocolFactory(new TBinaryProtocol.Factory()).processor(processor);
TSimpleServer tServer = new TSimpleServer(Args);
tServer.serve();
}
});
server.start();
}
static class TProtocolDecoratorTest extends TProtocolDecorator {
/**
* Encloses the specified protocol.
*
* @param protocol All operations will be forward to this protocol. Must be non-null.
*/
public TProtocolDecoratorTest(TProtocol protocol) {
super(protocol);
}
@Override
public void writeString(String str) throws TException {
System.out.println("Before writing string: " + str);
String prefix = "tainzehao ";
super.writeString(prefix + str);
System.out.println("After writing string");
}
@Override
public String readString() throws TException {
System.out.println("Before reading string");
String str = super.readString();
String prefix = "tainzehao ";
str = prefix + str;
return str;
}
}
public static void main(String[] args) throws TTransportException {
startServer();
TTransport transport =new TSocket("localhost", 8890);
TProtocol protocol = new TBinaryProtocol(transport);
TProtocol tProtocol = new DecoratorTest.TProtocolDecoratorTest(protocol);
PersonService.Client client = new PersonService.Client(tProtocol);
try {
transport.open();
System.out.println("------------------------");
Person person1 = new Person();
person1.setUsername("公众号:霸道的程序猿");
person1.setAge(50);
person1.setMarried(true);
Person preson = client.getPersonByUsername("xiaoyu");
System.out.println(preson.getUsername());
System.out.println(preson.getAge());
System.out.println(preson.getFieldValue(Person._Fields.MARRIED));
transport.close();
}catch (Exception ex){
throw new RuntimeException(ex.getMessage(),ex);
}finally {
transport.close();
}
}
}
5.2、客户端传输层说明
定义网络交互模式 TTransport
决定了网络传输的方式、比如使用阻塞方式进行传输、非阻塞方式传输等。
也有扩展的TTransport,比如TSaslClientTransport用于需要进行认证的方式传输
客户端传输层名称 | 说明 | 备注 |
---|---|---|
TSocket | 使用阻塞式io进行传输 | 客户端的常用方式 |
TNonblockingTransport | 使用非阻塞方式、用于构建异步客户端 | |
TFramedTransport | 包装TSocket,TNonblockingTransport,按快的大小进行传输 | 需要与协议配合使用,服务端开启协议为TCompactProtocol的话,别需要使用TFramedTransport或者TFastFramedTransport包装客户端的transport |
TSaslClientTransport,TSaslServerTransport | 包装transport的类、用于认证方式传输 | |
TFastFramedTransport | 包装transport的类、更紧凑的传输块 | |
。。。 | 还有更多的tansport,都是与网络传输不相关的类,可以不用过多的关注 |
5.3、服务器的网络模型说明
5.3.1、TServer
TServer是顶层的抽象类定义了静态内部类Args、Args继承自抽象类AbstractServerArgs。AbstractServerArg采用建造者模式、向TServer提供各种工厂
创建一个Server一般需要四个参数
- processor 必须的 自行编写代码实现服务的PersonService.Iface接口类
- protocolFactory 非必须 默认TBinaryProtocol.Factory, 可以只传入一个Factory,也可以是两个不同类型,分别对应输入和输出。如果服务端设置输入、输出不同协议,则客户端也要要进行相应设置。一般情况直接一个即可。如果服务端使用了
- TTransportFactory 非必须 默认值无。可以只传入一个,也可以是多个,默认是TTransportFactory,分别对应输入输出。
- TServerTransport, 服务端的传输层配置。 如果是非阻塞的则TServerTransport必须是TNonblockingServerTransport,因为开启非阻塞服务时TServerTransport会强制转换TNonblockingServerTransport,原理是nio中的socketChannel的 + selector机制。阻塞模式则没有限制Transport类型
Tserver说明
服务端名称 | 说明 | 备注 |
---|---|---|
TSimpleServer | 阻塞式服务 | |
TThreadPoolServer | 接收请求同步,处理请求异步 | |
TNonblockingServer | 1个处理线程异步接收请求,同步处理请求 | |
THsHaServer | 1个处理线程异步接收请求,异步处理请求 | 生产用的最多的方案 |
TTHreadeSelectorServer | 多个异步线程接收请求,异步处理请求 | 生产最好的解决方案,后续版本才有的功能 |
TSaslNonblockingServer | 1个线程异步线程处理请求,异步处理请求 | 应对客户端带有认证的需求场景,备注可以参考hive的 org.apache.hive.service.auth.PlainSaslHelper |
特殊说明
:TSimpleServer支持两种服务端TServerTransport的传输层;但是非阻塞的Transport传入,仍然是以阻塞的方式访问等待链接进入
processor实现demo
package com.tianzehao;
import com.tianzehao.tutorial.DataException;
import com.tianzehao.tutorial.Person;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TException;
public class PersonImpl implements PersonService.Iface {
@Override
public Person getPersonByUsername(String username) throws DataException, TException {
Person person = new Person();
person.setUsername(username);
person.setAge(100);
person.setMarried(true);
return person;
}
@Override
public void savePerson(Person person) throws DataException, TException {
System.out.println(person.getUsername());
System.out.println(person.getAge());
System.out.println(person.isMarried());
}
}
服务端开启demo
高可用的服务端
package com.tianzehao;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.*;
import org.apache.thrift.transport.layered.TFramedTransport;
public class ThriftServerTest {
public static void main(String[] args) throws Exception
{
//设置服务器端口 TNonblockingServerSocket-非堵塞服务模型
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8890);
//参数设置
THsHaServer.Args arg = new THsHaServer.Args(serverSocket).minWorkerThreads(2).maxWorkerThreads(4);
// minWorkerThreads 处理请求的线程数最小值,maxWorkerThreads处理请求线程数最大值
//处理器
PersonService.Processor<PersonImpl> processor = new PersonService.Processor<>(new PersonImpl());
arg.protocolFactory(new TCompactProtocol.Factory()).processorFactory(new TProcessorFactory(processor));
TServer server = new THsHaServer(arg);
System.out.println("Thrift 服务端启动成功");
server.serve();
}
}
验证demo服务
package com.tianzehao.servertest;
import com.tianzehao.PersonImpl;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerTransport;
public class SimpleServer {
public static void main(String[] args) throws Exception {
// TServerTransport tServerSocket = new TServerSocket(8890);
//虽然这里用了非阻塞的传输层,但是服务端还是阻塞的
TServerTransport transport = new TNonblockingServerSocket(8890);
TServer.Args tserverArgs = new TServer.Args(transport);
tserverArgs.protocolFactory(new TBinaryProtocol.Factory());
PersonService.Processor<PersonImpl> processor = new PersonService.Processor<>(new PersonImpl());
tserverArgs.processor(processor);
TSimpleServer tSimpleServer = new TSimpleServer(tserverArgs);
System.out.println("服务器启动");
tSimpleServer.serve();
}
}
hive的认证都是在thrift链接时做的校验, org.apache.hive.service.auth.PlainSaslHelper。 就是通过Sal
常见的SASL机制包括PLAIN、GSSAPI(Kerberos)、DIGEST-MD5等
简单服务器server
服务端通过TServerTransport监听端口,
Server端的transport只有两个,
阻塞式
TServerTransport原理基于socket的,创建ServerSocket,只能阻塞线程;
非阻塞式
TNonblockingServerSocket,默认是非阻塞模式了。但是TSimpleServer循环扫描TServerTransport,相当于阻塞的方式等待链接的进入
所以TSimpleServer支持两种服务端TServerTransport的传输层;但是还是阻塞方式访问等待链接进入
异步客户端使用方式demo
public class Client {
public static void main(String[] args) throws Exception {
// 异步调用管理器
TAsyncClientManager clientManager = new TAsyncClientManager();
// 客户端应该使用非阻塞 IO
TNonblockingTransport transport = new TNonblockingSocket("localhost",8181);
// 协议与服务端需要一致
TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory();
// 异步调用
UserService.AsyncClient asyncClient = new UserService.AsyncClient(tProtocolFactory,clientManager,transport);
asyncClient.getUser(123, new AsyncMethodCallback<User>() {
// 处理服务返回的结果值
public void onComplete(User user) {
System.out.println(user);
}
// 处理调用服务过程中出现的异常
public void onError(Exception e) {
}
});
// 让线程等待一秒,以免主线程先于异步调用结果之前结束,导致结果未被输出
TimeUnit.SECONDS.sleep(1);
}
}