Java并发编程(五):生产者和消费者

1 概述

维基百科上有对“生产者和消费者模型”的名词解释:

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

那为什么要使用生产者消费者模型呢?考虑Web场景(尤其是秒杀活动),如果同一时间请求量很大,服务端处理速率远远比不上请求速率,这样请求方(即用户)就会明显感受到很大延迟,影响用户体验,甚至可能会导致数据库被“击穿”,整个系统崩溃等。而生产者消费者模型就可以解决这个问题,加入一个中间层(即缓冲区),请求先打到中间层,服务端根据自身的处理能力从中间层“取”出请求,并处理。可能你要问了,这样干是能防止数据库被“击穿”等问题,但服务端处理能力没变,请求时间还是会比较长是吧?对,确实是这样,但对于用户来说,关键是请求开始到获得响应这段时间的大小。所以,只要请求发送到中间层成功,那么就可以给用户响应了(例如“您已成功参加活动,请到个人中心参看详情”),至于什么时候“真正”处理请求,这就要看服务端的处理能力了。

可见,生产者消费者模型能解决并发环境下,请求方和处理方处理速率不匹配的问题,同时还将请求发和处理方“解耦”了,即请求方(生产者)不需要关心处理方(消费者)如何处理数据,只需要将数据“扔”到中间层缓冲区即可。

下面我将介绍几种生产者和消费者模型的实现方案:

  • 使用wait()和notify()或者notifyAll()来实现
  • 使用阻塞队列BlockingQueue来实现
  • 使用信号量Semaphore来实现

2 使用wait()和notify()或者notifyAll()来实现

这是最基础的实现方案,当缓冲区慢或者空的时候,线程调用wait()方法进入阻塞等待状态,当生产者生产了产品(数据)或者消费者消费了产品(数据)之后,调用notify()或者notifyAll()唤醒线程,然后继续进行生产或者消费。

下面是一个示例:

//生产者
public class Producer {

    //缓冲区
    private final int[] values;

    //索引
    private final AtomicInteger index;

    //锁对象
    private final Object lock;

    //产品
    private final AtomicInteger count;

    public Producer(int[] values, AtomicInteger index, Object lock) {
        this.values = values;
        this.index = index;
        this.lock = lock;
        count = new AtomicInteger(0);
    }

    public void produce() {
        //需要先加锁,否则在多线程竞争的环境下,会出现线程安全问题
        synchronized (lock) {
            //判断当前索引是否等于缓冲区大小
            if (index.get() == values.length) {
                //如果是,就说明缓冲区满了,应该调用wait()方法进入等待阻塞状态
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //否则,就将产品放入到缓冲区中
            values[index.getAndIncrement()] = count.incrementAndGet();
            System.out.println("produce produce value : " + count.get());
            //如果之前的状态是空的状态,就说明可能会有线程被阻塞了,那么就调用notifyAll()唤醒线程
            if (index.get() == 1)
                lock.notifyAll();
        }
    }
}

//消费者
public class Consumer {


    private final int[] values;

    private final AtomicInteger index;

    private final Object lock;

    public Consumer(int[] values, AtomicInteger index, Object lock) {
        this.values = values;
        this.index = index;
        this.lock = lock;
    }

    public void consume() {
        //和生产者一样,需要加锁
        synchronized (lock) {
            //判断缓冲区是否为空
            if (index.get() == 0) {
                try {
                    //如果是,那么就应该进入阻塞状态
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //否则消费该产品
            System.out.println("consumer consume : " + values[index.decrementAndGet()]);
            //如果之前缓冲器是满的状态,也许会有生产者被阻塞,那么应该尝试唤醒生产者
            if (index.get() == values.length-1)
                lock.notifyAll();
        }
    }
}

//测试类
public class Main {

    public static void main(String[] args) throws InterruptedException {
        int[] value = new int[3]; //缓冲区容量为10
        AtomicInteger index = new AtomicInteger(0); //索引从0开始
        final Object lock = new Object();
        Producer producer = new Producer(value, index, lock);
        Consumer consumer = new Consumer(value, index, lock);
        ExecutorService service = Executors.newFixedThreadPool(4);

        service.execute(() -> {
            for (int i = 0; i < 10; i++) {
                producer.produce();
            }
        });

        service.execute(() -> {
            for (int i = 0; i < 10; i++) {
                consumer.consume();
            }
        });

        service.shutdown();
        service.awaitTermination(1000, TimeUnit.SECONDS);
    }
}

输出如下所示(由于线程执行顺序问题,可能会不太一样):

produce produce value : 1
produce produce value : 2
produce produce value : 3
consumer consume : 3
consumer consume : 2
consumer consume : 1
produce produce value : 4
produce produce value : 5
produce produce value : 6
consumer consume : 6
consumer consume : 5
consumer consume : 4
produce produce value : 7
produce produce value : 8
produce produce value : 9
consumer consume : 9
consumer consume : 8
consumer consume : 7
produce produce value : 10
consumer consume : 10

可以修改几个参数多试几次,只要不出现“程序永远不会停止”的情况就表示这样的实现应该没什么问题了。

3 使用阻塞队列BlockingQueue来实现

Java5添加了阻塞队列BlockingQueue接口以及几个实现,下图是该接口中几个操作:

每一列是一组配对的方法。例如add和remove配对使用,offer()和poll()配对使用,在使用的时候最好不用弄错了,否则会导致一些难以发现的问题。

  • 插入操作,当阻塞队列满的时候,执行插入操作,线程会被阻塞(put()方法)或者返回false(offer()方法)或抛出异常(add()方法)。
  • 移除操作,当阻塞队列为空的时候,执行移除操作,线程会被阻塞(take()方法)或者返回false(poll()方法)或抛出异常(remove()方法)。

主要就是这两个操作,JDK中默认有几个BlockingQueue的实现,例如基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue,优先队列PriorityQueue,同步队列SynchronousQueue。

下面是一个使用LinkedBlockingQueue实现生产者消费者模型的示例:

//生产者
public class Producer {

    private final BlockingQueue<Integer> blockingQueue;

    private final AtomicInteger count = new AtomicInteger(0);

    public Producer(BlockingQueue<Integer> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public void produce() {
        try {
            //直接调用put方法把产品放入队列里即可,同样不需要外部加锁
            blockingQueue.put(count.incrementAndGet());
            System.out.println("producer produce value : " + count.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//生产者
public class Consumer {

    //阻塞队列
    private final BlockingQueue<Integer> blockingQueue;

    public Consumer(BlockingQueue<Integer> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public void consume() {
        try {
            //消费就直接调用take方法即可,不再需要在外部加锁了
            System.out.println("consumer consume value : " + blockingQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果和上面没什么差别,不多说了。

4 使用信号量Semaphore来实现

先简单说一下信号量Semaphore的简单使用。Semaphore是众多并发工具类之中的一员,有两个不同形式的构造函数,如下所示:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

参数permits表示可用许可证的数量,可是是负数,fair表示公平性。sync是AQS的实现类的一个实例,在此先不讨论AQS。除此之外,Semaphore还有两个重要且常用的操作:acquire(),release()。在Semaphore实例内部的许可证数量大于0的时候,acquire()才会成功(即不会阻塞),否则线程就会进入阻塞状态,成功之后会把许可证的数量减1。release()与其相反,调用release()没有什么要求,无论许可证数量是否大于0,都会成功调用(发生异常除外)并且把许可证数量加1(即相当于把许可证归还)。

下面是使用Semaphore来实现生产者消费者模型:

public class Producer {

    //缓冲区
    private final int[] values;

    //索引
    private final AtomicInteger index;

    //产品
    private final AtomicInteger count;

    //表示缓冲区未满
    private final Semaphore notFull;

    //表示缓冲区未空
    private final Semaphore notEmpty;

    //当做锁来用
    private final Semaphore mutex;

    public Producer(int[] values, AtomicInteger index, Semaphore notFull, Semaphore notEmpty, Semaphore mutex) {
        this.values = values;
        this.index = index;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
        this.mutex = mutex;
        this.count = new AtomicInteger(0);
    }

    public void produce()  {
        try {
            //缓冲区不满的话,会成功,否则会阻塞
            notFull.acquire();
            //加锁
            mutex.acquire();
            //操作
            values[index.getAndIncrement()] = count.incrementAndGet();
            System.out.println("produce produce value : " + count.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放锁,顺序不要弄错了!!
            mutex.release();
            //调用notEmpty的release,表示已经非空了
            notEmpty.release();
        }
    }
}

public class Consumer {

    //缓冲区
    private final int[] values;

    //索引
    private final AtomicInteger index;

    private final Semaphore notFull;

    private final Semaphore notEmpty;

    private final Semaphore mutex;

    public Consumer(int[] values, AtomicInteger index, Semaphore notFull, Semaphore notEmpty, Semaphore mutex) {
        this.values = values;
        this.index = index;
        this.notFull = notFull;
        this.notEmpty = notEmpty;
        this.mutex = mutex;
    }

    public void consume() {
        try {
            //缓冲区非空的话,会成功,否则会阻塞
            notEmpty.acquire();
            mutex.acquire();
            System.out.println("consumer consume : " + values[index.decrementAndGet()]);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            //调用notFull的release,表示已经不满了
            notFull.release();
        }
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Semaphore notFull = new Semaphore(3);
        Semaphore notEmpty = new Semaphore(0);
        Semaphore mutex = new Semaphore(1);
        int[] value = new int[3];
        AtomicInteger index = new AtomicInteger(0);

        Producer producer = new Producer(value, index, notFull, notEmpty, mutex);
        Consumer consumer = new Consumer(value, index, notFull, notEmpty, mutex);
        ExecutorService service = Executors.newFixedThreadPool(4);

        service.execute(() -> {
            for (int i = 0; i < 10; i++) {
                producer.produce();
            }
        });

        service.execute(() -> {
            for (int i = 0; i < 10; i++) {
                consumer.consume();
            }
        });

        service.shutdown();
        service.awaitTermination(1000, TimeUnit.SECONDS);
    }
}

解释已经在代码中写清楚了,现在可能会让人不明白的是:如何保证线程安全?Semaphore的实现是基于AQS的,本身就已经具备了同步功能,只有一个许可的信号量其实就可以当做一个锁来使用(前提是控制好release()的使用),在代码中的mutex就具备这样的功能。

5 小结

本文介绍了什么是“生产者消费者模型”以及为什么需要该模型,之后还介绍了在Java中实现这三种模型的方法。实际上,生产者消费者模型应用非常广,例如现在流行的消息中间件都实现了这种模型(但并不是只有这种模型),例如Kafka,RabbitMQ,ActiveMQ等,理解生产者消费者模型对理解这些消息中间件非常有帮助。

用Java实现还算是比较简单了,如果接触过操作系统,应该都感受过被C语言支配的恐惧!

6 参考资料

《Java并发编程实战》

Java实现生产者和消费者的5种方式

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容