导读目录
- 线程组(ThreadGroup)
- 线程池(Thread Pool)
- Fork/Join框架和Executor框架
1.线程组
Java使用ThreadGroup来表示线程组,它可以对一批线程进行分类管理,Java允许程序直接对线程组进行控制。
用户创建的所有线程都属于指定线程组,如果没有显示指定线程属于哪个线程组,则该线程属于默认线程组(与其父线程处于同一个线程组)
一旦某个线程加入了指定线程组之后,该线程将一直属于该线程组,知道线程死亡,线程运行中途不能改变它所属的线程组
(1)指定线程组的Thread构造方法
Thread来提供的几个构造器用来设置新创建的线程属于那个线程组:
Thread(ThreadGroup group, Runnable target); //以target的run()方法为线程执行体创建新线程,属于group线程组
Thread(ThreadGroup group, Runnable target, String name);//相对于上面为线程指定了名字
Thread(ThreadGroup group, Runnable target, String name, long stackSize);
Thread(ThreadGroup group, String name);//创建新线程,名字为name,属于group线程组
//返回该线程所属的线程组,由于不能中途改变所属组,因此没有setThreadGroup()
ThreadGroup getThreadGroup();
(2)ThreadGroup相关方法
public class ThreadGroup extends Object implements Thread.UncaughtExceptionHandler
Thread.UncaughtExceptionHandler是Thread类的静态内部类,代表一个异常处理器。其作用是用于处理该线程未处理的异常
ThreadGroup类实现了Thread.UncaughtExceptionHandler接口,该接口内只有一个方法
public static interface Thread.UncaughtExceptionHandler {
//用于处理线程发生的未处理异常
void uncaughtException(Thread t, Throwable e);
}
ThreadGrou构造器
ThreadGroup(String name);//以指定的线程组名字来创建新的线程组
ThreadGroup(ThreadGroup parent, String name);//以指定的名字、指定的父线程组创建一个新的线程组
String getName(); //返回该线程组的名字,
线程组的名字一旦指定后是不允许改变的
ThreadGroup中用于操作线程组里的所有线程方法:
int activeCount(); //返回此线程组中活动线程的数目
void interrupt(); //中断此线程组中所有的线程
boolean isDaemon(); //返回此线程组是否为后台线程组
void setDaemon(boolean daemon); //设置给线程组是后台线程组
注意:后台线程组的最后一个线程执行结束后,或最后一个线程被销毁后,该后台线程组将自动销毁
void setMaxpriority(int pri);//设置线程组的最高优先级
//该方法是对Thread.UncaughtExceptionHandler接口里的方法实现。
void uncaughtException(Thread t, Throwable e); //处理该线程组内的任意线程t抛出的未处理异常。t代表出现异常的线程,e代表该线程抛出的异常
/****************/
小结:
1.每个线程都有所属的线程组(默认或指定的),而每个线程组都会异常处理方法,即这里的uncaughtException(t, e)方法。这也是所有处于该线程组里的线程默认的异常处理器(即该方法)。那么我们其实是可以单独为每一个线程指定异常处理器(即Thread.UncaughtExceptionHandler的其他实现类)
2.对于一个特定的线程实例而言,其异常处理器可能有3个:(1)单独为该线程实例指定的处理器,(2)该线程实例的所属线程类2017/8/30的处理器。(3)所在线程组的处理器
Thread类里可以设置异常处理器的方法:
static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh);//为该线程类的所有实例设置默认的异常处理器
void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh);//为指定的线程实例设置异常处理器
//自定义异常处理器
class MyExHandler implements Thread.UncaughtExceptionHandler {
// 实现uncaughtException方法,该方法将处理线程的未处理异常
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t + " 线程出现了异常:" + e);
}
}
//程序没有正常结束,说明该处理器处理完异常后还是把异常传给其调用者(这里就是JVM了)
public class ExHandler {
public static void main(String[] args) {
// 设置主线程的异常处理器
Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
int a = 5 / 0;
System.out.println("程序正常结束!");
}
}
注意:异常处理器和通过catch捕获异常是不同的。catch捕获异常时,该异常时不会向上一级调用者传播;但是异常处理器对异常处理之后,该异常仍会向上一级调用者传播(如上面的例子)。
2.线程池
系统启动一个线程的成本很高,因为它涉及与操作系统交互。因此,使用线程池可以很好的提高性能,尤其是当程序中需要创建大量的生存期很短的线程时。
类似于数据库连接池,线程池在系统启动的时候会创建大量的空闲的线程,当程序将一个Runnable对象或Callable对象传给线程池,线程池会启动一个线程来执行它们的run()或call()方法。执行完毕,该线程并不会死亡,而是返回线程池成为空闲状态,等待下一次被调用。
线程池的优点:
1.提高线程池的性能,不用重复创建线程。
2.线程池可以有效的控制系统中并发线程的数量。
(1)Executors工厂类
专门用于创建线程池,可以执行Runnable,Callable对象所代表的线程。
//Executors类
public class Executors extends Object
//Executor接口
public interface Executor
//ExecutorService接口,代表尽快执行线程(只要有空闲就立即执行)
public interface ExecutorService extends Executor
//ScheduledExecutorService是ExecutorService的子接口,用于表示一个可以在指定延迟后执行线程任务;
public interface ScheduledExecutorService extends ExecutorService
Executors提供如下几个静态工厂方法:
生产ExecutorService类的线程池(尽快立即执行)
static ExecutorService newCachedThreadPool();//创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会被缓存在线程池中。
static ExcecutorService newFixedThreadPool(int corePoolSize);//创建一个可重用的、具有固定线程数的线程池
static ExecutorService newSingleThreadExecutor();//创建一个只有单线程的线程池
生产ScheduledExecutorService<>类的线程池(延迟执行)
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);//创建具有指定线程数的线程池,可以在指定延迟后执行线程任务
static ScheduledExecutorService newSingleThreadScheduledExecutor();// 创建只有一个线程的线程池,可以在指定延迟后执行线程任务
Java8中新增的方法(相当于后台线程,用于支持多CPU的并行特点)
//parallelism参数代表"目标并行级别(即CPU的个数)"
static ExecutorService newWorkStealingPool(int parallelism);//生成的work-stealing线程池相当于后台线程城池,当所有的前台线程都死亡了,work-stealing池中的线程会自动死亡。
static ExecutorService newWorkStealingPool();//
(2)ExecutorService线程池
ExecutorService接口,代表尽快执行线程(只要有空闲就立即执行),需要程序将Runnable或Callable对象提交给该线程池,该线程池就会尽快执行该任务。
Future<?> submit(Runnable task);//Future代表Runnable任务的返回值,但run()没有返回值,故该方法执行结束后返回null
注意:可以使用Future的boolean isCancelled()、boolean isDone()来获得Runnable对象的执行状态
<T> Future<T> submit(Runnable task, T result); //result显示指定线程执行结束后的返回值,即run()执行结束后的值
<T> Future<T> submit(Callable<T> task);//执行Callable对象代表的任务,Future代表执行结束的返回值,即call()方法的返回值
void shutdown(); //将启动线程池的关闭序列,关闭后不再接受新任务,但会将以前提交的任务执行完成。
List<Runnable> shutdownNow(); //试图停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表
(3)ScheduledExecutorService线程池
代表可在指定延迟后或周期性的执行线程任务的线程池。提供如下4个方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//指定callable任务,在delay延迟后执行
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//指定commandr任务,将在delay延迟后执行
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//指定command任务,将在initialDelay延迟后,以period为周期重复执行
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);//在initialDelay延迟后开始执行,并在每一次执行终止和下一次执行开始之间都有固定的延迟
(4)使用线程池来执行线程任务的步骤:
1.获得线程池:调用Executors类的静态工厂方法创建一个ExecutorService对象。即获得一个线程池
2.创建执行体:创建Runnable对象或Callable对象的实例,作为线程执行任务
3.提交任务:调用ExecutorService对象的submit()方法来提交Runnable或Callable实例
4.关闭线程池:当不想提交任何任务时,调用ExecutorService的shutdown()方法来关闭线程池
例子
ExecutorService pool = Executors.newFixedThreadPool(1);//1
//2. 使用Lambda表达式创建Runnable对象
Runnable target = () -> {
for (int i = 0; i < 100 ; i++ )
{
System.out.println(Thread.currentThread().getName()
+ "的i值为:" + i);
}
};
// 3.向线程池中提交两个线程
pool.submit(target);
pool.submit(target);
//4. 关闭线程池
pool.shutdown();
3.Fork/Join框架
执行器框架(Executor Framework)将任务的创建和执行进行了分离,通过这个框架,只需要实现Runnable接口的对象和使用Executor对象,然后将Runnable对象发送给执行器。执行器再负责运行这些任务所需要的线程,包括线程的创建,线程的管理以及线程的结束。
java 7则又更进了一步,它包括了ExecutorService接口的另一种实现,用来解决特殊类型的问题,它就是Fork/Join框架,有时也称分解/合并框架。
Fork/Join框架是用来解决能够通过分治技术(Divide and Conquer Technique)将问题拆分成小任务的问题。在一个任务中,先检查将要解决的问题的大小,如果大于一个设定的大小,那就将问题拆分成可以通过框架来执行的小任务。如果问题的大小比设定的大小要小,就可以直接在任务里解决这个问题,然后,根据需要返回任务的结果。
Fork/Join框架基于以下两种操作:
分解(Fork)操作:当需要将一个任务拆分成更小的多个任务时,在框架中执行这些任务;
合并(Join)操作:当一个主任务等待其创建的多个子任务的完成执行。
Fork/Join框架和执行器框架(Executor Framework)主要的区别在于工作窃取算法(Work-Stealing Algorithm)
与执行器框架不同,使用Join操作让一个主任务等待它所创建的子任务的完成,执行这个任务的线程称之为工作者线程(Worker Thread)。工作者线程寻找其他仍未被执行的任务,然后开始执行。
Fork/Join框架的核心是由下列两个类组成的。
ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。
ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。
Fork/Join框架提供了在一个任务里执行fork()和join()操作的机制和控制任务状态的方法。通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类:
RecursiveAction:用于任务没有返回结果的场景。
RecursiveTask:用于任务有返回结果的场景。
ForkJoinPool(通用池)
ForkJoinPool类是一个特殊的Executor执行器类型,
为了利用多CPU的特点,将一个任务拆成多个小任务,把多个小任务放在多个处理器上并行(不是并发哦)执行;当这些小任务执行结束后,在将这些诶结果合并起来即可
插入图
构造器:
ForkJoinPool();//以Runntime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool,即以实际的核心数作为目标并行级别
ForkJoinPool(int parallelism);//创建一个包含parallelism个并行线程的ForkJoinPool
Java8扩展了ForkJoinPool的功能,提供如下2个新的方法:
static ForkJoinPool commonPool();//返回一个通用池,该通用池不受shutdown()或shutdownNow()方法的影响。
static int getCommonPoolParallelism();//返回通用池的并行级别
//执行指定的任务:ForkJoinTask
<T> ForkJoinTask<T> submit(ForkJoinTask<T> task);
<T> T invoke(ForkJoinTask<T> task);
当然了,也可以直接执行Runnable任务
void execute(Runnable task)
(2)ForkJoinTask类
上面的ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是抽象类,它有两个抽象的子:
RecursiveAction(代表无返回值的任务),
protected abstract void compute();//任务的主要计算是通过这个方法, 无返回值
RecursiveTask(代表有返回值的任务)
protected abstract V compute();//有返回值
方法:
ForkJoinTask<V> fork(); //安排异步执行此任务(即执行方法的调用者里封装的任务)
V join(); //返回计算完成时的结果。
虽然ForkJoinPool类是设计用来执行ForkJoinTask对象的,但也可以直接用来执行Runnable和Callable对象。当然,也可以使用ForkJoinTask类的adapt()方法来接收一个Callable对象或者一个Runnable对象,然后将之转化为一个ForkJoinTask对象,然后再去执行。
static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable); //将一个Callable对象封装成ForkJoinTask对象,然后再通过ForkJoinPool来执行
static ForkJoinTask<?> adapt(Runnable runnable);//同上面,只是封装的Runnable对象
执行不需要返回值的任务事例
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolTest {
public static void main(String[] args) throws Exception{
//1.创建执行器(即通用线程池)
ForkJoinPool pool = new ForkJoinPool();
//2.创建任务
PrintTask task = new PrintTask(0, 500);
//3.提交任务(即执行任务)
pool.submit(task);
pool.awaitTermination(2, TimeUnit.SECONDS);//表示当所有任务执行结束后,程序在此阻塞2s
//4.关闭线程池
pool.shutdown();
}
}
//任务类
class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 50;
private int start;
private int end;
//打印从start到end的任务
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end - start < THRESHOLD) {
for(int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "打印 " + i);
}
}else {
int mid = (start + end) / 2;
PrintTask left = new PrintTask(start, mid);
PrintTask right = new PrintTask(mid, end);
//并行执行这两个任务
left.fork();
right.fork();
//由于没有返回值,因此这里就不用合并结果了
}
}
}
带返回值的事例
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.Future;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinPoolTest2 {
public static void main(String[] args) throws Exception{
//1.创建执行器(即通用线程池)
ForkJoinPool pool = new ForkJoinPool();
//2.创建任务
int[] arrs = new int[1000];
for(int i = 0; i < arrs.length; i++) {
arrs[i] = i + 1;
}
PrintTask task = new PrintTask(arrs, 0, arrs.length);
//3.提交任务(即执行任务)
Future<Integer> f = pool.submit(task);
//4.返回结果
int sum = f.get()
System.out.println(sum);
//4.关闭线程池
pool.shutdown();
}
}
//任务
class PrintTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20;
private int start;
private int end;
private int[] arrs; //数组
//累加数组元素值
public PrintTask(int[] arrs, int start, int end) {
this.start = start;
this.end = end;
this.arrs = arrs;
}
@Override
protected Integer compute() {
int sum = 0;
if(end - start < THRESHOLD) {
for(int i = start; i < end; i++) {
sum += arrs[i];
}
return sum;
}else {
int mid = (start + end) / 2;
PrintTask left = new PrintTask(arrs, start, mid);
PrintTask right = new PrintTask(arrs, mid, end);
//并行执行这两个任务
left.fork();
right.fork();
//把每个小任务的结果加起来
return left.join() + right.join();
}
}
}