1、什么是JUC
JUC:就是我们Java原生的并发包,和一些常用的工具类!
java.util.concurrent、java.util.concurrent.atomic、java.util.concurrent.locks
2、线程基础知识回顾
使用@FunctionalInterface注解的Class都可以使用lambda表达式
一个进程可以包含多个线程,一个进程至少有一个线程! Java程序至少有两个线程: GC、Main
并发:多个线程操作同一个资源,交替执行的过程!
并行:多个线程同时执行!只有在多核CPU下才能完成!
所以我们使用多线程或者并发编程的目的:提高效率,让CPU一直工作,达到最高处理性能!
线程有 6 种状态,源码中有:Thread.State
public enum State {
// java能够创建线程吗? 不能!
// 新建
NEW,
// 运行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待
WAITING,
// 延时等待
TIMED_WAITING,
// 终止!
TERMINATED;
}
wait和sleep的区别:
1、wait是一个object类,sleep是一个单独的方法
正确的休眠方法:
TimeUnit.SECONDS.sleep(3);
2、sleep:抱着锁睡得,不会释放锁!wait 会释放锁!
3、wait 和 notify 是一组,一般在线程通信的时候使用!
4、sleep 就是一个单独的方法,在那里都可以用!
5、sleep 需要捕获异常!
3、synchronized锁
package com.coding.demo01;
// 传统的 Synchronized
// Synchronized 方法 和 Synchronized 块
/*
* 我们的学习是基于企业级的开发进行的;
* 1、架构:高内聚,低耦合
* 2、套路:线程操作资源类,资源类是单独的
*/
public class Demo01 {
public static void main(String[] args) throws InterruptedException {
// 1、新建资源类
Ticket ticket = new Ticket();
// 2、线程操纵资源类
new Thread(new Runnable() {
public void run() {
for (int i = 1; i <=40; i++) {
ticket.saleTicket();
}
}
},"A").start();
new Thread(new Runnable() {
public void run() {
for (int i = 1; i <=40; i++) {
ticket.saleTicket();
}
}
},"B").start();
new Thread(new Runnable() {
public void run() {
for (int i = 1; i <=40; i++) {
ticket.saleTicket();
}
}
},"C").start();
}
}
// 单独的资源类,属性和方法!
// 这样才能实现复用!
class Ticket{
private int number = 30;
// 同步锁,厕所 =>close=>
// synchronized 这是一个关键字
public synchronized void saleTicket(){
if (number>0){
System.out.println(Thread.currentThread().getName() + "卖出第"+(number--)+"票,还剩:"+number);
}
}
}
4、lock锁
package com.coding.demo01;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
* JUC之后的操作
* Lock锁 + lambda表达式!
*/
public class Demo02 {
public static void main(String[] args) {
// 1、新建资源类
Ticket2 ticket = new Ticket2();
// 2、线程操作资源类 , 所有的函数式接口都可以用 lambda表达式简化!
// lambda表达式 (参数)->{具体的代码}
new Thread(()->{for (int i = 1; i <= 40 ; i++) ticket.saleTicket();},"A").start();
new Thread(()->{for (int i = 1; i <= 40 ; i++) ticket.saleTicket();},"B").start();
new Thread(()->{for (int i = 1; i <= 40 ; i++) ticket.saleTicket();},"C").start();
}
}
// 依旧是一个资源类
class Ticket2{
// 使用Lock,它是一个对象
// ReentrantLock 可重入锁:回家:大门 (卧室门,厕所门...)
// ReentrantLock 默认是非公平锁!
// 非公平锁: 不公平 (插队,后面的线程可以插队)
// 公平锁: 公平(只能排队,后面的线程无法插队)
private Lock lock = new ReentrantLock();
private int number = 30;
public void saleTicket(){
lock.lock(); // 加锁
try {
// 业务代码
if (number>0){
System.out.println(Thread.currentThread().getName() + "卖出第"+(number--)+"票,还剩:"+number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 解锁
}
}
}
5、synchronized锁和lock锁的区别
1、Synchronized 是一个关键字、Lock 是一个对象
2、Synchronized 无法尝试获取锁,Lock 可以尝试获取锁,判断;
3、Synchronized 会自动释放锁(a线程执行完毕,b如果异常了,也会释放锁),lock锁是手动释放锁!如果你不释放就会死锁。
4、Synchronized (线程A(获得锁,如果阻塞),线程B(等待,一直等待);)lock,可以尝试获取锁,失败了之后就放弃
5、Synchronized 一定是非公平的,但是 Lock 锁可以是公平的,通过参数设置;
6、代码量特别大的时候,我们一般使用Lock实现精准控制,Synchronized 适合代码量比较小的同步问题;
6、生产者消费者
线程和线程之间本来是不能通信的,但是有时候我们需要线程之间可以协调操作:
Synchronized 普通版
package com.coding.demo01;
// Synchronized 版
/*
目的: 有两个线程:A B ,还有一个值初始为0,
实现两个线程交替执行,对该变量 + 1,-1;交替10次
*/
public class Demo03 {
public static void main(String[] args) {
Data data = new Data();
// +1
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
// -1
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
// 资源类
// 线程之间的通信: 判断 执行 通知
class Data{
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
if (number!=0){ // 判断是否需要等待
this.wait();
}
number++; // 执行
System.out.println(Thread.currentThread().getName()+"\t"+number);
// 通知
this.notifyAll(); //唤醒所有线程
}
// -1
public synchronized void decrement() throws InterruptedException {
if (number==0){ // 判断是否需要等待
this.wait();
}
number--; // 执行
System.out.println(Thread.currentThread().getName()+"\t"+number);
// 通知
this.notifyAll(); //唤醒所有线程
}
}
四条线程可以实现交替吗?不能,会产生虚假唤醒问题!
package com.coding.demo01;
// Synchronized 版
/*
目的: 有两个线程:A B ,还有一个值初始为0,
实现两个线程交替执行,对该变量 + 1,-1;交替10次
传统的 wait 和 notify方法不能实现精准唤醒通知!
*/
public class Demo03 {
public static void main(String[] args) {
Data data = new Data();
// +1
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
// -1
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 1; i <=10 ; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
// 资源类
// 线程之间的通信: 判断 执行 通知
class Data{
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
while (number!=0){ // 判断是否需要等待
this.wait();
}
number++; // 执行
System.out.println(Thread.currentThread().getName()+"\t"+number);
// 通知
this.notifyAll(); //唤醒所有线程
}
// -1
public synchronized void decrement() throws InterruptedException {
while (number==0){ // 判断是否需要等待
this.wait();
}
number--; // 执行
System.out.println(Thread.currentThread().getName()+"\t"+number);
// 通知
this.notifyAll(); //唤醒所有线程
}
}
package com.coding.demo01;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*
实现线程交替执行!
主要的实现目标:精准的唤醒线程!
三个线程:A B C
三个方法:A p5 B p10 C p15 依次循环
*/
public class Demo04 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 1; i <= 10; i++) {
try {
data.print5();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 1; i <= 10; i++) {
try {
data.print10();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 1; i <= 10; i++) {
try {
data.print15();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
}
}
// 资源类
class Data2{
private int number = 1; // 1A 2B 3C
private Lock lock = new ReentrantLock();
// 实现精准访问
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() throws InterruptedException {
lock.lock();
try {
// 判断
while (number!=1){
condition1.await();
}
// 执行
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知第二个线程干活!
number = 2;
condition2.signal(); // 唤醒
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 一定要解锁
}
}
public void print10() throws InterruptedException {
lock.lock();
try {
// 判断
while (number!=2){
condition2.await();
}
// 执行
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知3干活
number = 3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() throws InterruptedException {
lock.lock();
try {
// 判断
while (number!=3){
condition3.await();
}
// 执行
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 通知 1 干活
number = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
7、8道题彻底理解synchronized、static、普通方法对线程锁的影响
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
/*
1、标准的访问情况下,先执行 sendEmail 还是 sendSMS
答案:sendEmail
被 synchronized 修饰的方式,锁的对象是方法的调用者,所以说这里两个方法调用的对象是同一个
先调用的先执行!
*/
public class LockDemo01 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(2);
new Thread(()->{
phone.sendSMS();
},"B").start();
}
}
class Phone{
public synchronized void sendEmail(){
System.out.println("sendEmail");
}
public synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
/*
2、sendEmail休眠3秒后 ,先执行 sendEmail 还是 sendSMS
答案:sendEmail
被 synchronized 修饰的方式,锁的对象是方法的调用者,所以说这里两个方法调用的对象是同一个
先调用的先执行!
*/
public class LockDemo02 {
public static void main(String[] args) throws InterruptedException {
Phone2 phone = new Phone2();
new Thread(()->{
try {
phone.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(2);
new Thread(()->{
phone.sendSMS();
},"B").start();
}
}
class Phone2{
public synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("sendEmail");
}
public synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
/*
3、增加一个普通方法,请问先打印那个 sendEmail 还是 hello
答案:hello
新增加的这个方法没有 synchronized 修饰,不是同步方法,不受锁的影响!
*/
public class LockDemo03 {
public static void main(String[] args) throws InterruptedException {
Phone3 phone = new Phone3();
new Thread(()->{
try {
phone.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone.hello();
},"B").start();
}
}
class Phone3{
public synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("sendEmail");
}
// 没有 synchronized 没有 static 就是普通方式
public void hello(){
System.out.println("hello");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
/*
4、两个手机,请问先执行sendEmail 还是 sendSMS
答案:sendSMS
被 synchronized 修饰的方式,锁的对象是调用者;我们这里有两个调用者,两个方法在这里是两个锁
*/
public class LockDemo04 {
public static void main(String[] args) throws InterruptedException {
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(()->{
try {
phone1.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.sendSMS();
},"B").start();
}
}
class Phone4{
public synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("sendEmail");
}
public synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
//LockDemo05.Class 模板,只有一个 static
//new LockDemo05(),可以创建多个对象
/*
5、两个静态同步方法,同一个手机请问先执行sendEmail 还是 sendSMS
答案:sendEmail
只要方法被 static 修饰,锁的对象就是 Class模板对象,这个则全局唯一!所以说这里是同一个锁
并不是因为synchronized
*/
public class LockDemo05 {
public static void main(String[] args) throws InterruptedException {
Phone5 phone = new Phone5();
new Thread(()->{
try {
phone.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone.sendSMS();
},"B").start();
}
}
class Phone5{
public static synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("sendEmail");
}
public static synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
//LockDemo05.Class 模板,只有一个 static
//new LockDemo05(),可以创建多个对象
/*
6、两个静态同步方法,两个手机,请问先执行sendEmail 还是 sendSMS
答案:sendEmail
只要方法被 static 修饰,锁的对象就是 Class模板对象,这个则全局唯一!所以说这里是同一个锁
并不是因为synchronized
*/
public class LockDemo06 {
public static void main(String[] args) throws InterruptedException {
Phone6 phone = new Phone6();
Phone6 phone2 = new Phone6();
new Thread(()->{
try {
phone.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.sendSMS();
},"B").start();
}
}
class Phone6{
public static synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("sendEmail");
}
public static synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
//LockDemo05.Class 模板,只有一个 static
//new LockDemo05(),可以创建多个对象
/*
7、一个普通同步方法,一个静态同步方法,只有一个手机,请问先执行sendEmail 还是 sendSMS
答案:sendSMS
synchronized 锁的是这个调用的对象
static 锁的是这个类的Class模板
这里是两个锁!
*/
public class LockDemo07 {
public static void main(String[] args) throws InterruptedException {
Phone7 phone = new Phone7();
new Thread(()->{
try {
phone.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone.sendSMS();
},"B").start();
}
}
class Phone7{
public static synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("sendEmail");
}
public synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
package com.coding.lock8;
import java.util.concurrent.TimeUnit;
//LockDemo05.Class 模板,只有一个 static
//new LockDemo05(),可以创建多个对象
/*
7、一个普通同步方法,一个静态同步方法,两个手机,请问先执行sendEmail 还是 sendSMS
答案:sendSMS
synchronized 锁的是这个调用的对象
static 锁的是这个类的Class模板
这里是两个锁!
*/
public class LockDemo08 {
public static void main(String[] args) throws InterruptedException {
Phone8 phone = new Phone8();
Phone8 phone2 = new Phone8();
new Thread(()->{
try {
phone.sendEmail();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
//Thread.sleep(200);
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.sendSMS();
},"B").start();
}
}
class Phone8{
public static synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
System.out.println("sendEmail");
}
public synchronized void sendSMS(){
System.out.println("sendSMS");
}
}
1、new this 调用的这个对象,是一个具体的对象!
2、static class 唯一的一个模板!
在我们编写多线程程序得时候,只需要搞明白这个到底锁的是什么就不会出错了!
synchronized(Demo.class){
}
synchronized(this){
}
8、线程不安全的集合类和线程安全的集合类
//list 不安全
package com.coding.unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* 故障现象:ConcurrentModificationException 并发修改异常
* 导致原因:add方法没有锁!
* 解决方案:
* 1、List<String> list = new Vector<>(); //jdk1.0 就存在的!效率低
* 2、List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3、List<String> list = new CopyOnWriteArrayList<>();
*
* 什么是 CopyOnWrite; 写入是复制 (思想 COW)
* 多个调用者同时要相同的资源;这个有一个指针的概念。
* 读写分离的思想:
*/
public class UnSafeList {
public static void main(String[] args) {
// List<String> list = Arrays.asList("a", "b", "c");
// list.forEach(System.out::println);
// List<String> list = new ArrayList<>();
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,3));
System.out.println(list);
},String.valueOf(i)).start();
}
}
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
//set 不安全
package com.coding.unsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
// ConcurrentModificationException
public class UnSafeSet {
public static void main(String[] args) {
// HashSet 底层是什么 就是 HashMap
// add,就是 HashMap 的 key;
Set<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
// Set<String> set = new CopyOnWriteArraySet();
for (int i = 1; i <=30 ; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,3));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
//map不安全
package com.coding.unsafe;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
//ConcurrentModificationException
public class UnsafeMap {
public static void main(String[] args) {
// new HashMap<>() 工作中是这样用的吗? 不是
// 加载因子0.75f;,容量 16; 这两个值工作中不一定这样用!
// 优化性能!
// HashMap 底层数据结构,链表 + 红黑树
// = = = = = = =
// Map<String, String> map = new HashMap<>();
Map<String, String> map = new ConcurrentHashMap<>();
// 人生如程序,不是选择就是循环,时常的自我总结十分重要!
for (int i = 1; i <=30 ; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,3));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
7、读写锁(了解,使用较少)
package com.coding.rwlock;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/*
独占锁(写锁):一次只能被一个线程占有
共享锁(读锁):该锁可以被多个线程占有!
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
// 模拟线程
// 写
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(()->{
myCache.put(tempInt+"",tempInt+"");
},String.valueOf(i)).start();
}
// 读
for (int i = 1; i <= 5; i++) {
final int tempInt = i;
new Thread(()->{
myCache.get(tempInt+"");
},String.valueOf(i)).start();
}
}
}
// 读、写
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
// 读 : 可以被多个线程同时读
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取结果:"+o);
}
// 写 :应该是保证原子性 , 不应该被打扰
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入" + key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入ok" );
}
}
// 加锁操作: 读写锁
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
// 读写锁
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 读 : 可以被多个线程同时读
public void get(String key){
// 这些锁一定要匹配,否则就可能导致死锁!
readWriteLock.readLock().lock(); // 多个线程同时持有
try {
System.out.println(Thread.currentThread().getName()+"读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取结果:"+o);
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
// 写 :应该是保证原子性 , 不应该被打扰
public void put(String key,Object value){
readWriteLock.writeLock().lock(); // 只能被一个线程占用
try {
System.out.println(Thread.currentThread().getName()+"写入" + key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入ok" );
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
}
8、阻塞队列
队列 : FIFO ,先进先出,栈:Stack,后进先出
阻塞:什么情况下肯定会阻塞!
1、当队列是满的,如果继续添加元素就会阻塞
2、当队列是空的,如果继续取就会阻塞
如果多线程程序不关心唤醒,就使用阻塞队列。
阻塞队列是一个新东西吗?
List、Set 这些我们都学过,其实 BlockingQueue 和 这些是一样的;
BlockingQueue对应的4组方法
队列一般可以检测第一个元素是谁!
package com.coding.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingDemo {
public static void main(String[] args) throws InterruptedException {
// 参数,队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// java.lang.IllegalStateException: Queue full
blockingQueue.add("a");
blockingQueue.add("b");
blockingQueue.add("c");
System.out.println(blockingQueue.element());
//blockingQueue.add("d"); // 报错、抛弃不报错、一直等待、超时等待!
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("d",3L,TimeUnit.SECONDS)); // 尝试等待3秒,就会失败!返回false
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// System.out.println("准备放入第四个元素");
// blockingQueue.put("d"); // 队列满了,一直等,并且会阻塞!
System.out.println("========================");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take()); // 队列空了,一直等,并且会阻塞!
}
}
同步队列
SynchronousQueue , 只有一个容量!每一个put操作,就需要有一个 take操作!
package com.coding.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
// 同步队列 : 只能存放一个值!
public class SynchronousQueueDemo {
public static void main(String[] args) {
// 特殊的阻塞队列
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
// A 存
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "put a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + "put b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + "put c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
// B 取
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
}
}
9、线程池
池化技术:程序运行的本质:占用系统资源! 提高程序的使用率,降低我们一个性能消耗
线程池、连接池、内存池、对象池 ............
为什么要用线程池:线程复用
关于我们的线程池
三大方法、七大参数、4种拒绝策略
三大方法:
package com.coding.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo01 {
public static void main(String[] args) {
// 单例,只能有一个线程!
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 固定的线程数
// ExecutorService threadPool = Executors.newFixedThreadPool(8);
// 遇强则强!可伸缩!
// ExecutorService threadPool = Executors.newCachedThreadPool();
try {
// 线程池的使用方式!
for (int i = 0; i < 30; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 使用完毕后需要关闭!
threadPool.shutdown();
}
}
}
七大参数
1、先看看这个三个方法的源码
new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 约等于21亿
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
public ThreadPoolExecutor(int corePoolSize, // 核心池线程数大小 (常用)
int maximumPoolSize, // 最大的线程数大小 (常用)
long keepAliveTime, // 超时等待时间 (常用)
TimeUnit unit, // 时间单位 (常用)
BlockingQueue<Runnable> workQueue, // 阻塞队列(常用)
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略(常用)) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
自定义线程池的策略
package com.coding.pool;
import java.util.concurrent.*;
public class ThreadPoolDemo01 {
public static void main(String[] args) {
// 单例,只能有一个线程!
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 固定的线程数
// ExecutorService threadPool = Executors.newFixedThreadPool(1);
//遇强则强!可伸缩!
// ExecutorService threadPool = Executors.newCachedThreadPool();
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
// 线程池的使用方式!
for (int i = 0; i < 100; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 使用完毕后需要关闭!
threadPool.shutdown();
}
}
}
四大拒绝策略
/**
* 1、ThreadPoolExecutor.AbortPolicy(); 抛出异常,丢弃任务
* 思路:看到效果一样的东西,可以研究研究!
* 2、ThreadPoolExecutor.DiscardPolicy();不抛出异常,丢弃任务
* 3、ThreadPoolExecutor.DiscardOldestPolicy(); 尝试获取任务,不一定执行!
* 4、ThreadPoolExecutor.CallerRunsPolicy(); 哪来的去哪里找对应的线程执行!
*/
请你谈谈 最大线程池 该如何设置啊?
CPU密集型: 根据CPU的处理器数量来定!保证最大效率
IO密集型: 50 个线程都是进程操作大io资源, 比较耗时! > 这个常用的 IO 任务数!
10、四个函数式接口
java.util.function
所有的函数式接口都可以用来简化编程模型: 都可以使用lambda表达式简化!
/**
* 函数式接口是我们现在必须要要掌握且精通的
* 4个!
* Java 8
*
* Function : 有一个输入参数有一个输出参数
* Consumer:有一个输入参数,没有输出参数
* Supplier:没有输入参数,只有输出参数
* Predicate:有一个输入参数,判断是否正确!
*/
package com.coding.funcation;
import java.util.function.Function;
/**
* 函数式接口是我们现在必须要要掌握且精通的
* 4个!
* Java 8
*
* Function : 有一个输入参数有一个输出参数
* Consumer:有一个输入参数,没有输出参数
* Supplier:没有输入参数,只有输出参数
* Predicate:有一个输入参数,判断是否正确!
*/
public class Demo01 {
public static void main(String[] args) {
//
// Function<String,Integer> function = new Function<String,Integer>() {
// @Override
// public Integer apply(String str) {
// return str.length();
// }
// };
// (参数)->{方法体}
Function<String,Integer> function = (str)->{return str.length();};
System.out.println(function.apply("a45645646"));
}
}
package com.coding.funcation;
import java.util.function.Predicate;
// Predicate
public class Demo02 {
public static void main(String[] args) {
// Predicate<String> predicate = new Predicate<String>() {
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
Predicate<String> predicate = str->{return str.isEmpty();};
System.out.println(predicate.test("456"));
}
}
package com.coding.funcation;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class Demo03 {
public static void main(String[] args) {
// Supplier<String> supplier = new Supplier<String>() {
// // 语法糖
// @Override
// public String get() {
// return "《hello,spring》";
// }
// };
Supplier<String> supplier = ()->{return "《hello,spring》";};
Consumer<String> consumer = s->{ System.out.println(s);};
// Consumer<String> consumer = new Consumer<String>() {
// @Override
// public void accept(String s) {
// System.out.println(s);
// }
// };
consumer.accept(supplier.get());
}
}
11、Stream流式计算
package com.coding.stream;
import java.util.Arrays;
import java.util.List;
// 数据库、集合 : 存数据的
// Stream:计算和处理数据交给 Stream
public class StreamDemo {
/**
* 按条件用户筛选:
* 1、id 为偶数
* 2、年龄大于24
* 3、用户名大写 映射
* 4、用户名倒排序
* 5、输出一个用户
*
* 请你只用一行代码完成!
*/
public static void main(String[] args) {
User u1 = new User(1,"a",23);
User u2 = new User(2,"b",24);
User u3 = new User(3,"c",22);
User u4 = new User(4,"d",28);
User u5 = new User(6,"e",26);
// 存储
List<User> users = Arrays.asList(u1, u2, u3, u4, u5);
// 计算等操作交给流
// forEach(消费者类型接口)
users.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>24;})
.map(u->{return u.getName().toUpperCase();})
.sorted((o1,o2)->{return o2.compareTo(o1);})
.limit(1)
.forEach(System.out::println);
// 在JDK1.5 的时候,枚举:反射、注解、泛型
// 在JDK1.8 的时候 函数式接口、Stream流式计算、lambda表达式、链式编程!
// 无论何时,都还需要掌握一个东西叫 JVM;
// JVM: 你会了这个技术不会觉得你恨厉害!
}
}
12、分支合并
什么是 forkjoin
MapReduce:input->split->map->reduce->output
主要就是两步:
1、任务拆分
2、结果合并
前提:forkjoin 一定是用在大数据量的情况下
工作原理:工作窃取 底层维护的是一个双端队列;
好处:效率高
坏处:产生资源争夺
13、异步回调
Future
CompletableFuture
package com.coding.future;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值,好比多线程,功能更强大!
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + "没有返回值!");
// });
// System.out.println("111111");
// completableFuture.get();
// 有返回值
// 任务
CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"=>supplyAsync!");
int i = 10/0;
return 1024;
});
System.out.println(uCompletableFuture.whenComplete((t, u) -> { // 成功
System.out.println("t=>" + t); // 正确结果
System.out.println("u=>" + u); // 错误信息
}).exceptionally(e -> { // 失败,如果错误就返回错误的结果!
System.out.println("e:" + e.getMessage());
return 500;
}).get());
}
}