CompletionService

《Java并发编程实践》一书6.3.5节CompletionService:Executor和BlockingQueue,有这样一段话:

  "如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。"

这是什么意思呢?我们通过一个例子,分别使用繁琐的做法和CompletionService来完成,清晰的对比能让我们更好的理解上面的一段话和CompletionService这个API提供的初衷。考虑这样的场景,有5个Callable任务分别返回5个整数,然后我们在main方法中按照各个任务完成的先后顺序,在控制台打印返回结果。

[java] view plain copy

package net.aty.completeservice;  


import java.util.concurrent.Callable;  

import java.util.concurrent.TimeUnit;  


public class ReturnAfterSleepCallable implements Callable<Integer>  

{  

    private int sleepSeconds;  


    private int returnValue;  


    public ReturnAfterSleepCallable(int sleepSeconds, int returnValue)  

    {  

        this.sleepSeconds = sleepSeconds;  

        this.returnValue = returnValue;  

    }  


    @Override  

    public Integer call() throws Exception  

    {  

        System.out.println("begin to execute.");  


        TimeUnit.SECONDS.sleep(sleepSeconds);  


        System.out.println("end to execute.");  


        return returnValue;  

    }  

}  

这个任务会接受2个参数,睡眠指定的时间后,返回指定的结果。睡眠时间越短,意味着任务越先执行完成。

1.繁琐的做法

通过一个List来保存每个任务返回的Future,然后轮询这些Future,直到每个Future都已完成。我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,所以在调用get方式时,需要将超时时间设置为0。

[java] view plain copy

package net.aty.completeservice;  


import java.util.ArrayList;  

import java.util.List;  

import java.util.concurrent.ExecutionException;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import java.util.concurrent.Future;  

import java.util.concurrent.TimeUnit;  

import java.util.concurrent.TimeoutException;  


public class TraditionalTest  

{  

    public static void main(String[] args)  

    {  

        int taskSize = 5;  


        ExecutorService executor = Executors.newFixedThreadPool(taskSize);  


        List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();  


        for (int i = 1; i <= taskSize; i++)  

        {  

            int sleep = taskSize - i; // 睡眠时间  


            int value = i; // 返回结果  


            // 向线程池提交任务  

            Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value));  


            // 保留每个任务的Future  

            futureList.add(future);  

        }  


        // 轮询,获取完成任务的返回结果  

        while(taskSize > 0)  

        {  

            for (Future<Integer> future : futureList)  

            {  

                Integer result = null;  


                try  

                {  

                    result = future.get(0, TimeUnit.SECONDS);  

                } catch (InterruptedException e)  

                {  

                    e.printStackTrace();  

                } catch (ExecutionException e)  

                {  

                    e.printStackTrace();  

                } catch (TimeoutException e)  

                {  

                    // 超时异常需要忽略,因为我们设置了等待时间为0,只要任务没有完成,就会报该异常  

                }  


                // 任务已经完成  

                if(result != null)  

                {  

                    System.out.println("result=" + result);  


                    // 从future列表中删除已经完成的任务  

                    futureList.remove(future);    

                    taskSize--;  

                    //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)   

                    break; // 进行下一次while循环  

                }  

            }  

        }  


        // 所有任务已经完成,关闭线程池  

        System.out.println("all over.");  

        executor.shutdown();  

    }  




}  

可见轮询future列表非常的复杂,而且还有很多异常需要处理,TimeOutException异常需要忽略;还要通过双重循环和break,防止遍历集合的过程中,出现并发修改异常。这么多需要考虑的细节,程序员很容易犯错。

2.使用CompletionService

[java] view plain copy

package net.aty.completeservice;  


import java.util.concurrent.CompletionService;  

import java.util.concurrent.ExecutionException;  

import java.util.concurrent.ExecutorCompletionService;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  


public class CompletionServiceTest  

{  

    public static void main(String[] args)  

    {  

        int taskSize = 5;  


        ExecutorService executor = Executors.newFixedThreadPool(taskSize);  


        // 构建完成服务  

        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(  

                executor);  


        for (int i = 1; i <= taskSize; i++)  

        {  

            int sleep = taskSize - i; // 睡眠时间  


            int value = i; // 返回结果  


            // 向线程池提交任务  

            completionService  

                    .submit(new ReturnAfterSleepCallable(sleep, value));  

        }  


        // 按照完成顺序,打印结果  

        for (int i = 0; i < taskSize; i++)  

        {  

            try  

            {  

                System.out.println(completionService.take().get());  

            } catch (InterruptedException e)  

            {  

                e.printStackTrace();  

            } catch (ExecutionException e)  

            {  

                e.printStackTrace();  

            }  

        }  


        // 所有任务已经完成,关闭线程池  

        System.out.println("all over.");  

        executor.shutdown();  

    }  

}  

可见使用CompletionService不会有TimeOutExeception的问题,不用遍历future列表,不用担心并发修改异常。

3.CompletionService和ExecutorCompletionService的实现

   JDK源码中CompletionService的javadoc说明如下:

[java] view plain copy

/** 

 * A service that decouples the production of new asynchronous tasks 

 * from the consumption of the results of completed tasks.  Producers 

 * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt> 

 * completed tasks and process their results in the order they 

 * complete.  

 */  

也就是说,CompletionService实现了生产者提交任务和消费者获取结果的解耦,生产者和消费者都不用关心任务的完成顺序,由CompletionService来保证,消费者一定是按照任务完成的先后顺序来获取执行结果。

ExecutorCompletionService是CompletionService的实现,融合了线程池Executor和阻塞队列BlockingQueue的功能。

[java] view plain copy

public ExecutorCompletionService(Executor executor) {  

       if (executor == null)  

           throw new NullPointerException();  

       this.executor = executor;  

       this.aes = (executor instanceof AbstractExecutorService) ?  

           (AbstractExecutorService) executor : null;  

       this.completionQueue = new LinkedBlockingQueue<Future<V>>();  

   }  

到这里可以推测,按照任务的完成顺序获取结果,就是通过阻塞队列实现的,阻塞队列刚好具有这样的性质:阻塞和有序。

ExecutorCompletionService任务的提交和执行都是委托给Executor来完成。当提交某个任务时,该任务首先将被包装为一个QueueingFuture

[java] view plain copy

public Future<V> submit(Callable<V> task) {  

        if (task == null) throw new NullPointerException();  

        RunnableFuture<V> f = newTaskFor(task);  

        executor.execute(new QueueingFuture(f));  

        return f;  

}  

QueueingFuture是FutureTask的一个子类,通过改写FutureTask类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

[java] view plain copy

 /** 

  * FutureTask extension to enqueue upon completion 

  */  

private class QueueingFuture extends FutureTask<Void> {  

        QueueingFuture(RunnableFuture<V> task) {  

            super(task, null);  

            this.task = task;  

        }  

        protected void done() { completionQueue.add(task); }  

        private final Future<V> task;  

}  

这里简单说明下:FutureTask.done(),这个方法默认什么都不做,就是一个回调,当提交的线程池中的任务完成时,会被自动调用。这也就说时候,当任务完成的时候,会自动执行QueueingFuture.done()方法,将返回结果加入到阻塞队列中,加入的顺序就是任务完成的先后顺序。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,932评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,554评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,894评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,442评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,347评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,899评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,325评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,980评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,196评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,163评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,085评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,826评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,389评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,501评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,753评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,171评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,616评论 2 339

推荐阅读更多精彩内容