第五章 并行模式与算法

5.1 探讨单例模式

一种对象创建模式,用于产生一个对象的具体实例,确保系统中一个类只产生一个实例。
好处:
1.对于频繁使用的对象,可以省略new操作花费的时间。
2.由于new操作次数减少,系统内存使用频率降低,减轻GC压力,缩短GC停顿时间。
单例的实现

public class Singleton{
  public static int STATUS = 1;
  private Singleton(){  //构造方法为私有方法
    System.out.println("Singleyon is create");
}

  private static Singleton instance = new Singleton();
  public static Singleton getInstance(){
    return instance;  
}
}

当在任何地方引用STATUS会导致instance实例被创建, 但类初始化只有一次,因此实例instance只会被创建一次。
下面通过锁机制使得只会在第一次使用instance时创建对象:

public class LazySingleton{
    private LazySingleton(){
      System.println("LazySingleton is create");
}
    private static LazySingleton instance = null;
    public static synchronized LazySingleton getInstance(){
      if(instance == null)
          new LazySingleton();
      return instance;
}
}

这种方法好处:充分实现了加载延迟,只在真正需要时创建对象。
坏处:并发环境下加锁,竞争激烈的场合对性能产生一定影响。

双重检查模式:

public class StaticSingleton(){
    private StaticSingleton(){
      System.out.println("StaticSingleton is create");
}
private static class SingletonHolder{
      private static StaticSingleton instance = new StaticSingleton();
}

private static StaticSingleton getInstance(){
      return SingletonHolder.instance;
}
}

5.2 不变模式

核心思想:一个对象一旦被创建,它的内部状态将永远不会改变。
使用场景:
1.对象创建后,其内部状态和数据不再发生任何变化。
2.对象需要被共享,被多线程频繁访问。

java中不变模式的实现:
1.去除setter方法及所有修改自身属性的方法。
2.将所有属性设置成私有,并用final标记,确保其不可修改。
3.确保自类可以重载修改它的行为。
4.有一个可以完整创建对象的构造函数。

5.3 生产者-消费者模式

实现类图

生产者线程实现:

package chapter5.PC;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;                             //内存缓存区
    private static AtomicInteger count = new AtomicInteger();        //总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }

    public void run() {
        PCData data = null;
        Random r = new Random();

        System.out.println("start producer id="+Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data+" is put into queue");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.err.println("failed to put data:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop() {
        isRunning = false;
    }
}

消费者实现:从BlockingQueue中取出PCData对象

package chapter5.PC;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;                             //内存缓存区
    private static AtomicInteger count = new AtomicInteger();        //总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<PCData> queue) {
        this.queue = queue;
    }

    public void run() {
        PCData data = null;
        Random r = new Random();

        System.out.println("start producer id="+Thread.currentThread().getId());
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data+" is put into queue");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.err.println("failed to put data:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop() {
        isRunning = false;
    }
}

PCData对象作为生产者和消费者之间的共享数据模型:

package chapter5.PC;

public final class PCData {
    private  final int intData;
    public PCData(int d){
        intData=d;
    }
    public PCData(String d){
        intData=Integer.valueOf(d);
    }
    public int getData(){
        return intData;
    }
    @Override
    public String toString(){
        return "data:"+intData;
    }
}

主函数中,创建3个生产者3个消费者,让他们协同工作。

package chapter5.PC;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class PCMain {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
        Thread.sleep(3000);
        service.shutdown();
    }
}
运行结果

5.4 高性能生产者-消费者:无锁的实现

5.4.1 无锁的缓存框架:Disruptor

能极大的提高系统性能。
使用环型队列,队列大小为2的整数倍。
PCData对象:

package chapter5.disruptor;

public class PCData
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
    public long get(){
        return value;
    }
}

消费者实现来自Disrupter框架的WorkHandler接口。onEvent()方法为框架的回调方法。
PCData对象的工厂类:

package chapter5.disruptor;

import com.lmax.disruptor.EventFactory;

public class PCDataFactory implements EventFactory<PCData>
{
    public PCData newInstance()
    {
        return new PCData();
    }
}

生产者类

package chapter5.disruptor;

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class Producer
{
    private final RingBuffer<PCData> ringBuffer;

    public Producer(RingBuffer<PCData> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  //得到下一个可用的序列号
        try
        {
            PCData event = ringBuffer.get(sequence); // 通过序列号得到对象实体
            event.set(bb.getLong(0)); 
        }
        finally
        {
            ringBuffer.publish(sequence); //发布数据
        }
    }
}

其中RingBuffer为环形缓冲区,pushData()将传入的ByteBuffer对象中数据提取出来,并装载到环形缓冲区。
主函数

package chapter5.disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class PCMain
{
    public static void main(String[] args) throws Exception
    {
        Executor executor = Executors.newCachedThreadPool();
        PCDataFactory factory = new PCDataFactory();
        // ringBuffer的大小必须是2的整数倍
        int bufferSize = 1024;
        Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,
                bufferSize,
                executor,
                ProducerType.MULTI,
                new BlockingWaitStrategy()
        );
       
        disruptor.handleEventsWithWorkerPool(
                new Consumer(),
                new Consumer(),
                new Consumer(),
                new Consumer());
        disruptor.start();

        RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.pushData(bb);
            Thread.sleep(100);
            System.out.println("add data "+l);
        }
    }
}
运行结果

5.4.3 Disrupter框架的4种策略
BlockingWaitStrategy:默认策略。使用锁和条件进行数据的监控和线程的唤醒,最节省CPU,但在高并发中性能下降明显。

SleepWaitStrategy:它会在循环中不断等待数据。会进行自旋等待,如果不成功,使用Thread.yield()让出CPU,并进行线程休眠,以确保不占用太多CPU数据。
这种策略延迟较高,但对生产者影响较小。适合场景为异步日志等等。

YieldingWaitStrategy:消费者线程会不断循环监控缓存区的变化。消费者线程变成了一个内部执行了Thread.yeild()方法的死循环。最后有多于消费者线程的CPU核数。这种策略用于低延迟的场合。

BusySpinWaitStrategy:消费者线程尽最大努力监控缓存区变化。会占用所有CPU资源,CPU的物理核数必须大于消费者线程核数,否则可能会无法工作。这种策略适用于对延迟要求非常高的系统。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容