并发编程对于很多人来说都是一个难点,因为其内部涉及到的内容非常繁杂,同时在实际开发中很多人虽然做了多年,但是仍然还停留在业务代码开发。对于并发的接触可以说是少之又少。可是现在的面试中,并发这一部分又是一个高频考点,很多人都在并发这座大山面前无法翻越。
本系列会带领大家从并发编程的基础开始,逐步深入并发的核心。让大家能够系统的将并发全部知识点拿下。
并发编程基础
Java从诞生开始,其就已经内置了对于多线程的支持。当多个线程能够同时执行时,大多数情况下都能够显著提升系统性能,尤其现在的计算机普遍都是多核的,所以性能的提升会更加明显。但是,多线程在使用中也需要注意诸多的问题,如果使用不当,也会对系统性能造成非常严重的影响。
进程、线程、协程
什么是进程
进程可以理解为就是应用程序的启动实例。如微信、Idea、Navicat等,当打开它们后,就相当于开启了一个进程。每个进程都会在操作系统中拥有独立的内存空间、地址、文件资源、数据资源等。进程是资源分配和管理的最小单位。
1.1.2)什么是线程
线程从属于进程,是程序的实际执行者,一个进程中可以包含若干个线程,并且也可以把线程称为轻量级进程。每个线程都会拥有自己的计数器、堆栈、局部变量等属性,并且能够访问共享的内存变量。线程是操作系统(CPU)调度和执行的最小单位。CPU会在这些线程上来回切换,让使用者感觉线程是在同时执行的。
一个线程具有五种状态,分别为:新建、就绪、运行、阻塞、销毁。
同时对于线程状态切换的工作,是由JVM中的TCB(Thread Control Block)来执行。
线程使用带来的问题
有很多人都会存在一个误区,在代码中使用多线程,一定会为系统带来性能提升,这个观点是错误的。并发编程的目的是为了让程序运行的更快,但是,绝对不是说启动的线程越多,性能提升的就越大,其会受到很多因素的影响,如锁问题、线程状态切换问题、线程上下文切换问题,还会受到硬件资源的影响,如CPU核数。
什么叫做线程上下文切换
不管是在多核甚至单核处理器中,都是能够以多线程形式执行代码的,CPU通过给每个线程分配CPU时间片来实现线程执行间的快速切换。 所谓的时间片就是CPU分配给每个线程的执行时间,当某个线程获取到CPU时间片后,就会在一定时间内执行,当时间片到期,则该线程会进入到挂起等待状态。时间片一般为几十毫秒,通过在CPU的高速切换,让使用者感觉是在同时执行。
同时还要保证线程在切换的过程中,要记录线程被挂起时,已经执行了哪些指令、变量值是多少,那这点则是通过每个线程内部的程序计数器来保证。
简单来说:线程从挂起到再加载的过程,就是一次上下文切换。其实比较耗费资源的。
引起上下文切换的几种情况:
- 时间片用完,CPU正常调度下一个任务。
- 被其他优先级更高的任务抢占。
- 执行任务碰到IO阻塞,调度器挂起当前任务,切换执行下一个任务。
- 用户代码主动挂起当前任务让出CPU时间。
- 多任务抢占资源,由于没有抢到被挂起。
- 硬件中断。
生产者/消费者模式实现
public class Producer extends Thread{
private static final int QUEUE_SIZE=5;
private final Queue queue;
public Producer(Queue queue){
super();
this.queue=queue;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (queue){
while (queue.size() >= QUEUE_SIZE){
System.out.println("队列满了,等待消费");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(i);
System.out.println("入队数据:"+i);
queue.notify();
}
}
}
}
import java.util.Queue;
public class Consumer extends Thread{
private final Queue queue;
public Consumer(Queue queue){
super();
this.queue=queue;
}
@Override
public void run() {
while (true){
synchronized (queue){
while (queue.size() ==0){
System.out.println("队列没有数据,等待生产");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取出数据
System.out.println("取出数据:"+queue.poll());
queue.notify();
}
}
}
}
public class PCTest {
public static void main(String[] args) {
final Queue<Integer> queue = new LinkedList<>();
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
producer.start();
consumer.start();
}
}
演示效果如下:
这种方式虽然实现了生产者/消费者模式,但其性能非常低下。因为涉及到了同步锁、线程等待与唤醒的状态转换和线程上下文切换。这些操作都是机器耗费性能的。
什么是协程
协程是一种比线程更加轻量级的存在,一个线程中可以包含若干个协程。同时Java本身是并不支持协程的。但是如python、go都是支持的。协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行),所以其非常轻量级。而线程是由操作系统内核管理,所以是重量级的。只要是由程序也就是用户态完成操作,无需经过操作系统内核都是轻量级的。一旦经过操作系统内核就是重量级的。协程的开销远远小于线程的开销。
CPU时间片轮转机制&优化
之前已经提到了线程的执行,是依赖于CPU给每个线程分配的时间来进行。在CPU时间片轮转机制中,如果一个线程的时间片到期,则CPU会挂起该线程并给另一个线程分片一定的时间分片。如果进程在时间片结束前阻塞或结束,则 CPU 会立即进行切换。调度程序所要做的就是维护一张就绪进程列表,当进程用完它的时间片后,它被移到队列的末尾。
时间片太短会导致频繁的进程切换,降低了 CPU 效率: 而太长又可能引起对短的交互请求的响应变差。将时间片设为 100ms 通常是一个比较合理的折衷。
并行与并发的理解
对于这两个概念,如果刚看到的话,可能会很不屑。但是真的理解什么叫做并行,什么叫做并发吗?
所谓并发即让多个任务能够交替执行,一般都会附带一个时间单位,也就是所谓的在单位时间内的并发量有多少。
所谓并行即让多个任务能够同时执行。比如说:你可以一遍上厕所,一遍吃饭。
线程启动&中止
线程的实现方式有两种:继承Thread类、实现Runnable接口。但是有一些书籍或者文章会说有三种方式,即实现Callable接口。但通过该接口定义线程并不是Java标准的定义方式,而是基于Future思想来完成。 Java官方说明中,已经明确指出,只有两种方式。
那么Thread和Runnable有什么区别和联系呢? 一般来说,Thread是对一个线程的抽象,而Runnable是对业务逻辑的抽象,并且Thread 可以接受任意一个 Runnable 的实例并执行。
线程启动
/**
* 新建线程
*/
public class NewThread {
private static class UseThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+": use thread");
}
}
private static class UseRunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+": use runnable");
}
}
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName()+": use main");
UseThread useThread = new UseThread();
useThread.start();
UseRunnable useRunnable = new UseRunnable();
new Thread(useRunnable).start();
}
}
优化:启动线程前,最好为这个线程设置特定的线程名称,这样在出现问题时,给开发人员一些提示,快速定位到问题线程。
Thread.currentThread().setName("Runnable demo");
线程中止
线程在正常下当run执行完,或出现异常都会让该线程中止。
理解suspend()、resume()、stop()
这三个方法对应的是暂停、恢复和中止。对于这三个方法的使用效果演示如下:
public class Srs {
private static class MyThread implements Runnable{
@Override
public void run() {
Thread.currentThread().setName("my thread");
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
while (true){
System.out.println(Thread.currentThread().getName()+"run at"+dateFormat.format(new Date()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
Thread thread = new Thread(new MyThread());
//开启线程
System.out.println("开启线程");
thread.start();
TimeUnit.SECONDS.sleep(3);
//暂停线程
System.out.println("暂停线程");
thread.suspend();
TimeUnit.SECONDS.sleep(3);
//恢复线程
System.out.println("恢复线程");
thread.resume();
TimeUnit.SECONDS.sleep(3);
//中止线程
System.out.println("中止线程");
thread.stop();
}
}
执行结果
可以看到这三个方式,很好的完成了其本职工作。但是三个已经在Java源码中被标注为过期方法。那这三个方式为什么会被标记为过期方法呢?
当调用suspend()时,线程不会将当前持有的资源释放(如锁),而是占有者资源进入到暂停状态,这样的话,容易造成死锁问题的出现。
public class Srs {
private static Object obj = new Object();//作为一个锁
private static class MyThread implements Runnable{
@Override
public void run() {
synchronized (obj){
DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
while (true){
System.out.println(Thread.currentThread().getName()+"run at"+dateFormat.format(new Date()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"正常线程");
Thread thread1 = new Thread(new MyThread(),"死锁线程");
//开启线程
thread.start();
TimeUnit.SECONDS.sleep(3);
//暂停线程
thread.suspend();
System.out.println("暂停线程");
thread1.start();
TimeUnit.SECONDS.sleep(3);
//恢复线程
/*thread.resume();
System.out.println("恢复线程");
TimeUnit.SECONDS.sleep(3);*/
//中止线程
/*thread.stop();
System.out.println("中止线程");*/
}
}
在上述代码中,正常线程持有了锁,当调用suspend()时,因为该方法不会释放锁,所以死锁线程因为获取不到锁而导致无法执行。所以该方法必须与resume()成对出现。
当调用stop()时,会立即停止run()中剩余的操作,包括在catch或finally语句中的内容。因此可能会导致一些收尾性的工作的得不到完成,如文件流,数据库等关闭。并且会立即释放该线程所持有的所有的锁,导致数据得不到同步的处理,出现数据不一致的问题。
public class StopProblem {
public static void main(String[] args) throws Exception {
TestObject testObject = new TestObject();
Thread t1 = new Thread(() -> {
try {
testObject.print("1", "2");
} catch (Exception e) {
e.printStackTrace();
}
});
t1.start();
//让子线程有执行时间
//Thread.sleep(1000);
t1.stop();
System.out.println("first : " + testObject.getFirst() + " " + "second : " + testObject.getSecond());
}
}
class TestObject {
private String first = "ja";
private String second = "va";
public synchronized void print(String first, String second) throws Exception {
System.out.println(Thread.currentThread().getName());
this.first = first;
//模拟数据不一致
//Thread.sleep(10000);
this.second = second;
}
public String getFirst() {
return first;
}
public String getSecond() {
return second;
}
}
线程中止的安全且优雅姿势
Java对于线程安全中止实现设计了一个中断属性,其可以理解是线程的一个标识位属性。它用于表示一个运行中的线程是否被其他线程进行了中断操作。好比其他线程对这个线程打了一个招呼,告诉它你该中断了。通过interrupt()实现。
public class InterruptDemo {
private static class MyThread implements Runnable{
@Override
public void run() {
while (true){
System.out.println(Thread.currentThread().getName()+" is running");
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"myThread");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
添加该方法后,会出现一个异常,但是可以发现并不会线程的继续执行。
线程通过通过检查自身是否被中断来进行响应,可以通过isInterrupted()进行判断,如果返回值为true,代表添加了中断标识,返回false,代表没有添加中断标识。通过它可以对线程进行中断操作。
public class InterruptDemo {
private static class MyThread implements Runnable{
@Override
public void run() {
//while (true){
while (!Thread.currentThread().isInterrupted()){
System.out.println(Thread.currentThread().getName()+" is running");
}
System.out.println(Thread.currentThread().getName()+" Interrupt flag is : "+Thread.currentThread().isInterrupted());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"myThread");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
对线程中断属性的判断,可以利用其进行线程执行的中断操作。
线程也可以通过静态方法Thread.interrupted()对中断标识进行复位,如果该线程已经被添加了中断标识,当使用了该方法后,会将线程的中断标识由true改为false。
public class InterruptDemo {
private static class MyThread implements Runnable{
@Override
public void run() {
//while (true){
//while (!Thread.currentThread().isInterrupted()){
while (!Thread.interrupted()){
System.out.println(Thread.currentThread().getName()+" is running");
}
System.out.println(Thread.currentThread().getName()+" Interrupt flag is : "+Thread.currentThread().isInterrupted());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"myThread");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
中断线程的注意点
线程在执行的过程中,可能会进行中断或者阻塞,比方说使用了sleep()或者wait()。那么在线程执行中一旦出现这类操作的话,则会出现InterruptedException异常。但是并不会影响线程的继续执行。
那么一般都会进行try/catch的操作。具体效果如下:
public class InterruptDemo {
private static class MyThread implements Runnable{
@Override
public void run() {
while (!Thread.interrupted()){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName()+"in catch Interrupt flag is : "+Thread.currentThread().isInterrupted());
}
System.out.println(Thread.currentThread().getName()+" is running");
}
System.out.println(Thread.currentThread().getName()+" Interrupt flag is : "+Thread.currentThread().isInterrupted());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"myThread");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
此时可以明显观察到,线程会继续向下执行,并且catch中的中断标记属性为false。因为Thread.interrupted()不仅会判断线程中断标记属性,同时如果该线程的中断属性为true,会将true改变为false。
myThread is running
myThread is running
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at base.InterruptDemo$MyThread.run(InterruptDemo.java:16)
at java.lang.Thread.run(Thread.java:745)
myThread in catch Interrupt flag is : false
myThread is running
myThread is running
myThread is running
myThread is running
此时既然捕捉到了InterruptedException异常,代表该线程应该被中断了,应该让线程停止,并且在catch中要将线程中断掉。所以注意,在使用时,应该在catch中再次执行interrupt(),让线程中断掉。 这样可以在catch中在设置中断标记前,进行资源释放。
public class InterruptDemo {
private static class MyThread implements Runnable{
@Override
public void run() {
//while (true){
while (!Thread.currentThread().isInterrupted()){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" in catch Interrupt flag is : "+Thread.currentThread().isInterrupted());
//todo:资源释放
//设置中断标记
Thread.currentThread().interrupt();
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" is running");
}
System.out.println(Thread.currentThread().getName()+" Interrupt flag is : "+Thread.currentThread().isInterrupted());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread(),"myThread");
thread.start();
TimeUnit.SECONDS.sleep(3);
thread.interrupt();
}
}
同时要注意:处于死锁下的线程,无法被中断
深入线程操作常见方法
理解run()&start()
这两个方法都可以启动线程,但是它俩是有本质上的区别的。当线程执行了 start()方法后,才真正意义上的启动线程,其会让一个线程进入就绪队列等到分配CPU时间片,分到时间片后才会调用run()。注意,同一个线程的start()不能被重复调用,否则会出现异常,因为重复调用了,start方法,线程的state就不是new了,那么threadStatus就不等于0了。
//start源码分析
public synchronized void start() {
/**
Java里面创建线程之后,必须要调用start方法才能创建一个线程,该方法会通过虚拟机启动一个本地线程,本地线程的创建会调用当前系统去创建线程的方法进行创建线程。
最终会调用run()将线程真正执行起来
0这个状态,等于‘New’这个状态。
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* 线程会加入到线程分组,然后执行start0() */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
执行流程图
而run()则仅仅是一个普通方法,与类中的成员方法意义相同。在该方法中可以实现线程执行的业务逻辑。但并不会以异步的方式将线程启动,换句话说就是并不会去开启一个新的线程。其可以单独执行,也可以重复执行。
理解yield()
当某个线程调用了这个方法后,该线程立即释放自己持有的时间片。线程会进入到就绪状态,同时CPU会重新选择一个线程赋予时间分片,但注意,调用了这个方法的线程,也有可能被CPU再次选中赋予执行。
而且该方法不会释放锁。 如需释放锁的话,可以在调用该方法前自己手动释放。
public class Demo {
private static class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
if (i==5){
System.out.println(Thread.currentThread().getName());
yield();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new MyThread());
Thread thread2 = new Thread(new MyThread());
Thread thread3 = new Thread(new MyThread());
thread1.start();
thread2.start();
thread3.start();
}
}
从结果看出,当调用了该方法后线程会让出自己的时间分片,但也有可能被再次选中执行。
理解join()
该方法的使用,在实际开发中,应用的是比较少的。但在面试中,常常伴随着产生一个问题,如何保证线程的执行顺序? 就可以通过该方法来设置。
使用
当线程调用了该方法后,线程状态会从就绪状态进入到运行状态。
public class JoinDemo {
private static class MyThread extends Thread{
int i;
Thread previousThread; //上一个线程
public MyThread(Thread previousThread,int i){
this.previousThread=previousThread;
this.i=i;
}
@Override
public void run() {
//调用上一个线程的join方法. 不使用join方法解决是不确定的
//previousThread.join();
System.out.println("num:"+i);
}
}
public static void main(String[] args) {
Thread previousThread=Thread.currentThread();
for(int i=0;i<10;i++){
//每一个线程实现都持有前一个线程的引用。
MyThread joinDemo=new MyThread(previousThread,i);
joinDemo.start();
previousThread=joinDemo;
}
}
}
可是等到开启了join之后,结果就是有序的了。
根据结果可以看到,当前线程需要等待previousThread线程终止之后才从thread.join返回。可以理解完,线程会在join处等待。
原理剖析
//源码解析
public final void join() throws InterruptedException {
join(0);
}
...
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
//判断是否携带阻塞的超时时间,等于0表示没有设置超时时间
if (millis == 0) {
//isAlive获取线程状态,无限等待直到previousThread线程结束
while (isAlive()) {
//调用Object中的wait方法实现线程的阻塞
wait(0);
}
} else { //阻塞直到超时
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
可以看到刚方法是被synchronized修饰的,因为在其内部对于线程阻塞的实现,是通过Object中wait方法实现的,而要调用wait(),则必须添加synchronized。
为什么join()阻塞的是主线程呢?按照上述的理解,很多人认为其应该阻塞的是previousThread。实际上主线程会持有previousThread这个对象的锁,然后调用wait方法去阻塞,而这个方法的调用者是在主线程中的。所以造成主线程阻塞。
为什么previousThread线程执行完毕就能够唤醒住线程呢?或者说是在什么时候唤醒的?对于wait的使用,对应的会有notify或notifyAll。在JVM内部,会设置native线程对象为null同时调用notifyAll唤醒所有线程。
总的来说,Thread.join其实底层是通过wait/notifyall来实现线程的通信达到线程阻塞的目的;当线程执行结束以后,会触发两个事情,第一个是设置native线程对象为null、第二个是通过notifyall方法,让等待在previousThread对象锁上的wait方法被唤醒。
线程优先级
操作对于线程执行,是通过CPU时间片来调用运行的。那么一个线程被分配的时间片的多少,就决定了其使用资源的多少。而线程优先级就是决定线程需要能够使用资源多少的线程属性。
线程优先级的范围是1~10。一个线程的默认优先级是5,可以在构建线程时,通过setPriority()修改该线程的优先级。优先级高的线程分配时间片的数量会高于优先级低的线程。
一般来说对于频繁阻塞的线程需要设置优先级高点,而偏重计算的线程优先级会设置低些,确保处理器不会被独占。
但注意,线程优先级不能作为线程执行正确性的依赖,因为不同的操作系统可能会忽略优先级的设置。
守护线程
守护线程是一种支持形的线程,我们之前创建的线程都可以称之为用户线程。通过守护线程可以完成一些支持性的工作,如GC、分布式锁续期。守护线程会伴随着用户线程的结束而结束。
对于守护线程的创建,可以通过setDaemon()设置。
public class DaemonDemo {
private static class MyThread implements Runnable{
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
try {
System.out.println(Thread.currentThread().getName());
} finally {
System.out.println("thread run into finally");
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new MyThread());
//设置守护线程
thread.setDaemon(true);
thread.start();
TimeUnit.SECONDS.sleep(5);
}
}
当线程实例没有被设置为守护线程时,该线程并不会随着主线程的结束而结束。但是当被设置为守护线程后,当主线程结束,该线程也会伴随着结束。同时守护线程不一定会执行finally代码块。所以当线程被设定为守护线程后,无法确保清理资源等操作一定会被执行。
线程状态
理解了上述方法后,再来看一下这些对于线程状态转换能起到什么样的影响。