引言
Structured Streaming非常显式的提出了Source、StreamExecution、Sink三个组件,并且在每个组件显式的做到fault-tolerant,由此得到整个streaming程序的end-to-end exactly-once guarantees。
具体到源码上,Source是一个抽象的接口trait Source,包括了Structured Streaming实现end-to-end exactly-once处理所一定需要提供的功能:
Structured Streaming只保留对可靠数据源的支持:
已经支持:
Kafka,具体实现是KafkaSource extends Srouce
HDFS-compatible file system,具体实现是FileStreamSource extends Source
RateStream,具体实现是RateStreamSource extends Source
Source:方法与功能
1.每个StreamExecution的批次最开始,会向Source询问当前Source的最新进度,即最新的offset。
(1)这里由StreamExecution调用Source的getOffset方法
(2)Kafka(KafkaSource)的具体getOffset实现,会通过在driver端的一个长时运行的consumer从kafka brokers处获取到各个topic最新的offsets,比如topicA_partition1:300, topicB_partition1:50, topicB_partition2: 60,并把offsets返回。
(3)HDFS-compatible file system(FileStreamSource)的具体offset实现,是先扫描一下最新的一组文件,给一个递增的编号并持久化下来,比如2->{c.txt, d.txt},然后把编号2作为最新的offset返回
2.这个Offset给到StreamExecution会被StreamExecution持久化到自己的WAL里。
3.由Source根据StreamExecution所要求的start offset、end offset,提供在(start, end]区间范围内的数据
(1)这里是由StreamExecution调用Source的getBatch(start: Option[Offset], end: Offset): DataFrame
(2)这里的start offset和end offset,通常就是Source在上一个执行批次里提供的最新offset,和Source在这个批次里提供的最新offset;区间范围是左开右闭
(3)数据的返回形式是一个DataFrame(这个DataFrame目前只包含数据的描述信息,还没有发生实际的取数据操作)
4.StreamExecution触发计算逻辑logicalPlan的优化与编译
5.把计算结果写出给Sink
(1)注意这时才会由Sink触发实际的取数据操作,以及计算过程
6.在数据完整写出到Sink后,StreamExecution通知Source可以废弃数据;然后把成功的批次id写入到batchCommitLog
(1)这里由StreamExecution调用Source的commit方法
(2)commit方法主要帮助Source完成垃圾回收,如果外部数据源本身具有垃圾回收功能,如Kafka,那么在Source的具体实现commit上可以为空,留给外部数据源自己去管理