Spark中很多组件都是靠RPC、事件消息机制实现通信的。前者解决远程通信问题,后者则是本地较为高效的通信方式
定义ListenerBus
Spark定义了一个trait的ListenerBus,可以接收事件并将事件提交给对应的事件监听器
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
/**
* Returns a CodaHale metrics Timer for measuring the listener's event processing time.
* This method is intended to be overridden by subclasses.
*/
protected def getTimer(listener: L): Option[Timer] = None
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, getTimer(listener)))
}
/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
}
}
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
}
}
}
}
/**
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread for all listeners.
*/
protected def doPostEvent(listener: L, event: E): Unit
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}
}
ListenerBus是个泛型trait。其泛型参数为[L <: AnyRef, E]。L表示监听器引用,E表示事件。主要是将事件投递给对应的监听器处理
- listenersPlusTimers:维护所有注册的监听器+Timer。其数据结构是线程安全的CopyOnWriteArrayList,存放的对象是二元组(L, Option[Timer])。L是监听器引用,Timer是Metrics计时器,用来统计RPC以及耗时
- getTimer:获取监听器的时间统计Timer。protected类型,子类覆盖实现
- addListener:向listenersPlusTimers中添加监听器。final且thread-safe
- removeListener:从listenersPlusTimers中移除监听器
- postToAll:将事件post给所有监听器。线程不安全;使用java iterator避免asScala的wrapper cost;Timer统计;catch所有NonFatal Throwable
- doPostEvent:将事件post给指定的监听器。具体实现由子类override
- findListenersByClass:查找与指定类型T相同的监听器列表。implicitly[ClassTag[T]].runtimeClass:获取泛型T的实际类型;asInstanceOf:对象强制转换为T类型
ListenerBus继承体系
- SparkListenerBus:trait类型。实现将SparkListenerEvent事件doPostEvent()给SparkListenerInterface监听器。SparkListenerInterface、SparkListenerEvent也都是trait类型
- AsyncEventQueue:处理事件的start、stop、dispatch、post功能。事件保存在LinkedBlockingQueue
- ReplayListenerBus:用于从序列化的事件流中重放事件
- ExternalCatalog:abstract class。提供了操作DB、Table、Function的功能抽象并postToAll操作事件Pre、After
- HiveExternalCatalog:操作Hive MetaStore的具体功能实现
- InMemoryCatalog:HashMap保存并操作DB、Table信息
- StreamingListenerBus:针对spark streaming各个阶段的事件监听
- StreamingQueryListenerBus:Data Stream查询事件
SparkListenerBus详解
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
...
case _ => listener.onOtherEvent(event)
}
}
}
SparkListenerBus继承ListenerBus,实现doPostEvent()方法。SparkListenerInterface是listener的trait抽象,SparkListenerEvent是event的trait抽象。SparkListener是abstract class类型,对SparkListenerInterface的缺省适配,提供方法的空实现
SparkListenerBus实现了具体类型的event与listener的调用绑定
AsyncEventQueue详解
AsyncEventQueue继承SparkListenerBus、ListenerBus,拥有CopyOnWriteArrayList类型的监听器列表,以及LinkedBlockingQueue事件队列。监听器类型是SparkListenerInterface,事件类型是SparkListenerEvent
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
extends SparkListenerBus
with Logging {
import AsyncEventQueue._
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
// 10000长度的阻塞队列,保存事件
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
// Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
// this allows that method to return only when the events in the queue have been fully
// processed (instead of just dequeued).
// 未执行事件数。可以直接eventQueue.size()获取队列count
private val eventCount = new AtomicLong()
/** A counter for dropped events. It will be reset every time we log it. */
// 删除事件数。日志打印时清零:droppedEventsCounter.compareAndSet(droppedCount, 0)
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
private val logDroppedEvent = new AtomicBoolean(false)
private var sc: SparkContext = null
private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)
private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
// Remove the queue size gauge first, in case it was created by a previous incarnation of
// this queue that was removed from the listener bus.
metrics.metricRegistry.remove(s"queue.$name.size")
metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
override def getValue: Int = eventQueue.size()
})
// 新建一个daemon dispatchThread,用于事件分发
private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
// 分发事件。take事件,检查是否为POISON_PILL,然后将事件投递给监听器postToAll
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
// 这里take是阻塞获取队列事件,保证next有值,while就会一直循环
next = eventQueue.take()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
}
}
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
}
/**
* Start an asynchronous thread to dispatch events to the underlying listeners.
*
* @param sc Used to stop the SparkContext in case the async dispatcher fails.
*/
// 启动异步线程
private[scheduler] def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
this.sc = sc
dispatchThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
/**
* Stop the listener bus. It will wait until the queued events have been processed, but new
* events will be dropped.
*/
// 停止事件分发。修改stopped状态,put POISON_PILL到eventQueue,等待线程执行结束dispatchThread.join()。优雅的实现停止:POISON_PILL
private[scheduler] def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
eventCount.incrementAndGet()
eventQueue.put(POISON_PILL)
}
dispatchThread.join()
}
// 接收事件
def post(event: SparkListenerEvent): Unit = {
// stop时直接return
if (stopped.get()) {
return
}
eventCount.incrementAndGet()
// offer事件到eventQueue,成功则return,false则drop event
if (eventQueue.offer(event)) {
return
}
eventCount.decrementAndGet()
droppedEvents.inc()
// 当队列满时,drop event且droppedEventsCounter变量加1
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")
val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
// 超过一分钟时log记录
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
// 这里使用compareAndSet执行线程安全。也可以双重校验锁判断lastReportTimestamp,但没有CAS轻量!
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedCount events from $name since $previous.")
}
}
}
}
}
// 一个“毒药丸”事件,标识队列事件执行到此结束
private object AsyncEventQueue {
val POISON_PILL = new SparkListenerEvent() { }
}
LiveListenerBus详解
LiveListenerBus类是AsyncEventQueue管理器,建立了shared、appStatus、executorManagement、eventLog四种AsyncEventQueue事件调度器,保存在CopyOnWriteArrayList列表。创建多个AsyncEventQueue,提高事件响应时间
post方法会将事件投递给所有Queue,而每个Queue的listener又是不同的,根据name区分。start时,CopyOnWriteArrayList列表中的所有AsyncEventQueue都会start,也就是迭代调用AsyncEventQueue.start(),多个线程并发事件调度
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
/** Add a listener to queue shared by all non-internal listeners. */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
/** Add a listener to the executor management queue. */
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
/** Add a listener to the application status queue. */
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
/** Add a listener to the event log queue. */
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
Hadoop的AsyncDispatcher详解
Hadoop的AsyncDispatcher与Spark的AsyncEventQueue功能类似,部分实现细节不一样
public interface Event<TYPE extends Enum<TYPE>> {
TYPE getType();
long getTimestamp();
String toString();
}
public interface EventHandler<T extends Event> {
void handle(T event);
}
public interface Dispatcher {
// Configuration to make sure dispatcher crashes but doesn't do system-exit in
// case of errors. By default, it should be false, so that tests are not
// affected. For all daemons it should be explicitly set to true so that
// daemons can crash instead of hanging around.
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
"yarn.dispatcher.exit-on-error";
public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
EventHandler getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);
}
EventHandler.handle(Event)这是完整的方法签名。抽象事件处理器接口EventHandler,具体化事件处理器方法handle(),抽象事件Event
那怎么一个EventHandler处理多个Event?instanceof或type,就能一个handle方法处理多种Event。Hadoop引入Enum类型的type:TYPE extends Enum<TYPE>
Dispatcher接口将EventType与EventHandler绑定。这里eventType是Class类型,而不是Enum。比如JobEventHandler直接绑定JobEventType class类型即可。若是绑定具体Enum,则要循环多次,比如JobEventType.JOB_INIT绑定JobEventHandler,JobEventType.JOB_START绑定JobEventHandler
public abstract class AbstractEvent<TYPE extends Enum<TYPE>>
implements Event<TYPE> {
private final TYPE type;
private final long timestamp;
// use this if you DON'T care about the timestamp
public AbstractEvent(TYPE type) {
this.type = type;
// We're not generating a real timestamp here. It's too expensive.
timestamp = -1L;
}
// use this if you care about the timestamp
public AbstractEvent(TYPE type, long timestamp) {
this.type = type;
this.timestamp = timestamp;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public TYPE getType() {
return type;
}
@Override
public String toString() {
return "EventType: " + getType();
}
}
因为Event事件都会具体实例化,所以定义一个abstract class AbstractEvent,将type、timestamp共同的属性放入构造函数。若是将AbstractEvent定义为普通类class,则可以实例化,但AbstractEvent的实例化没有现实意义,所以进行abstract
public class AsyncDispatcher extends AbstractService implements Dispatcher {
private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);
private final BlockingQueue<Event> eventQueue;
private volatile boolean stopped = false;
// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
private volatile boolean drainEventsOnStop = false;
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
private volatile boolean drained = true;
private Object waitForDrained = new Object();
// For drainEventsOnStop enabled only, block newly coming events into the
// queue while stopping.
private volatile boolean blockNewEvents = false;
private EventHandler handlerInstance = null;
private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
private boolean exitOnDispatchException;
// 无长度限制的LinkedBlockingQueue
public AsyncDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
}
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}
public void setDrainEventsOnStop() {
drainEventsOnStop = true;
}
@Override
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
while (!drained && eventHandlingThread != null
&& eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
// stop all the components
super.serviceStop();
}
@SuppressWarnings("unchecked")
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
@SuppressWarnings("unchecked")
@Override
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
@Override
public EventHandler getEventHandler() {
if (handlerInstance == null) {
handlerInstance = new GenericEventHandler();
}
return handlerInstance;
}
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
}
/**
* Multiplexing an event. Sending it to different handlers that
* are interested in the event.
* @param <T> the type of event these multiple handlers are interested in.
*/
// 一个Event对应多个EventHandler
static class MultiListenerHandler implements EventHandler<Event> {
List<EventHandler<Event>> listofHandlers;
public MultiListenerHandler() {
listofHandlers = new ArrayList<EventHandler<Event>>();
}
@Override
public void handle(Event event) {
for (EventHandler<Event> handler: listofHandlers) {
handler.handle(event);
}
}
void addHandler(EventHandler<Event> handler) {
listofHandlers.add(handler);
}
}
Runnable createShutDownThread() {
return new Runnable() {
@Override
public void run() {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
};
}
@VisibleForTesting
protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}
@VisibleForTesting
protected boolean isDrained() {
return this.drained;
}
}
AsyncDispatcher与AsyncEventQueue
异同点
相同点:
- 维护LinkedBlockingQueue保存Event
- 有start、stop方法启停
- 另起一个线程循环消费Queue并dispatch event
- 提供生产事件方法。AsyncDispatcher是内部类GenericEventHandler.handle;AsyncEventQueue是post方法
不同点:
- AsyncDispatcher提供register方法将Event和EventHandler绑定,通过EventType来进行handle时调用不同方法;AsyncEventQueue提供doPostEvent方法实现不同Event调用Listener接口的不同方法(SparkListenerBus.doPostEvent)
AsyncDispatcher消费事件dispatch时,可以根据Event类型,HashMap中获取register的EventHandler,然后执行handle方法。AsyncEventQueue消费事件dispatch时,只能遍历Listener,调用doPostEvent方法,模式匹配后执行对应的事件监听器
Hadoop的事件是Event,处理器是EventHandler,方法是handle()。而Spark进行了更进一步的抽象,ListenerBus[L, E]。SparkListenerInterface、StreamingListener、ExternalCatalogEventListener是L的泛型trait;SparkListenerEvent、StreamingListenerEvent、ExternalCatalogEvent是E的泛型trait
使用场景
AsyncDispatcher:
ResourceManager、NodeManager、MRAppMaster等类在serviceInit()方法中调用createDispatcher(),创建AsyncDispatcher对象。每个类负责处理一系列Enum类事件,而不只是一种Enum类
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
AsyncDispatcher不作全局的事件调度器,而是每个重要的类都有一个自身的AsyncDispatcher调度器,提高事件响应时间
AsyncEventQueue:
LiveListenerBus是对AsyncEventQueue的统一管理器,相比Hadoop的各自为政,架构上更优秀。SparkContext在初始化时创建LiveListenerBus,并传递给JobScheduler、DAGScheduler等引用,所以LiveListenerBus必须是线程安全的