版本v8.4.0
关于Kafka-fetcher-plugin
kafka-fetcher-plugin 是Skywalking oap的一个可选module,名称为 "kafka-fetcher" ,它用来从kafka读取agent上送信息,一般与agent端的kafka-reporter-plugin配合使用。
通过在oap的配置文件application.yml
中启用该module,并配置相关kafka参数。
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
topic的配置和自动创建topic
在KafkaFetcherConfig
类中有topic的默认名字,如果不在配置文件中修改,则默认使用之。
private String configPath = "meter-analyzer-config";
private String topicNameOfMetrics = "skywalking-metrics";
private String topicNameOfProfiling = "skywalking-profilings";
private String topicNameOfTracingSegments = "skywalking-segments";
private String topicNameOfManagements = "skywalking-managements";
private String topicNameOfMeters = "skywalking-meters";
private String topicNameOfLogs = "skywalking-logs";
private boolean createTopicIfNotExist = true;
private boolean createTopicIfNotExist = true;
private int partitions = 3;
private int replicationFactor = 2;
另外,默认情况下,oap启动后会去检查相关topic是否已经创建,否则会自动创建。 见代码 KafkaFetcherHandlerRegister.java
的构造函数: (当前版本createTopicIfNotExist 这个参数并未使用)
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new NewTopic(
topic,
config.getPartitions(),
(short) config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
} catch (Exception e) {
throw new ModuleStartException("Failed to create Kafka Topics" + missedTopics + ".", e);
}
}
在该构造函数中还有线程池的创建:
executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue(threadPoolQueueSize),
new CustomThreadFactory("KafkaConsumer"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
这里使用的拒绝策略是CallerRunsPolicy, 意思是当线程池和队列满了后,调用者线程会来执行这个任务。
消费消息
在kafkaFetcherProvider 被oap加载后,会调用它的start方法,在start方法中会调用handlerRegister.start();
:
// 实现了Runnable接口
public class KafkaFetcherHandlerRegister implements Runnable {}
public void start() {
// start方法将自己提交到线程池中执行
executor.submit(this);
}
// run方法循环使用consumer拉取消息进行消费 ,拉取间隔为500ms
public void run() {
while (true) {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
while (iterator.hasNext()) {
ConsumerRecord<String, Bytes> record = iterator.next();
executor.submit(() -> handlerMap.get(record.topic()).handle(record));
}
}
}
对于拉取到的每条消息, 根据消息的topic从handlerMap中获取对应的handler,让后将handler.handle() 提交给线程池进行处理。
topic 与 handler
在kafkaFetcherProvider.start()
中可以看到topic和对应handler的关系:
handlerRegister.register(new JVMMetricsHandler(getManager(), config));
handlerRegister.register(new ServiceManagementHandler(getManager(), config));
handlerRegister.register(new TraceSegmentHandler(getManager(), config));
handlerRegister.register(new ProfileTaskHandler(getManager(), config));
if (config.isEnableMeterSystem()) {
handlerRegister.register(new MeterServiceHandler(getManager(), config));
}
if (config.isEnableLog()) {
handlerRegister.register(new LogHandler(getManager(), config));
}
总结:
oap的kafka-fetcher模块从kafka读取消息,然后根据topic获取响应的handler提交给线程池进行处理。