资料
官网地址 http://lmax-exchange.github.io/disruptor/
github https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
API文档 http://lmax-exchange.github.io/disruptor/docs/index.html
Maven地址 http://mvnrepository.com/artifact/com.lmax/disruptor
特点
Disruptor 开发的系统单线程能支撑每秒600万订单
Disruptor 是一个高性能异步处理队列,它实现了观察者模式 (生产者消费者)。它是无锁的、CAS操作、它不会清除缓存中的数据,只会覆盖,降低垃圾回收机制启动的频率。
Disruptor 读取cpu缓存,从cpu到L1-L3cpu缓存需要1-15纳秒,从cpu到主内容需要60-80纳秒
核心
RingBuffer 拥有一个序号,这个序号指向数组中下一个可用的元素。
RingBuffer 一个首尾相接的环, 由SequenceBuffer 控制不同消费者获取的序号。
RingBuffer 预分配缓存数组对象,没有首尾指针,缓存的长度总是2的n次方,这样可以用位运算 i & (length - 1) 替代 i % length 。
依赖链
指定两个消费者(并行)
disruptor.handleEventsWith(userEventHandler1,userEventHandler2);
指定两个消费者(串行)
disruptor.handleEventsWith(userEventHandler1).handleEventsWith(userEventHandler2);
先并行分组,并行完成后在串行
EventHandlerGroup<UserEvent> group1 = disruptor.handleEventsWith(userEventHandler1).handleEventsWith(userEventHandler2);
EventHandlerGroup<UserEvent> group2 = disruptor.handleEventsWith(userEventHandler3).handleEventsWith(userEventHandler4);// group1.and(group2).handleEventsWith(userEventHandler1);
初始化
// 初始化线程池-用户执行
ConsumerExecutor executor = Executors.newCachedThreadPool();
// 初始化EventFactory
LongEventFactory factory = new LongEventFactory();
// 初始化RingBuffer的大小,必须是2的指数 长度设置为2的N次方,有利于二进制计算机计算。
int bufferSize = 1024;
// 初始化RingBuffer
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
// 指定事件处理器
disruptor.handleEventsWith(new LongEventHandler());
// 开启Disruptor,开启所有线程,(此方法只能调用一次,并且所有的EventHandler必须在start之前添加,包括ExceptionHandler)
disruptor.start();
// 获取RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 获取下一个序号
long sequence = ringBuffer.next();
// 根据下一个序号获取Event
LongEvent event = ringBuffer.get(sequence);
// 发布序号(发布后可以被消费)
ringBuffer.publish(sequence);