Stream流如何提高遍历集合效率

Stream是Java8的新特性,相当于是高级版的Iterator,可以通过Lambda表达式对集合进行各种非常便利、高效的聚合操作,或者大批量数据操作。Stream的聚合操作与数据库SQL的聚合操作类似。我们可以在应用层就可以实现类似数据库的聚合操作,在数据处理方面,Stream不仅支持串行的方式,还支持并行的方式,在大批量数据的情况下使用并行操作可以显著的提高效率。

先Stream的简洁与强大:

举例:过滤分组一天中所有的销售订单中已支付未发货的订单,先用传统的for循环的方式来实现:

  HashMap<String, List<SaleOrder>> orderMap = new HashMap<String, List<SaleOrder>>();
        for (SaleOrder order : orderList) {
            if (order.getIsPay) {
                if (order.get(order.getDeliver()) == null) {  //该发货状态还没分类
                    List<SaleOrder> list = new ArryList<SaleOrder>();
                    list.add(order);
                    orderMap.put(order.getDeliver(),list);
                }else {
                    orderMap.get(order.getDeliver()).add(order);
                }
            }
        }

我们在使用Java8中的StreamAPI实现:

  1. 串行实现
HashMap<String, List<SaleOrder>> orderMap = orderList.stream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));
  1. 并行实现
HashMap<String, List<SaleOrder>> orderMap = orderList.parallelStream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));

通过简单的例子,我们可以看到Stream结合Lambda表达式实现的遍历筛选功能非常的简洁。

使用Stream实现遍历非常简单,但是如何让遍历的效率更高,我们还需要透过源码看Stream的实现原理。

Stream介绍

通过官方文档我们可以了解到,Stream操作可以分为两大类:中间操作和终结操作。中间操作只对操作进行记录,只会返回一个流。不进行计算操作,而终结操作是实现了计算操作的。

中间操作又可以分为无状态(Stateless)操作(比如:filter、map、flatMap等)与有状态(Stateful)操作(如:distinct、sorted、limit),无状态操作是指元素的处理不受之前元素的影响,有状态是指操作只有拿到所有元素之后才能继续下去。

终结操作包含了非短路操作(short-circuiting)(如:forEach、reduce、collect)与短路操作(如:findFirst、findAny)。短路终结操作指的是不用处理所有元素才能返回结果,比如findFirst,只要找到第一个符合条件的元素就返回结果。非短路终结操作则必须处理完所有的元素才能返回。

Stream源码实现

先通过一个类图了解Stream是由哪些结构类组成的。

BaseStreamStream是顶级的接口类,BaseStream主要定义了流的基本接口方法,如:spliterator、isParallel等;Stream则定义了一些流的常用操作方法,如:map、filter等。

ReferencePipeline是描述中间操作管道流和源管道流的一个结构类,它通过定义内部类组装了各种操作流。它定义了Head、StatelessOp、StatefulOp三个内部类,实现了BaseStream和Stream的接口方法。

这里其实还有一个很重要的接口就是Sink接口,定义了每个Stream之间关系的协议,包含begin()、end()、cancellationRequested()、accpt()四个方法。ReferencePipeline最终将整个Stream流操作组装成一个调用链,而这条调用链上的各个Stream操作的上下级关系是通过Sink接口协议来定义实现的。

Stream操作叠加

我们都知道,一个Stream的各个操作都是由处理管道组装,并统一完成数据处理的,在JDK中每次的中断操作会以使用阶段(Stage)命名。

管道结构是由ReferencePipeline类实现,前面已经说了它有三个内部类。Head类主要用来定义数据源操作,在我们初次调用names.stream()方法时,会初次加载Head对象,此时位加载数据源操作;接着加载的是中间操作,分别为无状态中间操作StatelessOp对象和有状态操作对象StatefulOp对象,此时的Stage并没有执行,而是通过AbstractPipeline生成了一个中间操作Stage链表;当我们调用中间操作的时候会生成最后一个Stage,通过最后一个终结Stage触发之前的中间操作,从最后一个Stage开始递归产生一个Sink链,就像这样:

下边通过一个例子实践一下:

需求:在一个集合中查出一个长度最长的并且以张为姓氏的名字。

List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");

String maxLenStartWithZ = names.stream()
                  .filter(name -> name.startsWith("张"))
                  .mapToInt(String::length)
                  .max()
                  .toString();

第一步:因为names是一个ArrayList集合,使用names.stream()方法将会调用集合类基础接口Collection的Stream方法;

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

第二步:Stream方法就会调用StreamSupport类的Stream方法,方法中初始化了ReferencePipeline的Head内部类对象:


    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

第三步:调用filter和map方法,这两个方法是无状态的中间操作,所以执行filter和map操作时,并没有进行任何的操作,而是分别创建了一个Stage来标识每一步操作。

通常情况下Stream的操作又需要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组来表示。ReferencePipeline的filter和map方法:

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

new StatelessOp将调用父类AbstractPipeline的构造函数,这个构造函数将前后的Stage链接起来,生成一个Stage链表。

    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;//将当前的stage的next指针指向之前的stage

        this.previousStage = previousStage;//赋值当前stage当全局变量previousStage 
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

因为在创建每一个Stage的时候都会包含一个opWrapSink()方法,该方法会把一个操作的具体实现封装到Sink类中,Sink采用(处理->转发)的模式来叠加操作。

当执行到max()时,会调用ReferencePipeline的max方法,此时由于max方法是一个终结操作,所以会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中。

最后,调用AbstractPipeline的wrapSink方法,该方法会调用opWrapSink生成一个Sink链表,Sink链表中的每一个Sink都封装了一个操作的具体实现。

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

在Sink链表生成以后,Stream开始执行,通过spliterator迭代集合,执行Sink链表的具体操作。

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次 filter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组 object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max

Stream并行处理

还是刚刚的demo需求,只需稍作修改就可以变成并行处理

List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");

String maxLenStartWithZ = names.stream()
                    .parallel()
                  .filter(name -> name.startsWith("张"))
                  .mapToInt(String::length)
                  .max()
                  .toString();

在执行终结操作之前都跟串行操作的一样,主要的不同就是在调用终结方法之后,并行处理会调用TerminalOp的evaluateParallel方法进行并行处理;

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。通过预估的数据量获取最小处理单元的阈值,如果当前分片大小大于最小处理单元的阈值,就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin 框架将会合并分片任何结果集。

合理使用Stream

综上,对Stream有了一定的认识,但是实际使用中我们该如何选择,还是需要一个测试说明:

  • 多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能;

  • 多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能;

  • 多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能;

  • 单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能。

多次测试结果:

  • 常规的迭代 <Stream 并行迭代 <Stream 串行迭代

  • Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

  • Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

  • 常规的迭代 <Stream 串行迭代 <Stream 并行迭代

以上测试结果,我们可以看到:在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以我们在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。

具体使用哪种方式我们还是需要结合应用场景进行选择。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容