前言
上一篇文章介绍了RingBuffer的基本信息,本文将对Disruptor的消费者进行进一步的解析,并对其中可能存在的坑点进行分析;
消费者继承体系
从接口体系上来看,消费者主要分为Work和Event两种类型,这两种类型的差别如下:
- 同一层次的WorkProcessor只有一个可以处理成功RingBuffer的事件,类似消息体系中的点对点模式;
- 同一层次的BatchEventProcessor并行处理(每一个消费者)成功RingBuffer事件,类似消息体系中的Topic模式
这两种实现上差别到底在哪里呢,我们进一步进行分析。从接口上,我们可以看到它们都继承了Runnable,所以联想到Disruptor在使用时生成的Executor,我们可以猜测不同的消费者都是在不同的线程中进行处理的,我们直接对其进行分析。
WorkProcessor
主要属性:
// Processor是否已经开始运行
private final AtomicBoolean running = new AtomicBoolean(false);
// 序列,初始时为-1
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 持有的RingBuffer引用
private final RingBuffer<T> ringBuffer;
// Processor上游的序列屏障
private final SequenceBarrier sequenceBarrier;
// 用户自定义的业务逻辑处理器
private final WorkHandler<? super T> workHandler;
// 异常处理器
private final ExceptionHandler<? super T> exceptionHandler;
// 消费的sequence
private final Sequence workSequence;
private final EventReleaser eventReleaser = new EventReleaser() {
@Override
public void release() {
sequence.set(Long.MAX_VALUE);
}
};
// 超时处理器
private final TimeoutHandler timeoutHandler;
上述关键属性分析下来,主要有sequence和workSequence两个属性有一定的迷惑性,探究下它们是如何来的:
// 构造方法
public WorkProcessor(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final WorkHandler<? super T> workHandler,
final ExceptionHandler<? super T> exceptionHandler,
final Sequence workSequence) {
...
// 上游传递进来的
this.workSequence = workSequence;
...
}
继续往上翻:
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandler<? super T> exceptionHandler,
final WorkHandler<? super T>... workHandlers) {
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
// 循环构造processor,共享workSequence
for (int i = 0; i < numWorkers; i++) {
workProcessors[i] = new WorkProcessor<>(
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
workSequence);
}
}
可以发现,同一WorkPool的processor共享同一个sequence,因此其实所谓的只有一个能够消费成功本质上依靠的就是同一个sequence(volatile语义)。
在Disruptor的使用中,我们一般需要手动调用下Disruptor#start()方法来启动整个框架:
public RingBuffer<T> start(final Executor executor) {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
}
// 生产者cursor,初始时为-1
final long cursor = ringBuffer.getCursor();
workSequence.set(cursor);
for (WorkProcessor<?> processor : workProcessors) {
processor.getSequence().set(cursor);
executor.execute(processor);
}
return ringBuffer;
}
所以由此判断,框架启动时workSequence和sequence的值都与生产者保持一致即-1,此时我们回到WorkProcessor的run()方法。
public void run() {
// 判断是否重复启动
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("Thread is already running");
}
// 清除警告状态
sequenceBarrier.clearAlert();
// 如果有实现LifecycleAware接口,回调其onStart()逻辑
notifyStart();
// 上一个sequence的slot是否处理成功,刚开始为-1,所以默认处理成功
boolean processedSequence = true;
// 缓存的可用sequence的下标
long cachedAvailableSequence = Long.MIN_VALUE;
// 下一个待处理的sequence
long nextSequence = sequence.get();
T event = null;
while (true) {
try {
// 如果上一个处理成功,则更新workSequence的值
if (processedSequence) {
processedSequence = false;
do {
// 从当前workSequence的后移1位,为新的需要处理的序列
nextSequence = workSequence.get() + 1L;
// sequence存储该processor已经处理成功的序列最大值,初始时为-1;
sequence.set(nextSequence - 1L);
}
// workSequence所有processor线程共享同一变量
// 假设此时发生并发问题,由于其它线程已经处理成功,那么此处更新失败,则下次nextSequence可能获取到跳跃的值
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// 逻辑走到这里,可以确定一个事情:sequence的值=workSequence的值-1,所以这里workSequence
// 本质上代表了该Processor成功抢占成功可以处理的sequence数据
// cachedAvailableSequence为等待之后下一个可用的序列
if (cachedAvailableSequence >= nextSequence) {
// 获取workSequence对应的slot数据
event = ringBuffer.get(nextSequence);
// 调用用户逻辑进行处理
workHandler.onEvent(event);
// 处理成功,更新标记为true
processedSequence = true;
} else {
// 说明当前processor抢占到的sequence可用数据超前,需要判断该sequence数据是否可用
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
} catch (final TimeoutException e) {
// 调用对应的TimeoutHandler进行处理
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
// 当发生警告通知时,如果该线程状态已经被暂停,则直接中断
if (!running.get()) {
break;
}
} catch (final Throwable ex) {
// 处理异常,将该sequence标识为处理成功 !important
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
从run()的代码逻辑可以看出,不同的work消费者都是通过CAS来抢占需要处理的slot数据,每个processor维护了自身已经处理成功的sequence以及大家公共持有的workSequence,同时该processor是否可以处理sequence是由barrier来维护的。有一点需要注意,在exceptionHandler.handleEventException(ex, nextSequence, event);中,默认的异常处理器为FatalExceptionHandler,其打印日志结束后会抛出RuntimeException,从而导致消费者线程中断,所以在实际使用中一定要实现业务自己的ExceptionHandler或者在WorkHandler中自己处理异常;
那么这里每个processor自身的sequence有什么作用?
思考下如下场景,假设现在有10个消费者,现在有10个slot需要处理,那么极端情况下可能workSequence被更新到了10,但是可能最小的sequence此时为0,说明第一个申请sequence成功的线程还未处理完毕,那么这整批消费者(一个WorkPool)最慢的下标其实就是0,一旦其处理成功,则sequence就有可能被更新为10;所以这里sequence的集合其实就是用来标识整个消费者中最慢的进度;
关于waitFor
上面提到了消费者能否处理sequence是由SequenceBarrier#waitFor来决定的,下面探究下该方法的实现机制。
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException {
checkAlert();
// 转移职责,交由waitStrategy处理
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence) {
return availableSequence;
}
// 该方法在上一期已经分析过,此处不再分析
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
...
// 以BlockingWaitStrategy为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
// 要消费的sequence超过了生产者的sequence,则消费者等待;
if (cursorSequence.get() < sequence) {
synchronized (mutex) {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
mutex.wait();
}
}
}
// 该processor要消费的sequence超过了上游processor的最小值,自旋等待
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
// 尝试寻找实现了onSpinWait的静态方法Thread进行调用
ThreadHints.onSpinWait();
}
// 返回上游最小sequence
return availableSequence;
}
BatchEventProcessor
主要属性
// 空闲状态
private static final int IDLE = 0;
// 暂停状态
private static final int HALTED = IDLE + 1;
// 运行状态
private static final int RUNNING = HALTED + 1;
// 实际运行状态
private final AtomicInteger running = new AtomicInteger(IDLE);
// 异常处理器
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
// RingBuffer
private final DataProvider<T> dataProvider;
// 屏障
private final SequenceBarrier sequenceBarrier;
// 用户业务逻辑处理器
private final EventHandler<? super T> eventHandler;
// 消费者sequence
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 超时处理器
private final TimeoutHandler timeoutHandler;
private final BatchStartAware batchStartAware;
主要属性与WorkProcessor基本类似,查看其run()方法逻辑:
private void processEvents() {
T event = null;
// 从当前sequence后移一个进行消费
long nextSequence = sequence.get() + 1L;
while (true) {
try {
// 获取可用的sequence长度
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 如果当前处理的sequence落后,就循环挨个处理
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 处理完毕后进行更新
sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
// 业务逻辑出现异常时,若无新的异常抛出,则更新sequence
sequence.set(nextSequence);
nextSequence++;
}
}
}
从上面的代码可以看出,假设Disruptor调用halt之后,则该批次数据仍会处理完毕,在新的一轮waitFor判断中抛出AlertException异常;
关于消费者线程池
此处把线程池单独拿出来看是有一定原因的,现在Disruptor把显示传入线程池的构造方法置为了@Deprecated,那么我们在使用时应该注意什么呢?
从前文我们已经大致分析过生产者和消费者是如何协同的,回顾下结论:
- sequencer会显示维护消费最慢slot的下标;
- 生产者在发布事件时需要先调用next()进行显示申请或者占用;
- 消费者线程在消费时会主动调用barrier#waitFor进行判断;
- 每个消费者占用一个线程;
这里有一个死锁问题,假设说消费者个数为N,线程个数为M,其中N > M,从前面的代码我们已经分析过,消费者线程的逻辑都是死循环,那么很有可能出现饥饿消费者,即无法被线程池调用,那么从生产者端来看,最慢slot一直未改变,从而导致生产者等待,而生产者等待又会促使消费者waitFor方法无法通过,从而出现互相等待死锁问题;那么Disruptor是如何进行解决的呢,代码如下:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) {
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
...
// BaseExecutor
private final ThreadFactory factory;
private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
public BasicExecutor(ThreadFactory factory) {
this.factory = factory;
}
@Override
public void execute(Runnable command) {
final Thread thread = factory.newThread(command);
if (null == thread) {
throw new RuntimeException("Failed to create thread to run: " + command);
}
thread.start();
threads.add(thread);
}
代码非常简单,即创建足够消费者使用的线程数量进行消费;
本文先写到这里,下一篇文章对消费者的等待策略进行具体分析。