Thrift说明及使用

thrift

thrift是一个快速实现rpc的框架,
1、通过安装thrift可执行软件
2、编写idl内容
3、输出指定语言代码
4、编写方法实现
5、编写框架代码即可快速实现rpc的调用

1、 安装thrift软件

wget http://qiniu.asdu.cn/thrift-0.19.0.exe

#  创建环境变量将下载的包放到目标位置;D:\Program Files\thrift
# 暴露环境变量
# 编辑PATH ,增加路径D:\Program Files\thrift

thrift -version
#输出 Thrift version 0.19.0则安装成功

[图片上传失败...(image-9e159a-1704700347281)]

2、idl语法

thrift提供多种类型,通过这些类型定义建立对语言的映射关系,以便快速生成rpc 框架类

2.1、基本类型

类型 说明 备注
bool 布尔值
byte 8 bit的数字
i16 16位数字 对应short
i32 32为数字 对应int
i64 64位数字 对应long
double 64为长度数字
string 文本
# 这里是java的thrift文件写法,为了使用方便定义了类型。如果想兼容性强就不用定义类型快捷方式,直接使用对应的标识符。下面给的不是一个好的实践
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

2.2、structs

对象类,该类是用来存放数据对象的

struct Person {
    1:optional String username,
    2:optional i32 age,
    3:optional bool married
}

2.3、union

字段类,该类使用来存放多种数据的。创建一个中间类,这个类可以设置多个类型,通过set防范设置数据。
与struct基本没啥区别,可以直接用struct代替

#标识TTypeQualifierValue的值可以设置多个类型,有对应的构造方法
union TTypeQualifierValue {
  1: optional i32 intname
  2: optional string stringname
}

2.4、containers

复杂的元素可以用这些
list 对应java ArrayList
set 对应java set
map 对应java map

  • list:元素的有序列表,对应转换到STL的vector,Java的ArrayList
  • set:无序的元素集合,对应转换到STL的set,Java的HashSet
  • map:对应转到到STL的map,Java的HashMap
    为了获得最大的兼容性,map的键类型最好是基本数据类型,而不是struct或者container
typedef list<string> listType
typedef set<string> setType
typedef map<string, string> mapType

#这是定义的,使用跟基本类型一致。

2.5、Services

用来定义rpc传输的接口防范,需要自定义实现接口方法用于thrift框架调用。
rpc主要是要将调用的方法序列化传输到调用到server端,再有server端继续往下触发。

exception DataException {
    1:optional String message,
    2:optional String callback,
    3:optional String date
}

service PersonService {
    Person getPersonByUsername(1:required String username) throws (1:DataException dataException),
    void savePerson(1:required Person person) throws(1:DataException dataException)
}

3、输出java代码

thrift --gen java .\thriftTest\src\main\resources\tutorial.thrift

4、编写实现

package com.tianzehao;

import com.tianzehao.tutorial.DataException;
import com.tianzehao.tutorial.Person;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TException;

public class PersonImpl implements PersonService.Iface {
    @Override
    public Person getPersonByUsername(String username) throws DataException, TException {
        System.out.println("getPersonByUsername被调用并收到参数:" + username);
        Person person = new Person();
        person.setUsername("公众号:霸道的程序猿");
        person.setAge(100);
        person.setMarried(true);

        return person;
    }

    @Override
    public void savePerson(Person person) throws DataException, TException {
        System.out.println("savePerson方法被调用,接收到的参数为:");
        System.out.println(person.getUsername());
        System.out.println(person.getAge());
        System.out.println(person.isMarried());
    }
}

5、thrift rpc介绍

5.1 协议层protocol

定义序列化模式,主要在网络传输中编码与解码。比如二进制、json格式

协议类型 说明 备注
TBinaryProtocol 二进制编码格式进行数据传输 常用方式,客户端无需使用TFramedTransport或者TFastFramedTransport包装客户端的transport进行包裹
TCompactProtocol 高效率、密集的二进制编码格式进行传输 常用方式,如果服务端开启协议为TCompactProtocol的话,客户端则需要使用TFramedTransport或者TFastFramedTransport包装客户端的transport,不然无法连接客户端
TJSONProtocol 使用Json字符串数据编码传输
TSimpleJSONProtocol 只提供JSON只写的协议,适用脚本语言解析
TProtocolDecorator 用于扩展协议的,比如我们可以再协议中添加处理逻辑

使用TProtocolDecorator的demo

package com.tianzehao.servertest;

import com.tianzehao.PersonImpl;
import com.tianzehao.tutorial.Person;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolDecorator;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.*;

public class DecoratorTest {


    static void startServer() throws TTransportException {
        Thread server = new Thread(new Runnable() {
            @Override
            public void run() {
                TServerTransport transport = null;
                try {
                    transport = new TServerSocket(8890);
                } catch (TTransportException e) {
                    throw new RuntimeException(e);
                }
                TServer.Args Args = new TServer.Args(transport);
                PersonService.Processor<PersonImpl> processor = new PersonService.Processor<>(new PersonImpl());
                Args.protocolFactory(new TBinaryProtocol.Factory()).processor(processor);

                TSimpleServer tServer = new TSimpleServer(Args);
                tServer.serve();
            }
        });
        server.start();
    }

    static class TProtocolDecoratorTest extends TProtocolDecorator {
        /**
         * Encloses the specified protocol.
         *
         * @param protocol All operations will be forward to this protocol. Must be non-null.
         */
        public TProtocolDecoratorTest(TProtocol protocol) {
            super(protocol);
        }
        @Override
        public void writeString(String str) throws TException {
            System.out.println("Before writing string: " + str);
            String prefix = "tainzehao ";
            super.writeString(prefix + str);
            System.out.println("After writing string");
        }

        @Override
        public String readString() throws TException {
            System.out.println("Before reading string");
            String str = super.readString();
            String prefix = "tainzehao ";
            str = prefix + str;
            return str;
        }
    }



    public static void main(String[] args) throws TTransportException {
        startServer();
        TTransport transport =new TSocket("localhost", 8890);

        TProtocol protocol = new TBinaryProtocol(transport);

        TProtocol tProtocol = new DecoratorTest.TProtocolDecoratorTest(protocol);
        PersonService.Client client = new PersonService.Client(tProtocol);

        try {
            transport.open();
            System.out.println("------------------------");

            Person person1 = new Person();
            person1.setUsername("公众号:霸道的程序猿");
            person1.setAge(50);
            person1.setMarried(true);
            Person preson = client.getPersonByUsername("xiaoyu");
            System.out.println(preson.getUsername());
            System.out.println(preson.getAge());
            System.out.println(preson.getFieldValue(Person._Fields.MARRIED));
    
            transport.close();
        }catch (Exception ex){
            throw new RuntimeException(ex.getMessage(),ex);
        }finally {
            transport.close();
        }
    }
    
}

5.2、客户端传输层说明

定义网络交互模式 TTransport
决定了网络传输的方式、比如使用阻塞方式进行传输、非阻塞方式传输等。
也有扩展的TTransport,比如TSaslClientTransport用于需要进行认证的方式传输

客户端传输层名称 说明 备注
TSocket 使用阻塞式io进行传输 客户端的常用方式
TNonblockingTransport 使用非阻塞方式、用于构建异步客户端
TFramedTransport 包装TSocket,TNonblockingTransport,按快的大小进行传输 需要与协议配合使用,服务端开启协议为TCompactProtocol的话,别需要使用TFramedTransport或者TFastFramedTransport包装客户端的transport
TSaslClientTransport,TSaslServerTransport 包装transport的类、用于认证方式传输
TFastFramedTransport 包装transport的类、更紧凑的传输块
。。。 还有更多的tansport,都是与网络传输不相关的类,可以不用过多的关注

5.3、服务器的网络模型说明

5.3.1、TServer

TServer是顶层的抽象类定义了静态内部类Args、Args继承自抽象类AbstractServerArgs。AbstractServerArg采用建造者模式、向TServer提供各种工厂
创建一个Server一般需要四个参数

  • processor 必须的 自行编写代码实现服务的PersonService.Iface接口类
  • protocolFactory 非必须 默认TBinaryProtocol.Factory, 可以只传入一个Factory,也可以是两个不同类型,分别对应输入和输出。如果服务端设置输入、输出不同协议,则客户端也要要进行相应设置。一般情况直接一个即可。如果服务端使用了
  • TTransportFactory 非必须 默认值无。可以只传入一个,也可以是多个,默认是TTransportFactory,分别对应输入输出。
  • TServerTransport, 服务端的传输层配置。 如果是非阻塞的则TServerTransport必须是TNonblockingServerTransport,因为开启非阻塞服务时TServerTransport会强制转换TNonblockingServerTransport,原理是nio中的socketChannel的 + selector机制。阻塞模式则没有限制Transport类型

Tserver说明

服务端名称 说明 备注
TSimpleServer 阻塞式服务
TThreadPoolServer 接收请求同步,处理请求异步
TNonblockingServer 1个处理线程异步接收请求,同步处理请求
THsHaServer 1个处理线程异步接收请求,异步处理请求 生产用的最多的方案
TTHreadeSelectorServer 多个异步线程接收请求,异步处理请求 生产最好的解决方案,后续版本才有的功能
TSaslNonblockingServer 1个线程异步线程处理请求,异步处理请求 应对客户端带有认证的需求场景,备注可以参考hive的 org.apache.hive.service.auth.PlainSaslHelper

特殊说明:TSimpleServer支持两种服务端TServerTransport的传输层;但是非阻塞的Transport传入,仍然是以阻塞的方式访问等待链接进入

processor实现demo

package com.tianzehao;
    
import com.tianzehao.tutorial.DataException;
import com.tianzehao.tutorial.Person;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TException;

public class PersonImpl implements PersonService.Iface {
    @Override
    public Person getPersonByUsername(String username) throws DataException, TException {
        Person person = new Person();
        person.setUsername(username);
        person.setAge(100);
        person.setMarried(true);
        return person;
    }

    @Override
    public void savePerson(Person person) throws DataException, TException {
        System.out.println(person.getUsername());
        System.out.println(person.getAge());
        System.out.println(person.isMarried());
    }
}

服务端开启demo

高可用的服务端

package com.tianzehao;

import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.*;
import org.apache.thrift.transport.layered.TFramedTransport;

public class ThriftServerTest {
    public static void main(String[] args) throws Exception
    {
        //设置服务器端口  TNonblockingServerSocket-非堵塞服务模型
        TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8890);


        //参数设置
        THsHaServer.Args arg = new THsHaServer.Args(serverSocket).minWorkerThreads(2).maxWorkerThreads(4);
        //  minWorkerThreads 处理请求的线程数最小值,maxWorkerThreads处理请求线程数最大值
        
        //处理器
        PersonService.Processor<PersonImpl> processor = new PersonService.Processor<>(new PersonImpl());
        arg.protocolFactory(new TCompactProtocol.Factory()).processorFactory(new TProcessorFactory(processor));

        TServer server = new THsHaServer(arg);
        System.out.println("Thrift 服务端启动成功");
        server.serve();
    }
}

验证demo服务

package com.tianzehao.servertest;

import com.tianzehao.PersonImpl;
import com.tianzehao.tutorial.PersonService;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerTransport;

public class SimpleServer {
    public static void main(String[] args) throws Exception {
//        TServerTransport tServerSocket = new TServerSocket(8890);
        //虽然这里用了非阻塞的传输层,但是服务端还是阻塞的
        TServerTransport transport = new TNonblockingServerSocket(8890);

        TServer.Args tserverArgs = new TServer.Args(transport);
        tserverArgs.protocolFactory(new TBinaryProtocol.Factory());
        PersonService.Processor<PersonImpl> processor = new PersonService.Processor<>(new PersonImpl());
        tserverArgs.processor(processor);

        TSimpleServer tSimpleServer = new TSimpleServer(tserverArgs);
        System.out.println("服务器启动");
        tSimpleServer.serve();
    }
}

hive的认证都是在thrift链接时做的校验, org.apache.hive.service.auth.PlainSaslHelper。 就是通过Sal
常见的SASL机制包括PLAIN、GSSAPI(Kerberos)、DIGEST-MD5等

简单服务器server
服务端通过TServerTransport监听端口,
Server端的transport只有两个,
阻塞式
TServerTransport原理基于socket的,创建ServerSocket,只能阻塞线程;
非阻塞式
TNonblockingServerSocket,默认是非阻塞模式了。但是TSimpleServer循环扫描TServerTransport,相当于阻塞的方式等待链接的进入
所以TSimpleServer支持两种服务端TServerTransport的传输层;但是还是阻塞方式访问等待链接进入

异步客户端使用方式demo

public class Client {
    public static void main(String[] args) throws Exception {
        // 异步调用管理器
        TAsyncClientManager clientManager = new TAsyncClientManager();
        // 客户端应该使用非阻塞 IO
        TNonblockingTransport transport = new TNonblockingSocket("localhost",8181);
        // 协议与服务端需要一致
        TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory();
        // 异步调用
        UserService.AsyncClient asyncClient = new UserService.AsyncClient(tProtocolFactory,clientManager,transport);
        asyncClient.getUser(123, new AsyncMethodCallback<User>() {
            // 处理服务返回的结果值
            public void onComplete(User user) {
                System.out.println(user);
            }

            // 处理调用服务过程中出现的异常
            public void onError(Exception e) {

            }
        });
        // 让线程等待一秒,以免主线程先于异步调用结果之前结束,导致结果未被输出
        TimeUnit.SECONDS.sleep(1);
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容