前言: 对于调用链跟踪,如果我们想从服务层面,了解服务之间的调用情况,包括服务之间调用次数,异常次数等。那么就需要一个链路拓扑展示功能。链路拓扑图可以帮助我们快速了解所有服务之间的调用情况:谁调用谁,总调用量多少,正常调用以及异常调用量情况。
快速开始
Zipkin的拓扑服务zipkin-dependencies是作为zipkin的一个独立的离线服务,也就是说,只启动zipkin服务,是没法看到拓扑的,还需要自己离线启动zipkin-dependencues服务。
$ STORAGE_TYPE=elasticsearch ES_HOSTS=host1,host2 java -jar zipkin-dependencies.jar `date -u -d '1 day ago' +%F`
# To override the http port, add it to the host string
$ STORAGE_TYPE=elasticsearch ES_HOSTS=host1:9201 java -jar zipkin-dependencies.jar `date -u -d '1 day ago' +%F`
其中ES配置参数如下:
* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.
* `ES_DATE_SEPARATOR`: The separator used when generating dates in index.
Defaults to '-' so the queried index look like zipkin-yyyy-DD-mm
Could for example be changed to '.' to give zipkin-yyyy.MM.dd
* `ES_HOSTS`: A comma separated list of elasticsearch hosts advertising http. Defaults to
localhost. Add port section if not listening on port 9200. Only one of these hosts
needs to be available to fetch the remaining nodes in the cluster. It is
recommended to set this to all the master nodes of the cluster. Use url format for
SSL. For example, "https://yourhost:8888"
* `ES_NODES_WAN_ONLY`: Set to true to only use the values set in ES_HOSTS, for example if your
elasticsearch cluster is in Docker. Defaults to false
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security
(formerly Shield) is in place. By default no username or
password is provided to elasticsearch.
Zipkin出了支持elasticsearch存储,还有mysql,cassard,详细配置信息请看源码Readme
效果展示
1、图中线条说明
服务之间的线条,遵循以下原则:
- 越细调用越少,越粗调用越多
- 越黑错误越少,越红错误越多
2、主调被调次数说明
点开每一个服务,可以看到主调被调,比如我在拓扑图中点击
某个服务,可以与此服务有直接调用关系的服务有哪些,效果如下:
其中Uses by表示此服务作为被调服务,被哪些服务调用了;Uses表示此服务调用了哪些其他服务。
在上面的图中点击某个主调或被调服务,即可看到具体的调用次数,以及失败次数,效果如下:
通过拓扑图,宏观上,我们可以快速了解服务之间的调用关系,同时也可以知道哪些服务间调用有问题,且可以知道出现问题的一个量级是多少(失败数,调用总数)。
原理分析
Zipkin拓扑denpendencies是基于上报的链路span数据再次构建出的描述链路拓扑的一种新的数据结构。
构建链路的第一步就是读取Span数据。Zipkin外部数据源支持三种,分别是Mysql,Cassandra,Elasticsearch,因此构建拓扑时,将从这三种数据源中读取Span数据。
读取Span数据源后,需要对其处理,计算出链路的拓扑。因为Span的数据量很大,普通程序计算处理无法完成任务,因此需要用到大数据框架。Zipkin官方选用的是Spark框架。Spark对Span数据进行处理,最后生成拓扑数据DenpendencyLink,然后持久化到存储中。
前端请求拓扑(DependencyLink)时,即按照查询条件,查询已经持久化后的DependencyLink,然后经过UI渲染,进行页面展示。
服务主逻辑
启动Zipkin-dependencies服务时,会传入几个参数,分别是时间day和存储类型storageType。Zipkin-dependencies服务是以天为单位进行建立拓扑,因此day将决定建立那一天的拓扑;而storageType将决定从什么储存中读取数据。
1、获取日期:
long day = args.length == 1 ? parseDay(args[0]) : System.currentTimeMillis();
2、获取存储类型:
String storageType = System.getenv("STORAGE_TYPE");
3、根据不同的存储启动不同的jOb:
switch (storageType) {
case "cassandra":
CassandraDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.build()
.run();
break;
case "cassandra3":
zipkin2.dependencies.cassandra3.CassandraDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.build()
.run();
break;
case "mysql":
MySQLDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.build()
.run();
break;
case "elasticsearch":
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer)
.jars(jarPath)
.day(day)
.build()
.run();
break;
default:
throw new UnsupportedOperationException("Unsupported STORAGE_TYPE: " + storageType);
}
Job构建拓扑
不同的存储会定义不同Job类,因此有CassandraDependenciesJob,MySQLDependenciesJob,MySQLDependenciesJob,ElasticsearchDependenciesJob。 不同的Job主要区别在于读取Span的方式不同,而Spark对Span进行处理计算的方式基本都是相同的。 本文主要分析ElasticsearchJOb。
Job中主要逻辑都在run方法中,ElastichserchJob的Run方法定义如下:
void run(String spanResource, String dependencyLinkResource, SpanBytesDecoder decoder) {
JavaSparkContext sc = new JavaSparkContext(conf);
try {
JavaRDD<Map<String, Object>> links =
JavaEsSpark.esJsonRDD(sc, spanResource)
.groupBy(JSON_TRACE_ID)
.flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))
.values()
.mapToPair(l -> Tuple2.apply(Tuple2.apply(l.parent(), l.child()), l))
.reduceByKey((l, r) -> DependencyLink.newBuilder()
.parent(l.parent())
.child(l.child())
.callCount(l.callCount() + r.callCount())
.errorCount(l.errorCount() + r.errorCount())
.build())
.values()
.map(DEPENDENCY_LINK_JSON);
if (links.isEmpty()) {
log.info("No dependency links could be processed from spans in index {}", spanResource);
} else {
JavaEsSpark.saveToEs(
links,
dependencyLinkResource,
Collections.singletonMap("es.mapping.id", "id")); // allows overwriting the link
}
} finally {
sc.stop();
}
}
主要步骤如下:
1、首先通过Spark的配置属性Conf,创建一个JavaSparkContext对象sc:
JavaSparkContext sc = new JavaSparkContext(conf);
2、然后读取elasticsearch span数据源:
JavaEsSpark.esJsonRDD(sc, spanResource)
3、读取数据源后,就可以对Span进行处理了,首先按照TraceId 进行Group分组:
JavaEsSpark.esJsonRDD(sc, spanResource)
.groupBy(JSON_TRACE_ID)
其中JSON_TRACE_ID Function定义如下:
static final Function<Tuple2<String, String>, String> JSON_TRACE_ID = new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> pair) throws IOException {
JsonReader reader = new JsonReader(new StringReader(pair._2));
reader.beginObject();
while (reader.hasNext()) {
String nextName = reader.nextName();
if (nextName.equals("traceId")) {
String traceId = reader.nextString();
return traceId.length() > 16 ? traceId.substring(traceId.length() - 16) : traceId;
} else {
reader.skipValue();
}
}
throw new MalformedJsonException("no traceId in " + pair);
}
@Override
public String toString() {
return "pair._2.traceId";
}
};
4、Span按照TraceId Group 分组后,接着对Span进行处理, 创建出DenpendencyLink。
JavaEsSpark.esJsonRDD(sc, spanResource)
.groupBy(JSON_TRACE_ID)
.flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))
5、上面方法最终返回的是个Map类型,将其转化为pari类型,再对其进行一个reduceByKey操作:
JavaEsSpark.esJsonRDD(sc, spanResource)
.groupBy(JSON_TRACE_ID)
.flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))
.values()
.mapToPair(l -> Tuple2.apply(Tuple2.apply(l.parent(), l.child()), l))
.reduceByKey((l, r) -> DependencyLink.newBuilder()
.parent(l.parent())
.child(l.child())
.callCount(l.callCount() + r.callCount())
.errorCount(l.errorCount() + r.errorCount())
.build())
6、Spark对Span的计算操作到这儿基本就完成了,最后将DependencyLink转化为Jso形式:
JavaEsSpark.esJsonRDD(sc, spanResource)
...
.map(DEPENDENCY_LINK_JSON);
7、对于计算好的拓扑Links,将其持久化到Elasticsearch中:
JavaEsSpark.saveToEs(
links,
dependencyLinkResource,
Collections.singletonMap("es.mapping.id", "id"));
整个过程到此完毕,其中最复杂也是最核心的逻辑就是计算出链路拓扑Denpendencylink,此步骤在Function TraceIdAndJsonToDependencyLinks(logInitializer, decoder)中。接下来详细分析TraceIdAndJsonToDependencyLinks完成的工作。
DenpendencyLink
首先介绍一下DenpendencyLink数据结构。DenpendencyLink就是最终与页面交互的拓扑结构数据单元,字端有:
字端名 | 描述 |
---|---|
parent | 父服务名 |
child | 子服务名 |
callCount | 这两个服务之间的总调用次数 |
errorCount | 这两个服务之间的总调用失败次数 |
DenpendencyLink类定义如下:
public final class DependencyLink implements Serializable {
public static Builder newBuilder() {
return new Builder();
}
/** parent service name (caller) */
public String parent() {
return parent;
}
/** child service name (callee) */
public String child() {
return child;
}
/** total traced calls made from {@link #parent} to {@link #child} */
public long callCount() {
return callCount;
}
/** How many {@link #callCount calls} are known to be errors */
public long errorCount() {
return errorCount;
}
}
构建DependencyLinks
TraceIdAndJsonToDependencyLinks类的定义如下:
final class TraceIdAndJsonToDependencyLinks
implements Serializable, Function<Iterable<Tuple2<String, String>>, Iterable<DependencyLink>> {
private static final long serialVersionUID = 0L;
private static final Logger log = LoggerFactory.getLogger(TraceIdAndJsonToDependencyLinks.class);
@Nullable final Runnable logInitializer;
final SpanBytesDecoder decoder;
TraceIdAndJsonToDependencyLinks(Runnable logInitializer, SpanBytesDecoder decoder) {
this.logInitializer = logInitializer;
this.decoder = decoder;
}
@Override
public Iterable<DependencyLink> call(Iterable<Tuple2<String, String>> traceIdJson) {
List<Span> sameTraceId = new ArrayList<>();
for (Tuple2<String, String> row : traceIdJson) {
decoder.decode(row._2.getBytes(ElasticsearchDependenciesJob.UTF_8), sameTraceId);
DependencyLinker linker = new DependencyLinker();
linker.putTrace(sameTraceId);
return linker.link();
}
}
其中call方法中,首先完成对同一TraceId的Span解码:
decoder.decode(row._2.getBytes(ElasticsearchDependenciesJob.UTF_8), sameTraceId);
然后,通过DependencyLinker类构造出DependendyLink,首先构造一个SpanNode Tree:
SpanNode traceTree = builder.build(spans);
然后利用深度优先遍历方法遍历整个,统计出CallCounts和errorCounts:
void addLink(String parent, String child, boolean isError) {
Pair key = new Pair(parent, child);
if (callCounts.containsKey(key)) {
callCounts.put(key, callCounts.get(key) + 1);
} else {
callCounts.put(key, 1L);
}
if (!isError) return;
if (errorCounts.containsKey(key)) {
errorCounts.put(key, errorCounts.get(key) + 1);
} else {
errorCounts.put(key, 1L);
}
}
其中callCounts和errorCounts定义如下:
//Pair key = new Pair(parent, child);
final Map<Pair, Long> callCounts = new LinkedHashMap<>();
final Map<Pair, Long> errorCounts = new LinkedHashMap<>();
最后,再通过callCounts和errorCounts生成List<DependencyLink>:
static List<DependencyLink> link(Map<Pair, Long> callCounts,
Map<Pair, Long> errorCounts) {
List<DependencyLink> result = new ArrayList<>(callCounts.size());
for (Map.Entry<Pair, Long> entry : callCounts.entrySet()) {
Pair parentChild = entry.getKey();
result.add(DependencyLink.newBuilder()
.parent(parentChild.left)
.child(parentChild.right)
.callCount(entry.getValue())
.errorCount(errorCounts.containsKey(parentChild) ? errorCounts.get(parentChild) : 0L)
.build());
}
return result;
}
这样,最终构建出了DependencyLink。
后记
本文为我的调用链系列文章之一,已有文章如下:
祝大家工作顺利,天天开心!