Java多线程入门不完全指南
序言
最近在读《把时间当作朋友》,序言就教导我“无论是谁,都是在某一刻意识到时间的珍贵,并且注定会因懂事太晚而多少有些后悔”。想想我,快走出奔三的泥沼,踏入奔四的深渊,越来越意识到时间的弥足珍贵。于是就想着,趁着还有些精力的时候,记录一下所闻、所学、所感。思考良久,记录些什么呢?先记录一下我这拙劣而又不放弃的Java学习之路吧,挖坑一篇"粗而广"的Java多线程介绍压压惊。
基础知识
线程与进程
- 线程:是操作系统能够进行运算调度的最小单位,是进程中的实际运作单位。一条线程是进程中一个单一顺序的控制流。
- 进程:是计算机中已运行程序的实体,是线程的容器。
同步与异步
- 同步:在发出一个调用时,在没有得到结果之前,该调用就不返回。一旦返回,必然会得到返回值。
- 异步:在调用发出之后,这个调用就直接返回。随后,被调用者通过状态、通知来通知调用者,或者通过回调函数来处理调用。
同步与异步关注的是消息通信机制。
阻塞与非阻塞
- 阻塞:调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
- 非阻塞:调用在不能立刻得到结果之前,该调用不会阻塞当前线程,当前线程仍会处理其他调用。
阻塞与非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。
并行与并发
- 并发:在同一个处理器上“同时”处理多个任务。通过cpu调度算法,让用户看上去是同时处理。
- 并行:在多台机器上同时处理多个任务,真正的同时。
多线程与线程安全
- 多线程:一个程序实现多个线程并发执行。
- 线程安全:多个线程同时执行同一段代码,线程的调度顺序不会影响该段代码的任何结果。线程安全主要通过线程同步实现。
线程的状态
5种状态
根据线程的生命周期,可以将线程分为以下5种状态:
- NEW(新建):创建了一个线程,尚未启动
- RUNNABLE(可运行):线程创建后,其他线程调用了该线程的
start()
方法,该线程便等待被CPU调度执行。 - RUNNING(运行):RUNNABLE状态的线程被CPU调度,获取了CPU时间片,运行
run()
方法。 - BLOCKERD(阻塞):线程无法获取CPU时间片,暂时停止运行的状态。这个状态的线程只有状态转为RUNNABLE时,才有机会被CPU调度执行。阻塞有三种情况:
- 等待阻塞:如RUNNING状态的线程执行
wait()
方法。 - 同步阻塞:如RUNNING状态的线程在获取同步锁时,同步锁被其他线程占用。
- 其他阻塞:如RUNNING状态的线程执行
sleep()
方法或其他线程调用join()
方法。
- 等待阻塞:如RUNNING状态的线程执行
- DEAD(死亡):线程
run()
方法执行结束或异常退出,该线程死亡,结束生命周期。
状态转换
等待队列和锁池是如何工作的?
Thread类定义的线程状态
-
NEW(新建):线程创建后未启动。
/** * Thread state for a thread which has not yet started. */
-
RUNNABLE(可运行):等待系统资源运行的线程。
/** * Thread state for a runnable thread. A thread in the runnable * state is executing in the Java virtual machine but it may * be waiting for other resources from the operating system * such as processor. */
-
BLOCKED(阻塞):当一个线程要进入synchronized语句块/方法时,如果没有获取到锁,会变成BLOCKED。或者在调用Object.wait()后,被notify()唤醒,再次进入synchronized语句块/方法时,如果没有获取到锁,会变成BLOCKED。进入阻塞状态是被动的。
/** * Thread state for a thread blocked waiting for a monitor lock. * A thread in the blocked state is waiting for a monitor lock * to enter a synchronized block/method or * reenter a synchronized block/method after calling * {@link Object#wait() Object.wait}. */
-
WAITING(无限等待):等待其它线程执行后,显示唤醒。调用锁对象的
wait()
方法并未设置时间、其它线程调用join()
方法并未设置时间、调用LockSupport.park()
方法都会使当前线程进入此状态。进入等待状态是主动的。/** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. * * For example, a thread that has called <tt>Object.wait()</tt> * on an object is waiting for another thread to call * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on * that object. A thread that has called <tt>Thread.join()</tt> * is waiting for a specified thread to terminate. */
-
TIMED_WAITING(限期等待):等待一段时间后被系统唤醒,不需要显示被唤醒。调用
Thread.sleep()
方法、调用锁对象的wait()
方法并设置时间、其它线程调用join()
方法并设置时间、调用LockSupport.parkNanos()
方法、调用LockSupport.parkUntil()
方法都会使线程进入此状态。/** * Thread state for a waiting thread with a specified waiting time. * A thread is in the timed waiting state due to calling one of * the following methods with a specified positive waiting time: * <ul> * <li>{@link #sleep Thread.sleep}</li> * <li>{@link Object#wait(long) Object.wait} with timeout</li> * <li>{@link #join(long) Thread.join} with timeout</li> * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li> * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li> * </ul> */
-
TERMINATED(结束):线程
run()
方法执行结束或异常退出后的线程状态。/** * Thread state for a terminated thread. * The thread has completed execution. */
VisualVM线程监控
通过JDK自带的VisualVM工具可以监控线程的运行状态,按照下图操作可以抓取线程Dump,监控线程的状态。
VisualVM将线程的状态分为五种:运行、休眠、等待、驻留、监视,与Thread类中的线程状态对应如下:
Thread类 | VisualVM |
---|---|
RUNNABLE | 运行 |
TIMED_WAITING (sleeping) | 休眠 |
TIMED_WAITING (on object monitor) WAITING (on object monitor) | 等待 |
TIMED_WAITING (parking) WAITING (parking) | 驻留 |
BLOCKED (on object monitor) | 监视 |
内存原子性、可见性和有序性
Java内存模型
概念
- 线程之间通信由Java内存模型控制,内存模型决定了一个线程对共享变量的写入何时对另一个线程可见。
- 每个线程都被抽象出一个保存共享变量副本的工作内存,线程对共享变量的操作都在此工作内存中进行。
-
某个线程无法直接访问其他线程中的变量,线程之间的通信需要通过主内存来实现。
线程通信过程
- 线程A从主内存中拷贝共享变量1到工作内存中的副本,对副本的值进行修改。
- 线程A刷新修改后的值到主内存中。
- 线程B将主内存中共享变量1拷贝到工作内存中。
并发编程三个特征
- 原子性:类似于数据库事务,要么全部执行,要么不执行。
如:num++
不具有原子性,先取出num的值,再进行加1。
而a=1
,return a
则都具有原子性。 - 可见性:某个线程对共享变量做了修改后,其他线程可以立马感知到该变量的修改。
- 有序性:若在本线程内观察,所有操作都是有序的;若在一个线程中观察其他线程,所有的操作都是无序的。前半句指“线程内表现为串行语义”,后半句指“指令重排序”现象和“工作内存中主内存同步延迟”现象。
Sychronized与Volatile
- Sychronized:可以保证原子性、可见性和有序性。
Sychronized关键字能保证在同一时刻,只有一个线程可以获取锁执行同步代码,执行完之后释放锁之前,会将修改后变量的值刷新的主内存中。 - Volatile:可以保证可见性和有序性,不能保证操作的原子性。
被Volatile关键字修饰的变量,在写操作后会加入一条store指令,强行将共享变量最新的值刷新到主内存中。在读操作前,会加入一条load指令,强行从主内存中读取共享变量最新的值。
Java锁机制
Java中的锁
- Sychronized
- ReentrantLock
- ReentrantReadWriteLock
这三种锁是怎么实现的?什么是AQS?什么是CAS?CAS的ABA问题怎么解决?集群环境下如何实现同步?有待后续分晓
Java多线程实现
创建线程的方式
-
继承Thread类:重写
run()
方法,创建线程后调用start()
方法启动。public class ThreadOne extends Thread { private static Integer num = 100; @Override public void run() { synchronized (num) { while (num > 0) { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("线程:" + this.getName() + "执行num--操作后,"); num--; System.out.println("num的值为:" + num); } } } public static void main(String[] args) { Thread t1 = new ThreadOne(); Thread t2 = new ThreadOne(); Thread t3 = new ThreadOne(); t1.start(); t2.start(); t3.start(); } }
-
实现Runnable接口:实现
run()
方法,创建线程后调用start()
方法启动。public class ThreadTwo implements Runnable{ private static Integer num = 100; @Override public void run() { synchronized (num) { while (num > 0) { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("线程:" + Thread.currentThread().getName() + "执行num--操作后,"); num--; System.out.println("num的值为:" + num); } } } public static void main(String[] args) { Thread t1 = new Thread(new ThreadTwo()); Thread t2 = new Thread(new ThreadTwo()); Thread t3 = new Thread(new ThreadTwo()); t1.start(); t2.start(); t3.start(); } }
-
实现Callable接口:实现
call()
方法,可以创建有返回值的线程。public class ThreadThree implements Callable<Integer> { private static Integer num = 100; @Override public Integer call() throws Exception { synchronized (num) { while (num > 0) { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } num--; } } return num; } public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<Integer> task1 = new FutureTask<>(new ThreadThree()); FutureTask<Integer> task2 = new FutureTask<>(new ThreadThree()); FutureTask<Integer> task3 = new FutureTask<>(new ThreadThree()); Thread t1 = new Thread(task1); Thread t2 = new Thread(task2); Thread t3 = new Thread(task3); t1.start(); t2.start(); t3.start(); System.out.println("线程1的返回值:" + task1.get()); System.out.println("线程2的返回值:" + task2.get()); System.out.println("线程3的返回值:" + task3.get()); } }
Executor线程池框架
为什么要用线程池?
- 通过复用“池”中的已有线程,减少线程创建和销毁的开销,提高性能。
- 可以设置最大并发线程数,避免过多线程竞争资源。
Executors创建线程池
-
newFixedThreadPool创建固定线程数的线程池。若所有线程都处于活动状态,新提交的任务会在队列中等待。若某个线程异常结束,则线程池会重新补充一个新线程。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
newCachedThreadPool创建可缓存的线程池,若线程池中的线程超过60s未被使用会被移除。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
newScheduledThreadPool创建定时线程,可定时执行任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
-
newSingleThreadExecutor创建一个单线程来执行任务。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
线程池执行任务
-
使用
execute()
方法执行Runnable任务public class RunnableThreadPool { public static void main(String[] args) { ExecutorService eService = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { eService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "正在执行"); } }); } eService.shutdown(); } }
-
使用
submit()
方法执行Callable任务public class CallableThreadPool { public static void main(String[] args) { ExecutorService eService = Executors.newFixedThreadPool(5); List<Future<String>> resultList = new ArrayList<Future<String>>(); for (int i = 0; i < 10; i++) { Future<String> future = eService.submit(new Callable<String>() { @Override public String call() throws Exception { return Thread.currentThread().getName() + "已执行call方法并成功返回"; } }); resultList.add(future); } eService.shutdown(); for (Future<String> future : resultList) { //等待future返回结果 while(!future.isDone()); try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }
那么线程池的实现原理是什么?有待后续见分晓