一、线程执行的策略
串行单线程执行
列:在食堂打饭的时候,食堂安排一个打饭大妈打饭,所有同学都排成一个队形,打饭大妈相当于线程,同学打饭相当于任务,这种一个大妈给一群站好队形的同同学打饭相当于java中的单线程串行执行任务。
串行执行的缺点很明显,当一个同学选菜慢的情况下,效率低下,后面的同学早已不耐烦了。
并行多线程执行
为了解决上述问题,食堂打算聘请多个打饭大妈,同时给同学打饭,这样的话同学倒是开心了,很快就能吃上饭,但是食堂承受不了了,因为食堂需要开更多的打饭窗口以及给大妈的工资,映射到多线程中也一样,如果每个任务都创建一个线程来处理的话,任务数量一旦多了,内存就会有很大的负担,甚至可能宕机。
线程池
为了解决上述问题,食堂又做了改进,就是先确定好打饭大妈的数量,然后让同学们排队,每个打饭大妈打完一次饭就叫队列中的下一个。映射到java中我们创建指定数量的线程,然后把任务放到一个队列中去处理,当每个线程处理完任务后就会去队列中取下一个任务处理,可以把线程重复利用,因为线程的创建也是消耗资源的。
所以结论就是:在一定范围内增加线程数量的确可以提升系统的处理能力,但是过多的创建线程将会降低系统的处理速度,如果无限制的创建线程,将会使系统崩溃。
二、多线程
以上三种任务的执行策略,每个执行策略都要规定很多任务的执行细节,我们要手动的去注意和关注这些细节。现在引入java中的几个多线程的类。
1、Executor:一个接口,其定义了一个接受Runnable对象的方法executor,其方法签名为executor(Runnable command)
2、ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可以跟踪一个或多个异步执行状况返回Fture的方法
3、AbstractExecutorService:ExecutroService执行方法的默认实现
4、ScheduledExecutorService:一个可定时调度任务的接口
5、ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,一个可定时调度任务的线程池
6、ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工程方法来创建线程池并返回一个ExecutorService对象
Executor接口
对于程序员来说,每次在执行某些任务的时候都需要设计一种新的执行策略太麻烦了,所以java就设计了这样一个接口
public interface Executor {
void execute(Runnable command);
}
Executor(执行器),有了这个执行器我们只需要把Runnable任务放到执行器的execute方法里就表示任务提交了,具体提交以后这些任务怎么分配线程怎么执行就不管了。这也就是把任务的提交和执行解耦了,我们来看一下执行器怎么用:
public void execute(List<Runnable> runnables) {
//创建包含10个线程的执行器
Executor executor = Executors.newFixedThreadPool(10);
for (Runnable r : runnables) {
//*提交任务
executor.execute(r);
}
}
其中的Executors类提供了一系列创建Executor子类的静态方法(最主要有四类),newFixedThreadPool(10)方法代表创建了一个包含10个线程Executor,可以用这10个线程去执行任务。(当然如果这些不满足业务也可以自己定义,只要继承Executor)
public class SerialExecutor implements Executor {
@Override
public void execute(Runnbale r) {
r.run();
}
}
多线程池ThreadPoolExecutor
/**
*
* @param corePoolSize 核心线程池大小
* @param maximumPoolSize 线程池最大容量
* @param keepAliveTime 线程池空闲时,线程存活的时间
* @param unit 单位
* @param workQueue 工作队列
* @param threadFactory 线程工厂
* @param handler 处理当线程队列满了,也就是执行拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){}
多线程创建的工厂类Executors
Executors类里面提供了创建适用于各种场景线程池的工具方法(静态方法,我们看一下常用的几个):
newFixedThreadPool(int nThreads)
创建一个拥有固定线程数量的线程池,具体的线程数量由nThreads参数指定。最开始该线程池中的线程数为0,之后每提交一个任务就会创建一个线程,直到线程数等于指定的nThreads参数,此后线程数量将不再变化。
newCachedThreadPool()
创建一个可缓存的线程池。会为每个任务都分配一个线程,但是如果一个线程执行完任务后长时间(60秒)没有新的任务可执行,该线程将被回收。
newSingleThreadExecutor()
创建单线程的线程池。其实只有一个线程,被提交到该线程的任务将在一个线程中串行执行,并且能确保任务可以按照队列中的顺序串行执行。
newScheduledThreadPool(int corePoolSize)
创建固定线程数量的线程池,而且以延迟或定时的方式来执行任务。怎么以延迟或定时的方式执行任务呢?我们看一下该方法的返回类型ScheduledExecutorService里提供的几个方法:
public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
稍微解释一下Callable和Future,Callable其实就是一个任务,只是这个任务有返回值。Future是这个线程执行中的状态可以去获取和控制这个线程。后面再详解。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class XPSchedule {
private static class PrintTask implements Runnable {
private String s;
public PrintTask(String s) {
this.s = s;
}
@Override
public void run() {
System.out.println(s);
}
}
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
//隔1秒后打印
service.schedule(new PrintTask("1"), 1, TimeUnit.SECONDS);
//首次5秒后打印,每隔1秒打印一次
service.scheduleAtFixedRate(new PrintTask("2"), 5, 1, TimeUnit.SECONDS);
}
}
Callable与Runnable详解:
Runnable接口
public interface Runnable {
public void run();
}
定义一个Task 类,它里边只有一个返回void的run方法,我们定义一个计算两个值大小的Runnable
public class Task implements Runnable {
private int num;
private int num1;
public Task(int num, int num1) {
this.num = num;
this.num1 = num1;
}
@Override
public void run() {
int sum = num/num1;
System.out.println("线程t的运算结果:" + sum);
}
}
public class Test {
public static void main(String[] args) {
Task task = new Task(4, 2);
Thread t = new Thread(task, "t");
t.start();
}
}
Callable是一个接口,它代表一个任务,与Runnable不同的是,这个任务是有返回值的
public interface Callable<V> {
V call() throws Exception;
}
把Task定义成一个Callable任务:
import java.util.concurrent.Callable;
public class Task implements Callable<Integer> {
private int num;
private int num1;
public Task(int num, int num1) {
this.num = num;
this.num1 = num1;
}
@Override
public Integer call() throws Exception {
int sum = num/num1;
System.out.println("线程t的运算结果:" + sum);
return sum;
}
}
call方法返回了结果。这种带返回值的Callable任务不能像Runnable一样直接通过Thread的构造方法传入,在Executor的子接口ExecutorService中规定了Callable任务的提交方式:
public interface ExecutorService extends Executor {
// 任务提交操作
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
// 生命周期管理
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// ... 省略了各种方便提交任务的方法
}
各种线程池其实都是实现了 ExecutorService 的,Callable 任务需要提交到线程池中才能运行
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//提交任务且执行,这里需要注意一下的是submit是提交和execute功能一样
service.submit(new Task(4, 2));
}
}
运行结果是:2
不管是Runnable任务还是Callable任务,线程池执行的任务可以划分为4个生命周期阶段:
创建:创建任务对象的时期。
提交:调用线程池的excute或者submit方法后,将任务塞到任务队列的时期。
执行中:某个线程从任务队列中将任务取出开始执行的时期。
完成:任务执行结束。
Future接口:
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
boolean isDone();
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
}
方法描述
任务已经完成,那么get方法将会立即返回,如果任务正常完成的话,会返回执行结果,若是抛出异常完成的话,将会将该异常包装成ExecutionException后重新抛出,如果任务被取消,则调用get方法会抛出CancellationExection异常
public class Test {
public static void main(String[] args) throws Exception{
ExecutorService service = Executors.newCachedThreadPool(); //创建一个线程池
Future<Integer> future = service.submit(new Task(6, 2)); //提交一个任务
int result = future.get(); //在任务执行完成之前,该方法将一直阻塞
System.out.println("线程main的运算结果:" + result);
}
执行结果:
线程t的运算结果:3
线程main的运算结果:3
有时候我们希望在指定时间内等待另一个线程的执行结果,那就可以可以使用带时间限制的get方法,另外的几个方法我们之后再详细的说。
视线再返回到ExecutorService接口上来,除了参数类型为Callable的submit方法,这个接口还提供了两个重载方法:
Future<?> submit(Runnable task); //第1个重载方法
<T> Future<T> submit(Runnable task, T result); //第2个重载方法
对于第1个只有一个Runnable参数的重载方法来说,由于Runnable的run方法并没有返回值,也就是说任务是没有返回值的,所以在该任务完成之后,对应的Future对象的get方法的返回值就是null。虽然不能获得返回值,但是我们还是可以调用Future的其他方法,比如isDone表示任务是否已经完成,isCancelled表示任务是否已经被取消,cancel表示尝试取消一个任务。
三、自定义线程
如果Executors提供的几个创建的线程池的执行策略不能满足你的业务,你也可以自定义线程池。只要实现了ExecutorService接口,代表着一个线程池,我们可以通过不同的构造方法参数来自定义的配置我们需要的执行策略,看一下这个类的构造方法:
public XPThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ... 省略具体实现
}
相关的参数及描述如下:
刚开始的时候线程池里并没有线程,之后每提交一个任务就会分配一个线程,直到线程数到达corePoolSize指定的值。之后即使没有新任务到达,这些线程也不会被销毁。
如果线程处理任务的速度足够快,那么将会复用这些线程去处理任务,但是如果任务添加的速度超过了处理速度的话,线程池里的线程数量可以继续增加到maximumPoolSize指定的最大线程数量值时,之后便不再增加。如果线程数量已经到达最大值,但是任务的提交速度还是超过了处理速度,那么这些任务将会被暂时放到任务队列中,等待线程个执行完任务之后从任务队列中取走(不一定)。
如果某个线程在指定的keepAliveTime时间(单位是unit)内都处于空闲状态,也就是说没有任务可执行,那这个线程将被标记为可回收的,但是必须当前线程池中线程数超过了corePoolSize值时,该线程将被终止。
我们可以通过这些参数来控制线程池中线程的创建与销毁。我们之前用到的Executors.newCachedThreadPool方法创建的线程池基本大小为0,最大大小为最大的int值,空闲存活时间为1分钟;Executors.newFixedThreadPool方法创建的线程池基本大小和最大大小都是指定的参数值,空闲存活时间为0,表示线程不会因为长期空闲而终止。
管理任务队列
线程池内部维护了一个阻塞队列,这个队列是用来存储任务的,线程池的基本运行过程就是:线程调用阻塞队列的take方法,如果当前阻塞队列中没有任务的话,线程将一直阻塞,如果有任务提交到线程池的话,会调用该阻塞队列的put方法,并且唤醒阻塞的线程来执行任务。
线程池中的阻塞队列的详细用法,那我们在自定义线程池的时候该使用哪一种阻塞队列呢?这取决于我们实际的应用场景,各种阻塞队列其实大致可以分为3类:
无界队列
其实无界在实际操作中的意思就是队列容量很大很大,比如有界队列LinkedBlockingQueue的默认容量就是最大的int值,也就是2147483647,这个大小已经超级大了,所以也可以被看作是无界的。如果在线程池中使用无界队列,而且任务的提交速度大于处理速度时,将不断的往队列里塞任务,但是内存是有限的,在队列大到一定层度的时候,内存将被用光,抛出OutOfMemoryError的错误(注意,是错误,不是异常)。所以你应该对当前任务的执行速度和提交速度有所了解,在任务不至于积压严重的情况下才使用无界队列
有界队列
正是因为无界队列可能导致内存用光,所以有界队列看上去是一个不错的选择。
但是它也有自己的问题,如果有界队列已经被塞满了,那后续提交的任务该怎么办呢?我们可以选择直接把任务舍弃,或者在提交任务的线程中抛出异常,或者别的什么处理方式,这种针对队列已满的情况下的反应措施被称为饱和策略,ThreadPoolExecutor构造方法中的handler参数就是用来干这个,我们稍后会详细说明各种策略采取的应对措施。所以有界队列 + 饱和策略的配置是我们常用的一种方案。
同步移交队列
你还记得在唠叨阻塞队列的时候提到过一种叫SynchronousQueue的队列么,
它名义上是个队列,但底层并不维护链表也没有维护数组,在一个线程调用它
的put方法时会立即将塞入的元素转交给调用take的线程,如果没有调用take
的线程则put方法会阻塞。使用这种阻塞队列的线程池肯定不能堆积任务,在提交任务后必须立即被一个线程执行,否则的话,后续的任务提交将失败。所以这种队列适用于非常大或者说无界的线程池,因为任务会被直接移交给执行它的线程,而不用先放到底层的数组或链表中,线程再从底层数组或链表中获取,所以这种阻塞队列性能更好。Executors.newCachedThreadPool()就是采用SynchronousQueue作为底层的阻塞队列的。
LinkedBlockingQueue或者ArrayBlockingQueue这样的队列都是先到达的任务会先被执行,如果你的任务有优先级的话,可以考虑使用PriorityBlockingQueue作为阻塞队列。
线程工厂
不指定这个参数,线程池就会默认创建一个非守护线程,如果不能满足业务,也可以自己定义。
例:如果希望给线程池中的线程取个名称、线程指定异常处理器,给线程设定优先级(最好不要,不好管理)、修改守护状态(最好不要)。
自定义一个的ThreadFactory:
import java.util.concurrent.ThreadFactory;
public class XPThreadFactory implements ThreadFactory {
private static int sum = 0;
private static String threadName= "xp-thread";
@Override
public Thread newThread(Runnable r) {
int i = sum++;
return new Thread(r, threadName + i);
}
}
自定义了一个ThreadFactory的子类XPThreadFactory,每次调用newThread获取的线程的名称都会加1。
饱和策略
当有界队列被任务填满之后,应该采取的措施。在ThreadPoolExecutor里定义了四个实现了RejectedExecutionHandler接口的静态内部类以表示不同的应对措施:
演示一下饱和策略的用途,定义一个耗时任务:
public class Task implements Runnable {
private int index;
public Task(int index) {
this.index = index;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() +
"线程正在执行第" + index + "个任务");
//模拟耗时操作
Thread.sleep(9000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
自定义一个线程池,里面只有一个线程并且阻塞队列的大小为1,来执行Task:
public class Test {
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(
1, //基本大小为1
1, //最大大小为1
0, //表示线程不会因为长时间空闲而被停止
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1), //大小为1的阻塞队列
new XPThreadFactory(), //自定义线程工厂
new ThreadPoolExecutor.AbortPolicy()); //饱和策略
try {
//该任务会被线程立即执行
service.submit(new Task(1));
//该任务会被塞到阻塞队列中
service.submit(new Task(2));
//该任务会根据不同的饱和策略而产生不同的反应
service.submit(new Task(3));
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果可以看见:
第一个任务会被线程池里唯一的线程立即执行,
第二个任务会被塞到阻塞队列中,之后阻塞队列就满了,
第三个任务的时候将会根据饱和策略来产相应的应对措施措施,当前使用的是AbortPolicy,所以执行后会抛出异常。
当把AbortPolicy饱和策略换成CallerRunsPolicy,执行结果是:
myThread0线程正在执行第1个任务
main线程正在执行第3个任务
CallerRunsPolicy饱和策略的意思是谁提交的任务谁执行,由于是main线程提交的任务,所以该任务由main线程去处理,由于该任务实在是太耗时了,所以main线程一直在执行该任务而无法执行后面的代码了。
四:注意
1、当看到new Thread(r).start()这种代码时,最好用线程池提交任务的形式来做,方便我们修改任务的执行策略。
2、线程池中的线程数量既不能太多,也不能太少。太多了的话将有大量线程在处理器和内存资源上发生竞争,太少了的话处理器资源又不能充分利用,所以在设置线程数量的时候核心原则就是:尽量使提高各种资源的利用率,而不会在线程切换上浪费过多时间,也不会因为线程过使内存溢出。
3、在设置之前我们必须分析程序是因为什么受限而不能更快的运行,如果是CPU密集型的程序,我们添加过多线程并不会起到什么效果,因为CPU的利用率一直很高,所以一般将线程数设置成:处理器数量 + 1(这个1是为了防止某个线程因为某些原因而暂停,这个线程立即替换调被暂停的线程,从而最大限度的提升处理器利用率)。在java中,我们可以通过Runtime对象来获取当前计算机的处理器数量:
int numberOfCPUs = Runtime.getRuntime().availableProcessors(); //获取当前计算机处理器数量
对于别的密集型程序,我们通常能通过常见更多的线程来提升处理器利用率。但是线程数量也受限于依赖资源的数量,比如内存一共有有20M,每个线程需要1M的内存去运行任务,这样我们创建多于20个线程也没有用,因为超过的线程会因为分配不到内存而被迫终止。所以最优的线程数量会使得各种资源的利用率处于最高水平。
4、任务运行处理时间差异较大,某些任务运行时间太长的情况
如果不同的任务的执行时间有长有短,它们被提交到了同一个线程池,一个线程中需要时间短的任务很快被执行完,可能该线程接着就获取到一个时间长的任务,久而久之,线程池的所有线程都可能运行着需要时间长的任务,哪些需要时间短的任务反而都被堵在阻塞队列中无法执行。如果出现这样的情况,最好把需要时间长的任务和需要时间短的任务分开来处理。
5、任务中使用ThreadLocal的情况
ThreadLocal是对于同一个变量,每个线程看起来都好像有一个私有的值。而在线程池中的一个线程可以执行多个任务,如果在一个线程某个任务中使用了ThreadLocal变量,那当该任务执行完之后,这个线程又开始执行别的任务,上一个任务遗留下的ThreadLocal变量对这个任务是没有意义的。除非该 ThreadLocal 变量的生命周期受限于任务的生命周期,也就是在任务执行过程中创建,在任务执行完成前销毁。
这里我非常感谢写这篇文章的作者,看完后我对多线程了解了深入了,之前都是简单实用层面
简书排版关于代码的不好看,如需要可点击这里查看