以下内容以pinpoint源码dubbo实现为例
1.新写DubboPlugin与DubboTraceMetadataProvider并添加配置文件。
DubboTraceMetadataProvider往TraceMetadataSetupContext上下文添加全局唯一的ServiceType和AnnotationKey。(遗留问题,在哪里使用到了)
DubboPlugin添加需要拦截的方法
2.定义DubboProviderDetector探针类和客户/服务端拦截器类DubboConsumerInterceptor/DubboProviderInterceptor。
DubboProviderDetector判断位置是否满足插件的要求
重要的在intercept上面
- DubboConsumerInterceptor
package com.navercorp.pinpoint.plugin.dubbo.interceptor;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.navercorp.pinpoint.bootstrap.context.*;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor1;
import com.navercorp.pinpoint.plugin.dubbo.DubboConstants;
/**
* @author Jinkai.Ma
*/
public class DubboConsumerInterceptor implements AroundInterceptor1 {
private final MethodDescriptor descriptor;
private final TraceContext traceContext;
public DubboConsumerInterceptor(TraceContext traceContext, MethodDescriptor descriptor) {
this.descriptor = descriptor;
this.traceContext = traceContext;
}
@Override
public void before(Object target, Object arg0) {
Trace trace = this.getTrace(target);
if (trace == null) {
return;
}
RpcInvocation invocation = (RpcInvocation) arg0;
if (trace.canSampled()) {
SpanEventRecorder recorder = trace.traceBlockBegin();
// RPC调用跟踪必须与RPC客户端代码范围内的服务代码一起记录。
recorder.recordServiceType(DubboConstants.DUBBO_CONSUMER_SERVICE_TYPE);
// 您必须发出此请求的接收者将使用的TraceId。
TraceId nextId = trace.getTraceId().getNextTraceId();
// 然后将其记录为下一个SpanID。
recorder.recordNextSpanId(nextId.getSpanId());
// 最后,将一些跟踪数据传递到服务器。
// 如何将它们放入消息中是特定于协议的。
// 本示例假定目标协议消息可以包含任何元数据(如HTTP标头)
invocation.setAttachment(DubboConstants.META_TRANSACTION_ID, nextId.getTransactionId());
invocation.setAttachment(DubboConstants.META_SPAN_ID, Long.toString(nextId.getSpanId()));
invocation.setAttachment(DubboConstants.META_PARENT_SPAN_ID, Long.toString(nextId.getParentSpanId()));
invocation.setAttachment(DubboConstants.META_PARENT_APPLICATION_TYPE, Short.toString(traceContext.getServerTypeCode()));
invocation.setAttachment(DubboConstants.META_PARENT_APPLICATION_NAME, traceContext.getApplicationName());
invocation.setAttachment(DubboConstants.META_FLAGS, Short.toString(nextId.getFlags()));
} else {
// 如果禁用采样此事务,则仅将该信息传递给服务器。
invocation.setAttachment(DubboConstants.META_DO_NOT_TRACE, "1");
}
}
@Override
public void after(Object target, Object arg0, Object result, Throwable throwable) {
Trace trace = this.getTrace(target);
if (trace == null) {
return;
}
RpcInvocation invocation = (RpcInvocation) arg0;
try {
SpanEventRecorder recorder = trace.currentSpanEventRecorder();
recorder.recordApi(descriptor);
if (throwable == null) {
String endPoint = RpcContext.getContext().getRemoteAddressString();
// RPC客户端必须记录端点(服务器地址)
recorder.recordEndPoint(endPoint);
// (可选)记录目标ID(服务器的逻辑名称,例如DB名称)
recorder.recordDestinationId(endPoint);
recorder.recordAttribute(DubboConstants.DUBBO_ARGS_ANNOTATION_KEY, invocation.getArguments());
recorder.recordAttribute(DubboConstants.DUBBO_RESULT_ANNOTATION_KEY, result);
} else {
recorder.recordException(throwable);
}
} finally {
trace.traceBlockEnd();
}
}
private Trace getTrace(Object target) {
Invoker invoker = (Invoker) target;
// Ignore monitor service.
if (DubboConstants.MONITOR_SERVICE_FQCN.equals(invoker.getInterface().getName())) {
return null;
}
return traceContext.currentTraceObject();
}
}
package com.navercorp.pinpoint.plugin.dubbo.interceptor;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.navercorp.pinpoint.bootstrap.context.*;
import com.navercorp.pinpoint.bootstrap.interceptor.SpanSimpleAroundInterceptor;
import com.navercorp.pinpoint.bootstrap.util.NumberUtils;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.plugin.dubbo.DubboConstants;
/**
* @author Jinkai.Ma
*/
public class DubboProviderInterceptor extends SpanSimpleAroundInterceptor {
public DubboProviderInterceptor(TraceContext traceContext, MethodDescriptor descriptor) {
super(traceContext, descriptor, DubboProviderInterceptor.class);
}
@Override
protected Trace createTrace(Object target, Object[] args) {
Invoker invoker = (Invoker) target;
// Ignore monitor service.
if (DubboConstants.MONITOR_SERVICE_FQCN.equals(invoker.getInterface().getName())) {
return traceContext.disableSampling();
}
RpcInvocation invocation = (RpcInvocation) args[0];
// 如果禁用采样此事务,则仅将该信息传递给服务器。
if (invocation.getAttachment(DubboConstants.META_DO_NOT_TRACE) != null) {
return traceContext.disableSampling();
}
String transactionId = invocation.getAttachment(DubboConstants.META_TRANSACTION_ID);
//如果没有transaction,则从此处开始新的transaction。
if (transactionId == null) {
return traceContext.newTraceObject();
}
//否则,继续使用给定的数据进行跟踪
long parentSpanID = NumberUtils.parseLong(invocation.getAttachment(DubboConstants.META_PARENT_SPAN_ID), SpanId.NULL);
long spanID = NumberUtils.parseLong(invocation.getAttachment(DubboConstants.META_SPAN_ID), SpanId.NULL);
short flags = NumberUtils.parseShort(invocation.getAttachment(DubboConstants.META_FLAGS), (short) 0);
TraceId traceId = traceContext.createTraceId(transactionId, parentSpanID, spanID, flags);
return traceContext.continueTraceObject(traceId);
}
@Override
protected void doInBeforeTrace(SpanRecorder recorder, Object target, Object[] args) {
RpcInvocation invocation = (RpcInvocation) args[0];
RpcContext rpcContext = RpcContext.getContext();
//您必须在“服务器”范围内记录服务类型。
recorder.recordServiceType(DubboConstants.DUBBO_PROVIDER_SERVICE_TYPE);
// 记录rpc名称,客户端地址,服务器地址。
recorder.recordRpcName(invocation.getInvoker().getInterface().getSimpleName() + ":" + invocation.getMethodName());
recorder.recordEndPoint(rpcContext.getLocalAddressString());
recorder.recordRemoteAddress(rpcContext.getRemoteAddressString());
// 如果此事务未在此处开始,请记录父(发送此请求的客户端)信息
if (!recorder.isRoot()) {
String parentApplicationName = invocation.getAttachment(DubboConstants.META_PARENT_APPLICATION_NAME);
if (parentApplicationName != null) {
short parentApplicationType = NumberUtils.parseShort(invocation.getAttachment(DubboConstants.META_PARENT_APPLICATION_TYPE), ServiceType.UNDEFINED.getCode());
recorder.recordParentApplication(parentApplicationName, parentApplicationType);
// Pinpoint通过匹配呼叫者的端点和被呼叫者的接受方主机来找到呼叫者-被呼叫者关系。
// https://github.com/naver/pinpoint/issues/1395
recorder.recordAcceptorHost(rpcContext.getLocalAddressString());
}
}
}
@Override
protected void doInAfterTrace(SpanRecorder recorder, Object target, Object[] args, Object result, Throwable throwable) {
RpcInvocation invocation = (RpcInvocation) args[0];
recorder.recordApi(methodDescriptor);
recorder.recordAttribute(DubboConstants.DUBBO_ARGS_ANNOTATION_KEY, invocation.getArguments());
if (throwable == null) {
recorder.recordAttribute(DubboConstants.DUBBO_RESULT_ANNOTATION_KEY, result);
} else {
recorder.recordException(throwable);
}
}
}
拦截器做一些记录操作。
插件开发总结:新写DubboPlugin与DubboTraceMetadataProvider并添加配置文件,加载的时候利用spi机制生成DubboPlugin与DubboTraceMetadataProvider,自己定义的intercept拦截器记录被调用方法前后的操作(什么地方用什么api暂时不清楚)。Detector探测当前位置是否满足插件要求。