作者:Arnold
缓冲区与刷新时长的配置
max.buffered.records(默认值 20000)
The maximum number of records each task will buffer before blocking acceptance of more records. This config can be used to limit the memory usage for each task.
connector任务不会直接把每条消息写入ES,而是先把它们放入一个缓冲区中,由另一个异步线程将缓冲区中的消息批量的写入ES。
这个配置限制了缓冲区的消息条数,也可以说是限制了所有未发送的消息条数(Buffer中的消息数量 + 请求已发往ES但还未返回结果的消息数量)
这个配置过高会占用过多kafka connect cluster的内存。
flush.timeout.ms
The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail.
有两种场景:
- 当上述的消息缓冲区满了的时候,connect任务会进入睡眠,另一个异步线程继续将缓冲区的内容取出并写入ES。最大的睡眠时长就是这个参数,但如果睡醒后缓冲区还是满的(一条也没写成),connector任务就会失败,并出现
Add timeout expired before buffer availability
的报错日志。 - 当connector尝试提交offset前,会希望缓冲区已经没数据了。因此会陷入睡眠,等待时间由此参数限制,睡眠期间另一个异步线程把缓冲区清空并全部写入ES。如果在睡醒后缓冲区里仍有东西,就认为刷新失败了,打出一条
Flush timeout expired with unflushed records
日志。
写操作配置
max.retries(默认值3)
The maximum number of retries that are allowed for failed indexing requests. If the retry attempts are exhausted the task will fail.
发请求写入ES失败后的重试次数。达到最大重试次数时task失败,并打出Failed to execute batch {} of {} records
。需要检查ES健康程度。
retry.backoff.ms(默认值100)
How long to wait in milliseconds before attempting to retry a failed indexing request. This avoids retrying in a tight loop under failure scenarios.
这个参数指定了在每次重试写入ES失败后,需要等待多久再重试。
常见异常日志与对应的调参建议
** es_rejected_execution_exception**
请求被ES拒绝,增加max.retries
或增加ES结点个数。更多信息参见AWS文档
** Failed to execute batch**
增加max.retries
或retry.backoff.ms
** Add timeout expired before buffer availability | Flush timeout expired with unflushed records**
增加flush.timeout.ms
,如果流量实在非常大也可以考虑增加max.buffered.records