前言
RingBuffer是Disruptor框架负责数据存储的模块,大部分文章也将其称之为环形缓存区,本文将对其实现原理进行深度探究。
本文依赖Disruptor版本为3.4.3
继承体系
实现接口说明
- DataProvider:获取指定位置的数据;
- EventSink:发布事件到RingBuffer的接口定义,提供了各种事件发布的操作方法;
- Cursored:获取当前游标位置;在Disruptor中都是依赖于Sequence的value;
- Sequenced:提供了RingBuffer相关的sequence判断操作方法,包括获取大小、占用slot、发布等方法;
继承类说明
- RingBufferFields:实现RingBuffer内存预分配以及数据定位等功能,此类非常关键,后续进行详细介绍;
- RingBufferPad:解决缓存行伪共享的填充类;
RingBuffer初始化流程
// RingBuffer的构造方法
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
// 调用父类即RingBufferFields的构造方法
super(eventFactory, sequencer);
}
可以看出RingBuffer的初始化流程逻辑均在其父类中,我们详细分析RingBufferFields类的实现逻辑;
abstract class RingBufferFields<E> extends RingBufferPad {
// 用来预留填充的单侧slot个数,加速计算
private static final int BUFFER_PAD;
// RingBuffer中首个slot的偏移位置
private static final long REF_ARRAY_BASE;
// log2(scale),该值与BUFFER_PAD作用相似,主要用来加速计算
private static final int REF_ELEMENT_SHIFT;
// 反射拿到UNSAFE对象
private static final Unsafe UNSAFE = Util.getUnsafe();
static {
// 返回Object数组中一个元素指针占用的内存大小
final int scale = UNSAFE.arrayIndexScale(Object[].class);
// 指针占4个字节
if (4 == scale) {
REF_ELEMENT_SHIFT = 2;
// 指针占8个字节
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// BUFFER_PAD的取值为16/32,后续计算使用
BUFFER_PAD = 128 / scale;
// 计算数组首元素的偏移大小,往后偏移了128个字节,128/8=16
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
}
// 数组的最大下标,即bufferSize - 1
private final long indexMask;
// 对象数组,RingBuffer中实际数据载体
private final Object[] entries;
// RingBuffer的长度
protected final int bufferSize;
// 序列器,分为单生产者和多生产者,用来在生产者和消费者之间传递数据,此处暂时略过
protected final Sequencer sequencer;
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
// bufferSize必须为2^N,否则进行报错
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
// 最大下标
this.indexMask = bufferSize - 1;
// 预留32/64个空槽位
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
/**
* 一次性填充满整个数组,从BUFFER_PAD的下一位开始;
*/
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
// 预留BUFFER_PAD个空slot
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence) {
// 元素内存偏移量=base + index * scale
// REF_ARRAY_BASE= UNSAFE.arrayBaseOffset(Object[].class) + 128;
// 因为RingBuffer的长度为2^N,所以(sequence & indexMask) = sequence % bufferSize = index
// index<<REF_ELEMENT_SHIFT = index << (2 ^ (log2(scale))) = index * scale
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
上述代码分析中,有几处关键点如下:
- 所谓的环形缓冲区实质上就是一个连续数组,只是包装出来了环形的概念;
- RingBuffer预分配内存时在数组的左右俩侧各预留了BUFFER_PAD个slot;
- 内存预分配时从BUFFER_PAD处开始,在64位JDK中,假设开启指针压缩,则相当于左侧预留32个slot,右侧预留32个slot;
- 在RingBuffer长度为2^N时,通过sequence & (bufferSize - 1)加速定位元素实际下标索引,通过结合<<操作实现乘法;
- 首个元素的内存偏移位置是固定的,即128位,因为BUFFER_PAD * scale=128;
在MultiProducerSequencer中后续通过>>>实现除法,作用类似;
为何预留slot?
在现代CPU中,缓存行伪共享是一个经典问题,即CPU一次load的L1缓存行大小是固定的,如果两个不同的元素被load到同一缓存行中,那么任何一个元素的修改都会导致另一元素在其它CPU核对应的缓存失效,这个问题在很多博文中都有专门讲解,此处不再赘述。引用结论就是单个频繁修改元素尽量不和其它元素在同一缓存行中。
对于entries对象,左右两侧各预留了128 bytes,这就保证了缓存行在<=128bytes时,entries对象一定不会和其它对象共享缓存行。
Sequence是什么?
Sequence是针对RingBuffer中slot位置的一个描述,与AtomicLong的功能几乎一致,其在AtomicLong的基础上解决了缓存行伪共享的问题,其继承关系如下:
其核心元素为value,根据Java的内存分配,value两次各8个long对象,即大小为8 * 8=64 bytes,那么在缓存行<=64 bytes时,可以避免缓存行伪共享问题。
下面对其关键的两个方法做下简单分析:
/**
* Perform an ordered write of this sequence. The intent is a Store/Store barrier between this write and any previous store.
*
* putOrderedLong在执行时使用store/store内存屏障,因此其在可见性上会存在一定的延时,但是由于其未使用store/load屏障,因此其写入是无阻塞的
*/
public void set(final long value) {
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* Performs a volatile write of this sequence. The intent is a Store/Store barrier between this write and any previous write and a
* Store/Load barrier between this write and any subsequent volatile read.
*
* 本方法支持volatile语义,即在写入前插入store/store内存屏障,写入后插入store/load内存屏障,因此值的修改在其它线程是立即可见的
*/
public void setVolatile(final long value) {
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
Sequencer是什么?
前面讲完了Sequence,是时候推出Sequencer了,Sequencer是Disruptor中非常核心的一个概念,其作为生产者和消费者的桥梁,通过维护两者的sequence来实现数据的扭转;
从继承体系上来看,sequencer主要分为单生产者和多生产者两类,这两者实现了各种free-lock的算法,此处以SingleProducerSequencer为例分析。
AbstractSequencer
该类主要属性说明如下:
// JDK提供的基于反射机制的volatile原子更新工具类,本更新器用来更新gatingSequences属性
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
// RingBuffer中环形缓冲区的大小
protected final int bufferSize;
// 消费者等待策略
protected final WaitStrategy waitStrategy;
// 生产者sequence,初始位置为-1
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 消费者sequence集合
protected volatile Sequence[] gatingSequences = new Sequence[0];
该类主要方法说明如下:
/**
* 增加消费者sequence列表,该方法功能由SequenceGroups代理,本质上还是AtomicReferenceFieldUpdater进行更新
*/
@Override
public final void addGatingSequences(Sequence... gatingSequences) {
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
/**
* 移除消费者sequence列表,该方法功能由SequenceGroups代理,本质上还是AtomicReferenceFieldUpdater进行更新
*/
@Override
public boolean removeGatingSequence(Sequence sequence) {
return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}
/**
* 获取消费者sequence列表中最小的索引下标值,cursor.get()为生产者下标值
*/
@Override
public long getMinimumSequence() {
return Util.getMinimumSequence(gatingSequences, cursor.get());
}
SingleProducerSequencer
SingleProducerSequencer中最主要的属性为nextValue和cachedValue,其中nextValue表示获取下一个可用slot的索引下标,而cachedValue则是当前消费者消费最慢slot的索引下标,下面以next(int n)方法为例分析生产者获取可用sequence的逻辑。
/**
* 申请N个可用slot,用来存放生产出来的数据
* @param n 可用sequence的个数
*/
public long next(int n) {
// 申请个数不能<1,不能>bufferSize
if (n < 1 || n > bufferSize) {
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// 当前slot下标,初始时为-1
long nextValue = this.nextValue;
// 申请可用槽位的最大下标,默认一次申请1个,所以初始时位0
long nextSequence = nextValue + n;
// 倒转1轮,与消费者进行比较
long wrapPoint = nextSequence - bufferSize;
// 消费者中记录的最小slot下标,默认为-1
long cachedGatingSequence = this.cachedValue;
// wrapPoint > cachedGatingSequence,主要用来判断是否出现环路,即生产者生产速度过快,追上了消费者;
// cachedGatingSequence > nextValue,主要用来判断是否消费者消费过快,追上了生产者;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
// 设置生产者要使用的slot下标,底层为volatile语义
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 重新最小slot值,此处分两种情况讨论:
// 如果nextValue<gatingSequences最小值,说明消费者消费过快,那么此时wrapPoint=nextValue + n-bufferSize<nextValue,直接跳出循环,消费者处直接等待;
// 如果nextValue>gatingSequences最小值,那么判断是否出现环路,如果不出现,那么直接分配,否则,生产者等待;
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
// 生产者等待
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
this.cachedValue = minSequence;
}
// 更新生产者已使用的slot下标
this.nextValue = nextSequence;
return nextSequence;
}
从上述的代码分析来看,申请可用slot主要通过生产者和消费者的sequence进行比较,单生产者与多生产者的差别仅在于多生产者对sequence的更新可能产生争用,因此使用CAS机制进行更新;
publishEvent
事件发布也是RingBuffer提供的一个重要功能,其相当于为生产者提供了数据操作的入口,通过前面的继承体系可以看出,事件相关操作是由EventSink接口提供的,下面对其先进行一个简单的分析。
public void publishEvent(EventTranslator<E> translator) {
// 占用slot位置
final long sequence = sequencer.next();
// 发布事件
translateAndPublish(translator, sequence);
}
......
private void translateAndPublish(EventTranslator<E> translator, long sequence) {
try {
// 事件转换
translator.translateTo(get(sequence), sequence);
} finally {
// 更新生产者sequence的值
sequencer.publish(sequence);
}
}
......
public void publish(long sequence) {
// 更新生产者的value
cursor.set(sequence);
// 唤醒消费者线程,这个与等待策略有关,此处先不具体讨论
waitStrategy.signalAllWhenBlocking();
}
结合上面next()的实现源码,我们大致可以得出如下结论,以单事件发布为例,其是一个两阶段操作
- 预占用slot,更新sequencer的nextValue,这样下次分配时从新的nextValue往下分配
- 对slot的具体数据进行更新,同时更新生产者的sequence
由于是单生产者,这种操作方式不会产生并发问题。但是在多生产者中,逻辑会有问题,比如如果在第二步中更新sequence,在一定程度上会存在顺序错乱的问题,因此在预申请的时候就要更新生产者sequence,同时如何判断数据发布成功了呢?我们后面揭晓。
MultiProducerSequencer
// UNSAFE对象实例
private static final Unsafe UNSAFE = Util.getUnsafe();
// 获取int数组的首个元素偏移位置
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
// 返回一个int数组中对象指针占用的内存大小
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
// 缓存的消费者最慢slot的下标
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer;
// ringbuffer最大下标
private final int indexMask;
// log2(bufferSize)
private final int indexShift;
通过这些属性定义,我们似乎有一点熟悉,比如BASE、SCALE、indexMask、indexShift这些概念,实际上这些值主要也是为了加速计算,其中比较让人懵圈的是int数组availableBuffer,所以可以想象BASE这些主要是为了对其进行内存操作;
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
super(bufferSize, waitStrategy);
// 初始化availableBuffer大小为RingBuffer的大小
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
private void initialiseAvailableBuffer() {
// 这个写法有点奇怪,提前分配内存,值为-1
for (int i = availableBuffer.length - 1; i != 0; i--) {
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
OK,分析到这里,我们大致了解了多生产者的一些关键信息,下面回到在单生产者发布事件的问题上来继续探究,我们知道无论是哪种生产者模式,第一步都是申请可用的slot,那下面直接上多生产者申请slot的代码。
public long next(int n) {
// 基础的逻辑判断
if (n < 1 || n > bufferSize) {
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
// 生产者实际的sequence值
long current;
// 申请可用slot的最大值
long next;
do {
// 实际sequence,初始时为-1
current = cursor.get();
// 申请可用slot的最大值,初始时为0
next = current + n;
// 下面与单生产者类似
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
// 这里补充一点,cachedGatingSequence是缓存的上一次可能出现环路或消费过快时的消费者最慢slot的下标,正常申请时可能存在多轮未更新
// 因此理论上cachedGatingSequence可能并不是最新的值,且其值相比实际值较小;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
// 获取最小的sequence值
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果出现环路,等待然后继续循环
if (wrapPoint > gatingSequence) {
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// 未出现环路,更新消费者的sequence值
gatingSequenceCache.set(gatingSequence);
// CAS更新生产者的sequence,实现free-lock
} else if (cursor.compareAndSet(current, next)) {
// 更新成功跳出
break;
}
} while (true);
return next;
}
从上述可以看出,next方法在多生产者中永远都依赖生产者的实际sequence值进行计算,且占用成功后实时更新;
这里是与单生产者模式的第一个差别,单生产者中计算依赖的是其自身的nextValue值,其通过publish才会实际更新生产者游标,所以单生产者是一个典型的二阶段提交;
对于多生产者来说,由于生产者游标被实时更新掉了,那消费者依据什么来判断数据已经替换(发布)成功了呢?查看MultiProducerSequencer的publish方法,代码如下:
public void publish(final long sequence) {
// 关键代码
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
继续往下翻看setAvailable的代码:
private void setAvailable(final long sequence) {
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
...
private int calculateAvailabilityFlag(final long sequence) {
// 计算值=sequence / bufferSize,即该sequence属于哪一圈
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence) {
// 计算值=sequence % bufferSize,即该sequence的实际数组下标
return ((int) sequence) & indexMask;
}
// 由上可知,availableBuffer中记录了对应sequence的实际圈数(round)
private void setAvailableBufferValue(int index, int flag) {
// 计算偏移量
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
由上我们可知,availableBuffer中记录了最新sequence对应的圈数,这里我们先临时插入一个消费者的代码片段,在Disruptor中,消费者的接口主要为EventProcessor,这里以WorkProcessor为例,代码如下:
if (cachedAvailableSequence >= nextSequence) {
// 获取下一个工作序列
event = ringBuffer.get(nextSequence);
// 处理序列
workHandler.onEvent(event);
// 更新标记为true
processedSequence = true;
} else {
// 获取可处理的最大sequence序列
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
...
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
checkAlert();
// 获取可用的序列
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence) {
return availableSequence;
}
// 关键在这里 !import
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
可知消费者消费数据时强依赖sequencer.getHighestPublishedSequence的方法,多生产者的代码实现如下:
public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
// 关键代码 !import
if (!isAvailable(sequence)) {
return sequence - 1;
}
}
return availableSequence;
}
...
public boolean isAvailable(long sequence) {
// %,计算实际的下标
int index = calculateIndex(sequence);
// 计算其round
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
// 判断其是否与标识位相同,如果不相同,则代码数据未publish完成
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
由上整个分析基本结束,即消费者依靠availableBuffer的值来判断该数据是否已经发布完毕;
总结
本文主要对RingBuffer的数据结构、生产者以及数据发布流程进行了一个大致的分析,但其中有很多的细节还未来得及分析,比如生产者和消费者之间是如何进行数据联动的,Barrier在其中扮演的角色又是什么,这些留到下一篇文章进行深入探讨。