对于多线程编程,如何优雅的终止子线程,始终是一个值得考究的问题。如果直接终止线程,可能会产生三个问题:
- 子线程当前执行的任务可能必须要原子的执行,即其要么成功执行,要么就不执行;
- 当前任务队列中还有未执行完的任务,直接终止线程可能导致这些任务被丢弃;
- 当前线程占用了某些外部资源,比如打开了某个文件,或者使用了某个Socket对象,这些都是无法被垃圾回收的对象,必须由调用方进行清理。
由此可见,如何优雅的终止一个线程,并不是一个简单的问题。常见的终止线程的方式是,声明一个标志位,如果调用方要终止其创建的线程的执行,就将该标志位设置为需要终止状态,子线程每次执行任务之前会检查该标志位,如果为需要终止状态,就不继续执行任务,而是进行当前线程所占用资源的一些清理工作,如关闭Socket和备份当前未完成的任务,清理完成之后结束当前线程的调用。
两阶段终止模式使用的就是上述方式进行多线程的终止的,只不过其将线程的终止封装为一个特定的框架,使用者只需要关注特定的任务执行方式即可,从而实现了线程的终止与任务的执行的关注点的分离。两阶段终止模式的UML图如下:
其各角色的作用如下:
- ThreadOwner:客户端程序,由其创建线程并执行任务,Terminatable提供的终止方法也是由其调用;
- Terminatable:终止方法提供的一个抽象接口,提供了一个terminate()方法供外部调用;
- TerminatableSupport:实现了Terminatable接口的抽象类,封装了具体的终止模板,其doRun()是一个抽象方法,子类必须实现,用于编写相关的任务的代码,doTermiate()和doCleanup()方法都是钩子方法,提供了空的实现,子类根据具体情况判断是否需要实现该方法;
- ConcreteTerminatable:用户具体的终止类,其doRun()方法用于实现具体的任务;
- TerminationToken:包含了一个标志位,并且记录了当前线程还需要执行的任务数量,默认情况下,只有其标志位为true,并且剩余需要执行的任务数为0时才会真正的终止当前线程的执行。
如下是两阶段终止模式各个类的实现,我们首先看看Terminatable接口及其抽象实现TerminatableSupport:
public interface Terminatable {
void terminate();
}
public abstract class TerminatableSupport extends Thread implements Terminatable {
public final TerminationToken terminationToken; // 记录当前的标志位
public TerminatableSupport() {
this(new TerminationToken()); // 初始化当前标志位
}
public TerminatableSupport(TerminationToken terminationToken) {
super();
this.terminationToken = terminationToken; // 初始化标志位
terminationToken.register(this); // 注册当前对象的标志位
}
protected abstract void doRun() throws Exception; // 供子类实现具体任务的方法
// 钩子方法,用于子类进行一些清理工作
protected void doCleanup(Exception cause) {}
// 钩子方法,用于子类进行终止时的一些定制化操作
protected void doTerminate() {}
@Override
public void run() {
Exception ex = null;
try {
// 在当前线程中执行任务时,会判断是否标识为终止,并且剩余任务数小于等于0,是才会真正终止当前线程
while (!terminationToken.isToShutdown() || terminationToken.reservations.get() > 0) {
doRun();
}
} catch (Exception e) {
ex = e;
} finally {
try {
doCleanup(ex); // 当前线程终止后需要执行的操作
} finally {
terminationToken.notifyThreadTermination(this);
}
}
}
@Override
public void interrupt() {
terminate();
}
@Override
public void terminate() {
terminationToken.setToShutdown(true); // 设置终止状态
try {
doTerminate(); // 执行客户端定制的终止操作
} finally {
if (terminationToken.reservations.get() <= 0) {
super.interrupt(); // 如果当前线程处于终止状态,则强制终止当前线程
}
}
}
// 提供给客户端调用的,即客户端线程必须等待终止完成之后才会继续往下执行
public void terminate(boolean waitUntilThreadTerminated) {
terminate();
if (waitUntilThreadTerminated) {
try {
this.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
当客户端调用termiante()方法时,其首先会将当前的终止状态设置为true,然后调用doTerminate()方法,这里需要注意的一点是,如果当前线程在doRun()方法中处于等待状态,比如Thread.sleep()、Thread.wait()方法等,那么即使设置了终止状态,也无法使其被唤醒,因为其无法运行到检测终止状态的代码处,其只能使用intertupt()方法才能使其被唤醒并终止,但是对于Socket.read()方法,即使调用了interrupt()方法,也无法使其终止,因而这里设置了doTerminate()方法,用于子类在该方法中关闭Socket。最后在finally块中,调用super.interrupt()方法,该调用的作用也即如果当前线程在doRun()方法中被阻塞,就强制终止其执行。
public class TerminationToken {
protected volatile boolean toShutdown = false; // 终止状态的标志位
public final AtomicInteger reservations = new AtomicInteger(0); // 记录当前剩余任务数
// 记录了所有注册了TerminationToken的实例,这里使用Queue是因为可能会有多个
// Terminatable实例共享同一个TeraminationToken,如果是共享的,那么reservations
// 实例就保存了所有共享当前TerminationToken实例的线程所需要执行的任务总数
private final Queue<WeakReference<Terminatable>> coordinatedThreads;
public TerminationToken() {
coordinatedThreads = new ConcurrentLinkedQueue<>();
}
public boolean isToShutdown() {
return toShutdown;
}
public void setToShutdown(boolean toShutdown) {
this.toShutdown = toShutdown;
}
// 将当前Terminatable实例注册到当前TerminationToken中
protected void register(Terminatable thread) {
coordinatedThreads.add(new WeakReference<>(thread));
}
// 如果是多个Terminatable实例注册到当前TerminationToken中,
// 则广播当前的终止状态,使得这些实例都会终止
protected void notifyThreadTermination(Terminatable thread) {
WeakReference<Terminatable> wrThread;
Terminatable otherThread;
while (null != (wrThread = coordinatedThreads.poll())) {
otherThread = wrThread.get();
if (null != otherThread && otherThread != thread) {
otherThread.terminate();
}
}
}
}
关于Terminatable和TerminationToken的关系是一对多的关系,即多个Terminatable实例可共用一个TerminationToken实例,而其reservations属性所保存的则是这多个Terminatable实例所共同要完成的任务数量。这里典型的多个Terminatable共用一个TerminationToken实例的例子是当有多个工作者线程时,这几个线程所消费的任务是共用的,因而其TermiantionToken实例也需要共用。
两阶段终止模式的使用场景非常的多,基本上只要是使用了子线程的位置都需要使用一定的方式来优雅的终止该线程的执行。我们这里使用生产者和消费者的例子来演示两阶段终止模式的使用,如下是该例子的代码:
public class SomeService {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
private final Producer producer = new Producer();
private final Consumer consumer = new Consumer();
public static void main(String[] args) throws InterruptedException {
SomeService ss = new SomeService();
ss.init();
TimeUnit.SECONDS.sleep(500);
ss.shutdown();
}
// 停止生产者和消费者的执行
public void shutdown() {
producer.terminate(true); // 先停止生产者,只有在生产者完全停止之后才会停止消费者
consumer.terminate(); // 停止消费者
}
// 启动生产者和消费者
public void init() {
producer.start();
consumer.start();
}
// 生产者
private class Producer extends TerminatableSupport {
private int i = 0;
@Override
protected void doRun() throws Exception {
queue.put(String.valueOf(i++)); // 将任务添加到任务队列中
consumer.terminationToken.reservations.incrementAndGet(); // 更新需要执行的任务数量
}
}
// 消费者
private class Consumer extends TerminatableSupport {
@Override
protected void doRun() throws Exception {
String product = queue.take(); // 获取任务
System.out.println("Processing product: " + product);
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(100)); // 模拟消费者对任务的执行
} catch (InterruptedException e) {
// ignore
} finally {
terminationToken.reservations.decrementAndGet(); // 更新需要执行的任务数量
}
}
}
}
可以看到,在子类使用两阶段终止模式时,其只需要实现各自所需要执行的任务,并且更新当前任务的数量即可。在某些情况下,当前任务的数量也可以不进行更新,比如在进行终止时,不关心当前剩余多少任务需要执行。