在上一篇中介绍了Receiver在Driver的精妙实现,本篇内容主要介绍Receiver在Executor中的启动,数据接收和存储
- 从ReceiverTracker的start方法开始,调用launchReceivers()方法,给endpoint发送消息,endpoint.send(StartAllReceivers(receivers)),endpoint就是ReceiverTrackerEndpoint,也可以说是给自己的消息通讯体发送了一条消息。看接收到的消息
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
// 循环启动receiver
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
//启动receiver
startReceiver(receiver, executors)
}
startReceiver(receiver, executors)循环调用,每一个receiver会启动一个job。startReceiver的代码如下
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
// 在worker节点启动receiver的方法,(就是action中的方法)
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException("Could not start receiver as object not found.")
}
//判断task的重试次数为0,就是没有task失败后,重试运行不执行以下代码
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
//这里创建接收器管理者,在start方法里启动receiver接收数据
val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
// 创建接收数据的RDD
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
//
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
// 根据数据本地性创建receiverRDD
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
// 对job进行一些配置
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
// 到这里就提交了receiverRDD到集群中
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
// 重启receiver
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
// 重启receiver
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
在startReceiverFunc函数中定义了从iterator中取一条记录,也就是receiver,然后实例化一个ReceiverSupervisorImpl,把receiver传递进入,然后调用ReceiverSupervisorImpl的start方法。当然这里并没有启动ReceiverSupervisorImpl,只是定义了操作而已,真正的执行是在Executor中。
然后提交ReceiverRDD到集群运行,代码如下
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
- 通过startReceiverFunc函数 来看ReceiverSupervisorImpl在Executor上的运行。
从supervisor.start()开始,start方法代码如下
def start() {
onStart()
startReceiver()
}
onStart方法代码如下
/**
* Called when supervisor is started.
* Note that this must be called before the receiver.onStart() is called to ensure
* things like [[BlockGenerator]]s are started before the receiver starts sending data.
*/
protected def onStart() { }
重点是看onStart的注释,注释内容说在receiver.onStart()之前,必须BlockGenerator先启动,以保证接收到的数据能够被存储起来。看onStart方法的子类实现,代码如下
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
registeredBlockGenerators在ReceiverSupervisorImpl实例化的时候创建,代码如下
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
registeredBlockGenerators在createBlockGenerator方法中添加了BlockGenerator,代码如下
override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
// 每一个receiver创建一个BlockGenerator,因为streamId一一对应receiver
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators += newBlockGenerator
newBlockGenerator
}
那么createBlockGenerator在什么时候被调用呢?看代码
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
registeredBlockGenerators的BlockGenerator已经有了,看BlockGenerator的start()方法,代码如下
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
这里启动了blockIntervalTimer和blockPushingThread,blockIntervalTimer就是一个定时器,默认每200ms回调一下updateCurrentBuffer方法,回调时间通过参数spark.streaming.blockInterval设置,这也是一个性能调优的参数,时间过短太造成block碎片太多,时间过长可能导致block块过大,具体时间长短要根据实际业务而定,updateCurrentBuffer方法作用就是将接收到的数据包装到block存储,代码后面再看;blockPushingThread作用是定时从blocksForPushing队列中取block,然后存储,并向ReceiverTrackerEndpoint汇报,代码后面再看
- BlockGenerator启动之后接着看 supervisor.start()方法中的 startReceiver()方法, startReceiver()代码如下
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
首先判断onReceiverStart()的返回值,onReceiverStart()代码在子类中的实现如下
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
onReceiverStart内部向trackerEndpoint发送了一条RegisterReceiver注册receiver的消息,在trackerEndpoint内部收到消息后,将注册信息包装到一个ReceiverTrackingInfo的case class类中,然后把ReceiverTrackingInfo按照k-v的方式put到receiverTrackingInfos中,key就是streamId,再次说明一个inputDstream对应一个receiver。
回到上面的调用返回true,将receiverState 标记为Started,然后调用了receiver的onStart方法。
- 以SocketReceiver为例,看SocketReceiver的onStart方法 ,启动了一条后台线程,调用receive()方法接收数据,代码如下
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
接着看receive()方法,代码如下
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
receiver方法的内容就很简单了,启动一个socket接收数据,接收一行就调用store方法存储起来,store方法的代码如下
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
调用supervisor的pushSingle方法,supervisor就是ReceiverSupervisor的实现类ReceiverSupervisorImpl的方法,代码如下
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
}
defaultBlockGenerator在上面说过,他是ReceiverSupervisorImpl的一个成员变量,接着看他的addData方法,代码如下
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
currentBuffer += data,在currentBuffer 上不断的累加数据,那么currentBuffer 的数据是怎样存储起来的呢,这时候就用到了前面介绍的 blockIntervalTimer和blockPushingThread
- 首先看blockIntervalTimer定时回调的updateCurrentBuffer()方法,代码如下
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
将currentBuffer交给newBlockBuffer ,然后实例化一个空的ArrayBuffer给currentBuffer,接着实例化一个Block把newBlockBuffer 传递进去,最后把newBlock 放入到blocksForPushing队列中
- 接下来就是blockPushingThread干的活了,在blockPushingThread线程中调用keepPushingBlocks方法,代码如下
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
def areBlocksBeingGenerated: Boolean = synchronized {
state != StoppedGeneratingBlocks
}
try {
// While blocks are being generated, keep polling for to-be-pushed blocks and push them.
while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
logInfo("Stopped block pushing thread")
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
case e: Exception =>
reportError("Error in block pushing thread", e)
}
}
从blocksForPushing队列中定时取出block然后pushBlock,代码如下
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
接着看pushBlock(block)方法,代码如下
listener.onPushBlock(block.id, block.buffer)
这里调用了listener的onPushBlock方法,那么listener是从哪来的,查询一下listener变量,listener是在BlockGenerator实例化的时候传递进来的,找BlockGenerator的实例化,是通过createBlockGenerator方法接收的参数并传递给BlockGenerator。找createBlockGenerator方法的调用,终于看到了defaultBlockGeneratorListener的实例化,代码如下
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
def onAddData(data: Any, metadata: Any): Unit = { }
def onGenerateBlock(blockId: StreamBlockId): Unit = { }
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}
原来onPushBlock方法在这里,看pushArrayBuffer的调用 ,pushArrayBuffer方法的代码如下
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}
重磅性的一行代码出现了 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption),代码如下
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
这里面做了几事件事,第一调用receivedBlockHandler来存储block
第二向trackerEndpoint汇报block的存储结果blockInfo
- receivedBlockHandler是在ReceiverSupervisorImpl实例化的时候创建的,代码如下
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
有两种类型,一种的WAL方式,还有一种普通的方式。WAL的方式以后再看,这里看BlockManagerBasedBlockHandler,代码如下
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)
numRecords = countIterator.count
putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
}
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
def cleanupOldBlocks(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
}
这里就是借助BlockManager来存储block并返回block存储的元数据,终于看完了receiver的整个数据接收和存储。
- 整个过程还是很清晰的,如果有张流程图就最好了,流程图以后补上,谢谢