自定义实现grpc拦截器

使用;框架进行通信时,有时候需要对编写拦截器对请求或者响应对象进行拦截。如何实现拦截呢?

服务端

服务端拦截器如下图所示:

serverCall:是响应的回调接口,可以用于直接关闭请求;

一般拦截器返回的是next.startCall(serverCall, headers);但是如果想获取到请求对象或者响应对象,需要通过装饰器模式来进行增强,在增强的时候,可以做一些处理。

public class TestServerInterceptor implements ServerInterceptor {

    public static final Metadata.Key<Long> USER_ID_KEY = Metadata.Key.of("userId", ASCII_LONG_MARSHALLER);
    public static final Metadata.Key<String> OPT_NAME_KEY = Metadata.Key.of("userName", ASCII_STRING_MARSHALLER);
    public static final ThreadLocal<Long> USER_THREAD_LOCAL = new ThreadLocal<>();


    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata headers,
                                                                 ServerCallHandler<ReqT, RespT> next) {
        /**
         * 处理请求的header,做一些特殊处理
         */
        //例如验证权限
        if (headers.get(USER_ID_KEY) == null) {
            //直接关闭请求
            serverCall.close(Status.UNAUTHENTICATED.withDescription("auth failed"), new Metadata());
        }
      //此处可以将header的值放入到ThreadLocal中 

        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(
                /**
                 * 回调监听接口
                 */
                new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {

                    /**
                     * 发送响应报文时候的拦截(可以打印响应报文)
                     * @param message response message.
                     */
                    @Override
                    public void sendMessage(RespT message) {
                        //可以做点什么(这里可以拿到接口的response)
                        log.info("test open sendMessage:{}", ObjectMapperUtils.toJSON(message));
                        super.sendMessage(message);
                        log.info("test close sendMessage");

                    }

                    /**
                     * 发送响应报文的拦截(可以设置响应header头)
                     * @param headers metadata to send prior to any response body.
                     */
                    @Override
                    public void sendHeaders(Metadata headers) {
                        log.info("test open sendHeaders");

                        Metadata.Key<Long> respXxx = Metadata.Key.of("respXxx", ASCII_LONG_MARSHALLER);
                        headers.put(respXxx, 1232L);
                        super.sendHeaders(headers);
                        log.info("test close sendHeaders");
                    }
                }, headers)) {

            /**
             * 打印请求报文
             */
            @Override
            public void onMessage(ReqT message) {
                //打印入参
                log.info("test open onMessage:{}", ObjectMapperUtils.toJSON(message));
                //参数处理,校验,若发现数据有误,则抛出异常
                log.info("test close onMessage");
            }


            /**
             * 代表本次请求正常结束
             */
            @Override
            public void onComplete() {
                log.info("test open onComplete()");
                //可以做点什么
                delegate().onComplete();
                log.info("test close onComplete()");

            }

            /**
             * 代表本次请求被取消掉,通常发生在服务端执行出现异常的情况会被调用。
             *
             * 例如请求超时,会执行到这个方法。
             */
            @Override
            public void onCancel() {
                log.info("test open onCancel()");
                delegate().onCancel();
                log.info("test close onCancel()");
            }

            /**
             * 贯穿整个请求的整个生命周期。
             */
            @Override
            public void onHalfClose() {
                log.info("test open onHalfClose()");
                log.info("test close onHalfClose()");
            }
        };
      //return next.startCall(serverCall, headers);

    }
}

执行的顺序:

open onMessage()
close onMessage()

open onHalfClose()
open 业务代码
open onComplete()
close onComplete()
close onHalfClose()

假设需要将请求头中的header向下传递:

public class CustomScopeServerInterceptor implements ServerInterceptor {

    public static final Metadata.Key<Long> USER_ID_KEY = Metadata.Key.of("userId", ASCII_LONG_MARSHALLER);
    public static final Metadata.Key<String> OPT_NAME_KEY = Metadata.Key.of("optName", ASCII_STRING_MARSHALLER);

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata headers,
                                                                 ServerCallHandler<ReqT, RespT> next) {


        ServerCall.Listener<ReqT> listener = next.startCall(serverCall, headers);
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {
            @Override
            public void onHalfClose() {
                /* 统一交给ProfileInterceptor处理异常 */
                IndustryUserInfoScope.setRpcMethodName(serverCall.getMethodDescriptor().getFullMethodName());
                //将header信息注入到ThreadLocal中
                injectScopeFromHeaders(headers);
                delegate().onHalfClose();
            }
        };
    }

客户端

  • SimpleForwardingClientCallListener是对响应报文的监听;
public class TestClientInterceptor implements ClientInterceptor {
    public static final Key<Long> USER_ID_KEY = Key.of("userId", ASCII_LONG_MARSHALLER);
    public static final Key<String> OPT_NAME_KEY = Key.of("userName", ASCII_STRING_MARSHALLER);

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
                                                               CallOptions callOptions, Channel next) {
        
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {


            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                /**
                 * 发送请求报文的时候,对请求的heade进行处理。
                 */
                log.info("test open injectHeadersFromScope()");
                headers.put(USER_ID_KEY, 1001L);
                log.info("test close injectHeadersFromScope()");


                //开始请求,入参即为响应报文的处理,
                super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {

                    /**
                     * 响应报文来了,进行处理
                     * @param message returned by the server
                     */
                    public void onMessage(RespT message) {
                        log.info("test open onMessage:{}", ObjectMapperUtils.toJSON(message));
                        super.onMessage(message);
                        log.info("test close onMessage ");
                    }

                    /**
                     * 响应报文来了,获取到响应的header头信息
                     * @param headers containing metadata sent by the server at the start of the response.
                     */
                    @Override
                    public void onHeaders(Metadata headers) {
                        Key<Long> respXxx = Key.of("respXxx", ASCII_LONG_MARSHALLER);
                        log.info("test open onHeaders:{}", headers.get(respXxx));
                        super.onHeaders(headers);
                        log.info("test close onHeaders");
                    }

                    /**
                     *
                     * @param status the result of the remote call.   错误码
                     * @param trailers metadata provided at call completion.
                     */
                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        log.info("test open onClose() :resp {}", ObjectMapperUtils.toJSON(status));

                        super.onClose(status, trailers);
                        log.info("test close onClose()");
                    }

                    @Override
                    public void onReady() {
                        log.info("test open onReady()");
                        super.onReady();
                        log.info("test close onReady()");
                    }
                }, headers);
                log.info("test close start()");

            }

            /**
             * 发送请求报文
             * @param message message to be sent to the server.
             */
            @Override
            public void sendMessage(ReqT message) {
                log.info("test open sendMessage():{}",ObjectMapperUtils.toJSON(message));
                super.sendMessage(message);
                log.info("test close sendMessage()");

            }

            @Override
            public void halfClose() {
                log.info("test open halfClose()");
                super.halfClose();
                log.info("test close halfClose()");
            }

            @Override
            public void cancel(String message, Throwable cause) {
                log.info("test open cancel()");
                super.cancel(message, cause);
                log.info("test close cancel()");
            }
        };
    }
}

如果仅仅是将ThreadLocal的值通过header向下传递,可以这样重写:

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
                                                               CallOptions callOptions, Channel next) {
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                injectHeadersFromScope(headers);
                // @1 在Header中设置需要透传的值
                super.start(responseListener, headers);
            }
        };
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,968评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,682评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,254评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,074评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,964评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,055评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,484评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,170评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,433评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,512评论 2 308
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,296评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,184评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,545评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,150评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,437评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,630评论 2 335

推荐阅读更多精彩内容