一、trait Source
Source 必须不断地到达数据以进行流式查询。 Source 必须具有单调递增的进度概念,用 offset 表示。 Spark 将定期查询每个 Source 以查看是否有更多数据可用
// 返回此 Source 的数据的 schema
def schema: StructType
// 返回此 Source 的最大可用 offset
// 如果此 Source 从未接收过任何数据,则返回 None
def getOffset: Option[Offset]
// 返回 (start,end] 偏移量之间的数据。当 start 为 None 时,批处理应以第一个记录开头。此方法必须始终为特定的 start 和 end 对返回相同的数据; 即使在另一个节点上重新启动 Source 之后也是如此。
// 更上层总是调用此方法,其值 start 大于或等于传递给 commit 的最后一个值,而 end 值小于或等于 getOffset 返回的最后一个值
// 当从日志中获取数据时,offset 的类型可能是 SerializedOffset。此外,StreamExecution 仅比较 Offset JSON 以确定两个对象是否相等。修改 Offset JSON 格式时可能会产生冲突,在这种情况下,Source应该返回一个空的DataFrame
def getBatch(start: Option[Offset], end: Offset): DataFrame
// 通知 Source 已完成处理到 end 偏移量的所有数据,并且将来只会请求大于 end 的偏移量
def commit(end: Offset) : Unit
// 停止此 Source 并释放它已分配的所有资源
def stop(): Unit
1.1、Offset
// Offset 的 JSON 序列化表示,用于将偏移量保存到 offsetLog
// 注意:我们假设 等效/相等 offset 序列化为相同的 JSON 字符串
public abstract String json();
@Override
public boolean equals(Object obj) {
if (obj instanceof Offset) {
return this.json().equals(((Offset) obj).json());
} else {
return false;
}
}
1.2、SerializedOffset
case class SerializedOffset(override val json: String) extends Offset
用于从外部存储加载 JSON 序列化偏移时使用。 目前不会将 JSON 序列化数据转换为特定的 offset 对象。Source 应在其对应的 Offset 伴生 object 中定义工厂方法,该对象接受 SerializedOffset 进行转换。
比如,object KafkaSourceOffset
的 def apply(offset: SerializedOffset): KafkaSourceOffset
方法将从 hdfs 文件上读取并转化为 KafkaSourceOffset。
二、KafkaSource(extends Source)
使用以下设计从 Kafka 读取数据的 Source
- KafkaSourceOffset 是为此 Source 定义的自定义偏移量,其包含 TopicPartition 到 offset 的映射。
KafkaSource 主要流程如下:
- 创建 Source 后,预配置的 KafkaOffsetReader 将用于查询此 Source 应开始读取的初始 offset。这用于创建 first batch。
-
getOffset()
使用 KafkaOffsetReader 查询最新的可用 offset,以 KafkaSourceOffset 类型返回 -
getBatch()
返回一个 DF,它从 start offset 读取,直到每个分区的 end offset 为止。排除 end offset,以与KafkaConsumer.position()
的语义一致 - 返回的 DF 基于 KafkaSourceRDD
删除 topic 时无法保证不丢失数据。如果丢失零数据至关重要,则用户必须确保在删除 topic 时已处理 topic 中的所有消息
2.1、KafkaSource#schema
def kafkaSchema: StructType = StructType(Seq(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
2.2、KafkaSource#getOffset
若有新增的 kafka partitonis,getOffset 中也会返回要 fetch 新增的 partitions 的数据到哪个 end offset
返回该 Source 可用的最大的 offset(会考虑到设置的单个 batch 允许消费的最大 offset 数),kafka 的 offset 类型如下:
// 包含各个 topic partition 对应的 offset
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
KafkaSource#getOffset
实现如下:
override def getOffset: Option[Offset] = {
// 获取 init offsets
initialPartitionOffsets
// 获取 lastest offsets
val latest = kafkaReader.fetchLatestOffsets()
// maxOffsetsPerTrigger 为每次 trigger 拉取的 offset 数
val offsets = maxOffsetsPerTrigger match {
case None =>
// 若为指定,则拉取到 lastest
latest
// currentPartitionOffsets 上一次消费到的 offsets
case Some(limit) if currentPartitionOffsets.isEmpty =>
rateLimit(limit, initialPartitionOffsets, latest)
case Some(limit) =>
rateLimit(limit, currentPartitionOffsets.get, latest)
}
currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
Some(KafkaSourceOffset(offsets))
}
核心逻辑:
- 获取 init offsets
- 获取 latest offsets
- 根据 maxOffsetsPerTrigger 以及上次消费到的 offsets,来确定本次消费的 end offsets:
- 若 maxOffsetsPerTrigger 为 None => end offsets 为 latest offsets
- 若 maxOffsetsPerTrigger 为 limit 且上次消费到 currentPartitionOffsets =>
rateLimit(limit, initialPartitionOffsets, latest)
- 若 maxOffsetsPerTrigger 为 limit 且本次为第一次消费 =>
rateLimit(limit, currentPartitionOffsets.get, latest)
2.2.1、获取 init offsets
KafkaSource#initialPartitionOffsets
如下:
private lazy val initialPartitionOffsets = {
val metadataLog =
// metadataPath 为该 KafkaSource 对应的 meta 持久化到 hdfs 的地址
new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
// 将 KafkaSourceOffset 写到 hdfs
override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
writer.write("v" + VERSION + "\n")
writer.write(metadata.json)
writer.flush
}
// 从 hdfs 中读出 KafkaSourceOffset
override def deserialize(in: InputStream): KafkaSourceOffset = {
in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8))
// HDFSMetadataLog guarantees that it never creates a partial file.
assert(content.length != 0)
if (content(0) == 'v') {
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
throw new IllegalStateException(
s"Log file was malformed: failed to detect the log file version line.")
}
} else {
// The log was generated by Spark 2.1.0
KafkaSourceOffset(SerializedOffset(content))
}
}
}
// EarliestOffsetRangeLimit: 希望返回最小的 offset
// LatestOffsetRangeLimit: 希望返回最大的 offset
// SpecificOffsetRangeLimit: 希望返回指定的 offset
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
offsets
}.partitionToOffsets
}
核心逻辑即:
- 定义一个用于读写 meta 持久化在 hdfs 上文件的 metadataLog(持久化文件路径在 KafkaSource 构造函数中传入)
- 读取持久化 meta 文件:
- 若存在,则以读取到的 offsets 为 init offsets
- 若不存在,则根据 KafaSource 构造函数中的 startingOffsets 类型来决定使用最小、最大还是指定的 offsets 作为 init offsets
2.2.2、rateLimit
private def rateLimit(
limit: Long,
from: Map[TopicPartition, Long],
until: Map[TopicPartition, Long]): Map[TopicPartition, Long]
根据 from 到 until 每个 topic partition 的 offset diff 来计算每个 topic partition 应该占用 limit 的比例,算出比例后,结合 from 就能得到 end offsets 了
2.3、KafkaSource#getBatch
返回 [start.get.partitionToOffsets, end.partitionToOffsets)
之间的数据
def getBatch(start: Option[Offset], end: Offset): DataFrame
核心流程如下:
上面的流程图中,以下几个点需要额外关注:
- 对于可能的数据丢失,是否需要抛异常来中止,如:新增的 partitions 被删除,新增的 partitions 的起始 offsets 不为 0
2.4、KafkaSource#commit
do nothing
2.4、KafkaSource#stop
override def stop(): Unit = synchronized {
kafkaReader.close()
}