Disruptor__快

Disruptor提供了一种线程之间信息交换的方式。

锁的缺点

并发的问题


想象有两个线程尝试修改同一个变量value:

  • 情况一:线程1先到达

变量value的值变为”blah”。
然后当线程2到达时,变量value的值变为”blahy”。

  • 情况二:线程2先到达

变量value的值变为”fluffy”。
然后当线程1到达时,值变为”blah”。

  • 情况三:线程1与线程2交互

线程2得到值"fluff"然后赋给本地变量myValue。
线程1改变value的值为”blah”。
然后线程2醒来并把变量value的值改为”fluffy”

解决办法

悲观锁

乐观锁(CAS)


在这种情况,当线程2需要去写Entry时才会去锁定它.它需要检查Entry自从上次读过后是否已经被改过了。如果线程1在线程2读完后到达并把值改为”blah”,线程2读到了这个新值,线程2不会把"fluffy"写到Entry里并把线程1所写的数据覆盖.线程2会重试(重新读新的值,与旧值比较,如果相等则在变量的值后面附上’y’),这里在线程2不会关心新的值是什么的情况.或者线程2会抛出一个异常,或者会返回一个某些字段已更新的标志,这是在期望把”fluff”改为”fluffy”的情况.举一个第二种情况的例子,如果你和另外一个用户同时更新一个Wiki的页面,你会告诉另外一个用户的线程 Thread 2,它们需要重新加载从Thread1来新的变化,然后再提交它们的内容。

死锁

死锁的原因和解决方法?

锁技术是慢的

Disruptor论文中讲述了我们所做的一个实验。这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。当单线程无锁时,程序耗时300ms。如果增加一个锁(仍是单线程、没有竞争、仅仅增加锁),程序需要耗时10000ms,慢了两个数量级。更令人吃惊的是,如果增加一个线程(简单从逻辑上想,应该比单线程加锁快一倍),耗时224000ms。使用两个线程对计数器自增5亿次比使用无锁单线程慢1000倍。并发很难而锁的性能糟糕。

Disruptor如何解决这些问题。

  • 首先,Disruptor根本就不用锁。
    取而代之的是,在需要确保操作是线程安全的(特别是,在多生产者的环境下,更新下一个可用的序列号)地方,我们使用CAS(Compare And Swap/Set)操作。这是一个CPU级别的指令,在我的意识中,它的工作方式有点像乐观锁——CPU去更新一个值,但如果想改的值不再是原来的值,操作就失败,因为很明显,有其它操作先改变了这个值。

CAS操作比锁消耗资源少的多,因为它们不牵涉操作系统,它们直接在CPU上操作。但它们并非没有代价——在上面的试验中,单线程无锁耗时300ms,单线程有锁耗时10000ms,单线程使用CAS耗时5700ms。所以它比使用锁耗时少,但比不需要考虑竞争的单线程耗时多。

回到Disruptor,在我讲生产者时讲过ClaimStrategy。在这些代码中,你可以看见两个策略,一个是SingleThreadedStrategy(单线程策略)另一个是MultiThreadedStrategy(多线程策略)。你可能会有疑问,为什么在只有单个生产者时不用多线程的那个策略?它是否能够处理这种场景?当然可以。但多线程的那个使用了AtomicLong(Java提供的CAS操作),而单线程的使用long,没有锁也没有CAS。这意味着单线程版本会非常快,因为它只有一个生产者,不会产生序号上的冲突。

  • 当然把一个数字转成AtomicLong不可能是Disruptor速度快的唯一秘密
    但这是非常重要的一点——在整个复杂的框架中,只有这一个地方出现多线程竞争修改同一个变量值。这就是秘密。还记得所有的访问对象都拥有序号吗?如果只有一个生产者,那么系统中的每一个序列号只会由一个线程写入。这意味着没有竞争、不需要锁、甚至不需要CAS。

为什么队列不能胜任这个工作

为什么队列底层用RingBuffer来实现,仍然在性能上无法与 Disruptor 相比。队列和最简单的ring buffer只有两个指针——一个指向队列的头,一个指向队尾:


如果有超过一个生产者想要往队列里放东西,尾指针就将成为一个冲突点,因为有多个线程要更新它。如果有多个消费者,那么头指针就会产生竞争,因为元素被消费之后,需要更新指针,所以不仅有读操作还有写操作了。
一个空的队列:


一个满的队列:


队列需要保存一个关于大小的变量,以便区分队列是空还是满。否则,它需要根据队列中的元素的内容来判断。
基于以上,这三个变量常常在一个cache line里面,有可能导致false sharing。因此,不仅要担心生产者和消费者同时写size变量(或者元素),还要注意由于头指针尾指针在同一位置,当头指针更新时,更新尾指针会导致缓存不命中。

神奇的缓存行填充

缓存行,伪共享

神奇的缓存行填充

Disruptor消除这个问题,至少对于缓存行大小是64字节或更少的处理器架构来说是这样的(译注:有可能处理器的缓存行是128字节,那么使用64字节填充还是会存在伪共享问题),通过增加补全来确保ring buffer的序列号不会和其他东西同时存在于一个缓存行中。

public long p1, p2, p3, p4, p5, p6, p7; // cache line padding
private volatile long cursor = INITIAL_CURSOR_VALUE;
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding

因此没有伪共享,就没有和其它任何变量的意外冲突,没有不必要的缓存未命中。

内存屏障

  • 什么是内存屏障?
    它是一个CPU指令。
    a)确保一些特定操作执行的顺序;
    b)影响一些数据的可见性(可能是某些指令执行后的结果)。

如果你的字段是volatile,Java内存模型将在写操作后插入一个写屏障指令,在读操作前插入一个读屏障指令。
1、一旦你完成写入,任何访问这个字段的线程将会得到最新的值。

2、在你写入前,会保证所有之前发生的事已经发生,并且任何更新过的数据值也是可见的,因为内存屏障会把之前的写入值都刷新到缓存。

RingBuffer cursor

RingBuffer的指针(cursor)(译注:指向队尾元素)属于一个神奇的volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一。

生产者将会取得下一个[Entry](或者是一批),并可对它(们)作任意改动, 把它(们)更新为任何想要的值。在所有改动都完成后,生产者对ring buffer调用commit方法来更新序列号(译注:把cursor更新为该Entry的序列号)。对volatile字段(cursor)的写操作创建了一个内存屏障,这个屏障将刷新所有缓存里的值(或者至少相应地使得缓存失效)。
这时候,消费者们能获得最新的序列号码(8),并且因为内存屏障保证了它之前执行的指令的顺序,消费者们可以确信生产者对7号Entry所作的改动已经可用。

  • 消费者那边会发生什么?
    消费者中的序列号是volatile类型的,会被若干个外部对象读取——其他的[下游消费者]可能在跟踪这个消费者。[ProducerBarrier]/RingBuffer跟踪它以确保环没有出现重叠(wrap)的情况(译注:为了防止下游的消费者和上游的消费者对同一个Entry竞争消费,导致在环形队列中互相覆盖数据,下游消费者要对上游消费者的消费情况进行跟踪)。

所以,如果你的下游消费者(C2)看见前一个消费者(C1)在消费号码为12的Entry,当C2的读取也到了12,它在更新序列号前将可以获得C1对该Entry的所作的更新。

基本来说就是,C1更新序列号前对ring buffer的所有操作(如上图黑色所示),必须先发生,待C2拿到C1更新过的序列号之后,C2才可以为所欲为(如上图蓝色所示)。

  • 内存屏障对性能的影响
    内存屏障作为另一个CPU级的指令,没有[锁那样大的开销]。内核并没有在多个线程间干涉和调度。但凡事都是有代价的。内存屏障的确是有开销的——编译器/cpu不能重排序指令,导致不可以尽可能地高效利用CPU,另外刷新缓存亦会有开销。所以不要以为用volatile代替锁操作就一点事都没。
    你会注意到Disruptor的实现对序列号的读写频率尽量降到最低。对volatile字段的每次读或写都是相对高成本的操作。但是,也应该认识到在批量的情况下可以获得很好的表现。如果你知道不应对序列号频繁读写,那么很合理的想到,先获得一整批Entries,并在更新序列号前处理它们。这个技巧对生产者和消费者都适用。以下的例子来自[BatchConsumer]
    long nextSequence = sequence + 1;
    while (running)
    {
        try
        {
            final long availableSequence = consumerBarrier.waitFor(nextSequence);
            while (nextSequence <= availableSequence)
            {
                entry = consumerBarrier.getEntry(nextSequence);
                handler.onAvailable(entry);
                nextSequence++;
            }
            handler.onEndOfBatch();
            sequence = entry.getSequence();
        }
        …
        catch (final Exception ex)
        {
            exceptionHandler.handle(ex, entry);
            sequence = entry.getSequence();
            nextSequence = entry.getSequence() + 1;
        }
    }

在上面的代码中,我们在消费者处理entries的循环中用一个局部变量(nextSequence)来递增。这表明我们想尽可能地减少对volatile类型的序列号的进行读写。

牛逼的RingBuffer

基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用的元素。(校对注:如下图右边的图片表示序号,这个序号指向数组的索引4的位置。
随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。
要找到数组中当前序号指向的元素,可以通过mod操作:
以上面的ringbuffer为例(java的mod语法):12 % 10 = 2。很简单吧。

事实上,上图中的ringbuffer只有10个槽完全是个意外。如果槽的个数是2的N次方更有利于基于二进制的计算机进行计算。

(校对注:2的N次方换成二进制就是1000,100,10,1这样的数字, sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。)

  • 没有尾指针。只维护了一个指向下一个可用位置的序号。这种实现是经过深思熟虑的—我们选择用环形buffer的最初原因就是想要竟可能多地保存消息(若有了尾指针,则尾指针经过的空间又浪费了)。
  • 不删除buffer中的数据
    也就是说这些数据一直存放在buffer中,直到新的数据覆盖他们。这也是不需要尾指针的原因。
  • 它是数组
    这是对CPU缓存友好的—也就是说,在硬件级别,数组中的元素是会被预加载的,因此在ringbuffer当中,cpu无需时不时去主存加载数组中的下一个元素。
  • 为数组预先分配内存
    这使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。

从Ringbuffer中读取数据消费


消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9。

消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.
final long availableSeq = consumerBarrier.waitFor(nextSequence);
ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12。ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号。

接下来

接下来,消费者会一直原地停留,等待更多数据被写入Ring Buffer。并且,一旦数据写入后消费者会收到通知——节点9,10,11和12 已写入。现在序号12到了,消费者可以让ConsumerBarrier去拿这些序号节点里的数据了。


拿到了数据后,消费者(Consumer)会更新自己的标识(cursor)。

你应该已经感觉得到,这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,消费者(Consumer)现在只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。

另一个好处是——你可以用多个消费者(Consumer)去读同一个RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。

写入 Ringbuffer

ProducerBarriers(AbstractSequencer)

写入 Ring Buffer 的过程涉及到两阶段提交 (two-phase commit)。首先,你的生产者需要申请 buffer 里的下一个节点。然后,当生产者向节点写完数据,它将会调用 ProducerBarrier 的 commit 方法。

那么让我们首先来看看第一步。 “给我 Ring Buffer 里的下一个节点”,这句话听起来很简单。的确,从生产者角度来看它很简单:简单地调用 ProducerBarrier 的 nextEntry() 方法,这样会返回给你一个 Entry 对象,这个对象就是 Ring Buffer 的下一个节点。

如何防止 Ring Buffer 重叠

在后台,由 ProducerBarrier 负责所有的交互细节来从 Ring Buffer 中找到下一个节点,然后才允许生产者向它写入数据。


在这幅图中,我们假设只有一个生产者写入 Ring Buffer。过一会儿我们再处理多个生产者的复杂问题。

ConsumerTrackingProducerBarrier 对象拥有所有正在访问 Ring Buffer 的 消费者 列表。这看起来有点儿奇怪-我从没有期望 ProducerBarrier 了解任何有关消费端那边的事情。但是等等,这是有原因的。因为我们不想与队列“混为一谈”(队列需要追踪队列的头和尾,它们有时候会指向相同的位置),Disruptor 由消费者负责通知它们处理到了哪个序列号,而不是 Ring Buffer。所以,如果我们想确定我们没有让 Ring Buffer 重叠,需要检查所有的消费者们都读到了哪里。
在上图中,有一个 消费者 顺利的读到了最大序号 12(用红色/粉色高亮)。第二个消费者 有点儿落后——可能它在做 I/O 操作之类的——它停在序号 3。因此消费者 2 在赶上消费者 1 之前要跑完整个 Ring Buffer 一圈的距离。

现在生产者想要写入 Ring Buffer 中序号 3 占据的节点,因为它是 Ring Buffer 当前游标的下一个节点。但是 ProducerBarrier 明白现在不能写入,因为有一个消费者正在占用它。所以,ProducerBarrier 停下来自旋 (spins),等待,直到那个消费者离开。

申请下一个节点

现在可以想像消费者 2 已经处理完了一批节点,并且向前移动了它的序号。可能它挪到了序号 9(因为消费端的批处理方式,现实中我会预计它到达 12,但那样的话这个例子就不够有趣了)。

上图显示了当消费者 2 挪动到序号 9 时发生的情况。在这张图中我已经忽略了ConsumerBarrier,因为它没有参与这个场景。
ProducerBarier 会看到下一个节点——序号 3 那个已经可以用了。它会抢占这个节点上的 Entry(我还没有特别介绍 Entry 对象,基本上它是一个放写入到某个序号的 Ring Buffer 数据的桶),把下一个序号(13)更新成 Entry 的序号,然后把 Entry 返回给生产者。生产者可以接着往 Entry 里写入数据。

提交新的数据


当生产者结束向 Entry 写入数据后,它会要求 ProducerBarrier 提交。

ProducerBarrier 先等待 Ring Buffer 的游标追上当前的位置(对于单生产者这毫无意义-比如,我们已经知道游标到了 12 ,而且没有其他人正在写入 Ring Buffer)。然后 ProducerBarrier 更新 Ring Buffer 的游标到刚才写入的 Entry 序号-在我们这儿是 13。接下来,ProducerBarrier 会让消费者知道 buffer 中有新东西了。它戳一下 ConsumerBarrier 上的 WaitStrategy 对象说-“喂,醒醒!有事情发生了!”

ProducerBarrier 上的批处理

有趣的是 Disruptor 可以同时在生产者和 [消费者]两端实现批处理。还记得伴随着程序运行,消费者 2 最后达到了序号 9 吗?ProducerBarrier 可以在这里做一件很狡猾的事-它知道 Ring Buffer 的大小,也知道最慢的消费者位置。因此它能够发现当前有哪些节点是可用的。


如果 ProducerBarrier 知道 Ring Buffer 的游标指向 12,而最慢的消费者在 9 的位置,它就可以让生产者写入节点 3,4,5,6,7 和 8,中间不需要再次检查消费者的位置。

多个生产者的场景

在多个生产者的场景下,你还需要其他东西来追踪序号。这个序号是指当前可写入的序号。注意这和“向 Ring Buffer 的游标加 1”不一样-如果你有一个以上的生产者同时在向 Ring Buffer 写入,就有可能出现某些 Entry 正在被生产者写入但还没有提交的情况。


让我们复习一下如何申请写入节点。每个生产者都向 ClaimStrategy 申请下一个可用的节点。生产者 1 拿到序号 13,这和上面单个生产者的情况一样。生产者 2 拿到序号 14,尽管 Ring Buffer的当前游标仅仅指向 12。这是因为 ClaimSequence 不但负责分发序号,而且负责跟踪哪些序号已经被分配。
我把生产者 1 和它的写入节点涂上绿色,把生产者 2 和它的写入节点涂上可疑的粉色-看起来像紫色。

现在假设生产者 1 还生活在童话里,因为某些原因没有来得及提交数据。生产者 2 已经准备好提交了,并且向 ProducerBarrier 发出了请求。

就像我们先前在 commit 示意图中看到的一样,ProducerBarrier 只有在 Ring Buffer 游标到达准备提交的节点的前一个节点时它才会提交。在当前情况下,游标必须先到达序号 13 我们才能提交节点 14 的数据。但是我们不能这样做,因为生产者 1 正盯着一些闪闪发光的东西,还没来得及提交。因此 ClaimStrategy 就停在那儿自旋 (spins), 直到 Ring Buffer 游标到达它应该在的位置。


现在生产者 1 从迷糊中清醒过来并且申请提交节点 13 的数据(生产者 1 发出的绿色箭头代表这个请求)。ProducerBarrier 让 ClaimStrategy 先等待 Ring Buffer 的游标到达序号 12,当然现在已经到了。因此 Ring Buffer 移动游标到 13,让 ProducerBarrier 戳一下 WaitStrategy 告诉所有人都知道 Ring Buffer 有更新了。现在 ProducerBarrier 可以完成生产者 2 的请求,让 Ring Buffer 移动游标到 14,并且通知所有人都知道。

你会看到,尽管生产者在不同的时间完成数据写入,但是 Ring Buffer 的内容顺序总是会遵循 nextEntry() 的初始调用顺序。也就是说,如果一个生产者在写入 Ring Buffer 的时候暂停了,只有当它解除暂停后,其他等待中的提交才会立即执行。

更新:最近的 RingBuffer​ 版本去掉了 Producer Barrier。如果在你看的代码里找不到 ProducerBarrier,那就假设当我讲“Producer Barrier”时,我的意思是“Ring Buffer”。
更新2:注意 Disruptor 2.0 版使用了与本文不一样的命名。如果你对类名感到困惑,请阅读我写的Disruptor 2.0更新摘要​。

Disruptor的依赖关系

菱形结构

  • 用队列实现菱形结构
  • 用 Disruptor 实现菱形结构


生产者这边比较简单,它是我在 上文​ 中描述过的单生产者模型。有趣的是,生产者并不需要关心所有的消费者。它只关心消费者 C3,如果消费者 C3 处理完了 Ring Buffer 的某一个节点,那么另外两个消费者肯定也处理完了。因此,只要 C3 的位置向前移动,Ring Buffer 的后续节点就会空闲出来。
管理消费者的依赖关系需要两个 ConsumerBarrier 对象。第一个仅仅与 Ring Buffer 交互,C1 和 C2 消费者向它申请下一个可访问节点。第二个 ConsumerBarrier 只知道消费者 C1 和 C2,它返回两个消费者访问过的消息序号中较小的那个。

Disruptor 怎样实现消费者等待(依赖)


我们从这个故事发生到一半的时候来看:生产者 P1 已经在 Ring Buffer 里写到序号 22 了,消费者 C1 已经访问和处理完了序号 21 之前的所有数据。消费者 C2 处理到了序号 18。消费者 C3,就是依赖其他消费者的那个,才处理到序号 15。

生产者 P1 不能继续向 RingBuffer 写入数据了,因为序号 15 占据了我们想要写入序号 23 的数据节点 (Slot)。


第一个 ConsumerBarrier(CB1)告诉 C1 和 C2 消费者可以去访问序号 22 前面的所有数据,这是 Ring Buffer 中的最大序号。第二个 ConsumerBarrier (CB2) 不但会检查 RingBuffer 的序号,也会检查另外两个消费者的序号并且返回它们之间的最小值。因此,三号消费者被告知可以访问 Ring Buffer 里序号 18 前面的数据。

注意这些消费者还是直接从 Ring Buffer 拿数据节点——并不是由 C1 和 C2 消费者把数据节点从 Ring Buffer 里取出再传递给 C3 消费者的。作为替代的是,由第二个 ConsumerBarrier 告诉 C3 消费者,在 RingBuffer 里的哪些节点可以安全的处理。

这产生了一个问题——如果任何数据都来自于 Ring Buffer,那么 C3 消费者如何读到前面两个消费者处理完成的数据呢?如果 C3 消费者关心的只是先前的消费者是否已经完成它们的工作(例如,把数据复制到别的地方),那么这一切都没有问题—— C3 消费者知道工作已完成就放心了。但是,如果 C3 消费者需要访问先前的消费者的处理结果,它又从哪里去获取呢?

秘密在于把处理结果写入 Ring Buffer 数据节点 (Entry) 本身。这样,当 C3 消费者从 Ring Buffer 取出节点时,它已经填充好了 C3 消费者工作需要的所有信息。这里 真正 重要的地方是节点 (Entry) 对象的每一个字段应该只允许一个消费者写入。这可以避免产生并发写入冲突 (write-contention) 减慢了整个处理过程。

  • 一些实际的 Java 代码
ConsumerBarrier consumerBarrier1 =
    ringBuffer.createConsumerBarrier();
BatchConsumer consumer1 =
    new BatchConsumer(consumerBarrier1, handler1);
BatchConsumer consumer2 =
    new BatchConsumer(consumerBarrier1, handler2);
ConsumerBarrier consumerBarrier2 =
    ringBuffer.createConsumerBarrier(consumer1, consumer2);
BatchConsumer consumer3 =
    new BatchConsumer(consumerBarrier2, handler3);
ProducerBarrier producerBarrier =
    ringBuffer.createProducerBarrier(consumer3);
  • 使用DSL
Executor executor = Executors.newCachedThreadPool();
BatchHandler handler1 = new MyBatchHandler1();
BatchHandler handler2 = new MyBatchHandler2();
BatchHandler handler3 = new MyBatchHandler3();
DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY,
    RING_BUFFER_SIZE, executor);
dw.consumeWith(handler1, handler2).then(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();
dw.consumeWith(handler1a, handler2a);
dw.after(handler1a).consumeWith(handler1b);
dw.after(handler2a).consumeWith(handler2b);
dw.after(handler1b, handler2b).consumeWith(handler3);
ProducerBarrier producerBarrier = dw.createProducerBarrier();

现在你知道了——如何关联 Disruptor 与相互依赖(等待)的多个消费者。关键点是:

  • 使用多个 ConsumerBarrier 来管理消费者之间的依赖(等待)关系。
  • 使用 ProducerBarrier 监视结构图中最后一个消费者。
  • 只允许一个消费者更新数据节点 (Entry) 的每一个独立字段。

Disruptor 2.0更新摘要

对于2.0版的主要变化有3点:

  • 更贴切的命名;
  • 把producer barrier(生产者屏障)整合进了ring buffer;
  • 将Disruptor Wizard加入了主代码库。
    新版本:

你可以看到基本原理还是类似的。新版本更加简单,因为ProducerBarrier本身不再作为一个单独的实体存在,它的替代者是PublishPort 接口,且RingBuffer自身就实现了这个接口。 类似地,DependencyBarrier替代ConsumerBarrier ,厘清了此对象的职责。另外,Publisher (Producer的替代者)和EventProcessor(替代了Consumer)也更能精确地体现出它们的行为。Consumer这个名字总是会带来一些混淆,因为因为其实消费者从来不从ring buffer消费任何东西。Consumer之前仅仅是用于队列实现的一个术语。

图上没有表现出来的变动是以前存在ring buffer里的东西叫entry(输入条目),而现在改名叫Event(事件)了,相应的就是EventProcessor。

概念

RingBuffer: 被看作Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。
Sequence:Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前为AtomicLong类的特性。
Sequencer:这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。
SequenceBarrier: 由Sequencer生成,并且包含了已经发布的Sequence的引用,这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑。
WaitStrategy:决定一个消费者将如何等待生产者将Event置入Disruptor。
Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。
EventProcessor:主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。
EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口。
Producer:由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。
WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。
WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker吃间移交
LifecycleAware:当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。

应用举例

Disruptor系统的最初设计是为了支持需要按照特定的顺序发生的阶段性类似流水线事件,这种需求在企业应用系统开发中并不少见。图8显示了标准的3级流水线。


首先,每个事件都被写入硬盘(日志)作为日后恢复用。其次,这些事件被复制到备份服务器。只有在这两个阶段后,系统开始业务逻辑处理。

按顺序执行上次操作是一个合乎逻辑的方法,但是并不是最有效的方法。日志和复制操作可以同步执行,因为他们互相独立。但是业务逻辑必须在他们都执行完后才能执行。

LMAX 一种新的零售的金融交易平台

通过低延迟处理大量交易,取得低延迟和高吞吐量,而且没有并发代码的复杂性,他们是怎么做到呢?现在LMAX已经产品化一段时间了,现在应该可以揭开其神秘而迷人的面纱了。

整体结构

从最高层次看,架构有三个部分:

  • 业务逻辑处理器business logic processor[5]
  • 输入input disruptor
  • 输出output disruptors

业务逻辑处理器处理所有的应用程序的业务逻辑,这是一个单线程的Java程序,纯粹的方法调用,并返回输出。不需要任何平台框架,运行在JVM里,这就保证其很容易运行测试环境。

全部驻留在内存中

业务逻辑处理器有次序地取出消息,然后运行其中的业务逻辑,然后产生输出事件,整个操作都是在内存中,没有数据库或其他持久存储。将所有数据驻留在内存中有两个重要好处:首先是快,没有IO,也没有事务,其次是简化编程,没有对象/关系数据库的映射,所有代码都是使用Java对象模型

使用基于内存的模型有一个重要问题:万一崩溃怎么办?电源掉电也是可能发生的,“事件”(Event Sourcing )概念是问题解决的核心,业务逻辑处理器的状态是由输入事件驱动的,只要这些输入事件被持久化保存起来,你就总是能够在崩溃情况下,根据事件重演重新获得当前状态。(NOSQL存储的基于事件的事务实现)

LMAX提供业务逻辑处理的快照,从快照还原,每天晚上系统不繁忙时构建快照,重新启动商业逻辑处理器的速度很快,一个完整的重新启动 – 包括重新启动JVM加载最近的快照,和重放一天事件 – 不到一分钟。

快照虽然使启动一个新的业务逻辑处理器的速度,但速度还不够快,业务逻辑处理器在下午2时就非常繁忙甚至崩溃,LMAX就保持多个业务逻辑处理器同时运行,每个输入事件由多个处理器处理,只有一个处理器输出有效,其他忽略,如果一个处理器失败,切换到另外一个,这种故障转移失败恢复是事件源驱动(Event Sourcing)的另外一个好处。

Ref:http://ifeve.com/disruptor/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容