Stream是Java8的新特性,相当于是高级版的Iterator,可以通过Lambda表达式对集合进行各种非常便利、高效的聚合操作,或者大批量数据操作。Stream的聚合操作与数据库SQL的聚合操作类似。我们可以在应用层就可以实现类似数据库的聚合操作,在数据处理方面,Stream不仅支持串行的方式,还支持并行的方式,在大批量数据的情况下使用并行操作可以显著的提高效率。
先Stream的简洁与强大:
举例:过滤分组一天中所有的销售订单中已支付未发货的订单,先用传统的for循环的方式来实现:
HashMap<String, List<SaleOrder>> orderMap = new HashMap<String, List<SaleOrder>>();
for (SaleOrder order : orderList) {
if (order.getIsPay) {
if (order.get(order.getDeliver()) == null) { //该发货状态还没分类
List<SaleOrder> list = new ArryList<SaleOrder>();
list.add(order);
orderMap.put(order.getDeliver(),list);
}else {
orderMap.get(order.getDeliver()).add(order);
}
}
}
我们在使用Java8中的StreamAPI实现:
- 串行实现
HashMap<String, List<SaleOrder>> orderMap = orderList.stream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));
- 并行实现
HashMap<String, List<SaleOrder>> orderMap = orderList.parallelStream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));
通过简单的例子,我们可以看到Stream结合Lambda表达式实现的遍历筛选功能非常的简洁。
使用Stream实现遍历非常简单,但是如何让遍历的效率更高,我们还需要透过源码看Stream的实现原理。
Stream介绍
通过官方文档我们可以了解到,Stream操作可以分为两大类:中间操作和终结操作。中间操作只对操作进行记录,只会返回一个流。不进行计算操作,而终结操作是实现了计算操作的。
中间操作又可以分为无状态(Stateless)操作(比如:filter、map、flatMap等)与有状态(Stateful)操作(如:distinct、sorted、limit),无状态操作是指元素的处理不受之前元素的影响,有状态是指操作只有拿到所有元素之后才能继续下去。
终结操作包含了非短路操作(short-circuiting)(如:forEach、reduce、collect)与短路操作(如:findFirst、findAny)。短路终结操作指的是不用处理所有元素才能返回结果,比如findFirst,只要找到第一个符合条件的元素就返回结果。非短路终结操作则必须处理完所有的元素才能返回。
Stream源码实现
先通过一个类图了解Stream是由哪些结构类组成的。
BaseStream和Stream是顶级的接口类,BaseStream主要定义了流的基本接口方法,如:spliterator、isParallel等;Stream则定义了一些流的常用操作方法,如:map、filter等。
ReferencePipeline是描述中间操作管道流和源管道流的一个结构类,它通过定义内部类组装了各种操作流。它定义了Head、StatelessOp、StatefulOp三个内部类,实现了BaseStream和Stream的接口方法。
这里其实还有一个很重要的接口就是Sink接口,定义了每个Stream之间关系的协议,包含begin()、end()、cancellationRequested()、accpt()四个方法。ReferencePipeline最终将整个Stream流操作组装成一个调用链,而这条调用链上的各个Stream操作的上下级关系是通过Sink接口协议来定义实现的。
Stream操作叠加
我们都知道,一个Stream的各个操作都是由处理管道组装,并统一完成数据处理的,在JDK中每次的中断操作会以使用阶段(Stage)命名。
管道结构是由ReferencePipeline类实现,前面已经说了它有三个内部类。Head类主要用来定义数据源操作,在我们初次调用names.stream()方法时,会初次加载Head对象,此时位加载数据源操作;接着加载的是中间操作,分别为无状态中间操作StatelessOp对象和有状态操作对象StatefulOp对象,此时的Stage并没有执行,而是通过AbstractPipeline生成了一个中间操作Stage链表;当我们调用中间操作的时候会生成最后一个Stage,通过最后一个终结Stage触发之前的中间操作,从最后一个Stage开始递归产生一个Sink链,就像这样:
下边通过一个例子实践一下:
需求:在一个集合中查出一个长度最长的并且以张为姓氏的名字。
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()
.filter(name -> name.startsWith("张"))
.mapToInt(String::length)
.max()
.toString();
第一步:因为names是一个ArrayList集合,使用names.stream()方法将会调用集合类基础接口Collection的Stream方法;
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
第二步:Stream方法就会调用StreamSupport类的Stream方法,方法中初始化了ReferencePipeline的Head内部类对象:
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
第三步:调用filter和map方法,这两个方法是无状态的中间操作,所以执行filter和map操作时,并没有进行任何的操作,而是分别创建了一个Stage来标识每一步操作。
通常情况下Stream的操作又需要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组来表示。ReferencePipeline的filter和map方法:
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
new StatelessOp将调用父类AbstractPipeline的构造函数,这个构造函数将前后的Stage链接起来,生成一个Stage链表。
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;//将当前的stage的next指针指向之前的stage
this.previousStage = previousStage;//赋值当前stage当全局变量previousStage
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
因为在创建每一个Stage的时候都会包含一个opWrapSink()方法,该方法会把一个操作的具体实现封装到Sink类中,Sink采用(处理->转发)的模式来叠加操作。
当执行到max()时,会调用ReferencePipeline的max方法,此时由于max方法是一个终结操作,所以会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中。
最后,调用AbstractPipeline的wrapSink方法,该方法会调用opWrapSink生成一个Sink链表,Sink链表中的每一个Sink都封装了一个操作的具体实现。
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
在Sink链表生成以后,Stream开始执行,通过spliterator迭代集合,执行Sink链表的具体操作。
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次 filter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组 object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max
Stream并行处理
还是刚刚的demo需求,只需稍作修改就可以变成并行处理
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()
.parallel()
.filter(name -> name.startsWith("张"))
.mapToInt(String::length)
.max()
.toString();
在执行终结操作之前都跟串行操作的一样,主要的不同就是在调用终结方法之后,并行处理会调用TerminalOp的evaluateParallel方法进行并行处理;
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。通过预估的数据量获取最小处理单元的阈值,如果当前分片大小大于最小处理单元的阈值,就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin 框架将会合并分片任何结果集。
合理使用Stream
综上,对Stream有了一定的认识,但是实际使用中我们该如何选择,还是需要一个测试说明:
多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能;
多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能;
多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能;
单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能。
多次测试结果:
常规的迭代 <Stream 并行迭代 <Stream 串行迭代
Stream 并行迭代 < 常规的迭代 <Stream 串行迭代
Stream 并行迭代 < 常规的迭代 <Stream 串行迭代
常规的迭代 <Stream 串行迭代 <Stream 并行迭代
以上测试结果,我们可以看到:在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以我们在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。
具体使用哪种方式我们还是需要结合应用场景进行选择。