1 生产者线程
生产者一般就是我们的应用线程,在发布通常使用一个EventTranslator将数据转移到RingBuffer上,因为不涉及共享数据和实例变量,通常使用同一个EventTranslator实例进行操作(注:translate经常是“翻译”的意思,但其实还有“ move from one place or condition to another.”的转移、转换的意思)。
* 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
event.data = arg0 ;
System.err.println("put data "+sequence+", "+event+", "+arg0);
// 生产线程0
Thread produceThread0 = new Thread(new Runnable() {
public void run() {
int x = 0;
while(x++ < events / 2){ // 除了下面这行代码,其他都没有关系
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
// 生产线程1
Thread produceThread1 = new Thread(new Runnable() {
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
2 生产事件的整体逻辑
// Disruptor.java
* Publish an event to the ring buffer. 使用给定的事件翻译器,发布事件
* @param eventTranslator the translator that will load data into the event.
* @param arg A single argument to load into the event
public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
ringBuffer.publishEvent(eventTranslator, arg);
// RingBuffer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
final long sequence = sequencer.next(); // 第一步 占坑
translateAndPublish(translator, sequence, arg0); // 第二步 填坑
3 Disruptor的核心--Sequencer接口
为什么说Sequencer是Disruptor的核心呢?其实这也不是我说的,是Disruptor官方Wiki Introduction上说的:
下边是Sequencer接口及其父接口Cursored、Sequenced 定义。
// Sequencer
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
public interface Sequencer extends Cursored, Sequenced
* Set to -1 as sequence starting point
* 序号开始位置
* Claim a specific sequence. Only used if initialising the ring buffer to
* a specific value.
* @param sequence The sequence to initialise too.
* 声明指定序号,只用在初始化RingBuffer到指定值,基本上不用了
void claim(long sequence);
* Confirms if a sequence is published and the event is available for use; non-blocking.
* @param sequence of the buffer to check
* @return true if the sequence is available for use, false if not
* 用非阻塞方式,确认某个序号是否已经发布且事件可用。
boolean isAvailable(long sequence);
* Add the specified gating sequences to this instance of the Disruptor. They will
* safely and atomically added to the list of gating sequences.
* @param gatingSequences The sequences to add.
* 增加门控序列(消费者序列),用于生产者在生产时避免追尾消费者
void addGatingSequences(Sequence... gatingSequences);
* Remove the specified sequence from this sequencer.
* @param sequence to be removed.
* @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
* 从门控序列中移除指定序列
boolean removeGatingSequence(Sequence sequence);
* Create a new SequenceBarrier to be used by an EventProcessor to track which messages
* are available to be read from the ring buffer given a list of sequences to track.
* @param sequencesToTrack
* @return A sequence barrier that will track the specified sequences.
* @see SequenceBarrier
* 消费者使用,用于追踪指定序列(通常是上一组消费者的序列)
SequenceBarrier newBarrier(Sequence... sequencesToTrack);
* Get the minimum sequence value from all of the gating sequences
* added to this ringBuffer.
* @return The minimum gating sequence or the cursor sequence if
* no sequences have been added.
* 获取追踪序列中最小的序列
long getMinimumSequence();
* Get the highest sequence number that can be safely read from the ring buffer. Depending
* on the implementation of the Sequencer this call may need to scan a number of values
* in the Sequencer. The scan will range from nextSequence to availableSequence. If
* there are no available values <code>>= nextSequence</code> the return value will be
* <code>nextSequence - 1</code>. To work correctly a consumer should pass a value that
* is 1 higher than the last sequence that was successfully processed.
* @param nextSequence The sequence to start scanning from.
* @param availableSequence The sequence to scan to.
* @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
* 获取能够从环形缓冲读取的最高的序列号。依赖Sequencer的实现,可能会扫描Sequencer的一些值。扫描从nextSequence
* 到availableSequence。如果没有大于等于nextSequence的可用值,返回值将为nextSequence-1。为了工作正常,消费者
* 应该传递一个比最后成功处理的序列值大1的值。
long getHighestPublishedSequence(long nextSequence, long availableSequence);
<T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
// Cursored.java
* Implementors of this interface must provide a single long value
* that represents their current cursor value. Used during dynamic
* add/remove of Sequences from a
* {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}.
* 游标接口,用于获取生产者当前游标位置
public interface Cursored
* Get the current cursor value.
* @return current cursor value
long getCursor();
// Sequenced.java
public interface Sequenced
* The capacity of the data structure to hold entries.
* @return the size of the RingBuffer.
* 获取环形缓冲的大小
int getBufferSize();
* Has the buffer got capacity to allocate another sequence. This is a concurrent
* method so the response should only be taken as an indication of available capacity.
* @param requiredCapacity in the buffer
* @return true if the buffer has the capacity to allocate the next sequence otherwise false.
* 判断是否含有指定的可用容量
boolean hasAvailableCapacity(final int requiredCapacity);
* Get the remaining capacity for this sequencer.
* @return The number of slots remaining.
* 剩余容量
long remainingCapacity();
* Claim the next event in sequence for publishing.
* @return the claimed sequence value
* 生产者发布时,申请下一个序号
long next();
* Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
* requires a little care and some math.
* <pre>
* int n = 10;
* long hi = sequencer.next(n);
* long lo = hi - (n - 1);
* for (long sequence = lo; sequence <= hi; sequence++) {
* // Do work.
* }
* sequencer.publish(lo, hi);
* </pre>
* @param n the number of sequences to claim
* @return the highest claimed sequence value
* 申请n个序号,用于批量发布
long next(int n);
* Attempt to claim the next event in sequence for publishing. Will return the
* number of the slot if there is at least <code>requiredCapacity</code> slots
* available.
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next()的非阻塞模式
long tryNext() throws InsufficientCapacityException;
* Attempt to claim the next n events in sequence for publishing. Will return the
* highest numbered slot if there is at least <code>requiredCapacity</code> slots
* available. Have a look at {@link Sequencer#next()} for a description on how to
* use this method.
* @param n the number of sequences to claim
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next(n)的非阻塞模式
long tryNext(int n) throws InsufficientCapacityException;
* Publishes a sequence. Call when the event has been filled.
* @param sequence
* 数据填充后,发布此序号
void publish(long sequence);
* Batch publish sequences. Called when all of the events have been filled.
* @param lo first sequence number to publish
* @param hi last sequence number to publish
* 批量发布序号
void publish(long lo, long hi);
3.1 单生产者发布事件
// SingleProducerSequencer.java
public long next()
return next(1);
* @see Sequencer#next(int)
public long next(int n)
if (n < 1)
throw new IllegalArgumentException("n must be > 0");
// 复制上次申请完毕的序列值
long nextValue = this.nextValue;
// 加n,得到本次需要申请的序列值,单个发送n为1
long nextSequence = nextValue + n; // 本次要验证的值
// 可能发生绕环的点,本次申请值 - 一圈长度
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue; // 数值最小的序列值,也就是最慢消费者
// wrapPoint 等于 cachedGatingSequence 将发生绕环行为,生产者将在环上,从后方覆盖未消费的事件。
// 如果即将生产者超一圈从后方追消费者尾(要申请的序号落了最慢消费者一圈)或 消费者追生产者尾,将进行等待。后边这种情况应该不会发生吧?
// 针对以上值举例:400米跑道(bufferSize),小明跑了599米(nextSequence),小红(最慢消费者)跑了200米(cachedGatingSequence)。小红不动,小明再跑一米就撞翻小红的那个点,叫做绕环点wrapPoint。
// 没有空坑位,将进入循环等待。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 只有当消费者消费,向前移动后,才能跳出循环
// 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{ // 唤醒等待的消费者
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
// 当消费者向前消费后,更新缓存的最小序号
this.cachedValue = minSequence;
// 将成功申请的序号赋值给对象实例变量
this.nextValue = nextSequence;
return nextSequence;
// RingBuffer.java
private void translateAndPublish(EventTranslator<E> translator, long sequence)
translator.translateTo(get(sequence), sequence);
public interface EventTranslatorOneArg<T, A>
* Translate a data representation into fields set in given event
* @param event into which the data should be translated.
* @param sequence that is assigned to event.
* @param arg0 The first user specified argument to the translator
void translateTo(final T event, long sequence, final A arg0);
// SingleProducerSequencer.java
public void publish(long sequence)
{ // 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费
// 除signalAllWhenBlocking外都是空实现
// BlockingWaitStrategy.java
public void signalAllWhenBlocking()
3.2 插播Disruptor中的高效AtomicLong--Sequence
class LhsPadding
protected long p1, p2, p3, p4, p5, p6, p7;
class Value extends LhsPadding
{ // value的前后各有7个long变量,用于缓存行填充,前后各7个保证了不管怎样,当64位的缓存行加载时value,不会有其他变量共享缓存行,从而解决了伪共享问题
protected volatile long value;
class RhsPadding extends Value
protected long p9, p10, p11, p12, p13, p14, p15;
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
* Sequence可以按照AtomicLong来理解,除了Sequence消除了伪共享问题,更加高效
public class Sequence extends RhsPadding
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
UNSAFE = Util.getUnsafe();
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
catch (final Exception e)
throw new RuntimeException(e);
* Create a sequence initialised to -1.
public Sequence()
* Create a sequence with a specified initial value.
* @param initialValue The initial value for this sequence.
public Sequence(final long initialValue)
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
* Perform a volatile read of this sequence's value.
* @return The current value of the sequence.
public long get()
return value;
* Perform an ordered write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* store.
* @param value The new value for the sequence.
* 此方法等同于AtomicLong#lazySet(long newValue),
* 和直接修改volatile修饰的value相比,非阻塞,更高效,但更新的值会稍迟一点看到
public void set(final long value)
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
* Performs a volatile write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* write and a Store/Load barrier between this write and any
* subsequent volatile read.
* @param value The new value for the sequence.
public void setVolatile(final long value)
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
* Perform a compare and set operation on the sequence.
* @param expectedValue The expected current value.
* @param newValue The value to update to.
* @return true if the operation succeeds, false otherwise.
public boolean compareAndSet(final long expectedValue, final long newValue)
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
* Atomically increment the sequence by one.
* @return The value after the increment
public long incrementAndGet()
return addAndGet(1L);
* Atomically add the supplied value.
* @param increment The value to add to the sequence.
* @return The value after the increment.
public long addAndGet(final long increment)
long currentValue;
long newValue;
currentValue = get();
newValue = currentValue + increment;
while (!compareAndSet(currentValue, newValue));
return newValue;
public String toString()
return Long.toString(get());
3.3 多生产者发布事件
public long next()
return next(1);
* @see Sequencer#next(int)
public long next(int n)
if (n < 1)
throw new IllegalArgumentException("n must be > 0");
long current;
long next;
current = cursor.get(); // 当前游标值,初始化时是-1
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
else if (cursor.compareAndSet(current, next))
while (true);
return next;
// MultiProducerSequencer.java
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 获取int[]数组类的第一个元素与该类起始位置的偏移。
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每个元素需要占用的位置,也有可能返回0。BASE和SCALE都是为了操作availableBuffer
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer; // 初始全是-1
private final int indexMask;
private final int indexShift;
public void publish(final long sequence)
waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy,才会进行通知。否则不会操作
public void publish(long lo, long hi)
for (long l = lo; l <= hi; l++)
* availableBuffer设置可用标志
* 主要原因是避免发布者线程之间共享一个序列对象。
* 游标和最小门控序列的差值应该永远不大于RingBuffer的大小(防止生产者太快,覆盖未消费完的数据)
private void setAvailable(final long sequence)
{ // calculateIndex 求模%, calculateAvailabilityFlag 求除/
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
private void setAvailableBufferValue(int index, int flag)
{ // 使用Unsafe更新属性,因为是直接操作内存,所以需要计算元素位置对应的内存位置bufferAddress
long bufferAddress = (index * SCALE) + BASE;
// availableBuffer是标志可用位置的int数组,初始全为-1。随着sequence不断上升,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)会一直增大。
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
private int calculateAvailabilityFlag(final long sequence)
{ // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。
return (int) (sequence >>> indexShift);
private int calculateIndex(final long sequence)
{ // 计算位置即求模,直接使用序号 与 掩码(2的平方-1,也就是一个全1的二进制表示),相当于 sequence % (bufferSize), bufferSize = indexMask + 1
return ((int) sequence) & indexMask;
4 剖析SingleProducerSequencer设计
// SingleProducerSequencer.java
abstract class SingleProducerSequencerPad extends AbstractSequencer
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
super(bufferSize, waitStrategy);
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
super(bufferSize, waitStrategy);
* Set to -1 as sequence starting point
protected long nextValue = Sequence.INITIAL_VALUE; // 生产者申请的下一个序列值
protected long cachedValue = Sequence.INITIAL_VALUE; // 缓存上一次比较的门控序列组和next的较小值(最慢消费者序列值)
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
* Not safe for use from multiple threads as it does not implement any barriers.</p>
* <p>
* <p>Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
* to {@link Sequencer#publish(long)} is made.
public final class SingleProducerSequencer extends SingleProducerSequencerFields
protected long p1, p2, p3, p4, p5, p6, p7;
// ...省略
使用HSDB(HotSpot Debugger,可通过 java -cp .;"%JAVA_HOME%/lib/sa-jdi.jar" sun.jvm.hotspot.HSDB 启动)跟踪demo对应的已断点的HotSpot进程,从Object Histogram对象图中筛选出SingleProducerSequencer实例,并通过Inspector工具对SingleProducerSequencer实例进行查看。
本例中,0x00000000828026f8为com.lmax.disruptor.SingleProducerSequencer实例在JVM中的内存起始位置。以此内存地址通过mem命令查看后续的30个内存地址内容。为啥要30个呢?其实20个就够了,可以看到"Object Histogram"中SingleProducerSequencer实例的size是160字节,mem打印一行表示一字长,对应到我本机的64位机器即8字节,所以长度选择大于等于160/8=20就可以看到SingleProducerSequencer实例的内存布局全貌。
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
HotSpot中一个对象(非数组)的内存布局大概是这样的:对象头(Mark Word + klass pointer) + 实际数据 + 为了保持8字节对齐的填充。其中对象头的Mark Word和klass pointer长度各为一机器字(machine-word),即32位机器对应32bit(4字节),64位机器对应64bit(8字节)。如64位JVM开启了指针压缩,klass pointer将压缩到4字节。
jinfo -flag UseCompressedOops pid 返回-XX:+UseCompressedOops即为开启,或jinfo -flags pid 查看全部选项。
使用HSDB Inspector查看实例。
hsdb> mem 0x00000000828026f8 20
0x00000000828026f8: 0x0000000000000009 // mark word 存储对象运行时数据,如哈希码、GC分代年龄、锁状态标志、线程持有锁、偏向线程ID、偏向时间戳
0x0000000082802700: 0x000000082000de38 // 高4位(82802704~82802707):int bufferSize 8 ,低4位(82802700~8280273):2000de38。由于开启了指针压缩,低4位表示klass pointer,由于使用的JDK1.8,klass metadata保存在Metadataspace中。
0x0000000082802708: 0x828028e082809e98 // 高4位:ref cursor,低4位: ref waitStrategy
0x0000000082802710: 0x000000008284b390 // ref gatingSequences ObjArray
0x0000000082802718: 0x0000000000000000 // 包括当前行的以下7行 SingleProducerSequencerPad中定义的p1~p7
0x0000000082802720: 0x0000000000000000
0x0000000082802728: 0x0000000000000000
0x0000000082802730: 0x0000000000000000
0x0000000082802738: 0x0000000000000000
0x0000000082802740: 0x0000000000000000
0x0000000082802748: 0x0000000000000000
0x0000000082802750: 0x0000000000000001 // nextValue 1
0x0000000082802758: 0xffffffffffffffff // cachedValue -1
0x0000000082802760: 0x0000000000000000 // SingleProducerSequencer定义的p1~p7
0x0000000082802768: 0x0000000000000000
0x0000000082802770: 0x0000000000000000
0x0000000082802778: 0x0000000000000000
0x0000000082802780: 0x0000000000000000
0x0000000082802788: 0x0000000000000000
0x0000000082802790: 0x0000000000000000
计算此对象的Shallow Heap size 和 Retained Heap size:
可以发现此对象一共占用20*8=160B内存,此值即Shallow Heap size。也可以手工计算:mark_word[8] + klass_pointer[4] + 2 * ref[4] + ObjArray_ref[8] + 16 * long[8] + int[4] = 160B
而保留内存大小Retained Heap size = Shallow Heap size + (当前对象的引用对象排除GC Root引用对象)的Shallow Heap size。
这里涉及到的引用为:cursor 0x00000000828028e0 ,waitStrategy 0x0000000082809e98 ,gatingSequences 0x000000008284b390。
分别使用revptrs命令查找反向引用,发现只有gatingSequences为此对象唯一引用,故计算gatingSequences(com.lmax.disruptor.Sequence[1] ) Shallow Heap size = 12 + 4 + 1 * 4 + 4 = 24B。这里由于开启了压缩指针,引用指针占用4B,此时占用20B,需要填充4B补满24B。故对象的Retained Heap size为160+24=184。
hsdb> mem 0x000000008284b390 3
0x000000008284b390: 0x0000000000000009
0x000000008284b398: 0x000000012000e08d
0x000000008284b3a0: 0x000000008284abc0
数组对象的Shallow Heap size=引用对象头大小12字节+存储数组长度的空间大小4字节+数组的长度*数组中对象的Shallow Heap size+padding大小
Heap Parameters:
ParallelScavengeHeap [ PSYoungGen [
eden = [0x00000000d6300000,0x00000000d66755d0,0x00000000d8300000] ,
from = [0x00000000d8300000,0x00000000d8300000,0x00000000d8800000] ,
to = [0x00000000d8800000,0x00000000d8800000,0x00000000d8d00000] ]
PSOldGen [ [0x0000000082800000,0x00000000829d79c0,0x0000000084a00000] ] ]
- Java对象内存布局(推荐,写的很棒) http://www.jianshu.com/p/91e398d5d17c
- JVM——深入分析对象的内存布局 http://www.cnblogs.com/zhengbin/p/6490953.html
- 借HSDB来探索HotSpot VM的运行时数据 http://rednaxelafx.iteye.com/blog/1847971
- markOop.hpp https://github.com/dmlloyd/openjdk/blob/jdk8u/jdk8u/hotspot/src/share/vm/oops/markOop.hpp
- Shallow and retained sizes http://toolkit.globus.org/toolkit/testing/tools/docs/help/sizes.html
- AtomicLong.lazySet是如何工作的? http://ifeve.com/how-does-atomiclong-lazyset-work/
- 《深入理解Java虚拟机》2.3.2 对象的内存布局