一.概述
1.JUC是JDK1.5中提供的一套并发包及其子包。包含以下:
java.util.concurrent,
java.util.concurrent.atmoic,
java.util.concurrent.lock
2.JUC中包含了5套接口:BlockingQueue、ConcurrentMap、ExecutorService,Lock和Automic
二.BlockingQueue-阻塞式队列
1.特征:阻塞、FIFO(先进先出)
2.BlockingQueue不同于之前学习的Queue,不能进行扩容。即BlockingQueue在使用的时候指定的容量是多少就是多少
3.当队列已满时,试图放入元素的线程会被阻塞;当队列为空时,似乎获取元素的线程会被阻塞。
阻塞式队列不允许元素为空。
4.重要方法:
特征 | 抛出异常 | 返回值 | 阻塞 | 定时阻塞 |
---|---|---|---|---|
添加元素 | add- java.lang.IllegalStateException | offer-false | put | offer |
移除元素 | remove-java.util.NoSuchElementException | poll-null | take | poll |
5.常见的实现类:
- ArrayBlockingQueue-阻塞式顺序队列
a.底层依靠数组来存储数据
b.使用的时候需要指定容量
package blockingqueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//构建队列
ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(5);
queue.add("5");
queue.add("5");
queue.add("5");
queue.add("5");
queue.add("5");
//添加元素
//队列已满
//抛出异常,java.lang.IllegalStateException
//queue.add("a");
//返回false
boolean r = queue.offer("b");
System.out.println(r);
//产生阻塞
//queue.put("c");
//定时阻塞
boolean re = queue.offer("d", 5, TimeUnit.SECONDS);
System.out.println(re);
System.out.println(queue);
}
}
- LinkedBlockingQueue-阻塞式链式队列
a.底层依靠单向节点来存储数据
b.在使用的时候可以指定容量也可以不指定,如果指定了容量,则容量不可变。如果没有指定容量,则容量为Integer.Max_VALUE,即2^31-1;此时因为这个容量相对较大,一般认为队列是无限的。
package blockingqueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class LinkeBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue<>();
//队列为空
//抛出异常java.util.NoSuchElementException
//System.out.println(queue.remove());
//返回null
//System.out.println(queue.poll());
//产生阻塞
//System.out.println(queue.take());
//定时阻塞
System.out.println(queue.poll(5, TimeUnit.SECONDS));
}
}
- PriorityBlockingQueue-具有优先级的阻塞式队列
a.在使用的时候可以不指定容量。如果不指定,则默认初始容量为11-在容量不够,会进行扩容
b.底层依靠数组存储元素
c.PriorityBlockingQueue会对放入其中的元素进行排序,要求元素对应的类必须实现Comparable接口,覆盖compareTo方法
d.如果需要给队列单独指定比较规则,那么可以传入Comparator对象
e.迭代遍历不保证排序
package blockingqueue;
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<String> queue=new PriorityBlockingQueue<>();
queue.put("Amy");
queue.put("Maray");
queue.put("Peter");
queue.put("Bob");
for(int i=0;i<4;i++){
System.out.println(queue.take());
}
PriorityBlockingQueue<Studnet> studnets=new PriorityBlockingQueue<>();
studnets.put(new Studnet("Amy",98,18));
studnets.put(new Studnet("Bob",48,17));
studnets.put(new Studnet("Cindy",85,20));
studnets.put(new Studnet("Lue",56,22));
studnets.put(new Studnet("Maray",100,18));
for(int i=0;i<5;i++){
System.out.println(studnets.take());
}
System.out.println("-----------------------------------------------");
//需要给队列单独指定比较规则
PriorityBlockingQueue<Studnet> studnet2=new PriorityBlockingQueue<>(
5,(s1,s2)->s1.getAge()-s2.getAge());
studnet2.put(new Studnet("Amy",98,18));
studnet2.put(new Studnet("Bob",48,17));
studnet2.put(new Studnet("Cindy",85,20));
studnet2.put(new Studnet("Lue",56,22));
studnet2.put(new Studnet("Maray",100,18));
for(int i=0;i<5;i++){
System.out.println(studnet2.take());
}
}
}
class Studnet implements Comparable<Studnet>{
private String name;
private int age;
private int score;
public Studnet(String name, int score,int age) {
this.name = name;
this.score = score;
this.age=age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Studnet{" +
"name='" + name + '\'' +
", age=" + age +
", score=" + score +
'}';
}
//按照分数进行排序
//在这个方法指定比较规则
//升序 this-o
//降序 o-this
@Override
public int compareTo(Studnet o) {
return o.score-this.score;
}
}
- SynchronousQueue-同步队列
a.不需要指定容量,容量默认为1且只能为1 - 扩展:BlockingDeque-阻塞式双向队列-允许两端存放两端拿
三.ConcurrentMap-并发映射
1.ConcurrentMap是JDK1.5提供的一套用于应对高并发以及保证数据安全的映射机制。
2.ConcurrentMap包含了ConcurrentHashMap和ConcurrentNavigableMap
3.ConcurrentHashMap-并发哈希映射
1)底层是基于数组加链表来实现的。数组的每一个位置称之为桶,每一个桶中维系一个链表。
2)如果不指定,默认情况下,初始容量为16,默认加载因子是0.75,扩容的时候是在原来的基础上增加一倍。
3)ConcurrentHashMap的最大容量(最大桶数)是2^30。
4)无论指定初始容量是多少,那么经过计算,最终容量一定是2^n的形式。
5)从JDK1.8开始,ConcurrentHashMap引入了红黑树的机制。当ConcurrentHashMap桶中的元素数量达到8个,会将这个桶中的链表扭转成为一个红黑树;如果桶中的元素数量不足7个的时候,会将这个桶中的红黑树再扭转会链表。在ConcurrentHashMap中,使用红黑树的前提是容量>=64
6)转化红黑树的前提是容量为>=64的原因如下:
假设现在容量为16,当其中桶中的链表数量达到了8个,这时需要将其转化为红黑树,但是这时候concurrentHashMap进行了插入操作,这时同样也达到了扩容的条件,这时需要同步进行红黑树的转化以及rehash操作,产生了资源冲突。所以给定前提为容量>=64,这时扩容概率不会很大。
7)红黑树(Red-Black Tree)
- 本质上是一种自平衡二叉查找树
- 二叉查找树的特征:
a.左子树小于根,右子树大于根
b.没有相等的节点 - 特征:
a.所有节点非红即黑
b.根节点必须是黑节点
c.红节点的子节点必须是黑色的
d.最底层的叶子节点必须是黑色的空节点
e.从根节点到任意一个叶子节点经过的路径的黑色节点个数一致,即黑节点高度相同
f.新添的节点颜色必须是红色的 - 红黑树的修正-前提:父子节点为红
a.叔父节点为红,那么将父节点和叔父节点涂黑,祖父节点涂红
b.叔父节点为黑,且当前节点为右子叶,则以当前节点为轴进行左旋
c.叔父节点为黑,且当前节点为左子叶,则以当前节点为轴进行右旋 - 在红黑树中,每添加一个元素,都需要考虑这棵树是否需要修正
-
红黑树的查询时间复杂度为O(log n)
8)ConcurrentHashMap是一个异步线程安全的映射-支持并发。不同于Hashtable,ConcurrentHashMap采用了分段/桶锁机制来保证线程安全。----宏观同步,微观异步(映射异步,桶同步)
HashMap:异步线程不安全
Hashtable:同步线程安全-凡是对外提供的方法都是同步方法,静态方法锁对象是当前类的字节码对象,非静态方法的锁对象是this
9)线程在使用锁的时候,会产生非常大的开销(线程状态切换、线程的上下文调度、CPU资源的切换等)
因此在JDK1.8中,引入了一套无锁算法CAS(Compare And Swap 比较和交换)-CAS过程中涉及到线程的重新调度问题,所以CAS需要结合具体的CPU内核架构实现。目前市面上几乎所有的CPU内核都是支持CAS的。Java中的CAS底层是依靠C语言实现的。
10)ConcurrentHashMap的用法是和HashMap一致。
4.ConcurrentNavigableMap-并发导航映射
1)ConcurrentNavigableMap提供了用于截取子映射的方法--headMap、tailMap、subMap
package concurrentmap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConcurrentNavigableMapDemo {
public static void main(String[] args) {
//实现类ConcurrentSkipListMap-并发跳跃表映射
ConcurrentNavigableMap<String,Integer> map=new ConcurrentSkipListMap<>();
map.put("Amy",98);
map.put("Bob",78);
map.put("Jack",75);
map.put("Rose",88);
map.put("Tony",45);
System.out.println(map);
//从头开始截取到指定位置
System.out.println(map.headMap("Jack"));
//从指定位置截取到尾部
System.out.println(map.tailMap("Bob"));
//截取指定范围的数据
System.out.println(map.subMap("Bob","Rose"));
}
}
2)ConcurrentNavigableMap本身是一个接口,在JDK中提供了唯一的实现类ConcurrentSkipListMap-并发跳跃表映射--底层是基于跳跃表实现的
3)跳跃表:
原理参考连接:https://blog.csdn.net/qpzkobe/article/details/80056807
- 针对有序列表来使用
- 适合于读多写少的场景
- 跳跃表可以进行多层提取,但是最后一层的元素个数不能少于2个
- 典型的“以空间换时间”的产物
- 当新增元素的时候,这个元素是否要提取到上层跳跃表中遵循“抛硬币”原则
- 跳跃表的时间复杂度为O(log n),空间复杂度为O(n)
四.ExecutorService-执行器服务
1.本质上是一个线程池。意义:减少线程的创建和销毁,减少服务器资源的浪费,做到线程的复用
2.线程池在刚定义的时候是空的,没有任何线程。
3.如果接收到一个请求,线程池中就会创建一个线程(core-thread -核心线程)用于处理这个请求
4.核心线程用完之后不会销毁而是会去等待下一个请求。
5.在定义线程池的时候需要去给定核心线程的数量。
6.在核心线程达到指定数量之前,每次来的请求都会触发创建一个新的核心线程。
7.如果核心线程被全部占用,那么后来的线程将会放到工作队列(work queue)中临时存储。工作队列本质上是一个阻塞式队列
8.如果工作队列被全部占用,那么后来的请求会被交给一个临时线程(temproary thread)来处理
9.在定义线程池的时候需要给定临时线程的数量
10.临时线程在处理完请求之后,会存活指定的一段时间。如果在这段时间内接受到新的请求,那么临时线程会继续处理新的请求而暂时不会被销毁;如果超过这段时间临时线程没有接收到新的请求,那么这个临时线程就会被销毁。
11.如果临时线程被全部占用,那么后来的请求会被交给拒绝执行处理器(RejectedExecutionHandler)来进行拒绝处理。
12.代码:
submit和execute的区别:
execute()用于提交Runnable线程
submit()既可以提交Runnable线程也可以提交Callable线程
package executorservice;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
public static void main(String[] args) {
//构建一个线程池
/**
*int corePoolSize---核心线程数量
*int maximumPoolSize---最大线程数量=核心线程数+临时线程数
*long keepAliveTime---临时线程存活时间
*TimeUnit unit---时间单位
*BlockingQueue<Runnable> workQueue---工作队列
*RejectedExecutionHandler handler---拒绝执行处理器--如果有具体的拒绝流程,需要覆盖这个接口
*/
ExecutorService es=new ThreadPoolExecutor(5,
10,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
//实际过程中,会有一套明确的拒绝流程
(r,e)-> System.out.println("拒绝执行线程")
);
//new Thread(new ExecutorThread()).start();
//可以通过线程池来执行这个线程
/**
* submit和execute的区别:
* execute()用于提交Runnable线程
* submit()既可以提交Runnable线程也可以提交Callable线程
*/
//es.execute(new ExecutorThread());
es.submit(new Thread());
//关闭线程池
es.shutdown();
}
}
class ExecutorThread implements Runnable{
@Override
public void run() {
System.out.println("hello");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
13.Callable<T>
1)Callable是JDK1.5中提供的一套用于定义线程的方式,通过泛型来定义返回值类型
2)创建Callable线程的两种方式:
package executorservice;
import java.util.concurrent.*;
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//方式一:将Callable包装成Runnable,通过Thread来启动
//Callable->FutureTask->RunnableFuture->Runnable
FutureTask<String> f=new FutureTask<>(new CallableThread());
new Thread(f).start();
//获取指定结果
System.out.println(f.get());
//方式二:通过线程池来启动Callable线程
ExecutorService es=new ThreadPoolExecutor(5,10,5,TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5));
Future<String> f1 = es.submit(new CallableThread());
System.out.println(f1.get());
es.shutdown();
}
}
//泛型定义的是返回值类型
class CallableThread implements Callable<String>{
@Override
public String call() throws Exception {
return "SUCCESS";
}
}
3)Runnable和Callable比较:
比较 | Runnable | Callable |
---|---|---|
返回值 | 没有返回值 | 通过泛型来定义返回值 |
启动方式 | 1.通过Thread直接启动 2.通过线程池的execute或者submit来启动 |
1.包装成Runnable之后通过Thread来启动 2.通过线程池的submit方法来启动 |
异常机制 | 不允许抛出异常,一旦出现异常需要立即捕获处理,就没有办法利用全局机制来进行处理 | 允许抛出异常,意味着可以选择用全局机制(例如Spring中的异常通知)来统一处理异常 |
14.预定义的线程池
package executorservice;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorServiceDemo2 {
public static void main(String[] args) {
//预定义的线程池
/**
* newCachedThreadPool特点:
* 1.没有核心线程,全部都是临时线程
* 2.临时线程的数量为Integer.MAX_VALUE,即2^31-1
* 一台服务器能够承载的线程数量远低于这个值
* 所以此时认为这个线程池能够处理无限多的请求
* 3.临时线程的存活时间是一分钟
* 4.工作队列是一个同步队列(容量为1)
*/
//大池子小队列
//适合于高并发的短任务场景,例如即时通信
//不适合于长任务场景
ExecutorService es= Executors.newCachedThreadPool();
/**
* newFixedThreadPool特点:
* 1.没有临时线程,全部都是核心线程
* 2.工作队列是一个阻塞式链式队列,且容量为Integer.MAX_VALUE,
* 此时认为这个线程池能够处理无限多的请求
*/
//小池子大队列
//适合于并发低的长任务场景,例如文件下载
//不适合高并发的短任务的场景
ExecutorService es1=Executors.newFixedThreadPool(5);
}
}
15.ScheduledExecutorService--定时调度执行器任务。能够起到定时调度的效果
package executorservice;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses= Executors.newScheduledThreadPool(5);
//延时执行
//ses.schedule(new ScheduleThread(),5, TimeUnit.SECONDS);
//每隔5秒执行一次
//从上次的开始来计算下一次的启动时间
//实际间隔时间=max(指定时间,线程执行时间)
//ses.scheduleAtFixedRate(new ScheduleThread(),0,5,TimeUnit.SECONDS);
//每隔5秒执行一次
//从下一次的结束来计算下一次启动时间
//实际间隔时间=指定时间+线程执行时间
ses.scheduleWithFixedDelay(new ScheduleThread(),0,5,TimeUnit.SECONDS);
}
}
class ScheduleThread implements Runnable{
@Override
public void run() {
System.out.println("hello");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
16.ForkJoinPool分叉合并池
- 分叉:将一个大的任务拆分成多个小的任务交给多个线程来执行
- 合并:将拆分出去的小的任务的计算结果来进行汇总
- 求1-100000000000L的和
package executorservice;
import java.util.concurrent.*;
public class ForkJoinPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start=System.currentTimeMillis();
//求1-100000000000L的和
//主函数所在的类默认是一个线程类-主线程
//一个线程只能落到一个CPU核上
//运行时间:39780
/*long sum=0;
for(long i=1;i<=100000000000L;i++){
sum+=i;
}
System.out.println(sum);*/
//运行时间:23613
ForkJoinPool pool=new ForkJoinPool();
Future<Long> f = pool.submit(new Sum(1, 100000000000L));
System.out.println(f.get());
pool.shutdown();
long end=System.currentTimeMillis();
System.out.println(end-start);
}
}
class Sum extends RecursiveTask<Long>{
private long start;
private long end;
public Sum(long start, long end) {
this.start = start;
this.end = end;
}
//分叉合并的逻辑就是覆盖在这个方法中
@Override
protected Long compute() {
//拆分,如果拆分出去的范围较大,那么继续拆分
//如果拆分出去的范围较小,那么将这个小的范围的数字进行求和
if(end-start<=10000){
long sum=0;
for(long i=start;i<=end;i++){
sum+=i;
}
return sum;
}else {
long mid=(start+end)/2;
Sum left=new Sum(start,mid);
Sum right=new Sum(mid+1,end);
//分叉
left.fork();
right.fork();
//合并
return left.join()+right.join();
}
}
}
- 在数据量比较小的时候,使用循环的效率反而比较高,数据量越大,分叉合并的效率越高
- 分叉合并通过大量的线程抢占CPU,从而能够有效地提高CPU的利用率,可能就会导致其他线程被挤占。因此在实际生产过程中,慎用分叉合并。如果需要使用分叉合并,放在相对空闲的时间来执行
- 在分叉合并中,当一个核上的任务执行完毕之后,这个和不会空闲下来,而是随机扫描一个核,从这个被扫描的核的任务队列尾端来“偷取”一个任务回来执行--“work-stealing”(工作窃取)策略
五.Lock-锁
1.Lock是JDK1.5提供的一套锁机制,在实际生产过程中更推荐使用Lock代替synchronized---相对而言,Lock比synchronized更加灵活。
package Lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
static int i=0;
public static void main(String[] args) throws InterruptedException {
Lock lock=new ReentrantLock();
new Thread(new Add(lock)).start();
new Thread(new Add(lock)).start();
//main所在的类是一个线程类-主线程
//这个线程在执行过程中需要启动两个Add的线程
//这两个Add线程在启动过程中,主线程会抢占CPU继续执行
//考虑:主线程即使抢占到CPU也需要阻塞
Thread.sleep(3000);
System.out.println(i);
}
}
class Add implements Runnable{
private Lock lock;
public Add(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
//加锁
lock.lock();
for (int i = 0; i < 10000; i++) {
LockDemo.i++;
}
//解锁
lock.unlock();
}
}
2.ReentrantLock-重入锁
a.重入锁:当锁资源被释放之后,这个锁资源可以再次被线程占用
b.非重入锁:当锁资源被释放之后,不能被再次使用--非重入锁更多的是在校验中使用
3.大部分排他锁和自旋锁
- 自旋锁也是排他锁
- 对于其他的陪他锁而言,当一个线程占用锁对象之后,其他的线程会陷入阻塞状态,持续等待。当锁资源被释放之后,被阻塞的线程需要被唤醒之后才能抢占,这个过程中就涉及到了线程的状态的变化
- 自旋锁的特点在于,当发现锁资源被占用之后,线程不会陷入阻塞,而是持续判断锁资源是否被释放
- 自旋锁因为没有状态的转化,所以效率相对要高一些;但是相对而言,自旋锁会持续占用CPU资源
4.ReadWriteLock-读写锁
- 读锁:允许多个人同时读,不允许写入--本质上是共享锁
- 写锁:只允许一个人写,不允许读--本质上是排他锁
//获取写锁
ReadWriteLock rw=new ReentrantReadWriteLock();
Lock lock=rw.writeLock();
5.公平策略和非公平策略
- 在资源有限的情况下,虽然理论上各个线程抢占的几率相等,但是实际上各个线程的抢占次数并不相等,这种现象称之为非公平策略
-
在公平策略的前提下,各个线程并不能直接抢占资源,而是需要抢占入队顺序。此时,各个线程的执行次数大致是相等的
- 公平策略需要涉及到大量线程调度的问题,所以相对而言,非公平策略的效率更高。
- synchronized、Lock默认是非公平的
//非公平的
ReadWriteLock rw=new ReentrantReadWriteLock(false);
//公平的
ReadWriteLock rw=new ReentrantReadWriteLock(true);
6.其他
- CountDownLatch:闭锁/线程递减锁。对线程进行计数,在计数归零之前,线程会陷入阻塞;直到计数归零之后,会自动放开阻塞-上一组线程结束需要开启下一组线程
package Lock;
import java.util.concurrent.CountDownLatch;
/**
* 案例:考试
* 考官和考生到达考场之后,开始考试
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl=new CountDownLatch(7);
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Teacher(cdl)).start();
new Thread(new Teacher(cdl)).start();
//需要等上面的线程执行完毕之后才能继续执行下面的逻辑
//在上面的线程执行完毕之前,当前主线程需要阻塞
cdl.await();
System.out.println("开始考试!!!");
}
}
class Teacher implements Runnable{
private CountDownLatch cdl;
public Teacher(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
//模拟:考官走到考场时间
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("考官到达考场~");
cdl.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Student implements Runnable{
private CountDownLatch cdl;
public Student(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
//模拟:考生走到考场时间
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("考生到达考场~");
cdl.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- CyclicBarrier:栅栏。对线程进行计数。在计数归零之前,线程会陷入阻塞。直到线程计数归零,会自动放开阻塞。-所有线程到达同一个点之后再分别继续执行
package Lock;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 案例:跑步比赛
* 运动员先到起跑线,人齐之后,听到枪响之后在跑出去
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cb=new CyclicBarrier(6);
new Thread(new Runner(cb),"1号").start();
new Thread(new Runner(cb),"2号").start();
new Thread(new Runner(cb),"3号").start();
new Thread(new Runner(cb),"4号").start();
new Thread(new Runner(cb),"5号").start();
new Thread(new Runner(cb),"6号").start();
}
}
class Runner implements Runnable{
private CyclicBarrier cb;
public Runner(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
try {
//模拟运动员走到起跑线
Thread.sleep((long) (Math.random()*10000));
String name=Thread.currentThread().getName();
System.out.println(name+"运动员走到了起跑线");
//先到起跑线的人需要等待,直到人齐了,听到枪响之后再跑
//阻塞,减少计数--计数归零会自动放开阻塞
cb.await();
System.out.println(name+"跑了出去~");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
- Exchanger<V>:交换机,用于交换两个线程之间的信息。
package Lock;
import java.util.concurrent.Exchanger;
/**
* 案例:购物
* 一手交钱一手交货
*/
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> ex=new Exchanger<>();
new Thread(new Seller(ex)).start();
new Thread(new Consumer(ex)).start();
}
}
class Consumer implements Runnable{
private final Exchanger<String> ex;
public Consumer(Exchanger<String> ex) {
this.ex = ex;
}
@Override
public void run() {
String info="钱";
//挑好东西之后,需要付款,商家需要将商品交换给消费者
String msg= null;
try {
msg = ex.exchange(info);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者收到商家给的:"+msg);
}
}
class Seller implements Runnable{
private final Exchanger<String> ex;
public Seller(Exchanger<String> ex) {
this.ex = ex;
}
@Override
public void run() {
String info="商品";
//商家将商品交付给消费者之后,需要收到消费者的付款
try {
String msg = ex.exchange(info);
System.out.println("商家收到消费者给的:"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- Semaphore:信号量。在执行指定逻辑之前,线程需要先获取信号。当信号被全部获取完,那么后来的线程就会被阻塞;直到有信号被释放,那么被阻塞的线程才会获取信号执行逻辑-在实际生产过程中,信号量适用于限流的
package Lock;
import java.util.concurrent.Semaphore;
/**
* 案例:去餐馆吃饭
* 餐馆中的桌子的数量有限。如果所有的桌子被占用,那么后来的人就会被阻塞
*/
public class SemaphoreDemo {
public static void main(String[] args) {
//6个信号->6张桌子
Semaphore s=new Semaphore(6);
for (int i = 0; i < 10; i++) {
new Thread(new Eater(s)).start();
}
}
}
class Eater implements Runnable{
private Semaphore s;
public Eater(Semaphore s) {
this.s = s;
}
@Override
public void run() {
//占用一张桌子
//桌子->信号
try {
//获取一个信号
s.acquire();
System.out.println("来了一波客人,占用了一张桌子");
//模拟用餐时间
Thread.sleep((long) (Math.random()*10000));
System.out.println("客人用餐完毕离开,空出来一张桌子");
//一张桌子就空出来相当于一个信号被释放
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
六.Atomic操作-原子性操作
1.原子性操作实际上针对属性提供了大量的线程安全的方法。在jdk1.8中,采用了CAS+volatile机制来保证属性的线程安全。
2.volatile是Java中的关键字之一,是Java提供的一种轻量级的线程间的通信机制
- 保证线程的可见性。当共享资源发生变化的时候,其他线程能够立即感知到这种变化并且做出对应的操作,这个过程称之为可见性
- 不保证线程的原 子性。原子性指的是线程的执行过程不可分割。换而言之,就是线程的执行过程不会被打断不会被抢占。加锁实际上保证的线程的原子性。
- 禁止指令重排。指令重排指的是预先定义的顺序和指令的实际执行顺序执行不一致。 指令重排可能发生在每一步(java-class-System-CPU),但是注意,每一步过程中,发生指令重排的概率不足百万分之一。指令重排不能违背happen-before(先发生)原则-使用的变量必须先产生。在多线程的情况下,执行完全相同的代码可能会因为指令重排获取到不同的结果,这种现象称之为结果的二相性。