Java 8 之Stream Spliterator

定义

  • 用于遍历和分割“源”元素的对象。

数据源

  • Spliterator的元素来源可能是一个数组,一个集合,一个IO通道,一个生成函数。

处理数据源的方式

  • Spliterator可以单独或顺序地批量地遍历元素。
  • Spliterator也可以将其部分元素作为另一个Spliterator进行分区,为了并行化操作。使用不能拆分或以非常不平衡或低效的方式进行拆分Spliterator的操作不太可能从并行中获益。遍历和分解流出的元素;每个Spliterator只对单个批量计算有用。

特征 characteristics

Spliterator 还声明了 一组关于它的结构和源的特征(characteristics),包含以下以下几种:

  • ORDERED int 型 值为16 既定的顺序,Spliterator保证拆分和遍历时是按照这一顺序。
  • DISTINCT int型 值为1 表示元素都不是重复的,对于每一对元素{ x, y},{ !x.equals(y)}。例如,这适用于基于{@link Set}的Spliterator。
  • SORTED int型 值为4 表示元素顺序按照预定义的顺序,可以通过getComparator 获取排序器,若返回null ,则是按自然排序。
  • SIZED int型 值为64 表示在遍历分隔之前 estimateSize() 返回的值代表一个有限的大小,在没有修改结构源的情况下,代表了一个完整遍历时所遇到的元素数量的精确计数。
  • NONNULL init型 值为256 表示数据源保证元素不会为空
  • IMMUTABLE int 型 值为1024 表示在遍历的过程中不能添加、替换、删除元素
  • CONCURRENT int型 值为4096 表示元素可以被多个线程安全并发得修改而不需要外部的同步。
  • SUBSIZED int型 值为16384 表示trySplit()返回的结果都是SIZED和SUBSIZED

Tips

  • 一个late-binding Spliterator 在第一次遍历、分隔或者查询任何估计的大小时绑定 ,而不是在创建的时候绑定。
  • 非后期绑定的Spliterator在构建或在任何方法的第一次调用时绑定到数据源。在绑定之前对源进行的修改将在遍历Spliterator时反映出来,在绑定源之后,发现 structural interference应立即抛出ConcurrentModificationException 异常,这称为快速失败。
  • Spliterator的批量遍历方法({@link # forEachRemaining()})可以在遍历完所有元素之后优化遍历并检查 structural interference,而不是检查每个元素并立即失败。
  • Spliterator 提供估计剩余多少元素的方法,即estimateSize()方法,理想情况下,正如在characteristics SIZED反应的那样,这个值会与成功遍历所遇到的数量完全一致。但是,即使不知道确切的值,估计值对于在数据源上执行的操作来说仍然是有用的,例如帮助确定是进一步分割还是按顺序遍历其余的元素。

并行的实现

尽管在并行算法中有明显的实用功能,但spliterator并不向我们期望的那样是线程安全的;相反,使用spliterator的并行算法的实现应该确保spliterator一次只使用一个线程。这个通常很容易通过 串行线程封闭 来实现:通常使用递归分解这个经典的并行算法。调用{@link #trySplit()}的线程可以将返回的Spliterator传递给另一个线程,而这个线程又可以遍历或进一步拆分这个Spliterator。如果两个或多个线程在同一个Spliterator上同时操作,则不定义分割和遍历的行为。如果原始线程将一个spliterator传递给另一个线程进行处理,那么最好是在使用{@link #tryAdvance(Consumer) tryAdvance()}的任何元素之前进行切换,因为某些保证(例如{@link #estimateSize()}对于{@code size}spliterator的精度)只有在遍历开始之前才有效。


Spliterator分割图.png

Spliterator通过支持分割和单元素迭代,除了支持串行遍历,还支持高效的并行遍历。另外,Spliterator 不像Iterator设计的那样设计两个方法hasNext 判断是否有元素和next() 返回元素进行消费,Spliterator 设计一个tryAdvance方法,消费元素,如果有就消费并返回true,如果没有则返回false,不需要两个独立的方法。
对于可变源,如果在Spliterator绑定到其数据源和遍历结束之间对源进行结构上的干扰(添加、替换或删除元素),可能会出现随机和不确定的影响。
对于structurally interfered 可以有一下几个方法避免:

  • 数据源为java.util包的CopyOnWriteArrayList ,它是不可变的,数据源为该类实例的Spliterator同样会将characteristics声明为IMMUTABLE

  • 数据源为java.util包的ConcurrentHashMap, 数据源为该类实例的Spliterator会将特性(characteristics) 声明为CONCURRENT。
    可变的数据源会提供一个 late-binding 和快速失败的Spliterator。

    这里有一个类(除了当做例子之外,它不是一个非常有用的类),它维护一个数组,其中实际数据保存在偶数位置,而不相关的标记数据保存在奇数位置。它的Spliterator会忽略标记数据。

/**
 * @Author unyielding
 * @date 2018/7/26 0026 19:48
 * @desc 一个类(除了当做例子之外,它不是一个非常有用的类),
 * 它维护一个数组, 其中实际数据保存在偶数位置,而不相关的标记数据保存在奇数位置。
 * 它的Spliterator会忽略标记数据。
 */
public class TaggedArray<T> {
    private final Object[] elements;//创建后,不可变的
    /**
     * 构造方法
     *
     * @param data 实际数据
     * @param tags 标记数据
     */
    TaggedArray(T[] data, Object[] tags) {
        int size = data.length;
        //保证实际数据数组和标记数据数组的大小相同
        if (tags.length != size) throw new IllegalArgumentException();
        this.elements = new Object[2 * size];
        //初始化elements 数组
        for (int i = 0, j = 0; i < size; ++i) {
            elements[j++] = data[i];
            elements[j++] = tags[i];
        }
    }

    public Spliterator<T> spliterator() {
        return new TaggedArraySpliterator<>(elements, 0, elements.length);
    }

    static class TaggedArraySpliterator<T> implements Spliterator<T> {
        private final Object[] array;

        private int origin; //当前索引,在分割或者遍历时使用

        private final int fence;//最大的下标加一

        TaggedArraySpliterator(Object[] array, int origin, int fence) {
            this.array = array;
            this.origin = origin;
            this.fence = fence;
        }

        /**
         *  批量遍历
         * @param action 消费函数 {@link Consumer} 的子类,可以通过lambda表达式表示
         */
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            for (; origin < fence; origin += 2) {
                action.accept((T) array[origin]);
            }
        }

        /**
         *  处理单个元素
         * @param action 消费函数 {@link Consumer} 的子类,可以通过lambda表达式表示
         * @return 如果有元素消费就返回true,如果没有就直接返回false
         */
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (origin < fence) {
                action.accept((T) array[origin]);
                origin += 2;
                return true;
            }
            return false;
        }

        /**
         * 分割数据源
         * @return 返回分割后生成的Spliterator
         */
        @Override
        public Spliterator<T> trySplit() {
            int lo = origin;
            int mid = ((lo + fence) >> 1) & 1;//强制中点数为偶数
            if (lo < mid) {
                origin = mid;//重置Spliterator的 当前下标
                return new TaggedArraySpliterator<>(array, lo, mid);
            }//太小不需要拆分
            return null;
        }

        /**
         * 估计剩余还有多少元素
         * @return 剩余还有多少元素
         */
        @Override
        public long estimateSize() {
            return (long) ((fence - origin) / 2);
        }

        /**
         * 获取特征值 用户可以根据 特征值 ,
         * 用户可以根据 配置更好的控制和优化它的使用
         * @return
         */
        @Override
        public int characteristics() {
            return ORDERED | IMMUTABLE | SIZED | SUBSIZED;
        }
    }

    /**
     * 并行遍历
     * @param a 一个{@link TaggedArray} 实例
     * @param action
     * @param <T> 每个元素的值
     */
    static <T> void parEach(TaggedArray<T> a, Consumer<T> action) {
        Spliterator<T> spliterator = a.spliterator();
        long targetBatchSize = spliterator.estimateSize()
                / (ForkJoinPool.getCommonPoolParallelism() * 8);
        new ParEach<>(null, spliterator, action, targetBatchSize).invoke();
    }
}

并行计算器 ,其实就是继承CountedCompleter 一个可以放到forlk/join 线程池里的类

    /**
     * 并行计算器
     * @param <T> 元素的类型
     */
    static class ParEach<T> extends CountedCompleter<T> {
        final Spliterator<T> spliterator;
        final Consumer<T> action;
        final long targetBatchSize;

        ParEach(ParEach<T> parent, Spliterator<T> spliterator,
                Consumer<T> action, long targetBatchSize) {
            super(parent);
            this.spliterator = spliterator;
            this.action = action;
            this.targetBatchSize = targetBatchSize;
        }

        @Override
        public void compute() {
            Spliterator<T> sub;
            while (spliterator.estimateSize() > targetBatchSize
                    && (sub = spliterator.trySplit()) != null) {
                addToPendingCount(1);
                new ParEach<>(this, sub, action, targetBatchSize).fork();
            }
            spliterator.forEachRemaining(action);
            propagateCompletion();
        }
    }

至于生成Stream 的姿势 详见Java 之Stream 生成姿势
代码地址
推荐阅读
https://www.jianshu.com/p/af22a9d8ce98

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容