5.1 探讨单例模式
一种对象创建模式,用于产生一个对象的具体实例,确保系统中一个类只产生一个实例。
好处:
1.对于频繁使用的对象,可以省略new操作花费的时间。
2.由于new操作次数减少,系统内存使用频率降低,减轻GC压力,缩短GC停顿时间。
单例的实现
public class Singleton{
public static int STATUS = 1;
private Singleton(){ //构造方法为私有方法
System.out.println("Singleyon is create");
}
private static Singleton instance = new Singleton();
public static Singleton getInstance(){
return instance;
}
}
当在任何地方引用STATUS会导致instance实例被创建, 但类初始化只有一次,因此实例instance只会被创建一次。
下面通过锁机制使得只会在第一次使用instance时创建对象:
public class LazySingleton{
private LazySingleton(){
System.println("LazySingleton is create");
}
private static LazySingleton instance = null;
public static synchronized LazySingleton getInstance(){
if(instance == null)
new LazySingleton();
return instance;
}
}
这种方法好处:充分实现了加载延迟,只在真正需要时创建对象。
坏处:并发环境下加锁,竞争激烈的场合对性能产生一定影响。
双重检查模式:
public class StaticSingleton(){
private StaticSingleton(){
System.out.println("StaticSingleton is create");
}
private static class SingletonHolder{
private static StaticSingleton instance = new StaticSingleton();
}
private static StaticSingleton getInstance(){
return SingletonHolder.instance;
}
}
5.2 不变模式
核心思想:一个对象一旦被创建,它的内部状态将永远不会改变。
使用场景:
1.对象创建后,其内部状态和数据不再发生任何变化。
2.对象需要被共享,被多线程频繁访问。
java中不变模式的实现:
1.去除setter方法及所有修改自身属性的方法。
2.将所有属性设置成私有,并用final标记,确保其不可修改。
3.确保自类可以重载修改它的行为。
4.有一个可以完整创建对象的构造函数。
5.3 生产者-消费者模式
生产者线程实现:
package chapter5.PC;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue; //内存缓存区
private static AtomicInteger count = new AtomicInteger(); //总数,原子操作
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id="+Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data+" is put into queue");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.err.println("failed to put data:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
消费者实现:从BlockingQueue中取出PCData对象
package chapter5.PC;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue; //内存缓存区
private static AtomicInteger count = new AtomicInteger(); //总数,原子操作
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id="+Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data+" is put into queue");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.err.println("failed to put data:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
PCData对象作为生产者和消费者之间的共享数据模型:
package chapter5.PC;
public final class PCData {
private final int intData;
public PCData(int d){
intData=d;
}
public PCData(String d){
intData=Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+intData;
}
}
主函数中,创建3个生产者3个消费者,让他们协同工作。
package chapter5.PC;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class PCMain {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
5.4 高性能生产者-消费者:无锁的实现
5.4.1 无锁的缓存框架:Disruptor
能极大的提高系统性能。
使用环型队列,队列大小为2的整数倍。
PCData对象:
package chapter5.disruptor;
public class PCData
{
private long value;
public void set(long value)
{
this.value = value;
}
public long get(){
return value;
}
}
消费者实现来自Disrupter框架的WorkHandler接口。onEvent()方法为框架的回调方法。
PCData对象的工厂类:
package chapter5.disruptor;
import com.lmax.disruptor.EventFactory;
public class PCDataFactory implements EventFactory<PCData>
{
public PCData newInstance()
{
return new PCData();
}
}
生产者类
package chapter5.disruptor;
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
public class Producer
{
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); //得到下一个可用的序列号
try
{
PCData event = ringBuffer.get(sequence); // 通过序列号得到对象实体
event.set(bb.getLong(0));
}
finally
{
ringBuffer.publish(sequence); //发布数据
}
}
}
其中RingBuffer为环形缓冲区,pushData()将传入的ByteBuffer对象中数据提取出来,并装载到环形缓冲区。
主函数
package chapter5.disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class PCMain
{
public static void main(String[] args) throws Exception
{
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
// ringBuffer的大小必须是2的整数倍
int bufferSize = 1024;
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory,
bufferSize,
executor,
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWithWorkerPool(
new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
disruptor.start();
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.pushData(bb);
Thread.sleep(100);
System.out.println("add data "+l);
}
}
}
5.4.3 Disrupter框架的4种策略
BlockingWaitStrategy:默认策略。使用锁和条件进行数据的监控和线程的唤醒,最节省CPU,但在高并发中性能下降明显。
SleepWaitStrategy:它会在循环中不断等待数据。会进行自旋等待,如果不成功,使用Thread.yield()让出CPU,并进行线程休眠,以确保不占用太多CPU数据。
这种策略延迟较高,但对生产者影响较小。适合场景为异步日志等等。
YieldingWaitStrategy:消费者线程会不断循环监控缓存区的变化。消费者线程变成了一个内部执行了Thread.yeild()方法的死循环。最后有多于消费者线程的CPU核数。这种策略用于低延迟的场合。
BusySpinWaitStrategy:消费者线程尽最大努力监控缓存区变化。会占用所有CPU资源,CPU的物理核数必须大于消费者线程核数,否则可能会无法工作。这种策略适用于对延迟要求非常高的系统。