在我司的风电大数据项目中,出现了一个报错
比如
Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {winSink-2=161803385}
我司为了实现Exactly Once的语义,采取了自行保管offset的方式。即Spark App提交后,从上一次任务结束的位置开始继续读取消息。但是这样做会遇到问题,即上述的OffsetOutOfRangeException
,通常是因为Kafka
的retention expiration造成的。
在Kafka的配置中,需要关注这样一条
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
public static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";
当你直接通过比如Kafka的Client访问时,即使你指定了一个不存在offset
,即大于上边界或小于下边界,Kafka
也将会根据这一条配置reset你的offset值,比如earliest
或latest
。
但是当你在Spark Streaming中指定了一个OutOfRange的初始offset时,Spark不会理会你的auto.offset.reset
,而是会出现文章开头的报错Offsets out of range with no configured reset policy for partitions
关于这一点的讨论可以参见SPARK-19680。这里摘录部分内容
The issue here is likely that you have lost data (because of retention expiration) between the time the batch was defined on the driver, and the time the executor attempted to process the batch. Having executor consumers obey auto offset reset would result in silent data loss, which is a bad thing.
There's a more detailed description of the semantic issues around this for kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937
If you've got really aggressive retention settings and are having trouble getting a stream started, look at specifying earliest + some margin on startup as a workaround. If you're having this trouble after a stream has been running for a while, you need more retention or smaller batches.
If you got an OffsetOutOfRangeException after a job had been running
for 4 days, it likely means that your job was not running fast enough
to keep up, and retention expired records that had not yet been
processed.
因而,对于上述这种情况,为了避免这样的问题发生,需要在程序初始化时,校验当前Kafka中的offset边界情况。如果当前存储的值低于最小值,应该调整为最小值。如何检验?可以参考我的另一篇博客:Fetch Offset range in Kafka
当然,这种丢失数据的情况通常是不应该出现的,应记录或避免这个情况。
- 关于offset的管理,可以参见your-own-data-store
- 关于Flume、Kafka、Spark、TSDB,欢迎指教与交流