Java并发包工具

一、CountDownLatch用法

CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

3个最重要的方法:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { }; 
 //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; 
//将count值减1
public void countDown() { };  

example1

package com.yincb.until;

import com.sun.xml.internal.stream.util.ThreadLocalBufferAllocator;

import java.util.Random;
import java.util.concurrent.*;

public class CountDownLatchExample1 {

    private static Random random = new Random(System.currentTimeMillis());
    private static ExecutorService executor = Executors.newFixedThreadPool(2);
    private static CountDownLatch latch = new CountDownLatch(10);

    public static void main(String[] args) throws InterruptedException {
        //step 1
        int[] data = query();
        //step 2
        for (int i = 0; i < data.length; i++) {
            executor.execute(new SimpleRunnble(data,i,latch));
        }
        // step3
        latch.await();
        System.out.println("all of finish.");
        executor.shutdown();
        /*executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);
        System.out.println("all of finish.");*/
    }

    static class SimpleRunnble implements Runnable{
        private final int[] data;
        private final int index;
        private final CountDownLatch latch;
        SimpleRunnble(int[] data, int index,CountDownLatch latch) {
            this.data = data;
            this.index = index;
            this.latch=latch;
        }


        @Override
        public void run() {
            try {
                Thread.sleep(random.nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int value = data[index];
            if(value%2 == 0){
                data[index] = value*2;
            }else {
                data[index] = value*10;
            }
            System.out.println(Thread.currentThread().getName()+" finish.");
            latch.countDown();
        }
    }
    private static int[] query(){
        return new int[]{1,2,3,4,5,6,7,8,9,10};
    }
}

运行结果:
pool-1-thread-1 finish.
pool-1-thread-2 finish.
pool-1-thread-1 finish.
pool-1-thread-2 finish.
pool-1-thread-1 finish.
pool-1-thread-2 finish.
pool-1-thread-1 finish.
pool-1-thread-2 finish.
pool-1-thread-2 finish.
pool-1-thread-1 finish.
all of finish.

example2

public class CountDownLatchExample2 {
    public static void main(String[] args) throws InterruptedException {

        final CountDownLatch latch = new CountDownLatch(1);

        new Thread(){
            @Override
            public void run() {
                System.out.println("initial do thing.");
                try {
                    Thread.sleep(1000);
                    latch.await();
                    System.out.println("doing thing");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                System.out.println("sync prepare data.");
                try {
                    Thread.sleep(1000);
                    System.out.println("prepare data done.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println("relase");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();


        Thread.currentThread().join();
    }
}

运行结果:
initial do thing.
sync prepare data.
prepare data done.
relase
doing thing

二、CyclicBarrier用法

它可以对一组线程进行协同。同一个CyclicBarrier下的所有线程一旦到达了拦杆处,它就必须等待,只有等到所有线程都到达了拦杆处处,才能继续运行。 CyclicBarrier是一个同步辅助器。 通俗点讲就是:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活

example

public class CyclicBarrierExample1 {

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        // new Runnable() 回调
        CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("all other thread finished");
            }
        });

        new Thread(){
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(20);
                    System.out.println("t1 finished");
                    barrier.await();
                    System.out.println("t1 ==> other thread finished too");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println("t2 finished");
                    barrier.await();
                    System.out.println("t2 ==> other thread finished too");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }

}

运行结果:
t2 finished
t1 finished
all other thread finished
t1 ==> other thread finished too
t2 ==> other thread finished too

example

public class CyclicBarrierExample2 {

    public static void main(String[] args) throws InterruptedException {

        final CyclicBarrier barrier = new CyclicBarrier(2);

        new Thread(){
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        TimeUnit.SECONDS.sleep(4);
        System.out.println(barrier.getNumberWaiting());
        System.out.println(barrier.getParties());
        System.out.println(barrier.isBroken());
        TimeUnit.SECONDS.sleep(2);

        barrier.reset();

        System.out.println(barrier.getNumberWaiting());
        System.out.println(barrier.getParties());
        System.out.println(barrier.isBroken());

    }

}

运行结果:
1
2
false
0
2
false

example

public class CyclicBarrierExample3 {

    static class MyCountDownLatch extends CountDownLatch{
        private final Runnable runnable;
        public MyCountDownLatch(int count,Runnable runnable) {
            super(count);
            this.runnable = runnable;
        }

        @Override
        public void countDown() {
            super.countDown();
            if(getCount()==0){
                this.runnable.run();
            }
        }
    }

    public static void main(String[] args) {
        final MyCountDownLatch myCountDownLatch = new MyCountDownLatch(2, new Runnable() {
            @Override
            public void run() {
                System.out.println("all thread finished");
            }
        });

        new Thread(){
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                myCountDownLatch.countDown();
                System.out.println(Thread.currentThread().getName()+" finished");
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                myCountDownLatch.countDown();
                System.out.println(Thread.currentThread().getName()+" finished");
            }
        }.start();

    }

}

运行结果:
Thread-0 finished
all thread finished
Thread-1 finished

CountDownLatch 和 CyclicBarrier 的区别

1.CountDownLatch 不能reset,CyclicBarrier可以循环使用
2.工作线程之间互不关心,工作线程必须要等到同一个点才去执行

三、Exchanger用法

Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据。

首先我们来看看官方的api文档中的叙述:
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常。

Exchange 常用API

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

example

public class ExchangeTest1 {


    public static void main(String[] args) {

        final Exchanger<String> exchanger = new Exchanger<>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+ " start.");
                try {
                    String result = exchanger.exchange("thread A data.");
                    System.out.println(Thread.currentThread().getName()+" get "+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+ " end.");
            }
        },"A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+ " start.");
                try {
                    String result = exchanger.exchange("thread B data.");
                    System.out.println(Thread.currentThread().getName()+" get "+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+ " end.");
            }
        },"B").start();

    }

}

运行结果:
A start.
B start.
B get thread A data.
A get thread B data.
B end.
A end.

example

public class ExchangeTest2 {
    
    public static void main(String[] args) {

        final Exchanger<String> exchanger = new Exchanger<>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+ " start.");
                try {
                    String result = exchanger.exchange("thread A data.",10,TimeUnit.SECONDS);
                    System.out.println(Thread.currentThread().getName()+" get "+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+ " end.");
            }
        },"A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+ " start.");
                try {
                    TimeUnit.SECONDS.sleep(20);
                    String result = exchanger.exchange("thread B data.");
                    System.out.println(Thread.currentThread().getName()+" get "+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+ " end.");
            }
        },"B").start();

    }

}

运行结果:A start.
B start.
A end.
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.yincb.utils.exchange.ExchangeTest2$1.run(ExchangeTest2.java:19)
at java.lang.Thread.run(Thread.java:748)

example
交换的对象是源对象,同一个对象,不是源对象的副本

public class ExchangeTest4 {

    public static void main(String[] args) {

        final Exchanger<Object> exchanger = new Exchanger<>();

        new Thread(){
            @Override
            public void run() {
                Object aobj = new Object();
                System.out.println("A send obj="+aobj);
                try {
                    final Object result = exchanger.exchange(aobj);
                    System.out.println("A accept obj="+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                Object bobj = new Object();
                System.out.println("B send obj="+bobj);
                try {
                    Object result = exchanger.exchange(bobj);
                    System.out.println("B accept obj="+result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

运行结果:
A send obj=java.lang.Object@19230e0d
B send obj=java.lang.Object@1020132c
B accept obj=java.lang.Object@19230e0d
A accept obj=java.lang.Object@1020132c

四、Semaphore用法

百度百科:
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。
Semaphore分为单值和多值两种,前者只能被一个线程获得,后者可以被若干个线程获得。
以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。
在这个中,车位是公共资源,每辆车好比一个线程,看门人起的就是[信号量]的作用。
更进一步,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。在信号量上我们定义两种操作: Wait(等待) 和 Release(释放)。 当一个线程调用Wait(等待)操作时,它要么通过然后将信号量减一,要么一直等下去,直到信号量大于一或超时。Release(释放)实际上是在信号量上执行加操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为加操作实际上是释放了由信号量守护的资源。
在java中,还可以设置该信号量是否采用公平模式,如果以公平方式执行,则线程将会按到达的顺序(FIFO)执行,如果是非公平,则可以后请求的有可能排在队列的头部。
JDK中定义如下:
Semaphore(int permits, boolean fair)
创建具有给定的许可数和给定的公平设置的Semaphore。

example

public class SemaphoreExample {
    public static void main(String[] args) {
        final SemaphoreLock semaphoreLock = new SemaphoreLock();
        for (int i = 0; i < 2; i++) {
            new Thread(){
                @Override
                public void run() {
                    try {
                        try {
                            System.out.println(Thread.currentThread().getName()+" running.");
                            semaphoreLock.lock();
                            System.out.println(Thread.currentThread().getName()+" get semaphore lock.");
                            TimeUnit.SECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }finally {
                        semaphoreLock.unlock();
                    }
                    System.out.println(Thread.currentThread().getName()+" release semaphore lock.");
                }
            }.start();
        }
    }

    static class SemaphoreLock{
        private final Semaphore semaphore = new Semaphore(1);
        public void lock() throws InterruptedException{
            semaphore.acquire();
        }
        public void unlock(){
            semaphore.release();
        }
    }
    
}

run result:
Thread-0 running.
Thread-1 running.
Thread-0 get semaphore lock.
Thread-0 release semaphore lock.
Thread-1 get semaphore lock.
Thread-1 release semaphore lock.

example

public class SemaphoreExample2 {

    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(1);
        for (int i = 0; i < 2; i++) {
            new Thread(){
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+" in.");
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+" get semaphore");
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();
                    }
                    System.out.println(Thread.currentThread().getName()+" out.");
                }
            }.start();
        }
    }
}

运行结果:
Thread-0 in.
Thread-1 in.
Thread-0 get semaphore
Thread-0 out.
Thread-1 get semaphore
Thread-1 out.

常用API

  1. public void acquire() throws InterruptedException
    获取一个许可证,没有许可证就阻塞
  2. public void acquire(int permits) throws InterruptedException
    获取传入数量的许可证,许可证数量不够就阻塞
    3.public int availablePermits()
    获取可用许可证数量
    4.public final int getQueueLength()
    当前被block的线程数量,该值是一个评估值,不一定正确

五、ReentrantLock - 显示锁(排他锁)

在Java中通常实现锁有两种方式,一种是synchronized关键字,另一种是Lock。二者其实并没有什么必然联系,但是各有各的特点,在使用中可以进行取舍的使用。
首先我们先对比下两者
实现:
首先最大的不同:synchronized是基于JVM层面实现的,而Lock是基于JDK层面实现的。曾经反复的找过synchronized的实现,可惜最终无果。但Lock却是基于JDK实现的,我们可以通过阅读JDK的源码来理解Lock的实现。
使用:
对于使用者的直观体验上Lock是比较复杂的,需要lock和realse,如果忘记释放锁就会产生死锁的问题,所以,通常需要在finally中进行锁的释放。但是synchronized的使用十分简单,只需要对自己的方法或者关注的同步对象或类使用synchronized关键字即可。但是对于锁的粒度控制比较粗,同时对于实现一些锁的状态的转移比较困难。
example

public class ReentrantLockExample {

    private static final Lock lock = new ReentrantLock();
    // private static final Lock lock = new ReentrantLock(true); // true 公平锁 false 非公平锁

    public static void main(String[] args) throws InterruptedException {
        /*IntStream.range(0,2).forEach(i -> new Thread(){
            @Override
            public void run() {
                needLock();
            }
        }.start());*/


       /*Thread t1 = new Thread(()->testUnInterrupter());
       t1.start();
       TimeUnit.SECONDS.sleep(1);
       Thread t2 = new Thread(()->testUnInterrupter());
       t2.start();
       TimeUnit.SECONDS.sleep(1);

       t2.interrupt();*/

       Thread t1 = new Thread(()->testTryLock());
        t1.start();
        TimeUnit.SECONDS.sleep(1);

        Thread t2 = new Thread(()->testTryLock());
        t2.start();
        TimeUnit.SECONDS.sleep(1);


    }

    private static void testTryLock(){
        if(lock.tryLock()){
            try {
                Optional.of(Thread.currentThread().getName()+" get lock").ifPresent(System.out::println);
                while (true){}
            }finally {
                lock.unlock();
            }
        }else {
            Optional.of(Thread.currentThread().getName()+" not get lock").ifPresent(System.out::println);
        }
    }

    private static void testUnInterrupter(){
        try {
            // lock.lock(); // not interrupter
            lock.lockInterruptibly(); // able interrupter
            Optional.of(Thread.currentThread().getName()+" get lock").ifPresent(System.out::println);
            while (true){

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    private static void needLock(){
        try {
            lock.lock();
            Optional.of(Thread.currentThread().getName()+" get lock").ifPresent(System.out::println);
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void needLockBySync(){
        synchronized (ReentrantLockExample.class){
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

六、ReentrantReadWriteLock- 读写锁

ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。

public class ReadWriterLockExample {

    private final static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
    private final static Lock readLock = readWriteLock.readLock();
    private final static Lock writerLock = readWriteLock.writeLock();

    private static final List<Long> data = new ArrayList<>();

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(ReadWriterLockExample::read);
        t1.start();

        TimeUnit.SECONDS.sleep(1);

        Thread t2 = new Thread(ReadWriterLockExample::read);
        t2.start();

    }

    public static void writer(){
        try {
            writerLock.lock();
            data.add(System.currentTimeMillis());
        }finally {
            writerLock.unlock();
        }
    }

    public static void read(){
        try {
            readLock.lock();
            data.forEach(System.out::println);
            System.out.println(Thread.currentThread().getName()+"==============");
        }finally {
            readLock.unlock();
        }
    }

}

七、Condition用法

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

example

public class ConditionExample {

    private final static ReentrantLock lock = new ReentrantLock();
    private final static Condition condition = lock.newCondition();

    private static int data = 0;
    private static volatile boolean noUse = true;

    private static void buildData(String threadName){
        try {
            lock.lock(); // ===synchronized key word # monitor enter
            while (noUse){
                condition.await();  // === monitor.wait()
            }
            data++;
            Optional.of(threadName+"==>P:"+data).ifPresent(System.out::println);
            TimeUnit.SECONDS.sleep(1);
            noUse = true;
            //condition.signal(); // === monitor.notify()
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();  // === synchronized end #monitor end
        }
    }

    private static void useData(String threadName){
        try {
            lock.lock();
            while (!noUse){
                condition.await();
            }
            //TimeUnit.SECONDS.sleep(1);
            Optional.of(threadName+"==>C:"+data).ifPresent(System.out::println);
            noUse=false;
            // condition.signal();
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();

        }

    }

    public static void main(String[] args) {

        for (int i = 0; i < 2; i++)
            new Thread(){
                @Override
                public void run() {
                    for (;;){
                        buildData(Thread.currentThread().getName());
                    }
                }
            }.start();


        new Thread(){
            @Override
            public void run() {
                for (;;){
                    useData(Thread.currentThread().getName());
                }
            }
        }.start();
    }

}

example

public class ConditionExample3 {

    private static final Lock lock = new ReentrantLock();

    private static final Condition PRODUCER_LOCK = lock.newCondition();
    private static final Condition CONSUMER_LOCK = lock.newCondition();

    private static final LinkedList<Long> TIMESTAMP_POOL = new LinkedList<>();
    private final static int MAX_CAPACITY = 100;

    public static void main(String[] args) {
        IntStream.range(0,6).boxed().forEach(ConditionExample3::beginProducer);
        IntStream.range(0,13).boxed().forEach(ConditionExample3::beginConsumer);
    }

    private static void beginProducer(int i){
        new Thread(()->{
            for (;;){
                producer();
                sleep(2);
            }
        },"P-"+i).start();
    }

    private static void beginConsumer(int i){
        new Thread(()->{
            for (;;){
                consumer();
                sleep(3);
            }
        },"C-"+i).start();
    }

    private static void producer(){
        try {
            lock.lock();
            while (TIMESTAMP_POOL.size()>=MAX_CAPACITY){
                PRODUCER_LOCK.await();
            }
            Long value = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() +" P==>" + value);
            TIMESTAMP_POOL.addLast(System.currentTimeMillis());
            CONSUMER_LOCK.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void consumer(){
        try {
            lock.lock();
            while (TIMESTAMP_POOL.isEmpty()){
                CONSUMER_LOCK.await();
            }
            Long value = TIMESTAMP_POOL.removeFirst();
            System.out.println(Thread.currentThread().getName()+" C==>" +value);
            PRODUCER_LOCK.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void sleep(long seconds){
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

八、StampedLock作用

StampedLock是Java8引入的一种新的所机制,简单的理解,可以认为它是读写锁的一个改进版本,读写锁虽然分离了读和写的功能,使得读与读之间可以完全并发,但是读和写之间依然是冲突的,读锁会完全阻塞写锁,它使用的依然是悲观的锁策略.如果有大量的读线程,他也有可能引起写线程的饥饿
而StampedLock则提供了一种乐观的读策略,这种乐观策略的锁非常类似于无锁的操作,使得乐观锁完全不会阻塞写线程

example

public class StampedLockExample2 {


    private final static StampedLock lock = new StampedLock();
    private final static List<Long> data = new ArrayList<>();

    public static void main(String[] args) {

        final ExecutorService executorService = Executors.newFixedThreadPool(10);

        Runnable readTask = () ->{
            for (;;){
                read();
            }
        };

        Runnable writeTask = () -> {
            for (;;){
                write();
            }
        };

        executorService.submit(readTask);
        executorService.submit(readTask);
        executorService.submit(readTask);
        executorService.submit(readTask);
        executorService.submit(readTask);
        executorService.submit(readTask);
        executorService.submit(readTask);
        executorService.submit(readTask);

        executorService.submit(writeTask);
    }


    private static void read(){

        //乐观读
        long stamped = lock.tryOptimisticRead();
        if (lock.validate(stamped)){
            try {
                stamped = lock.readLock();
                System.out.println("stamped="+stamped);
                Optional.of(
                        data.stream().map(String::valueOf).collect(Collectors.joining("#","R-",""))
                ).ifPresent(System.out::println);
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlockRead(stamped);
            }
        }
        // 悲观读
        /*long stamped = -1;
        try {
            stamped = lock.readLock();
            Optional.of(
                    data.stream().map(String::valueOf).collect(Collectors.joining("#","R-",""))
            ).ifPresent(System.out::println);
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlockRead(stamped);
        }*/
    }

    private static void write(){
        long stamped = -1;
        try {
            stamped = lock.writeLock();
            data.add(System.currentTimeMillis());
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlockWrite(stamped);
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容