Exectuor中任务与线程的取消

任务的取消


1.轮询volatile变量

任务取消最简单的方式就是每次循环入口都去轮询volatile变量,但问题也显而易见

public class Producer implements Runnable{
  private volatile boolean cancelled;
  private final List<Product> productsQueue
                            =new LinkedBlockingQueue<>(20);
  public void run(){
    Product p =new Product();
    while(!cancelled)
        p=produce();
        productsQueue.put(p);//当队列空间不足,put可能会阻塞,那么就无法去轮询cancelled变量,造成的结果就是可能长时间内无法响应取消
    }
} 

2.轮询线程的中断状态

这里先介绍下我对Thread.interrupt()的理解,所谓interrupt,并非是强制性的打断,而是提出一个中断建议,建议你这个线程要停下来,至于你这个线程到底是听我的停下来了,还是直接无视我,亦或是保留我这个意见,都由你自己决定,和我不相干。这三种情况,也对应了三种代码实现,抛出InterruptedException提前返回(听我的停下来了),catch了这个异常但是毫无处理(直接无视我),catch后返回上层调用函数时再次调用interrupt()恢复中断状态,交给上层处理(保留意见)。下面见其代码核心部分

public void interrupt() {
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }

简而言之,interrupt方法只是去设置一个flag,不同的方法对这个flag的反应也不同,我再贴上jdk对于“不同方法不同反应”的解释

4个if即为4种情况

不完全概括

if the thread has started, and was not in a sleep() or wait() method, then calling interrupt() on it does not throw an exception. Instead it sets the thread's interrupted flag.

java这种中断机制优于抢占式中断是好疑问的,抢占式的中断可能会造成数据的不一致,而java的中断机制则更优雅,但是处理起来也需要更加细心。
言归正传,用检测线程中断的方式代替上述的volatile的变量后,run核心代码变化如下

    public void run(){
    try{
     Product p=null;
     while(!Thread.currentThread.isInterrupted()){ //循环入口检测一次
        p=produce();
        productsQueue.put(p);//这里会检测第二次
     }
    }catch(InterruptedException e){
        /*自己决定如何处理*/
    }
}

因为put方法能检测到线程中断,所以就避免了上面用volatile变量时问题。
关于为什么put函数能检测线程中断,我谈一下我的理解,我贴一段LinkedBlockingQueue.put()的代码(一些其他代码被我省去了,有心的读者可以自己去jdk里面再细看)

public void put(E e) throws InterruptedException {
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
            while (count.get() == capacity) {
                notFull.await(); //当队列已满时,生产者会调用Condition.await()阻塞自己
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
上面代码中的notFull.await()方法实质上调用的就是这个方法

3.通过Future取消

Future有个cancel方法

boolean cancel(boolean mayInterruptIfRunning);

对于这个参数,官方解释如下

@param mayInterruptIfRunning true if the thread executing this
task should be interrupted; otherwise, in-progress tasks are allowed
to complete

在FutureTask中非常直观的展现这个参数的意义

if (mayInterruptIfRunning) {
      Thread r = runner;
      if (r != null){
          r.interrupt();
      }
 }

Future一般都是由ExecutorService的submit方法返回的(大家看源代码就知道,就是把callable封装了返回),示例代码如下

public void taskRun(Runnable task){
  Future f=executorService.submit(task);
  try{
    f.get(10,Timeunit.SECONDS);
  }catch(Exception e1){    
  }finally{
    f.cancel(true);
  }
}

这里补充一下submit和execute的区别,submit比execute的功能更丰富些,可以返回一个future,并且他们对于异常的处理是不同的。

There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException

关于ExecutorService对于异常的处理,可以看下这里的讨论,下篇博客会好好分析异常。

处理不可中断的阻塞

大部分阻塞方法都会抛出InterruptedException来响应中断请求,然而也有一些阻塞方法并不响应中断,对于这类方法,调用interrupt方法除了设置flag以外毫无作用(并不会提前返回),对于这类方法,Doug Lea总结了一下(文章后面会有重写newTaskFor方法来处理这一类问题)

In the case of socket I/O, if a thread closes the socket, blocking I/O operations on that socket in other threads will complete early with a SocketException. The nonblocking I/O classes in java.nio also do not support interruptible I/O, but blocking operations can similarly be canceled by closing the channel or requesting a wakeup on the Selector. Similarly, attempting to acquire an intrinsic lock (enter a synchronized block) cannot be interrupted, but ReentrantLock supports an interruptible acquisition mode.

4.重写AbstractExecutorService中newTaskFor实现取消

这一点也是Doug Lea书上7.1.7上所述的,一开始我也没太看懂,后来看了下jdk中对于AbstractExecutorService的文档,对这点也算是稍微理解了

Provides default implementations of ExecutorService execution methods. This class implements the submit, invokeAny and invokeAll methods using a RunnableFuture returned by newTaskFor, which defaults to the FutureTask class provided in this package. For example, the implementation of submit(Runnable) creates an associated RunnableFuture that is executed and returned. Subclasses may override the newTaskFor methods to return RunnableFuture implementations other than FutureTask.

简单的解释下这段文字的意思,AbstractExecutorService实现了几个 ExecutorService的方法,比如submit,invokeAny,invokeAll,这些方法都是将传进来的callable通过newTaskFor方法转化为RunnableFuture(就是一个可runnable也可作为future的类,FutureTask就是实现了这样的接口),submit(Runnable)方法就是生成了对应的可执行可返回的RunnableFuture,子类可以重写newTaskFor方法以返回除了FutureTask外其他的RunnableFuture了。

先贴上jdk里面给的示例代码

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    //自定义一个RunnableFuture
   static class CustomTask<V> implements RunnableFuture<V> {...}
  //重写newTaskFor方法
   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
       return new CustomTask<V>(c);
   }
   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
       return new CustomTask<V>(r, v);
   }
   // ... add constructors, etc.
 }

这样后,再看下书上给的例子,可能会更好理解

   public interface CancellableTask<T> extends Callable<T> {
        void cancel();
        RunnableFuture<T> newTask();
    }
    public class CancellingExecutor extends ThreadPoolExecutor {
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable){
            //判断传进来的callable是否是我们要特殊处理的task
            if (callable instanceof CancellableTask)
                return ((CancellableTask<T>) callable).newTask();
            else
                return super.newTaskFor(callable);
        }
    }
    public abstract class SocketUsingTask<T> implements CancellableTask<T> {
        private Socket socket;
        protected synchronized void setSocket(Socket s) {
            socket = s;
        }
        public synchronized void cancel() {
            try {
                if (socket != null)
                    socket.close();
            } catch (IOException ignored) {
            }
        }

        public RunnableFuture<T> newTask() {
            return new FutureTask<T>(this) {
                public boolean cancel(boolean mayInterruptIfRunning) {
                    try {
                        SocketUsingTask.this.cancel();
                    } finally {
                        return super.cancel(mayInterruptIfRunning);
                    }
                }
            };
        }
    }

这例子实质上也非常粗糙,只是提供了一个思路,和jdk所给的例子一样。只是对于任务的取消多了一个选择,可重写newTaskFor方法实现自定义的取消。

线程的取消(停止)


1.Poison对象

线程的取消书上介绍了“Poison”方法即加一个毒药对象进入工作队列,当消费者碰到这个对象时就结束自己的工作,打个比方,就是工厂流水线里面放一个和产品不同的标记物,一旦流水线过来工人看到这个标记物了,那么就停止生产,但是这样做的缺点就是缺乏灵活性,只有生产者和消费者数量是已知的情况下,才能用毒药对象,不然控制起来难度太大。

2.关闭ExecutorService

因为Thread大多情况下是有线程池去管理的,所以关闭ExectuorService相比于前面提到的方法,更为普遍。

void shutdown();               //拒绝新任务,没完成的旧任务接着干
List<Runnable> shutdownNow();  //拒绝新任务,没完成的旧任务挨个取消,返回提交了但没有被实行的任务

shutdownNow的局限性也在于此,对于那些已经开始但尚未结束的任务,我们无法收集。如果有需求,我们可以在和上文中的newTaskFor一样,去覆盖AbstractExecutorService的execute方法。

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());
    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(...);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }
   //注意finally代码块
    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
// delegate other ExecutorService methods to exec
}

3.非正常线程的终止

所谓的非正常中止,大部分情况下都是由RuntimeException造成的,在Java中,为每个线程设置UncaughtExceptionHandler可以捕捉这些uncheckedException,在executorService中,你可以选择重写ThreadFactory的newThread方法,或者讲异常的捕捉全部放在Runnable&Callable中,也可以改写ThreadPoolExecutor.afterExcute方法,当然,这三种方法不是互相全等的,都有自己的使用场景,这里也要牵涉到execute和submit的差异,还有为什么会造成这种差异,都是需要深究的,因为篇幅受限(写太长看不下去啊),我准备在下一篇博客中,好好地分析下在ExecutorService中的异常处理机制。

4.JVM ShutdownHook

最后提及下所谓的JVM钩子,定义如下:

A shutdown hook is simply an initialized but unstarted thread. When the virtual machine begins its shutdown sequence it will start all registered shutdown hooks in some unspecified order and let them run concurrently.

简单的翻译下,ShutdownHook就是一个注册在jvm上当jvm退出时才会启动的线程,可用来做一些结尾扫荡什么的,若注册了多个钩子,那么他们会在jvm退出时一起运行,但顺序是随机的。
对于ShutdownHook,设计时需要满足以下几点:

  1. 所做之事一定要简短并尽快结束,因为该方法可能会被某些内部错误而中止(比如操作系统直接kill了,因此甚至极端情况下这个方法都不会被调用),而且不能因为这个方法导致jvm退出需要极长的时间。
  2. 所做之事一定要是线程安全的,并且要避免死锁。
    给一段代码示例加深印象
public void start() {
        //注册钩子
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                try { LogService.this.stop(); }
                catch (InterruptedException ignored) {}
            }
        });
    }

5.Finalize

只说一点,看完这方法就别当这方法存在就行了。

本文参考资料
http://ibruce.info/2013/12/19/how-to-stop-a-java-thread/
http://forward.com.au/javaProgramming/HowToStopAThread.html
https://www.ibm.com/developerworks/java/library/j-jtp05236/index.html
https://dzone.com/articles/know-jvm-series-2-shutdown

这篇博客主要是自己对并发编程实战中所提及的任务与线程的取消的一些个人思路梳理,写着写着发现异常处理也是大坑,准备下一篇开坑,争取两天内整理好,今天情人节,这篇文章前前后后写了五六个小时,因为怕误导也不想含糊其辞。总之收获很多!最后祝单身狗们,年年有今日,岁岁有今朝,哈哈!晚安,明天好好分析下Executor中的异常处理

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容

  • 下面是我自己收集整理的Java线程相关的面试题,可以用它来好好准备面试。 参考文档:-《Java核心技术 卷一》-...
    阿呆变Geek阅读 14,729评论 14 507
  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 811评论 0 3
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,510评论 18 139
  • 先看几个概念:线程:进程中负责程序执行的执行单元。一个进程中至少有一个线程。多线程:解决多任务同时执行的需求,合理...
    yeying12321阅读 536评论 0 0