十三、多线程执行器

十三、多线程执行器

FutureTask 和 Callable

可以用来启动一个需要很长时间的计算任务


        // Callable 与 Runnable 类似,但是有返回值
        Callable<Integer> callable=()->{
            int sum=0;
            for (int i = 0; i < 1000000000; i++) {
                sum+=i;
            }
            return sum;
        };


        // FutureTask 可以将 Callable 包装成实现了 Runnable 和 Future 接口的对象
        FutureTask<Integer> futureTask = new FutureTask<>(callable);

        // 开启线程运行任务
        Thread thread=new Thread(futureTask);
        thread.start();

        
        // 获取任务的执行结果,未计算完会阻塞
        // 如果计算线程被中断,抛出InterruptedException
        futureTask.get();
        //
        try {
            // 设置最长阻塞时间,超时还没得出结果会抛出异常
            futureTask.get(10L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


        // 如果计算还没开始,取消该任务。如果计算处于运行中,且参数为 true ,则计算会被中断
        futureTask.cancel(true);

        futureTask.isDone();
        futureTask.isCancelled();

        System.out.println(futureTask.get());
线程池

线程创建是有消耗的,可以提前创建一些线程,需要的时候直接使用。维护这些线程的池子,也就是线程池

执行器

通过 Executors 的工厂方法获得线程池的执行器 ExecutorService

用 ExecutorService 来执行线程并得到一个 Future

用 Future 监视线程执行状况


        // 执行器 
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 必要时创建新线程,空闲线程会被保留 60秒
        Executors.newCachedThreadPool();
        // 包含固定数量的线程,空闲线程会一直被保留,参数为线程数
        Executors.newFixedThreadPool(32);
        // 只有一个线程的“池”,会按顺序执行提交的任务
        Executors.newSingleThreadExecutor();
        // 用于预定执行一些任务而构建的固定线程池,支持定时及周期性的执行任务,参数为线程数
        Executors.newScheduledThreadPool(32);
        // ScheduledThreadPool 的单线程“池”版
        Executors.newSingleThreadScheduledExecutor();


        Runnable runnable=()->{};
        Callable callable=()->1;
        Future future;
        // 提交一个 Runnable 或者 Callable 对象给 Executor 并返回一个 future 来监控执行
        // 调用 future.get() 会返回 null ,runnable 没返回值
        future=executorService.submit(runnable);
        // 调用 future.get() 会在任务完成后,把第二个参数传入的对象返回
        future=executorService.submit(runnable,new Object());

        future=executorService.submit(callable);


        // 当不在需要提交任何任务的时候调用,关闭执行器,不再接受新任务,当所有任务完成后,线程池的线程死亡
        executorService.shutdown();
        // 会取消尚未开始的任务,并且试图中断正在运行的线程
        executorService.shutdownNow();

预定和周期性的执行任务

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(12);
        
        Runnable runnable=()->{};
        Callable callable=()->1;
        // 在指定延迟后执行任务 , 在 10s 后执行任务
        executorService.schedule(runnable,10, TimeUnit.SECONDS);
        executorService.schedule(callable,10, TimeUnit.SECONDS);
        // 在指定延迟后周期性的执行任务, 10s 后开始,每天都在这个时间执行任务
        executorService.scheduleAtFixedRate(runnable,10,60*60*24,TimeUnit.SECONDS);
        // 在指定延迟后,固定间隔的执行任务, 10s 后开始,执行完任务隔 1 小时后,再次执行任务
        executorService.scheduleWithFixedDelay(runnable,10,60*60*1, TimeUnit.SECONDS);
任务组

可以控制一组任务的执行

invokeAny : 任一任务得出结果,即返回结果

ExecutorCompletionService:可以先取出有结果的任务


        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Callable<Integer> callable1=()->1;
        Callable<Integer> callable2=()->2;
        //Callable<Integer> callable2=()->{while (true);};

        List<Callable<Integer>> callables=new ArrayList();
        callables.add(callable1);
        callables.add(callable2);

        // 执行集合中的所有任务,且任意一个执行完成得出结果,即将结果返回,但是其他未执行完的线程仍会继续执行
        // 可以用在比如说多线程分段查找,任何一个线程找到了就可以返回结果了
        Integer result = executorService.invokeAny(callables);
        // 可以设置超时时间,超时未得出结果会抛出 TimeOutException
        executorService.invokeAny(callables,1,TimeUnit.SECONDS);


        // 2,1,1,2,2,1,1,1,1,1,  且每次程序运行结果不尽相同
        for (int i = 0; i < 10; i++) {System.out.print(executorService.invokeAny(callables)+",");}



        // 执行集合中所有任务,并返回 Future 的集合 ,此方法会等所有任务都有结果了才返回
        List<Future<Integer>> futures = executorService.invokeAll(callables);
        // 与 invokeAny 一样也有带超时参数的版本
        executorService.invokeAll(callables,1,TimeUnit.SECONDS);



        // 构建一个能收集完成服务执行器,内部会维护一个阻塞队列 BlockingQueue,包含已完成执行的任务的future
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
        for (Callable<Integer> callable : callables) {
            executorCompletionService.submit(callable);
        }
        System.out.println("******");
        // take 方法取出一个已经执行完的任务的 future ,如果没有完成的任务,则会阻塞(底层的 BlockingQueue.take() 阻塞了)
        executorCompletionService.take().get();
        executorCompletionService.take().get();
        // 只有两个任务结果已经全部取出,要是再取就阻塞了
        //executorCompletionService.take().get();

        // 取出一个完成的任务结果,如果没有 返回 null
        executorCompletionService.poll();
        // 没有结果的时候会等待给定时间 (就是BlockingQueue.poll)
        executorCompletionService.poll(2,TimeUnit.SECONDS);

        executorService.shutdown();
Fork-Join 框架

Jave SE 7 提供了 fork-join 框架,可以将任务分段并行执行

一个计算1-1000(不含1000)的和 ,用法如下,



public class MyForkJoin {

    public static void main(String[] args) {

        int start=1;
        int end=1000;
        // 创建线程池
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        // 创建任务,
        MyTask myTask = new MyTask(start, end);
        // 执行任务
        forkJoinPool.invoke(myTask);
        // 获得任务执行结果
        System.out.println(myTask.join());

        // 一般算法对比看计算结果是否准确
        System.out.println(sum(start,end));
    }


    static  int sum(int start,int end){
        int sum=0;
        for (int i = start; i < end; i++) {
            sum+=i;
        }
        return sum;
    }
}
// 如果需要返回计算结果 则继承 RecursiveTask<T>
// 如果不需要生成结果,则继承 RecursiveAction
class MyTask extends RecursiveTask<Integer> {
    // 设置分组每组长度的界限
    static int threshold = 10;
    // 分组起始
    int start;
    // 分组结束
    int end;

    public MyTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        // 小于每组长度就不分了 直接执行计算
        if (end - start <= threshold) {
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 继续分组
            int mid = (start + end) / 2;
            MyTask first = new MyTask(start, mid);
            MyTask second = new MyTask(mid, end);
            // 执行两个子任务 , 这明明是个任务类,那么线程池是从哪里来的呢, ForkJoinWorkerThread 内有一个 pool ,通过 Thread.currentThread() 拿到线程再拿到 pool
            invokeAll(first, second);
            // 因为这个任务的逻辑是求总和,将两个子任务求和,作为本任务的结果返回
            return first.join() + second.join();
        }
    }
}
CompletableFuture

可以在任务完成后,按顺序执行一些工作的 Future

并没有明白具体怎么用,见 执行器-可完成的Future


        // public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
        CompletableFuture completableFuture = new CompletableFuture();

        // 每个方法都返回 CompletableFuture ,在 Future 完成后执行后面的操作,且这些方法是非阻塞的
        completableFuture.thenApply((t)->1);
        completableFuture.thenCompose((t)->1);
        //  completableFuture.thenXxxxx 还有很多方法

        // 可以实现一个任务完成后按顺序执行一些工作
        completableFuture.thenApply((t)->1).thenCompose((t)->1).thenAccept((t)->{});
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,302评论 5 470
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,232评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,337评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,977评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,920评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,194评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,638评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,319评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,455评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,379评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,426评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,106评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,696评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,786评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,996评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,467评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,043评论 2 341