Java8 Stream

Java8 Stream

Stream讲解示例: 找出前3短的单词并转成大写形式

List<String> words = Arrays.asList(
      "One",
      "Two",
      "Three",
      "Four",
      "Five",
      "Six"
  );

实现一:命令式——迭代:

  @Test
  public void test_takeTop3ShortWords_iterative() {
    
    List<String> filtered = new ArrayList<>();
    for(String word : words){
      if(word.length()<=4){
        filtered.add(word);
      }
    }
    
    filtered.sort(Comparator.comparing(String::length));
    
    List<String> top3 = filtered.subList(0, Math.min(3, filtered.size())); //节省了一个迭代
    
    List<String> result = new ArrayList<>(top3);
    for(String word : top3){
      result.add(word.toUpperCase());
    }
  }

缺点:

  • 代码过于繁杂, 不太好直接看出意图。
  • 代码的中间变量过多,命名困难。

实现二:声明式——stream

  @Test
  public void test_takeTop3ShortWords_stream(){
    
    List<String> result =  words.stream()
        .filter(s-> s.length()<=4)
        .sorted(Comparator.comparing(String::length))
        .map(String::toUpperCase)
        .limit(3)
        .collect(Collectors.toList());
  }

Stream的实现原理

操作的分类

操作的分类

操作的串联

  1. 通过相关的操作对象PipelineHelper的构造函数,将操作先后关系维护起来(是反向的),维护一个代码块Sink链式结构: D(C(B(A)))
  2. 调用wrapSink,将代码块正向串连起来: A->B->C->D
  3. 分别调用begin、foreach accecpt, end 代码块

操作的协作契约

interface Sink<T> extends Consumer<T> {
    void begin(long size); //执行一些初始化工作,如排序时要建一个临时表用于存储排序后的数据
    void accept(T t); //继承自Consumer,指明如何处理单个数据。
    void end(); //指明本操作结束时(所有数据都已经处理完),如何进行后续的处理。
    boolean cancellationRequested(); //当下游不再需要更多数据时,可以用这个通知到上游操作。一般为短路操作。
}

自带拆分功能的新版迭代器-数据流的驱动器

public interface Iterator<E> {
    boolean hasNext();  //是否还有更多数据
    E next(); //获取下一个数据
    void remove(); //删除最近一个数据
    
    default void forEachRemaining(Consumer<? super E> action) {
            Objects.requireNonNull(action);
            while (hasNext())
                    action.accept(next());
    }
}
public interface Spliterator<T> {

    long estimateSize(); //估算还有多少数据待迭代
    boolean tryAdvance(Consumer<? super T> action); //处理单个数据
    Spliterator<T> trySplit(); //用于并行流分解子任务

    default void forEachRemaining(Consumer<? super T> action) {//默认调用tryAdvance去遍历,类似foreach
        do { } while (tryAdvance(action));
    }
}

流程示例

流程示例

Collection.stream()

  • StreamSupport.stream(spliterator(), false)
    • Spliterators.spliterator(this, 0)
    • new ReferencePipeline.Head

AbstractPipeline

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
}
        
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
}

 final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
}

        @Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                spliterator.forEachRemaining(wrappedSink);
                wrappedSink.end();
        }
        else {
                copyIntoWithCancel(wrappedSink, spliterator);
        }
}

Spliterator

class ArraySpliterator<E> implements Spliterator<E> {

        @SuppressWarnings("unchecked")
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Object[] a; int i, hi; // hoist accesses and checks from loop
            if (action == null)
                throw new NullPointerException();
            if ((a = array).length >= (hi = fence) &&
                (i = index) >= 0 && i < (index = hi)) {
                do { action.accept((T)a[i]); } while (++i < hi);
            }
        }
}

filter

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

sorted

  • SortedOps.makeRef(this)
class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);

                // If the input is already naturally sorted and this operation
                // also naturally sorted then this is a no-op
                if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                        return sink;
                else if (StreamOpFlag.SIZED.isKnown(flags))
                        return new SizedRefSortingSink<>(sink, comparator);
                else
                        return new RefSortingSink<>(sink, comparator);
        }
}               
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
        private ArrayList<T> list;

        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }

        @Override
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
        }

        @Override
        public void end() {
            list.sort(comparator);
            downstream.begin(list.size());
            if (!cancellationWasRequested) {
                list.forEach(downstream::accept);
            }
            else {
                for (T t : list) {
                    if (downstream.cancellationRequested()) break;
                    downstream.accept(t);
                }
            }
            downstream.end();
            list = null;
        }

        @Override
        public void accept(T t) {
            list.add(t);
        }
    }

map

    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

limit

 Sink<T> opWrapSink(int flags, Sink<T> sink) {
                return new Sink.ChainedReference<T, T>(sink) {
                    long n = skip;
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        if (n == 0) {
                            if (m > 0) {
                                m--;
                                downstream.accept(t);
                            }
                        }
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }

collect

ReduceOp

 @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();  //new ReducingSink()
    }
        
class ReducingSink extends Box<R> implements AccumulatingSink<T, R, ReducingSink> {
        @Override
        public void begin(long size) {
                state = seedFactory.get();
        }

        @Override
        public void accept(T t) {
                accumulator.accept(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
                reducer.accept(state, other.state);
        }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容

  • 转自: Java 8 中的 Streams API 详解 为什么需要 Stream Stream 作为 Java ...
    普度众生的面瘫青年阅读 2,911评论 0 11
  • Int Double Long 设置特定的stream类型, 提高性能,增加特定的函数 无存储。stream不是一...
    patrick002阅读 1,267评论 0 0
  • 一 基础篇 1.1 Java基础 面向对象的特征抽象:将一类对象的共同特征总结出来构建类的过程。继承:对已有类的一...
    essential_note阅读 685评论 0 0
  • 刚刚看了一下自己写的文章,居然没有过千字的,就这样的速度,就这样隔一个月才写上的几个字,什么时候才能成为想象中的自...
    钩钩手阅读 101评论 0 1
  • 我明白你会来,所以我等。——沈从文 人的一生可以遇到八万人,但是最后陪在你身边的也不过是寥寥无几,有的人会日赶夜赶...
    茶不二阅读 240评论 0 2