1 首先定义ClientInterceptor
public HeaderClientInterceptor(String clientId, String accessToken) {
this.clientId = clientId;
this.accessToken = accessToken;
}
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(CLIENT_ID, clientId);
if (StringUtils.isNotEmpty(accessToken)) {
headers.put(ACCESS_TOKEN, accessToken);
}
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
/**
* {@inheritDoc}
* @see io.grpc.ForwardingClientCallListener#onClose(Status, Metadata)
*/
@Override
public void onClose(Status status, Metadata trailers) {
CLOG.debug("关闭原因:{}", status.getCode().toString());
CLOG.debug("获取请求返回头部信息,{}", trailers.toString());
String isexpire = trailers.get(IS_EXPIRE);
if (Boolean.getBoolean(isexpire)) {
OAuthService oAuthService = SpringContextUtils.getBean(OAuthService.class);
String accessToken = oAuthService.oauth();
CLOG.debug("refresh Token,{}", accessToken);
}
super.onClose(status, trailers);
}
@Override
public void onHeaders(Metadata headers) {
CLOG.debug("获取请求返回头部信息,{}", headers.toString());
super.onHeaders(headers);
}
}, headers);
}
};
}
2 Auth,项目中做成了切面。
public String verifying() {
ManagedChannel channel = channelPool.getManagedChannel();
OAuthBlockingStub blockingStub = OAuthGrpc.newBlockingStub(channel)
.withInterceptors(new HeaderClientInterceptor(clientProp.getClientId(), null));
OAuthRequest oAuthRequest = OAuthRequest.newBuilder().setClientId(clientProp.getClientId())
.setClientSecret(clientProp.getSecret()).setGrantType(clientProp.getGrant()).build();
CLOG.info("OAuthRequest: " + JsonUtils.toJson(oAuthRequest), tagMap);
OAuthResponse oauthResponse = null;
try {
oauthResponse = blockingStub.getAccessToken(oAuthRequest);
RedisHelper.setHNATokenAndEffectTime(oauthResponse.getAccessToken(), new Date());
} catch (StatusRuntimeException e) {
CLOG.info(e, tagMap);
return "";
} finally {
if (null != channel) {
channelPool.returnObject(channel);
}
}
return oauthResponse.getAccessToken();
}
3 具体clientStub调用
public AirLowFareSearchRS airLowFareSearch(AirLowFareSearchRQ airLowFareSearchRQ) {
ManagedChannel channel = channelPool.getManagedChannel();
Map<String, String> tagMap = this.tagMap(airLowFareSearchRQ);
AirLowFareSearchBlockingStub blockingStub = AirLowFareSearchGrpc.newBlockingStub(channel)
.withInterceptors(new HeaderClientInterceptor(clientProp.getClientId(), RedisHelper.getToken()));
CLOG.info("AirLowFareSearchRQ :" + JsonUtils.toJson(airLowFareSearchRQ), tagMap);
AirLowFareSearchRS airLowFareSearchRS = null;
try {
airLowFareSearchRS = blockingStub.airLowFareSearch(airLowFareSearchRQ);
CLOG.info("AirLowFareSearchRS :" + JsonUtils.toJson(airLowFareSearchRS), tagMap);
} catch (Exception e) {
CLOG.info(e, tagMap);
ExceptionHandler.tokenUselessToEmpty(e);
} finally {
if (null != channel) {
channelPool.returnObject(channel);
}
}
return airLowFareSearchRS;
}
4 深入源码 (调了半天有点晕,下次再续)
/**
* Executes a unary call and returns a {@link ListenableFuture} to the response.
*
* @return a future for the single response message.
*/
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call,
ReqT param) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture), false);
return responseFuture;
}