Java并发系列7-Disruptor无锁缓存框架

声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87

1、从生产者消费者说起

在传统的生产者消费者模型中,通常是采用BlockingQueue实现。其中生产者线程负责提交需求,消费者线程负责处理任务,二者之间通过共享内存缓冲区进行通信。由于内存缓冲区的存在,允许生产者和消费者之间速度的差异,确保系统正常运行。

下图展示一个简单的生产者消费者模型,生产者从文件中读取数据,将数据内容写入到阻塞队列中,消费者从队列的另一边获取数据,进行计算并将结果输出。其中Main负责创建两类线程并初始化队列。

生产者-消费者

Main:

public class Main {
    public static void main(String[] args) {
        // 初始化阻塞队列
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000);
        // 创建生产者线程
        Thread producer = new Thread(new Producer(blockingQueue, "temp.dat"));
        producer.start();
        // 创建消费者线程
        Thread consumer = new Thread(new Consumer(blockingQueue));
        consumer.start();
    }
}

生产者:

public class Producer implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private String fileName;
    private static final String FINIDHED = "EOF";

    public Producer(BlockingQueue<String> blockingQueue, String fileName)  {
        this.blockingQueue = blockingQueue;
        this.fileName = fileName;
    }

    @Override
    public void run() {
        try {
            BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
            String line;
            while ((line = reader.readLine()) != null) {
                blockingQueue.put(line);
            }
            // 结束标志
            blockingQueue.put(FINIDHED);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者:

public class Consumer implements Runnable {
    private BlockingQueue<String> blockingQueue;
    private static final String FINIDHED = "EOF";

    public Consumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        String line;
        String[] arrStr;
        int ret;
        try {
            while (!(line = blockingQueue.take()).equals(FINIDHED)) {
                // 消费
                arrStr = line.split("\t");
                if (arrStr.length != 2) {
                    continue;
                }
                ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
                System.out.println(ret);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

生产者-消费者模型可以很容易地将生产和消费进行解耦,优化系统整体结构,并且由于存在缓冲区,可以缓解两端性能不匹配的问题。

2、BlockingQueue的不足

上述使用了ArrayBlockingQueue,通过查看其实现,完全是使用锁和阻塞等待实现线程同步。在高并发场景下,性能不是很优越。

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

但是,ConcurrentLinkedQueue却是一个高性能队列,这是因为其实现使用了无锁的CAS操作。

3、Disruptor初体验

Disruptor是由LMAX公司开发的一款高效无锁内存队列。使用无锁方式实现了一个环形队列代替线性队列。相对于普通的线性队列,环形队列不需要维护头尾两个指针,只需维护一个当前位置就可以完成出入队操作。受限于环形结构,队列的大小只能初始化时指定,不能动态扩展。

如下图所示,Disruptor的实现为一个循环队列,ringbuffer拥有一个序号(Seq),这个序号指向数组中下一个可用的元素。

Disruptor循环队列

随着不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到超过这个环。


Disruptor循环队列

Disruptor要求数组大小设置为2的N次方。这样可以通过Seq & (QueueSize - 1) 直接获取,其效率要比取模快得多。这是因为(Queue - 1)的二进制为全1等形式。例如,上图中QueueSize大小为8,Seq为10,则只需要计算二进制1010 & 0111 = 2,可直接得到index=2位置的元素。

在RingBuffer中,生产者向数组中写入数据,生产者写入数据时,使用CAS操作。消费者从中读取数据时,为防止多个消费者同时处理一个数据,也使用CAS操作进行数据保护。
这种固定大小的RingBuffer还有一个好处是,可以内存复用。不会有新空间需要分配或者旧的空间回收,当数组填充满后,再写入数据会将数据覆盖。

4、Disruptor小试牛刀

同样地,使用Disruptor处理第一节中的生产者消费者的案例。

4.1 添加Maven依赖
<dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.2</version>
</dependency>
4.2 定义事件对象

由于我们只需要将文件中的数据行读出,然后进行计算。因此,定义FileData.class来保存文件行。

public class FileData {
    private String line;

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }
}
4.3 定义工厂类

用于产生FileData的工厂类,会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例。

public class DisruptorFactory implements EventFactory<FileData> {

    public FileData newInstance() {
        return new FileData();
    }
}
4.4 定义消费者

消费者的作用是读取数据并进行处理。数据的读取已经由Disruptor封装,onEvent()方法为Disruptor框架的回调方法。只需要进行简单的数据处理即可。

public class DisruptorConsumer implements WorkHandler<FileData> {
    private static final String FINIDHED = "EOF";

    @Override
    public void onEvent(FileData event) throws Exception {
       String line = event.getLine();
        if (line.equals(FINIDHED)) {
            return;
        }
        // 消费
        String[] arrStr = line.split("\t");
        if (arrStr.length != 2) {
            return;
        }
        int ret = Integer.parseInt(arrStr[0]) + Integer.parseInt(arrStr[1]);
        System.out.println(ret);
    }
}
4.5 定义生产者

生产者需要一个Ringbuffer的引用。其中pushData()方法是将生产的数据写入到RingBuffer中。具体的过程是,首先通过next()方法得到下一个可用的序列号;取得下一个可用的FileData,并设置该对象的值;最后,进行数据发布,这个FileData对象会传递给消费者。

public class DisruptorProducer {
    private static final String FINIDHED = "EOF";
    private final RingBuffer<FileData> ringBuffer;

    public DisruptorProducer(RingBuffer<FileData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(String line) {
        long seq = ringBuffer.next();
        try {
            FileData event = ringBuffer.get(seq);   // 获取可用位置
            event.setLine(line);                    // 填充可用位置
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ringBuffer.publish(seq);        // 通知消费者
        }
    }

    public void read(String fileName) {
        try {
            BufferedReader reader = new BufferedReader(new FileReader(new File(fileName)));
            String line;
            while ((line = reader.readLine()) != null) {
                // 生产数据
                pushData(line);
            }
            // 结束标志
            pushData(FINIDHED);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
4.6 定义Main函数

最后需要一个DisruptorMain()将上述的数据、生产者和消费者进行整合。

public class DisruptorMain {
    public static void main(String[] args) {
        DisruptorFactory factory = new DisruptorFactory();          // 工厂
        ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
        int BUFFER_SIZE = 16;   // 必须为2的幂指数

        // 初始化Disruptor
        Disruptor<FileData> disruptor = new Disruptor<>(factory,
                BUFFER_SIZE,
                executor,
                ProducerType.MULTI,         // Create a RingBuffer supporting multiple event publishers to the one RingBuffer
                new BlockingWaitStrategy()  // 默认阻塞策略
                );
        // 启动消费者
        disruptor.handleEventsWithWorkerPool(new DisruptorConsumer(),
                new DisruptorConsumer()
        );
        disruptor.start();
        // 启动生产者
        RingBuffer<FileData> ringBuffer = disruptor.getRingBuffer();
        DisruptorProducer producer = new DisruptorProducer(ringBuffer);
        producer.read("temp.dat");

        // 关闭
        disruptor.shutdown();
        executor.shutdown();
    }
}

5、Disruptor策略

Disruptor生产者和消费者之间是通过什么策略进行同步呢?Disruptor提供了如下几种策略:

  • BlockingWaitStrategy:默认等待策略。和BlockingQueue的实现很类似,通过使用锁和条件(Condition)进行线程同步和唤醒。此策略对于线程切换来说,最节约CPU资源,但在高并发场景下性能有限。
  • SleepingWaitStrategy:CPU友好型策略。会在循环中不断等待数据。首先进行自旋等待,若不成功,则使用Thread.yield()让出CPU,并使用LockSupport.parkNanos(1)进行线程睡眠。所以,此策略数据处理数据可能会有较高的延迟,适合用于对延迟不敏感的场景。优点是对生产者线程影响小,典型应用场景是异步日志。
  • YieldingWaitStrategy:低延时策略。消费者线程会不断循环监控RingBuffer的变化,在循环内部使用Thread.yield()让出CPU给其他线程。
  • BusySpinWaitStrategy:死循环策略。消费者线程会尽最大可能监控缓冲区的变化,会占用所有CPU资源。

6、Disruptor解决CPU Cache伪共享问题

为了解决CPU和内存速度不匹配的问题,CPU中有多个高速缓存Cache。在Cache中,读写数据的基本单位是缓存行,缓存行是内存复制到缓存的最小单位。

伪共享问题

若两个变量放在同一个Cache Line中,在多线程情况下,可能会相互影响彼此的性能。如上图所示,CPU1上的线程更新了变量X,则CPU上的缓存行会失效,同一行的Y即使没有更新也会失效,导致Cache无法命中。
同样地,若CPU2上的线程更新了Y,则导致CPU1上的缓存行又失效。如果CPU经常不能命中缓存,则系统的吞吐量则会下降。这就是伪共享问题

解决伪共享问题

解决伪共享问题,可以在变量的前后都占据一定的填充位置,尽量让变量占用一个完整的缓存行。如上图中,CPU1上的线程更新了X,则CPU2上的Y则不会失效。同样地,CPU2上的线程更新了Y,则CPU1的不会失效。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

/**
 * <p>Concurrent sequence class used for tracking the progress of
 * the ring buffer and event processors.  Support a number
 * of concurrent operations including CAS and order writes.
 *
 * <p>Also attempts to be more efficient with regards to false
 * sharing by adding padding around the volatile field.
 */
public class Sequence extends RhsPadding
{
    static final long INITIAL_VALUE = -1L;
    private static final Unsafe UNSAFE;
    private static final long VALUE_OFFSET;

    static
    {
        UNSAFE = Util.getUnsafe();
        try
        {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
        }
        catch (final Exception e)
        {
            throw new RuntimeException(e);
        }
    }
... ...
}

Sequence的实现中,主要使用的是Value,但通过LhsPaddingRhsPadding在Value的前后填充了一些空间,使Value无冲突的存在于缓存行中。

参考
http://ifeve.com/dissecting-disruptor-whats-so-special/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容

  • 本文是笔者在研究Disruptor过程中翻译的Disruptor1.0论文精选,中间穿插了一些感想和说明,均以“译...
    coder_jerry阅读 5,143评论 3 52
  • Java-Review-Note——4.多线程 标签: JavaStudy PS:本来是分开三篇的,后来想想还是整...
    coder_pig阅读 1,629评论 2 17
  • 距离是什么? 你在的地方下雪了, 而我还在过夏天。 幅员辽阔的中国, 终究隔开了你我。 渠艺 2016.10.26
    渠六亿阅读 193评论 0 3
  • 首先登陆Navicat官网下载Linux版本: https://www.navicat.com.cn/downlo...
    吕志豪阅读 894评论 0 0
  • 我写的东西,你最好别看。因为我知道,我写得不好。可是我需要,我需要表达。负能量退散,正能量快来。写得不好是,因为太...
    夜深月明阅读 149评论 0 0