多线程
概述
Thread 线程 => 是操作系统能够进行运算调度的最小单位。大部分的情况下,它被包含在进程中,是进程中的实际运作单位
- Java 的执行模型是同步 | 阻塞(block)的
- JVM 默认情况下只有一个线程 => 具有严重的性能问题
- Java 中只有
Thread
代表线程 -
Thread
类的每一个实例代表一个 JVM 中的线程 -
Thread.start()
之后,JVM 中就增加一个线程,增加一个执行流,增加了一套方法栈 => 开启新线程 =>new Thread(Runnable target).start()
+run
-
start
方法调用 => 开始并发执行 - 主线程的入口一定是
Main
方法,其余新开的线程的最底部一定是Thread.run
方法 - 方法栈(局部变量)是线程私有的
- 静态变量 | 类变量是被所有线程共享的
- 多线程中的线程会被线程调度机制打断
- 不同的执行流的同步执行是一切线程问题的根源
生命周期
-
NEW
=> Thread state for a thread which has not yet started => 初始态 => 新创建一个线程对象,但还没有调用start()
方法 -
RUNNABLE
=> Thread state for a runnable thread. A thread in the runnable state is executing in the Java virtual
machine but it may be waiting for other resources from the operating system such as the processor. => 运行 -
BLOCKED
=> Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling Object.wait. => synchronized => 阻塞 => 表示线程阻塞于锁 -
WAITING
=> Thread state for a waiting thread. A thread is waiting due to calling one of the following methods: Object.wait with no timeout, Thread.join with no timeout & LockSupport.park. A thread in the waiting state is waiting for another thread to perform a particular action. => 等待 -
TIMED_WAITING
=> Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time: Thread.sleep, Object.wait with a timeout, Thread.join with a timeout, LockSupport.parkNanos & LockSupport.parkUntil => 超时等待 => 调用锁 |monitor
|
对象的wait()
方法之后,就进入了WAITING
状态 -
TERMINATED
=> Thread state for a terminated thread. The thread has completed execution => 终止 => 表示该线程已经执行完毕
多线程
- CPU 的速度远远大于内存的读取、硬盘的读取以及网络IO的速度,在读取相关数据的时候,CPU 可以做其他有意义的事情,故产生了多线程 => 尽可能的运用 CPU 的效率
- 现代 CPU 都是多核的 =>
Q = I^2 *RT
=> CPU 大于 4GHz 之后发热量承受不住 => 推出多核 => 发热量并没有显著变化 => 多线程可以更大限度发挥多核 CPU 的好处 - 原子操作 -> atomic operation => 不会被线程调度机制打断的操作。
线程难的原因:需要看着同一份代码,想象不同的人在疯狂的以乱序执行它 => 多个线程同时访问同一个共享变量时,由于变量不是原子的,以致于过程是乱序的,不知道什么时候会发生不正常的,有可能是正常的,有可能是不正常的
// 最终结果不是 1000
public class Test {
private static int j = 0;
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(++j);
}).start();
}
}
}
Java 为多线程提供了语言级的支持
-
sychoronized
关键字 -
Thread
类 Object.wait | Object.notify | Object.notifyAll
多线程适用场景
- 对于 IO 密集型应用极其有用 => 网络 IO (通常包括数据库) | 文件 IO
- 对于 CPU 密集型应用稍有折扣 => 多线程带来的提升有限 => CPU 密集型:计算机完成一项任务的时间是取决于 CPU 的速度,其 CPU 占用率高,也许在某段时间内保持 100% 占用率
- 性能提升的上线 => 单核CPU 100% | 多核CPU N * 100%
多线程问题
- 正确性
- 安全 => 竞争条件 & 死锁
- 协同 => 同时、随机执行的线程,如何让他们协同工作?
- 效率 & 易用性
- 执行的越快越好
- 用起来越不容易出错越好
java.lang.Object
- Java 从一开始就把线程作为语言特性,提供语言级的支持
为什么 Java 中所有对象都可以成为锁?
-
Object.wait()
|Object.wait(long)
|Object.wait(long, int)
=> Causes the current thread to wait until
another thread invokes the java.lang.Object#notify() method or the java.lang.Object#notifyAll() method for this
object. The current thread must own this object's monitor. => Error java.lang.IllegalMonitorStateException -
Object.notify
=> Wakes up a single thread that is waiting on this object's monitor. If any threads are waiting on
this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the
implementation. -
Object.notifyAll()
=> Wakes up all threads that are waiting on this object's monitor. The awakened threads will
not be able to proceed until the current thread relinquishes the lock on this object. The awakened threads will
compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for
example, the awakened threads enjoy no reliable privilege or disadvantage in being the next thread to lock this
object.
合理的使用 wait
+ notify
就可以达到调度不同线程的目的 => 先 wait()
之后 notify
进程才会一直走下去,如果先 notify
后 wait
那么进程将会一直等待 notify
synchronized(obj) {
while(condition does no hold) {
obj.wait();
}
}
synchronized(obj) {
obj.notify();
}
线程不安全
线程不安全的表现
- 数据错误
- 死锁
数据错误
数据错误 => 不是原子操作
i++
-
if-then-do
public class Test { private static final Map<Integer, Integer> map = new HashMap<>(); public static void main(String[] args) { for (int i = 0; i < 1000; i++) { new Thread(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } int random = new Random().nextInt(10); if (!map.containsKey(random)) { map.put(random, random); System.out.println("put: " + random); } }).start(); } } }
死锁
synchronized
=> 同步 => 锁 => 同一个时刻只能有一个线程拿到同一把锁 => 在 Java 中任何一个对象都可以当成锁🔐
public class Test {
// Thread1 和 Thread2 使用了不同的顺序来获得资源锁
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
new Thread1().start();
new Thread2().start();
}
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (lock1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
System.out.println("");
}
}
}
}
static class Thread2 extends Thread {
@Override
public void run() {
synchronized (lock2) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1) {
System.out.println("");
}
}
}
}
}
排查死锁问题
-
jps
=> 查看当前所有的 Java 进程 - 找到死锁所运行的进程 ID
-
jstack <java process id> > <file>
=> 将死锁的进程栈信息保存到指定文件 - 分析文件 | 排查调用栈,看每个 Thread 在哪个方法处停滞了
预防死锁产生的原则
所有的线程都按照相同的顺序获得资源的锁🔐
线程安全
实现线程安全的基本手段
- 不可变类 => 数据错误「多个线程同时修改同一个数据」=>
Integer
|String
| ... -
synchronized
同步块 - JUC 包
synchronized
- Java 语音级的支持,1.6 之后性能极大的提升
- 字节码层面的实现 =>
MONITORENTER
+MONITOREXIT
- 锁住的是什么?
// 锁住对象 LOCK public static void Main() { // 在语句外加 synchronized synchronized(LOCK) { } } // 锁住 Class 对象 public synchronized static void foo() { } // 锁住 this (当前对象) // 在方法上声明 synchronized 关键字 public synchronized void bar() { } // 等价于 => 效果等价,字节码不等价 public void bar() { synchronized(this) { // do something } } // 将 ` this ` 当成锁 => 将该实例当成锁 Main object = new Main(); new Thread(object::modifySharedVariable).start(); private synchronized void modifySharedVariable() { // do something } // 等价于 private void modifySharedVariable() { synchronized(this) { } }
- 非公平锁 => 谁获得锁取决于操作系统的调度
- 可重入锁
缺点
-
synchronized
不能知道当前锁是否能被拿到 - 只有悲观锁、排他锁,没有乐观锁、共享锁
- 性能相对稍差
JUC 包
Java 的协同机制一定要和锁(monitor | 监视器 | 管程)一起来工作
- JUC => java.util.concurrent => Java 并发工具包
- java.util.concurrent.ConcurrentHashMap => 任何使用
HashMap
有线程安全问题的地方,都无脑使用ConcurrentHashMap
替换即可 - java.util.concurrent.atomic.AtomicBoolean | java.util.concurrent.atomic.AtomicInteger | java.util.concurrent.atomic.AtomicLong =>
a += 1
不是原子操作 =>AtomicInteger
中的addAndGet()
方法是原子操作 - java.util.concurrent.locks.ReentrantLock => 可重入锁 => 与
synchronized
不同点:ReentrantLock
可以在一个地方加锁,在另一个地方解锁 - java.util.concurrent.CountDownLatch
Lock & Condition
- 同一个锁可以有多个条件
- 读写分离 =>
ReadLock
|WriteLock
=> 读写锁 => 可以实现多个线程读,只有一个线程写 -
Lock.tryLock()
=> Acquires the lock only if it is free at the time of invocation. =>boolean tryLock()
- 可以方便的实现更加灵活的优先级 & 公平性 => 公平锁 & 非公平锁
-
ReentrantLock
=>sync
+NonfairSync
+FairSync
=> 默认的ReentrantLock
是非公平锁
- 锁的重入问题 => 当一个线程重复的去获取锁,是否可以获取到 =>
Lock
支持可重入锁
private static Lock LOCK = new ReentrantLock();
public static void Main() {
LOCK.lock();
LOCK.lock();
LOCK.lock();
# lock 了几次就要 unlock 几次,不然其他线程获取不到锁
LOCK.unlock();
LOCK.unlock();
LOCK.unlock();
}
CountDownLatch
- 倒数闭锁
- 用于协调一组线程的工作
-
countDown
+await
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
int second = new Random().nextInt(10);
try {
Thread.sleep(second * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程" + finalI + "活干完了");
latch.countDown();
}).start();
}
latch.await();
System.out.println("我是老板,所有工人的活都干完了");
}
}
CyclicBarrier
-
CyclicBarrier
=> 循环屏障 => 等待所有线程到达一个屏障之后所有线程再一起出发 await
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
int second = new Random().nextInt(10);
try {
Thread.sleep(second * 1000);
System.out.println("线程" + finalI + "活干完了");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
System.out.println("等其他人到了才能继续");
}).start();
}
}
}
Semaphore
- 信号量 => 当信号量为1时,就是锁,排它锁就是一个信号量为1的特殊的锁
-
acquire
+release
BlockingQueue & BlockingDeque
- 传统的集合框架的操作要么正常返回,要么丢出异常,BlockingQueue | BlockingDeque 提供了一种等待的可能
- queue => 先进先出
- deque => double ended queue => 头尾都可以进出
-
BlockingQueue
=> capacity +put
+take
线程池 ThreadPoolExecutor
- 线程是非常昂贵的 => Java 线程模型的缺陷
- Java 的线程调度完全依赖于操作系统的依赖调度,所以线程是非常昂贵的,每个线程都要占用操作系统一部分资源
- 使用线程池完成多线程协同和任务分解
- 线程池是预先定义好的若干个线程 => 省去每次创建 | 销毁线程的开销 => java.concurrent.Executors + java.util.Callable + java.util.concurrent.Future
- java.util.Callable => 用于描述一个抽象任务的 interface => 泛型 + 有返回值 + 可以抛出异常
- java.util.concurrent.Future
ExecutorService executorService = Executors.newFixedThreadPool(threadNum); Future submit = executorService.submit(Callable); submit.get();
参数
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
-
corePoolSize
=> the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set => 核心员工数量 -
maximunPoolSize
=> the maximum number of threads to allow in the pool => 最大招募的员工数量 -
keepAliveTime
&unit
=> 员工闲下来之后多久炒掉他们-
keepAliveTime
=> when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating -
unit
=> the time unit for the keepAliveTime argument
-
-
workQueue
=> the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method => 订单队列 -
threadFactory
=> the factory to use when the executor creates a new thread => 造人的工厂 -
handler
=> the handler to use when execution is blocked because the thread bounds and queue capacities are reached => 订单实在太多的处理策略
Rejected Execution Handler Policy
-
AboutPolicy
=> A handler for rejected tasks that throws a RejectedExecutionException -
CallerRunsPolicy
=> A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded -
DiscardOldestPolicy
=> A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded -
DiscardPolicy
=> A handler for rejected tasks that silently discards the rejected task
ExecutorService
一个多线程执行器框架,最常用的实现是线程池。屏蔽了线程的细节,提供了并发执行任务机制
Future
- 代表未来才会发生的事情
- Future 本身是立即返回的
-
Future.get
会阻塞并返回执行结果,并抛出可能的异常 => 使用get
异常会从其他线程中转移到当前线程,方便处理异常
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPoll = Executors.newFixedThreadPool(10);
Future<?> future = threadPoll.submit(() -> {
try {
Thread.sleep(10000);
System.out.println("我结束工作了");
return "Future Result";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println("我把工作提交了");
// 获取 future 结果
System.out.println(future.get());
System.out.println("我提交的工作做完了");
}
}
创建多线程方法
- for loop +
new Thread()
- 线程池 => java.concurrent.Executors + java.util.Callable + java.util.concurrent.Future
- java.util.concurrent.ForkJoinPool
- Arrange async execution =>
execute
method - Await and obtain result =>
invoke
method - Arrange exec and obtain Future =>
submit
method
- Arrange async execution =>
- java.util.Collection.parallelStream() =>
Collection.parallelStream().forEach(item => // do something)
调度线程方法
- java.lang.Object.wait() + java.lang.Object.notify()
- java.util.concurrent.locks.Lock + java.util.concurrent.locks.ReentrantLock + java.util.concurrent.locksCondition => 注:
Condition
等待的方法是await()
- java.util.concurrent.BlockingQueue => implements => java.util.concurrent.LinkedBlockingQueue +
java.util.concurrent.ArrayBlockingQueue 注意:java.util.concurrent.BlockingQueue 如果要调度线程可以采用两个 java.util.concurrent.BlockingQueue,如queue
+signalQueue
,其中signalQueue
主要用于调度线程 - java.util.concurrent.Semaphore
- java.util.concurrent.Exchanger
Runnable & Callable
- Runnable | Callable 都不是线程,只是一个任务,只是一小段代码,这个任务会被哪个线程执行取决于线程池本身,可以被任何一个线程执行
- Runnable 的限制
- 不能丢出来异常,Runnable 方法签名上没有
Throws
- Runnable 没有返回值
- 不能丢出来异常,Runnable 方法签名上没有
基于 Runnable 的问题,创造了 Callable,Callable 就是 Runnable 的高级版本,Callable 解决了 Runnable 的问题
Java Memory Modal
JMM => Java 线程模型 => 方法中的局部变量是私有的,其他都是公有的
当两个线程执行时,方法栈是私有的,方法栈中有局部变量,局部变量也是私有的,当在线程中 new Object()
的时候,每个线程都会在 Heap 中生成一个 Object
,方法栈中的局部变量指向 Heap 中的 Object
,两个线程会生成两个 Object
由于 CPU 和 Main Memory 交换是很慢的,CPU 进行一次内存寻址大约是 1 微秒,大约相当于 CPU 的 1000 - 3000 个时钟周期。所以 Java 模型允许每一个线程自己有一份私有的副本,CPU
和私有的副本进行读写。Java 线程模型定期的把每个线程的私有变量同步给 Main Memory
上述导致的问题:
- 线程A更新一个公有变量
globalVariable
,线程B感知不到,需要等到线程A同步至 Main Memory,并且线程B的副本同步 Main Memory,才能感知到globalVariable
的更新 - 在现代 CPU 中,如果两条指令没有依赖关系,编译器可以进行一个指令重排
volatile
volatile
关键字声明的变量在多线程中,所有的线程写该变量的时候都会直接写入 Main Memory,对于共享变量所做的修改会立刻写回 Main Memory。当读该变量时,会立刻从 Main Memory 中读取刷新至当前 CPU 的副本中,再从副本中读取
- 可见性,并非原子性
- 写入 volatile 变量会直接写入 Main Memory
- 从 volatile 变量读取会直接读取 Main Memory
- 非常弱的同步机制
- 禁止指令重排 => 编译器和处理器都可能对指令进行重排,以提高效率,提高程序执行的速度,在多线程中会导致问题 => 可以保证自己的读写前后插入屏障,使得不会发生指令重排
- 有同步的时候无需
volatile
=> synchronized | Lock | AtomicInteger => 既保证可见性又保证原子性
知识点
- 如何查看一个数据是否是线程安全的?
进入方法后,搜索thread
=><strong>Note that this implementation is not synchronized.</strong>
=> 线程不安全 - 除非特意提及线程安全,不然都是线程不安全的
-
Collections.synchronized
方法可以将一些线程不安全的类 | 方法变为synchronized
-
monitor
=> 在 Java 世界中,使用monitor
代表synchronized
锁住的对象 -
Runnable
中的run
- 没有返回值
- 没有声明抛出的异常
- 线程是一种很昂贵的资源,不能频繁的创建它。在 IO 密集型应用中,创建线程数量最大值约为 CPU 数量 + 1
-
Object.wait()
和Thread.sleep()
有什么区别?-
Object.wait()
=> 持有锁之后调用锁的wait
方法后 sleep,之后放弃锁,等待被notify
-
Thread.sleep()
=> sleep 的时候持有锁,sleep 可以不持有锁,当不持有锁的时候sleep
,不会影响其他线程工作
-
- 创建线程池里有
new Thread
=> TODO: 找到相应代码 - TODO
- 排它锁
- 共享锁
- 乐观锁
- 悲观锁
- 公平锁
- 非公平锁