Disruptor深度解析-消费者Consumer

前言

上一篇文章介绍了RingBuffer的基本信息,本文将对Disruptor的消费者进行进一步的解析,并对其中可能存在的坑点进行分析;

消费者继承体系


从接口体系上来看,消费者主要分为Work和Event两种类型,这两种类型的差别如下:

  • 同一层次的WorkProcessor只有一个可以处理成功RingBuffer的事件,类似消息体系中的点对点模式;
  • 同一层次的BatchEventProcessor并行处理(每一个消费者)成功RingBuffer事件,类似消息体系中的Topic模式

这两种实现上差别到底在哪里呢,我们进一步进行分析。从接口上,我们可以看到它们都继承了Runnable,所以联想到Disruptor在使用时生成的Executor,我们可以猜测不同的消费者都是在不同的线程中进行处理的,我们直接对其进行分析。

WorkProcessor

主要属性:

    // Processor是否已经开始运行
    private final AtomicBoolean running = new AtomicBoolean(false);
    // 序列,初始时为-1
    private final Sequence        sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 持有的RingBuffer引用
    private final RingBuffer<T>   ringBuffer;
    // Processor上游的序列屏障
    private final SequenceBarrier sequenceBarrier;
    // 用户自定义的业务逻辑处理器
    private final WorkHandler<? super T>      workHandler;
    // 异常处理器
    private final ExceptionHandler<? super T> exceptionHandler;
    // 消费的sequence
    private final Sequence                    workSequence;
    
    private final EventReleaser eventReleaser = new EventReleaser() {
        @Override
        public void release() {
            sequence.set(Long.MAX_VALUE);
        }
    };
    // 超时处理器
    private final TimeoutHandler timeoutHandler;

上述关键属性分析下来,主要有sequence和workSequence两个属性有一定的迷惑性,探究下它们是如何来的:

// 构造方法
public WorkProcessor(
            final RingBuffer<T> ringBuffer,
            final SequenceBarrier sequenceBarrier,
            final WorkHandler<? super T> workHandler,
            final ExceptionHandler<? super T> exceptionHandler,
            final Sequence workSequence) {
        ...
        // 上游传递进来的
        this.workSequence = workSequence;
        ...
    }

继续往上翻:

public WorkerPool(
            final RingBuffer<T> ringBuffer,
            final SequenceBarrier sequenceBarrier,
            final ExceptionHandler<? super T> exceptionHandler,
            final WorkHandler<? super T>... workHandlers) {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        workProcessors = new WorkProcessor[numWorkers];
        // 循环构造processor,共享workSequence
        for (int i = 0; i < numWorkers; i++) {
            workProcessors[i] = new WorkProcessor<>(
                    ringBuffer,
                    sequenceBarrier,
                    workHandlers[i],
                    exceptionHandler,
                    workSequence);
        }
    }

可以发现,同一WorkPool的processor共享同一个sequence,因此其实所谓的只有一个能够消费成功本质上依靠的就是同一个sequence(volatile语义)。
在Disruptor的使用中,我们一般需要手动调用下Disruptor#start()方法来启动整个框架:

  public RingBuffer<T> start(final Executor executor) {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
        }
        // 生产者cursor,初始时为-1
        final long cursor = ringBuffer.getCursor();
        workSequence.set(cursor);

        for (WorkProcessor<?> processor : workProcessors) {
            processor.getSequence().set(cursor);
            executor.execute(processor);
        }

        return ringBuffer;
    }

所以由此判断,框架启动时workSequence和sequence的值都与生产者保持一致即-1,此时我们回到WorkProcessor的run()方法。

public void run() {
        // 判断是否重复启动
        if (!running.compareAndSet(false, true)) {
            throw new IllegalStateException("Thread is already running");
        }
        // 清除警告状态
        sequenceBarrier.clearAlert();
        // 如果有实现LifecycleAware接口,回调其onStart()逻辑
        notifyStart();  

        // 上一个sequence的slot是否处理成功,刚开始为-1,所以默认处理成功
        boolean processedSequence = true;
        // 缓存的可用sequence的下标
        long cachedAvailableSequence = Long.MIN_VALUE;
        // 下一个待处理的sequence
        long nextSequence = sequence.get();
        T event = null;
        while (true) {
            try {
                // 如果上一个处理成功,则更新workSequence的值
                if (processedSequence) {
                    processedSequence = false;
                    do {
                        // 从当前workSequence的后移1位,为新的需要处理的序列
                        nextSequence = workSequence.get() + 1L;
                        // sequence存储该processor已经处理成功的序列最大值,初始时为-1;
                        sequence.set(nextSequence - 1L);
                    }
                    // workSequence所有processor线程共享同一变量
                    // 假设此时发生并发问题,由于其它线程已经处理成功,那么此处更新失败,则下次nextSequence可能获取到跳跃的值
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 逻辑走到这里,可以确定一个事情:sequence的值=workSequence的值-1,所以这里workSequence
                // 本质上代表了该Processor成功抢占成功可以处理的sequence数据
                // cachedAvailableSequence为等待之后下一个可用的序列
                if (cachedAvailableSequence >= nextSequence) {
                    // 获取workSequence对应的slot数据
                    event = ringBuffer.get(nextSequence);
                    // 调用用户逻辑进行处理
                    workHandler.onEvent(event);
                    // 处理成功,更新标记为true
                    processedSequence = true;
                } else {
                    // 说明当前processor抢占到的sequence可用数据超前,需要判断该sequence数据是否可用
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            } catch (final TimeoutException e) {
                // 调用对应的TimeoutHandler进行处理
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                // 当发生警告通知时,如果该线程状态已经被暂停,则直接中断
                if (!running.get()) {
                    break;
                }
            } catch (final Throwable ex) {
                // 处理异常,将该sequence标识为处理成功  !important
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

从run()的代码逻辑可以看出,不同的work消费者都是通过CAS来抢占需要处理的slot数据,每个processor维护了自身已经处理成功的sequence以及大家公共持有的workSequence,同时该processor是否可以处理sequence是由barrier来维护的。有一点需要注意,在exceptionHandler.handleEventException(ex, nextSequence, event);中,默认的异常处理器为FatalExceptionHandler,其打印日志结束后会抛出RuntimeException,从而导致消费者线程中断,所以在实际使用中一定要实现业务自己的ExceptionHandler或者在WorkHandler中自己处理异常

那么这里每个processor自身的sequence有什么作用?
思考下如下场景,假设现在有10个消费者,现在有10个slot需要处理,那么极端情况下可能workSequence被更新到了10,但是可能最小的sequence此时为0,说明第一个申请sequence成功的线程还未处理完毕,那么这整批消费者(一个WorkPool)最慢的下标其实就是0,一旦其处理成功,则sequence就有可能被更新为10;所以这里sequence的集合其实就是用来标识整个消费者中最慢的进度;

关于waitFor

上面提到了消费者能否处理sequence是由SequenceBarrier#waitFor来决定的,下面探究下该方法的实现机制。

public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException {
        checkAlert();
        // 转移职责,交由waitStrategy处理
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence) {
            return availableSequence;
        }
        // 该方法在上一期已经分析过,此处不再分析
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
...
// 以BlockingWaitStrategy为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException {
        long availableSequence;
        // 要消费的sequence超过了生产者的sequence,则消费者等待;
        if (cursorSequence.get() < sequence) {
            synchronized (mutex) {
                while (cursorSequence.get() < sequence) {
                    barrier.checkAlert();
                    mutex.wait();
                }
            }
        }
        // 该processor要消费的sequence超过了上游processor的最小值,自旋等待
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
            // 尝试寻找实现了onSpinWait的静态方法Thread进行调用
            ThreadHints.onSpinWait();
        }
        // 返回上游最小sequence
        return availableSequence;
}

BatchEventProcessor

主要属性

    // 空闲状态
    private static final int IDLE    = 0;
    // 暂停状态
    private static final int HALTED  = IDLE + 1;
    // 运行状态
    private static final int RUNNING = HALTED + 1;
    // 实际运行状态
    private final AtomicInteger               running          = new AtomicInteger(IDLE);
    // 异常处理器
    private       ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    // RingBuffer
    private final DataProvider<T>             dataProvider;
    // 屏障
    private final SequenceBarrier             sequenceBarrier;
    // 用户业务逻辑处理器
    private final EventHandler<? super T>     eventHandler;
    // 消费者sequence
    private final Sequence                    sequence         = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 超时处理器
    private final TimeoutHandler              timeoutHandler;
    private final BatchStartAware             batchStartAware;

主要属性与WorkProcessor基本类似,查看其run()方法逻辑:

private void processEvents() {
        T event = null;
        // 从当前sequence后移一个进行消费
        long nextSequence = sequence.get() + 1L;

        while (true) {
            try {
                // 获取可用的sequence长度
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null && availableSequence >= nextSequence) {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }
                // 如果当前处理的sequence落后,就循环挨个处理
                while (nextSequence <= availableSequence) {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                // 处理完毕后进行更新
                sequence.set(availableSequence);
            } catch (final TimeoutException e) {
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                if (running.get() != RUNNING) {
                    break;
                }
            } catch (final Throwable ex) {
                exceptionHandler.handleEventException(ex, nextSequence, event);  
                // 业务逻辑出现异常时,若无新的异常抛出,则更新sequence
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
}

从上面的代码可以看出,假设Disruptor调用halt之后,则该批次数据仍会处理完毕,在新的一轮waitFor判断中抛出AlertException异常;

关于消费者线程池

此处把线程池单独拿出来看是有一定原因的,现在Disruptor把显示传入线程池的构造方法置为了@Deprecated,那么我们在使用时应该注意什么呢?
从前文我们已经大致分析过生产者和消费者是如何协同的,回顾下结论:

  • sequencer会显示维护消费最慢slot的下标;
  • 生产者在发布事件时需要先调用next()进行显示申请或者占用;
  • 消费者线程在消费时会主动调用barrier#waitFor进行判断;
  • 每个消费者占用一个线程;

这里有一个死锁问题,假设说消费者个数为N,线程个数为M,其中N > M,从前面的代码我们已经分析过,消费者线程的逻辑都是死循环,那么很有可能出现饥饿消费者,即无法被线程池调用,那么从生产者端来看,最慢slot一直未改变,从而导致生产者等待,而生产者等待又会促使消费者waitFor方法无法通过,从而出现互相等待死锁问题;那么Disruptor是如何进行解决的呢,代码如下:

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) {
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
...
// BaseExecutor
private final ThreadFactory factory;
private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();

public BasicExecutor(ThreadFactory factory) {
  this.factory = factory;
}

@Override
public void execute(Runnable command) {
  final Thread thread = factory.newThread(command);
  if (null == thread) {
    throw new RuntimeException("Failed to create thread to run: " + command);
  }

  thread.start();

  threads.add(thread);
}

代码非常简单,即创建足够消费者使用的线程数量进行消费;
本文先写到这里,下一篇文章对消费者的等待策略进行具体分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容