一起来学Java8(八)——ForkJoin

在一起来学Java8(七)——Stream中我们了解了reduce的用法,其中并行流的底层是使用了分支/合并框架。

分支/合并框架的核心思想是把一个大的任务拆分成多个子任务,然后把每个子任务的执行结果整合起来,返回一个最终结果。

ForkJoinPool

分支/合并框架的核心类是java.util.concurrent.ForkJoinPool,从名称中可以看到它是一个线程池,线程数量是默认处理器数量,可以通过下面这句话来改变线程数:

System.setProperty("jaav.util.concurrent.ForkJoinPool.common.parallelism", "8");

RecursiveTask

前面说到了ForkJoinPool类是一个线程池,那么RecursiveTask的做用就是生成一个任务,然后把这个任务放到ForkJoinPool当中去。

RecursiveTask类是一个抽象类,并且有一个抽象方法

protected abstract V compute();

这个方法的主要功能是拆分任务逻辑,直到无法拆分时返回子任务的执行结果。

下面从一个简单的例子来说明各个类的使用方式。

这个例子演示将一个数组内的所有数字相加,得到一个总和。思想是将数组进行对半拆分,得到两个子数组,然后再对两个子数组进行拆分,以此类推,直到数组长度小于等于10的时候不再拆分。

@AllArgsConstructor
    static class NumberAddTask extends RecursiveTask<Long> {

        // 存放数字
        private long[] numbers;
        // 计算的起始位置
        private int startIndex;
        // 计算的结束位置
        private int endIndex;

        @Override
        protected Long compute() {
            int len = endIndex - startIndex;
            // 数组长度小于10,无法拆分,开始运算
            if (len <= 10) {
                return execute();
            }
            // 拆分左边的子任务
            NumberAddTask leftTask = new NumberAddTask(numbers, startIndex, startIndex + len / 2);
            // 将子任务加入到ForkJoinPool中去
            leftTask.fork();
            // 创建右边的任务
            NumberAddTask rightTask = new NumberAddTask(numbers, startIndex + len / 2, endIndex);
            // 执行右边的任务
            Long rightSum = rightTask.compute();
            // 读取左边的子任务结果,这里会阻塞
            Long leftSum = leftTask.join();
            // 合并结果
            return leftSum + rightSum;
        }

        private long execute() {
            long sum = 0;
            for (int i = startIndex; i < endIndex; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }

运行代码:

long startTime = System.currentTimeMillis();
// 生成一个数组,存放1,2,3,4....
long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
// 创建一个任务,起始位置是0,结束位置是数组的长度
NumberAddTask numberAddTask = new NumberAddTask(numbers, 0, numbers.length);
// 将任务加入到线程池中运行,得到总和
Long sum = new ForkJoinPool().invoke(numberAddTask);
long time = System.currentTimeMillis() - startTime;
System.out.println("sum:" + sum + ", 耗时:" + time + "毫秒");

打印:

sum:500000500000, 耗时:63毫秒

Spliterator

Spliterator是Java8新增的一个接口,从名字上可解读出两种意思:split,iterate,即该接口提供分割迭代的功能。Spliterator接口需要配合Stream一起使用。

使用方式:

// 创建顺序执行的Stream
Stream stream = StreamSupport.stream(Spliterator, false);

// 创建并行的Stream
Stream stream = StreamSupport.stream(Spliterator, true);

这两行代码即为Collection.stream()方法的默认实现。

Spliterator接口声明了4个抽象方法,需要开发者自己实现。

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();
}

boolean tryAdvance(Consumer<? super T> action):方法用于遍历获取元素,然后通过Consumer来执行,如果取到元素返回true,否则返回false

Spliterator<T> trySplit():执行分割操作,如果集合还能再继续分割,则返回一个新的Spliterator,如果不能继续分割则返回null

long estimateSize():用来返回剩余元素数量

int characteristics():指定集合一些特性。

characteristics特性列表如下:

特性 含义
ORDERD 集合中的的元素有顺序概念
DISTINCT 对任意一对遍历过的元素x,y,x.equals(y)返回false
SORTED 遍历的元素按照一个预定义的顺序排序
SIZED 集合元素大小可确定的
NONNULL 保证遍历元素没有null
IMMUTABLE 集合元素不能修改
CONCURRENT 集合可被其它线程同时修改,无需同步
SUBSIZED 该Spliterator和所有从它拆分出来的子Spliterator都有SIZED特性

characteristics()使用方式如下:

@Override
public int characteristics() {
    return Spliterator.SIZED | Spliterator.SUBSIZED;
}

下面我们来实现一个自定义的分割迭代器

static class LongSpliterator implements Spliterator<Long> {

        private long[] array;
        private int index;
        private int end;

        public LongSpliterator(long[] array, int index, int end) {
            this.array = array;
            this.index = index;
            this.end = end;
        }

        @Override
        public boolean tryAdvance(Consumer<? super Long> action) {
            if (index >= 0 && index < end) {
                // 取出元素
                Long l = array[index++];
                // 执行
                action.accept(l);
                return true;
            }
            return false;
        }

        /**
         * 尝试分割集合。
         * 切割规则:数组一分为2,留后半段,将前半段再一分为2。
         * @return 返回null结束分割
         */
        @Override
        public Spliterator<Long> trySplit() {
            int start = 0;
            // 取中间
            int middle = (start + end) >>> 1;
            if (start < middle) {
                return null;
            }
            // 当前index变成中间值,即当前类的操作范围是:middle ~ end
            index = middle;
            // 将前半段再一分为2
            return new LongSpliterator(array, start, middle);
        }

        @Override
        public long estimateSize() {
            return end - index;
        }

        @Override
        public int characteristics() {
            return Spliterator.SIZED | Spliterator.SUBSIZED;
        }
    }

该分割迭代器中的集合是一个long数组,分割规则是将数组一分为2,留后半段,将前半段再一分为2。

现在来使用这个分割迭代器,计算从1加到1000000,测试用例如下:

    public void testDo() {
        // 创建一个数组
        long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
        long startTime = System.currentTimeMillis();
        LongSpliterator spliterator = new LongSpliterator(numbers, 0, numbers.length);
        // 申明一个并行的Stream
        Stream<Long> stream = StreamSupport.stream(spliterator, true);
        // 计算从1加到1000000,结果应该是:500000500000
        Long sum = stream.reduce((n1, n2) -> n1 + n2).orElse(0L);
        long time = System.currentTimeMillis() - startTime;
        System.out.println("sum:" + sum + ", 耗时:" + time + "毫秒");
    }

打印:

sum:500000500000, 耗时:32毫秒

https://shimo.im/docs/gGQHg8xPC3X8cPHV/ 《2020年最新Java架构师系统进阶资料免费领取》,可复制链接后用石墨文档 App 或小程序打开

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容