基于JDK12
Stream的获取过程
import java.util.stream.Stream;
// 创建一个String流
Stream<String> xx = Stream.of("xx", "xx");
//Stream.of方法调用链
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
import java.util.Arrays;
public static <T> Stream<T> stream(T[] array) {
return stream(array, 0, array.length);
}
public static <T> Stream<T> stream(T[] array, int startInclusive,
int endExclusive) {
return StreamSupport.stream(
spliterator(array, startInclusive, endExclusive), false);
}
Q1:什么是ArraySpliterator?
A1:ArraySpliterator是一个维护一个数组的可分割的迭代器,可用于并行迭代。
// spliterator方法的调用链
import java.util.Arrays;
public static <T> Spliterator<T> spliterator(T[] array, int startInclusive,
int endExclusive) {
return Spliterators.spliterator(array, startInclusive, endExclusive,
Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
import java.util.Spliterators;
public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex,
int toIndex, int additionalCharacteristics) {
checkFromToBounds(Objects.requireNonNull(array).length,fromIndex,toIndex);
// 最后返回一个数组分离迭代器
return new ArraySpliterator<>(array, fromIndex, toIndex,
additionalCharacteristics);
}
package java.util;
public interface Spliterator<T> {
// 定义许多特征量表示此分离迭代器维护的数据的特征和操作数据的特征
/* ORDERED == 1 << 4,代表数组元素根据索引顺序访问是有有意义的,
例如List而hash-based的集合根据索引顺序访问是没有意义的
*/
public static final int ORDERED = 0x00000010;
// DISTINCT == 1,表示元素不重复,x.equals(y) == false;
public static final int DISTINCT = 0x00000001;
/* SORTED == 1 << 2,代表元素是有序的(可能是自然顺序),
getComparator方法返回null时,表示自然排序
必须同时具有ORDERED特征
*/
public static final int SORTED = 0x00000004;
/* SIZED == 1 << 6,表示元素的数量是有限的大小
从estimateSize()遍历或分割之前返回的值是有限且精确的
*/
public static final int SIZED = 0x00000040;
// NONNULL == 1 << 8,表示元素不能为null
public static final int NONNULL = 0x00000100;
// IMMUTABLE == 1 << 10,表示不能CRUD
public static final int IMMUTABLE = 0x00000400;
/* CONCURRENT == 1 << 12,代表并发CURD
在最上层分离器中不能和SIZED同时出现,子分离器可以
在最上层分离器中不能和IMMUTABLE同时出现,子分离器可以
*/
public static final int CONCURRENT = 0x00001000;
/* SUBSIZED == 1 << 14代表trySplit()分离出来的子分离器都将是SIZED和SUBSIZED
但是没有SIZED,却有SUBSIZED是不恰当的
*/
public static final int SUBSIZED = 0x00004000
// 在Spliterator,判断是否具有某个特征
default boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
// 在Spliterator,存在SIZED特征返回estimateSize(),否则返回-1
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}
}
package java.util;
static final class ArraySpliterator<T> implements Spliterator<T> {
/* array表示分离迭代器维护的数组
index至fence表示当前分离迭代器维护array的范围(不包括fence)
characteristics表示特征量
*/
private final Object[] array;
private int index;
private final int fence;
private final int characteristics;
public ArraySpliterator(Object[] array, int additionalCharacteristics) {
this(array, 0, array.length, additionalCharacteristics);
}
public ArraySpliterator(Object[] array, int origin, int fence,
int additionalCharacteristics) {
/* this.origin = 0, this.fence = array.length,
也就是本分离器维护整个数组
this.characteristics特征为Spliterator.SIZED | Spliterator.SUBSIZED
| Spliterator.ORDERED | Spliterator.IMMUTABLE
*/
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED
| Spliterator.SUBSIZED;
}
/* 此方法将创一个新的分离迭代器用于维护原分离迭代器的一半范围的元素
并将原分离迭代器的范围设置成原来的一半
*/
@Override
public Spliterator<T> trySplit() {
int lo = index, mid = (lo + fence) >>> 1; // 中间索引
/* lo >= mid 维护的索引范围为1,或者出现lo > fence的错误,返回null
否则,创建一个新的ArraySpliterator对象,
index = mid, 原分离迭代维护mid至fence范围,新分离迭代器维护lo至mid范围
*/
return (lo >= mid)
? null
: new ArraySpliterator<>(array, lo, index = mid,
characteristics);
}
/* 这个是一个终止操作类型的方法,使用在流操作的最后一个操作
对维护范围内的元素进行消费,index = fence导致分离迭器代维护0个元素
也就是分离迭代器不能使用了
*/
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi;
if (action == null)
throw new NullPointerException();
/* 判断fence是否<= array.length
判断index是否>=0并且<fence
最后index=fence,终结迭代器
*/
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
// 消费元素
do { action.accept((T)a[i]); } while (++i < hi);
}
}
/* 每次消费一个元素位于index处,并且index += 1
对比forEachRemaining不是一个终止操作的方法,
但是index == fence 的时候必定终结
*/
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (action == null)
throw new NullPointerException();
if (index >= 0 && index < fence) {
@SuppressWarnings("unchecked") T e = (T) array[index++];
action.accept(e);
return true;
}
return false;
}
// 迭代器维护的元素的数量
@Override
public long estimateSize() { return (long)(fence - index); }
// 迭代器的特征
@Override
public int characteristics() {
return characteristics;
}
/* 获取元素比较器,具有SORTED特征则返回null
由于构造链中没有传递SORTED特征,所以永远返回null
*/
@Override
public Comparator<? super T> getComparator() {
if (hasCharacteristics(Spliterator.SORTED))
return null;
throw new IllegalStateException();
}
}
Q2:StreamOpFlag是什么?
A2:StreamOpFlag是流特征及流操作特征的标志的枚举类。用流的优化计算的状态控制以及优化计算。
import java.util.stream.StreamSupport;
public static <T> Stream<T> stream(Spliterator<T> spliterator,
boolean parallel) {
Objects.requireNonNull(spliterator);
/* StreamOpFlag.fromCharacteristics(spliterator) == 0b0101_0101
parallel == false
*/
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
package java.util.stream;
enum StreamOpFlag {
/*
StreamOpFlag类似于Spliterator的特征量,表示stream特征和stream操作的特征,
主要用于优化运算
Stream对象内部使用int类型变量维护特征的集合
一个stream的一生分为3个阶段:
1. stream sources, 资源阶段,e.g. 一个Stream.of('xx', 'xx')生成一个stream
2. intermediate operations, 中间操作阶段,e.g. .filter(x -> x > 5)
3. terminal operations, 终止阶段,e.g. .foreach(System.out::println)
分离器的一些特征是与Stream的特征相匹配的, 共有特征包括
1. DISTINCT, 表示内部元素不重复
2. SORTED, 表示内部元素是有序的
3. ORDERED, 表示元素是操作元素是自然顺序,
4. SIZED, 表示元素数量有限且精确
SHORT_CIRCUIT是流独有的特征,‘短路’, 不会遍历所有元素的操作的特征
.e.g. findAny方法在匹配到元素的就结束遍历
*/
// Matches Spliterator.DISTINCT
DISTINCT(0, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
// Matches Spliterator.SORTED
SORTED(1, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
// Matches Spliterator.ORDERED
ORDERED(2, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)
.clear(Type.TERMINAL_OP.clear(Type.UPSTREAM_TERMINAL_OP)),
// Matches Spliterator.SIZED
SIZED(3, set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)),
SHORT_CIRCUIT(12, set(Type.OP).set(Type.TERMINAL_OP));
/*
枚举特征量的构造方法
1. position
与Splitrerator特征不同的是:stream特征做了更细化的分类
e.g. 只有DISTINCT特征时,
spl有:0b01, spl无:0b00
stream有:0b01, stream无:0b10, stream未知:0b11
stream使用两个bit存储一个特征
使用DISTINCT.set存储特征值‘有’
使用DISTINCT.clear存储特征值‘无’
使用DISTINCT.preserve存储未知,也是该特征值的掩码,
一个特征值即没有set,也没有clear,那么就应该是preserve
*/
private StreamOpFlag(int position, MaskBuilder maskBuilder) {
this.maskTable = maskBuilder.build();
// Two bits per flag
position *= 2;
// bitPosition是存储位置.
this.bitPosition = position;
this.set = SET_BITS << position;
this.clear = CLEAR_BITS << position;
this.preserve = PRESERVE_BITS << position;
}
private static final int SET_BITS = 0b01;
private static final int CLEAR_BITS = 0b10;
private static final int PRESERVE_BITS = 0b11;
private final Map<Type, Integer> maskTable;
private final int bitPosition;
private final int set;
private final int clear;
private final int preserve;
/*
2. maskBuilder 用来生成不同时期的特征的掩码
思考一个问题:stream有三个阶段,特征set,clear在这三个阶段是否都是有意义的?
e.g. SIZED表示容量有限且精确,
如果使用了filter方法,那么SIZED就不精确,可以设置成SIZED.clear
如果SIZED特征本身就是preserve,只有可能在中间操作之后设置成SIZED.clear
也就是无法将一个非SIZED数据源操作成SIZED.
所以SIZED.set在中间操作时期是没有意义的
因此,特征在不同的时期有不同的具体表现
enum Type表示更细化的时期
*/
enum Type {
SPLITERATOR, //分离器时期
STREAM, // 分离器刚转化为流,还没有操作的时期,
OP, // 中间操作时期
TERMINAL_OP, // 终止操作时期
UPSTREAM_TERMINAL_OP // 略,未应用
}
/*
以DISTINCT(0, set(Type.SPLITERATOR).set(Type.STREAM)
. setAndClear(Type.OP)),为例
set(Type.SPLITERATOR)创建了MaskBuilder代理了一个EnumMap
以Type枚举值为key,
Maskbuilder.this.set(t) k = t, v = SET_BITS
Maskbuilder.this.clear(t) k = t, v = CLEAR_BITS
Maskbuilder.this.setAndClear(t) k = t, v = PRESERVE_BITS
// 表示set和clear都有可能
意义是,不同的时期(Type), 由不同的具体特征值
DISTINCT.maskBuilder {Type.SPLITERATOR = SET_BITS,
TYPE.STREAM = SET_BITS,
Type.OP = PRESERVE_BITS}
this.maskTable = DISTINCT.maskBuilder.build()方法将
没有加入的Type枚举值的v设置成0,表示此时期无效
总结:就是为了生成不同时期的所有特征的掩码
每一列表示特征在不同时期的具体值
每一行表示时期内不同特征的具体值
01 = set
10= clear
11 = preserve
*/
DISTINCT | SORTED | ORDERED | SIZED | SHORT_CIRCUIT | |
---|---|---|---|---|---|
SPLITERATOR | 01 | 01 | 01 | 01 | 00 |
STREAM | 01 | 01 | 01 | 01 | 00 |
OP | 11 | 11 | 11 | 10 | 01 |
TERMINAL_OP | 00 | 00 | 10 | 00 | 10 |
UPSTREAM_TERMINAL_OP | 00 | 00 | 10 | 00 | 00 |
private static MaskBuilder set(Type t) {
return new MaskBuilder(new EnumMap<>(Type.class)).set(t);
}
private static class MaskBuilder {
final Map<Type, Integer> map;
MaskBuilder(Map<Type, Integer> map) { this.map = map; }
MaskBuilder mask(Type t, Integer i) { map.put(t, i); return this; }
MaskBuilder set(Type t) { return mask(t, SET_BITS); }
MaskBuilder clear(Type t) { return mask(t, CLEAR_BITS); }
MaskBuilder setAndClear(Type t) { return mask(t, PRESERVE_BITS); }
Map<Type, Integer> build() {
for (Type t : Type.values()) {
map.putIfAbsent(t, 0b00);
}
return map;
}
}
// 不同时期的掩码
static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(
Type.SPLITERATOR); // 0b0101_0101
static final int STREAM_MASK = createMask(Type.STREAM); // 0b0101_0101
static final int OP_MASK = createMask(
Type.OP); // 0b01_0000_0000_0000_0000_1011_1111
static final int TERMINAL_OP_MASK = createMask(
Type.TERMINAL_OP); // 0b01_0000_0000_0000_0000_1010_0000
static final int UPSTREAM_TERMINAL_OP_MASK = createMask(
Type.UPSTREAM_TERMINAL_OP); // 未应用
private static int createMask(Type t) {
int mask = 0;
for (StreamOpFlag flag : StreamOpFlag.values()) {
mask |= flag.maskTable.get(t) << flag.bitPosition;
}
return mask;
}
// 该特征是否是Stream时期有效的特征
boolean isStreamFlag() { return maskTable.get(Type.STREAM) > 0; }
// flags是否是包含该特征的set
boolean isKnown(int flags) { return (flags & preserve) == set; }
// flags是否是包含该特征的clear
boolean isCleared(int flags) { return (flags & preserve) == clear; }
// flags是否包含该特征的preserve
boolean isPreserved(int flags) { return (flags & preserve) == preserve; }
// 该特征在t时期能否设置成set
boolean canSet(Type t) { return (maskTable.get(t) & SET_BITS) > 0; }
// 全特征掩码,0b11_0000_0000_0000_0000_1111_1111
private static final int FLAG_MASK = createFlagMask();
private static int createFlagMask() {
int mask = 0;
for (StreamOpFlag flag : StreamOpFlag.values()) {
mask |= flag.preserve;
}
return mask;
}
/*
FlAG_MASK_IS,0b0101_0101,取流特征中的所有set的掩码
FLAG_MASK_NOT, 0b1010_1010, 取流特征中的所有的clear的掩码
INITAL_OPS_VALUE, 0b1111_1111, 分离器特征转化成流特征的掩码
*/
private static final int FLAG_MASK_IS = STREAM_MASK;
private static final int FLAG_MASK_NOT = STREAM_MASK << 1;
static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT;
// 用于设置流特征
static final int IS_DISTINCT = DISTINCT.set;
static final int NOT_DISTINCT = DISTINCT.clear;
static final int IS_SORTED = SORTED.set;
static final int NOT_SORTED = SORTED.clear;
static final int IS_ORDERED = ORDERED.set;
static final int NOT_ORDERED = ORDERED.clear;
static final int IS_SIZED = SIZED.set;
static final int NOT_SIZED = SIZED.clear;
static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
/*
从分离器特征转化为流的分离器时期特征
分离器具有SORTED和对象比较器,那么分离器SORTED特征不会被传递到分离器时期特征
characteristics & 0b0101_0101 保证了特征为spl和streamopflag的交集
*/
static int fromCharacteristics(Spliterator<?> spliterator) {
int characteristics = spliterator.characteristics();
if ((characteristics & Spliterator.SORTED) != 0
&& spliterator.getComparator() != null) {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK
& ~Spliterator.SORTED;
}
else {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
}
}
static int fromCharacteristics(int characteristics) {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
}
// Stream特征转化回分离器特征
static int toCharacteristics(int streamFlags) {
return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK;
}
/*
中间操作和终止操作使得流特征发生变化,combineOpFlags结合原特征与新产生的特征
e.g. 原0b0101_0101 新0b0000_0010
需要把DISTINCT从set变成clear
getMask(0b0000_0010)
如果flags == 0;
return 0b11_0000_0000_0000_0000_1111_11111, 全掩码
否则((FLAG_MASK_IS & flags) << 1) | ((FLAG_MASK_NOT & flags) >> 1)
将低位特征码中存在 set,clear,preserve码
全变成presreve码 0b0000_0011
| flags 目的是取高位特征SHORT_CIRCUIT,
~ 将新特征中为00(没有设置set,clear,preserve)
取反为11生成掩码 0b1111...1111_1100
prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)
原特征中取新特征没有设置的特征(00)
| newStreamOrOpFlags 生成新的特征
整体逻辑:生成的特征码是新特征码和原特征码中新特征中没有的设置的特征 的并集
*/
static int combineOpFlags(int newStreamOrOpFlags, int prevCombOpFlags) {
return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags))
| newStreamOrOpFlags;
}
private static int getMask(int flags) {
return (flags == 0)
? FLAG_MASK
: ~(flags | ((FLAG_MASK_IS & flags) << 1)
| ((FLAG_MASK_NOT & flags) >> 1));
}
// 重后期特征码转换成stream时期特征码
static int toStreamFlags(int combOpFlags) {
// By flipping the nibbles 0x11 become 0x00 and 0x01 become 0x10
return ((~combOpFlags) >> 1) & FLAG_MASK_IS & combOpFlags;
}
}
Q3:ReferencePipeline.Head是什么?
A3:ReferencePipeline.Head是一个数据源Stream对象。
package java.util.stream;
// Reference.Head对象的构造方法链
Head(Spliterator<?> source, int sourceFlags, boolean parallel) {
// sourceFlags == 0b0101_0000,parallel == false
super(source, sourceFlags, parallel);
}
// super
ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// super
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
/*
StreamOpFlag.STREAM_MASK表示数据源流特征掩码 == 0b0101_0101
StreamOpFlag.INITIAL_OPS_VALUE表示初始化操作流掩码 == 0b1111_1111
Stream.of方法生成的资源流的特征是sourceFlags == 0b0101_0000
this.sourceOrFlags = 0b0101_0000
this.combinedFlags = 0b0101_1111
combinedFlags对应了StreamOpFlag有效特征未知予以保留为preserve
本资源流的特征是:
DISTINCT.preserve
SORTED.preserve
SIZED.set
ORDERED.set
*/
// 上一个阶段的流对象,这是一个数据源流对象,没有上一个阶段
this.previousStage = null;
// 数据源分离迭代器
this.sourceSpliterator = source;
// 指向资源阶段的流对象
this.sourceStage = this;
// 资源流或操作流特征
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// 结合特征,下一个操作流特征与其结合生成操作流的结合特征
this.combinedFlags = (~(sourceOrOpFlags << 1))
& StreamOpFlag.INITIAL_OPS_VALUE;
// 操作深度,资源流没有操作
this.depth = 0;
// 是否并发处理流,false
this.parallel = parallel;
}
Stream的非并发执行流程
Q4:什么是StatelessOp,StatefulOp?
A4:都是中间操作流,StatelessOp为无状态中间操作流(操作元素时,元素之间没有影响,例如map),StatefulOp为有状态中间操作流(操作元素时,元素之间有影响,例如distinct)。
package java.util.stream;
abstract static class StatelessOp<E_IN, E_OUT> extends
ReferencePipeline<E_IN, E_OUT> {
/*
使用无状态中间方法的时候,就会直接返回一个StatelessOp流
e.g. Stream.of("xx","xx").filter(x -> x > 5).map(x -> x - 5)
upstream在filter方法中就是Stream.of产生的资源流
在map方法中就是filter生产的StatelessOp流
中间操作方法的基本逻辑就是流对流的包裹
enum StreamShape {
REFERENCE,
INT_VALUE,
LONG_VALUE,
DOUBLE_VALUE
}
Stream.of生产的资源都是Object[], 也就是REFERENCE引用类型
assert upstream.getOutputShape() == inputShape; // 没意义
opFlags为本此操作对流产生的特征
e.g. filter方法导致流的数据size不精确了,
opFlags可以是StreamOpFlag.NOT_SIZED
*/
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
// 是否是有状态的流,当然不是
@Override
final boolean opIsStateful() { return false; }
}
// super
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
// super
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
// linkedOrConsumed默认为false, 流被链接或被消费为true
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
// 链接两个流,可以想象多个中间操作会产生一条流链表
previousStage.nextStage = this;
this.previousStage = previousStage;
/* StreamOpFlag.OP_MASK == // 0b01_0000_0000_0000_0000_1011_1111
这个掩码可以看出SIZED的为10,表示操作特征永远不能是IS_SIZED
只能是NOT_SIZED,或者是无效00,也就是流操作默认不是IS_SIZED
this.combinedFlags由被链接的previousStage的结合特征及操作特征产生
资源流为head流,其sourceStage指向自己
this.sourceStage也是指向Head,就是说中间操作流的sourceStage都是指向Head流
那么sourceStage.sourceAnyStateful永远是在Head流中更新
这是一个对并发操作有影响的变量,暂且不提
*/
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags,
previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
abstract static class StatefulOp<E_IN, E_OUT> extends
ReferencePipeline<E_IN, E_OUT> {
/*
与StatelessOp基本一致
opIsStateful返回true导致sourceStage.sourceAnyStateful = true;
opEvaluateParallel方法为并发处理的方法
*/
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() { return true; }
@Override
abstract <P_IN> Node<E_OUT>
opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}
Q5:什么是Sink,什么是Sink.ChainedReference?
A5:Sink是Consumer的扩展接口,Sink.ChainedReference是Sink的具体实现类。
package java.util.stream;
interface Sink<T> extends Consumer<T> {
/*
begin必须在accept接收数据之前调用,完成某些状态的初始化
arg. size是本Sink具体接收到的数据个数,未知或无限为-1
end方法必须在accept处理完所有数据后调用,完成一些收尾工作
end之后Sink对象可以再次复用
Sink对象是用来封装本次操作的和下次操作之间的逻辑
*/
default void begin(long size) {}
default void end() {}
/*
取消被请求。
主要是一些短路操作使用,可以监控下个Sink对象是否短路
如果短路,那么本Sink在有些情况下也应该短路。
*/
default boolean cancellationRequested() { return false; }
}
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
/*
对Rederence流的函数式接口对象进行链式的连接
downstream是下个操作的Sink对象
e.g. aStreamObj.filter(predicate).map(function).forEach(consumer);
forEach的Sink.this.accept(T t) {
consumer.accept(t);
}
map的Sink.this.accept(T t) {
downstream.accept(function.apply(t))
}
filter的Sink.this.accept(T t) {
if (predicate) {
downstream.accept(t)
}
}
调用,predicate -> function -> consumer
可以看出整个操作流程成为一条执行链
并且封装所有的Sink为一个Sink,逻辑:
filterSink(mapSink(forEachSink))
begin,end,cancellationRequested方法也被封装成一条调用链
downstream方法的调用必须有
否则,对一个Sink调用方法将无法传播到下一个Sink
*/
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) { downstream.begin(size); }
@Override
public void end() { downstream.end(); }
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
Q6:什么是TerminalOp, TerminalSink?
A6:TerminalOp是终止操作对象,与中间操作建立一个操作流维护操作对象不同,终止操作并不建立一个流。TerminalSink是终止操作的Sink包装。
package java.util.stream;
interface TerminalOp<E_IN, R> {
/*
由于流的惰性求值,终止操作必须对流进行
终止操作后面不能继续使用流方法,所以终止操作不必是一个流对象
由此,该接口诞生。
*/
default StreamShape inputShape() { return StreamShape.REFERENCE; }
/*
返回终止操作特征,不同的终止操作有不同的结果。
终止操作特征只有:IS_SHORT_CIRCUIT 和 NOT_ORDERED有效
*/
default int getOpFlags() { return 0; }
// 并发处理
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}
// 非并发时处理
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator);
}
// 支持了Supplier接口,有些终结方法是有返回值的
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
Q7:举例说明中间方法的具体实现
Q7:
// 一个无状态的中间方法,在并发和非并发时逻辑是一致的
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
/*
e.g. Stream.of("x", "xx").filter(i -> i.length() >= 2).map(略)
filter方法生成了一个无状态中间流,使得.map可以继续
this是filter的调用流, 即Stream.of产生的流
StreamOpFlag.NOT_SIZED是新的操作特征
重写了AbstractPipline的opWrapSink方法
该方法就是将filter的sink和map的sink链接
并且前面的sink是依赖后面的sink实例化的
惰性求值,会在终止操作的时候调用
*/
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
/* sink是下一个操作的sink对象,在ChainedReference是downstream变量
flags是上一次操作的结合特征
*/
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
/*
size可能是具体的值,但是经过fliter操作,size就不是精准的
所以传递给下一个方法的size为-1
-1表示容量是未知或无限的
*/
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
/*
predicate就是被保存操作流的opWrapSink方法内的匿名类中
downstream.accept(u)下一个sink的accept方法
逻辑上preidicate.test(u) == true, u才会继续传播
所以流操作是不对数据直接进行修改
*/
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
/* 一个有状态的中间方法,任何短路方法和元素的之间互有影响的方法都是有状态的
可能在accpet方法中收集元素,处理好收集好的元素,之后在向下一个sink传播,例如sorted
也可能使用逻辑,满足向下sink传播,不满足不传播,例如distinct
在并发时逻辑可能和非并发时不一致
*/
private static final class SizedRefSortingSink<T>
extends AbstractRefSortingSink<T> {
private T[] array;
private int offset;
// 这是一排序方法
SizedRefSortingSink(Sink<? super T> sink,
Comparator<? super T> comparator) {
super(sink, comparator);
}
// 当中件方法执行到这里的时候,所有的元素都被收集array中,并不向下执行了
@Override
public void accept(T t) {
array[offset++] = t;
}
@Override
@SuppressWarnings("unchecked")
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = (T[]) new Object[(int) size];
}
/*
在end方法时才进行排序,
调用downstream继续执行
非短路调用流程:
sink.begin(-1);
spliterator.forEachRemaining(sink);
执行完所有元素的sortedsink的accept方法,退出
sink.end(); // 调用排序方法,继续向下一个sink执行
*/
@Override
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
/*
cancellationRequestedCalled == false
说明sink链中是没有短路方法的,可以将所有的元素向后传播
== true的说面有短路情况,需要监视传播
*/
if (!cancellationRequestedCalled) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset &&
!downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
/*
sink链中有任何一个方法是短路方法时的执行逻辑
do { } while (!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
sink.cancellationRequestedCalled方法永远监控不到sorted后面的短路情况
因为sortedsink.cancellationRequestedCalled方法是不向下传播的
当sortedsink.cancellationRequestedCalled方法为被调用的时候,
说明sink链中是一定有短路的方法
sorted.cancellationRequested = true;说明要监控后面sink的短路情况
*/
@Override
public final boolean cancellationRequested() {
cancellationRequestedCalled = true;
return false;
}
}
Q8:举例说明终止方法的实现及执行流程
A8:
// 终止方法forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
// 终止执行方式和终止sink的生成
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
/*
ForEachOp实现了TerminalOp是一个终止执行对象
实现了TerminalSink也是一个终止sink对象
*/
abstract static class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
/* ordered是终止操作的特征的逻辑值,true表示0无效,false表示NOT_ORDERED
*/ ordered在并行处理时有不同的意义
private final boolean ordered;
protected ForEachOp(boolean ordered) { this.ordered = ordered; }
@Override
public int getOpFlags() { return ordered ? 0 : StreamOpFlag.NOT_ORDERED; }
// 非并行处理的执行方法
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
// TerminalSink也是Suppier,get是某些需要终止方法有返回值需要实现的
@Override
public Void get() { return null; }
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
// begin和end直接是默认的空方法,不需要向下传播,也不是短路操作
@Override
public void accept(T t) {
consumer.accept(t);
}
}
}
// 执行,执行,执行
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
/*
终止方法forEach的调用者是中间操作的最后一个流
也就是forEach的前一个方法产生的流对象
linkedOrConsumed必定是false
*/
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this,
sourceSpliterator(terminalOp.getOpFlags()))
/* 只讨论非并发的处理方式
terminalOp.evaluateSequential是终止执行对象的具体实现
*/
: terminalOp.evaluateSequential(this,
sourceSpliterator(terminalOp.getOpFlags()));
}
private Spliterator<?> sourceSpliterator(int terminalFlags) {
// 本方法是获取资源分离迭代器
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
/* 并行处理的情况下,如果有任何一个有状态方法,才会特殊处理
因为无状态方法,在并行时逻辑是没有变化的
*/
if (isParallel() && sourceStage.sourceAnyStateful) { // 略 }
if (terminalFlags != 0) {
// 将终结操作特征与流结合特征结合
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags,
combinedFlags);
}
return spliterator;
}
// 调用具体的终止方法的shxian
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
/* helper就是forEach方法的调用流
this是终止对象,
wrapAndCopyInto返回this
get()是针对有结果的对象,这里没意义
*/
return helper.wrapAndCopyInto(this, spliterator).get();
}
// wrap是将sink对象链接,copyInto是具体的执行
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink,
Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
// 将sink对象从后向前链接
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
/*
中间操作的depth都是大于0的
p = p.previousStage从后向前
*/
for ( @SuppressWarnings("rawtypes") AbstractPipeline
p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
/*
所有的中间操作流都重协了opWrapSink方法用于生成具体的Sink
p.previousStage.combinedFlags是前一流的结合特征
对本流的结合特征可能是有帮助的
*/
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
// 这个sink是第一个中间操作的sink
return (Sink<P_IN>) sink;
}
// 执行
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink,
Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
/*
短路操作是:只执行一部分元素就可以停止
e.g. limit(100),只需要前一百个元素,那么就不需要从spl中迭代所有的元素
*/
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
/*
短路特征会在结合特征里一直向下传播
没有短路操作,forEachRemaining所有的元素
wrappedSink是sink链
end之后,流操作就已经完成了
*/
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
// 有短路操作
copyIntoWithCancel(wrappedSink, spliterator);
}
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink,
Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
/* 获取到的p为资源流,没有任何意义
可能是非REFERENCE流中有其他的forEachWithCancel实现
*/
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
return cancelled;
}
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator,
Sink<P_OUT> sink) {
boolean cancelled;
/*
tryAdvance是一个一个迭代元素的
必须满足sink.cancellationRequestedCalled() == false
当有一个短路操作满足时,迭代分离器就停止迭代
e.g. limit(100).limit(50);
limit(50)满足是就停止迭代
对于limit(100),逻辑上也是没有问题的
任何的短路accept方法都会实现逻辑上的停止
e.g. limit.accept
limit = 50
accept(T t) {
if (limit > 0) {
limit--;
downstream.accept(t)
}
}
实现了只向下传播50个数据
但是分离迭代器没有停止,产生了没有必要的迭代
cancellationRequested() {
return limit == 0 ? true : false;
}
所以,cancellationRequested是让迭代器停止的信号
*/
do { } while (!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
Stream的并发执行流程
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this,
sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this,
sourceSpliterator(terminalOp.getOpFlags()));
}
// 在并发执行执行中重要是sourceSpliterator方法,不是evaluateParallel
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
// 获取sourceStage.spliterator略
// 条件是并发,并且有有状态中间方法才进行特殊处理
if (isParallel() && sourceStage.sourceAnyStateful) {
int depth = 1;
// 遍历所有的中间操作流,
for (@SuppressWarnings("rawtypes") AbstractPipeline
u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
/*
如果p是有状态的流进行特殊处理,先看depth
e.g. 有-有状态流
无-无状态流
资源流 -> 无1 -> 无2 -> 有1 -> 无4 -> 有2 -> 无5 -> 终止操作
depth: 0 1 2 0 1 0 1
wrapSink方法:
for ( @SuppressWarnings("rawtypes") AbstractPipeline
p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
从‘有1’开始向前链接sink,可得 无1 - 无2 - 有1 sink链
从‘有2’开始向前链接sink,可得 无4 - 有2 sink链
从终止操作开始向前链接sink, 可的 无5 - 终止操作 sink链
至此引出了一个分段执行的方法:
先执行‘有1’sink链,得出结果封装成Spliterator
交给‘有2’sink链执行,得出Spliterator
交给terminalOp.evaluateParallel执行,结束
为什么要分段执行?
因为有状态的中间方法大多数需要收集前面流过来的所有元素
才能继续执行,可能执行完毕才能传递给下一个sink,从此逻辑,分段执行。
*/
if (p.opIsStateful()) {
depth = 0;
/* 短路方法一定是有状态方法,并行处理时,需要去掉短路特征
1. 并行时每个线程都维护了资源的一部分。不可能因为一个短路信号就中断
2. 分段处理,短路信号较短,没有很大的影响
*/
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
thisOpFlags = thisOpFlags &
~StreamOpFlag.IS_SHORT_CIRCUIT;
}
/* opEvaluateParallelLazy是分段处理重要方法
就是像上面分段处理一样,执行p前面sink的流程
有状态方法是不调用非并行的sink逻辑的,也就是onWrapSink
p具体的并行逻辑,不同方法有不同的实现
StatefulOp必须实现的是opEvaluateParallelLazy,但是很少使用,
使用更多的是opEvaluateParallelLazy,
所以StatefulOp都实现了opEvaluateParallelLazy;
*/
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// sized是很有用特征
thisOpFlags = spliterator.hasCharacteristics(
Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED)
| StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED)
| StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
// 去掉了短路特征,新的spl的Sized特征也可能发生了变化
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags,
u.combinedFlags);
}
}
if (terminalFlags != 0) {
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags,
combinedFlags);
}
return spliterator;
}
// 举个例子distinct
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?>
upstream) {
return new ReferencePipeline.StatefulOp<T, T>(upstream,
StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT
| StreamOpFlag.NOT_SIZED) {
<P_IN> Node<T> reduce // 略
@Override
<P_IN> Node<T> opEvaluateParallel //略
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T>
helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags()))
{
/* helper是此中间方法流的前一个流
其特征是满足DISTINCT,那么去重就不用做了
但是distinct前面的无状态方法都没有执行阿!
wrapSpliterator就是将前面对sink链和spliterator封装到一起
可以想象,使用静态代理spliterator;
在迭代元素时,每个都经过sink链的处理,
那么,这个代理spl就是distinct方法sink链符合逻辑的spl
*/
return helper.wrapSpliterator(spliterator);
}
else if (StreamOpFlag.ORDERED.isKnown(
helper.getStreamAndOpFlags())) {
/*
如果helper有ORDERED特征,那么distinct方法没有
NOT_ORDERED属性就需要保持OREDERED
ORDERED是自然顺序
必须在此以保持自然顺序的方式得出此sink阶段的结果spl
像的下面代理方式处理,是否保持ORERED特征,
只能看后面真正并行执行时是不是保序方法了
所以有ORDERED特征的方法应该立即执行,不去依赖后面的情况
reduce使用了ReduceOp,此sink使用了hashset接收元素
并行时,使用CountedCompleter任务
compute主要逻辑是:将spl分成小的spl,
每个小任务持有一个小spl,
每个小任务持有自己的sink链,保证了sink中hashset是单线程安全的
最后将所有任务对hashset合并成一个hashset
hashset在转化成spl
*/
return reduce(helper, spliterator).spliterator();
}
else {
/*
没有ORERED特征
wrapSpliterator代理了spl和前面的无状态sink链
在使用一个DistinctSpliterator代理
DistinctSpliterator迭代时,先经过无状态的sink,
再经过去重逻辑(ConcurrentHashMap)就符合去重逻辑了
这种代理的方式只是封装了spl和sink链的逻辑,没有执行
再下一个StatefulOp处可能执行,也可能继续封装sink逻辑代理,
那么只有在终结方法处执行了
*/
return new StreamSpliterators.DistinctSpliterator<>(helper
.wrapSpliterator(spliterator));
}
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) { //略 }
}
Q9:什么是SpinedBuffer?
A9:SpindedBuffer是二维数组结构的缓冲,优势是减小了扩容的次数和代价。
package java.util.stream
abstract class AbstractSpinedBuffer {
/*
SpinedBuffer维护了一个二维数组,可增、查,不可删、改
SpinedBuffer -> [[], [], [], null, null]
外部数组叫spine,内部数组叫chunk
spine最小容量:8
chunk最小容量:1 << 4, 最大容量:1 << 30
chunk是填满一个在建立下一个chunk
elementIndex是当前可写的chunk的可写位置的索引
spineIndex是当前可写chunk在spine中的索引位置
priorElementCount数组索引i位置上是spine索引i位置之前的所有chunk的容量和
总容量:priorElementCount[spineIndex] + spine[spineIndex].length
总数量:priorElementCount[spineIndex] + elementIndex
*/
public static final int MIN_CHUNK_POWER = 4;
public static final int MIN_CHUNK_SIZE = 1 << MIN_CHUNK_POWER;
public static final int MAX_CHUNK_POWER = 30;
public static final int MIN_SPINE_SIZE = 8;
protected final int initialChunkPower;
protected int elementIndex;
protected int spineIndex;
protected long[] priorElementCount;
protected AbstractSpinedBuffer() {
this.initialChunkPower = MIN_CHUNK_POWER; // 4
}
protected AbstractSpinedBuffer(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal Capacity: "
+ initialCapacity);
/*
将容量换成大致的power
initialCapacity - 1是为了防止
e.g. 0b0001_0000不减1 power == 5
实际上power应该为4
减1,0b0000_1111 power == 4
只对特定的数字有优化。所以最好的initialCapacity是2的n次方
*/
this.initialChunkPower = Math.max(MIN_CHUNK_POWER,
Integer.SIZE - Integer.numberOfLeadingZeros(initialCapacity - 1));
}
public boolean isEmpty() {
return (spineIndex == 0) && (elementIndex == 0);
}
public long count() {
return (spineIndex == 0)
? elementIndex
: priorElementCount[spineIndex] + elementIndex;
}
/*
chunk的大小是倍增的,n为spine索引
0或1,建立 1 << initialChunkPower
大于1,建立 1 << initialChunkPower + n - 1
e.g. spine[chunk[16], chunk[16], chunk[32], chunk[64], chunk[128]]
这种情况非常容易二分,任何一个位置chunk的容量都是其前面的chunk的容量和
*/
protected int chunkSize(int n) {
int power = (n == 0 || n == 1)
? initialChunkPower
: Math.min(initialChunkPower + n - 1,
AbstractSpinedBuffer.MAX_CHUNK_POWER);
return 1 << power;
}
public abstract void clear();
}
class SpinedBuffer<E> extends AbstractSpinedBuffer
implements Consumer<E>, Iterable<E> {
// 指向当前可写的chunk
protected E[] curChunk;
// 二维数组
protected E[][] spine;
// 生成迭代分离器的特征
private static final int SPLITERATOR_CHARACTERISTICS
= Spliterator.SIZED | Spliterator.ORDERED | Spliterator.SUBSIZED;
@SuppressWarnings("unchecked")
SpinedBuffer(int initialCapacity) {
super(initialCapacity);
// 会直接初始化一个chunk
curChunk = (E[]) new Object[1 << initialChunkPower];
}
@SuppressWarnings("unchecked")
SpinedBuffer() {
super();
curChunk = (E[]) new Object[1 << initialChunkPower];
}
// 容量
protected long capacity() {
return (spineIndex == 0)
? curChunk.length
: priorElementCount[spineIndex] + spine[spineIndex].length;
}
// 初始化spine,priorElementCount,curChunk放在spine[0]
@SuppressWarnings("unchecked")
private void inflateSpine() {
if (spine == null) {
spine = (E[][]) new Object[MIN_SPINE_SIZE][];
priorElementCount = new long[MIN_SPINE_SIZE];
spine[0] = curChunk;
}
}
// 根据目标容量扩容
@SuppressWarnings("unchecked")
protected final void ensureCapacity(long targetSize) {
long capacity = capacity();
if (targetSize > capacity) {
inflateSpine();
for (int i=spineIndex+1; targetSize > capacity; i++) {
if (i >= spine.length) {
// 这种情况spine完全填满,扩容spine
int newSpineSize = spine.length * 2;
spine = Arrays.copyOf(spine, newSpineSize);
priorElementCount = Arrays.copyOf(priorElementCount,
newSpineSize);
}
// chunk填满,建立下一个chunk
int nextChunkSize = chunkSize(i);
spine[i] = (E[]) new Object[nextChunkSize];
priorElementCount[i] = priorElementCount[i-1]
+ spine[i-1].length;
capacity += nextChunkSize;
}
}
}
protected void increaseCapacity() {
ensureCapacity(capacity() + 1);
}
// 查找,index是针对总size的
public E get(long index) {
if (spineIndex == 0) {
if (index < elementIndex)
return curChunk[((int) index)];
else
throw new IndexOutOfBoundsException(Long.toString(index));
}
if (index >= count())
throw new IndexOutOfBoundsException(Long.toString(index));
for (int j=0; j <= spineIndex; j++)
if (index < priorElementCount[j] + spine[j].length)
return spine[j][((int) (index - priorElementCount[j]))];
throw new IndexOutOfBoundsException(Long.toString(index));
}
// 从索引offset开始拷贝进入array
public void copyInto(E[] array, int offset) {
long finalOffset = offset + count();
if (finalOffset > array.length || finalOffset < offset) {
throw new IndexOutOfBoundsException("does not fit");
}
if (spineIndex == 0)
System.arraycopy(curChunk, 0, array, offset, elementIndex);
else {
for (int i=0; i < spineIndex; i++) {
System.arraycopy(spine[i], 0, array, offset, spine[i].length);
offset += spine[i].length;
}
if (elementIndex > 0)
System.arraycopy(curChunk, 0, array, offset, elementIndex);
}
}
// 转化为一维数组
public E[] asArray(IntFunction<E[]> arrayFactory) {
long size = count();
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
E[] result = arrayFactory.apply((int) size);
copyInto(result, 0);
return result;
}
// 清空,只保留spine[0]处的chunk为curChunk,并清空
@Override
public void clear() {
if (spine != null) {
curChunk = spine[0];
for (int i=0; i<curChunk.length; i++)
curChunk[i] = null;
spine = null;
priorElementCount = null;
}
else {
for (int i=0; i<elementIndex; i++)
curChunk[i] = null;
}
elementIndex = 0;
spineIndex = 0;
}
@Override
public Iterator<E> iterator() {
return Spliterators.iterator(spliterator());
}
@Override
public void forEach(Consumer<? super E> consumer) {
for (int j = 0; j < spineIndex; j++)
for (E t : spine[j])
consumer.accept(t);
for (int i=0; i<elementIndex; i++)
consumer.accept(curChunk[i]);
}
// 添加元素
@Override
public void accept(E e) {
// chunk填满需要扩容
if (elementIndex == curChunk.length) {
// 可能只有curChunk,没有spine,需要初始化
inflateSpine();
// 没有意义的if
if (spineIndex+1 >= spine.length || spine[spineIndex+1] == null)
increaseCapacity();
elementIndex = 0;
++spineIndex;
curChunk = spine[spineIndex];
}
curChunk[elementIndex++] = e;
}
@Override
public String toString() {
List<E> list = new ArrayList<>();
forEach(list::add);
return "SpinedBuffer:" + list.toString();
}
// 生成分离迭代器
public Spliterator<E> spliterator() {
class Splitr implements Spliterator<E> {
// 维护的第一个chunk在spine中的Index
int splSpineIndex;
// 维护的最后一个chunk在spine中的Index
final int lastSpineIndex;
/*
分离器中的chunk在被进行消费时,可能需要经过多个chunk
splChunk指向当前被消费的chunk
splElementIndex被消费的元素在chunk中的Index
*/
E[] splChunk;
int splElementIndex;
// 维护的最后一个chunk的容量
final int lastSpineElementFence;
Splitr(int firstSpineIndex, int lastSpineIndex,
int firstSpineElementIndex, int lastSpineElementFence) {
this.splSpineIndex = firstSpineIndex;
this.lastSpineIndex = lastSpineIndex;
this.splElementIndex = firstSpineElementIndex;
this.lastSpineElementFence = lastSpineElementFence;
assert spine != null || firstSpineIndex == 0
&& lastSpineIndex == 0;
splChunk = (spine == null) ? curChunk :spine[firstSpineIndex];
}
// 分离器剩余未被消费的元素的数量
@Override
public long estimateSize() {
return (splSpineIndex == lastSpineIndex)
? (long) lastSpineElementFence - splElementIndex
: priorElementCount[lastSpineIndex]
+ lastSpineElementFence
- priorElementCount[splSpineIndex]
- splElementIndex;
}
@Override
public int characteristics() {
return SPLITERATOR_CHARACTERISTICS;
}
@Override
public boolean tryAdvance(Consumer<? super E> consumer) {
// 略
}
@Override
public void forEachRemaining(Consumer<? super E> consumer) {
// 略
}
// 这里就体现出了chunkSize()生成的chunk使spine容易二分
@Override
public Spliterator<E> trySplit() {
if (splSpineIndex < lastSpineIndex) {
Spliterator<E> ret = new Splitr(splSpineIndex,
lastSpineIndex - 1,
splElementIndex,
spine[lastSpineIndex-1].length);
splSpineIndex = lastSpineIndex;
splElementIndex = 0;
splChunk = spine[splSpineIndex];
return ret;
}
else if (splSpineIndex == lastSpineIndex) {
int t = (lastSpineElementFence - splElementIndex) / 2;
if (t == 0)
return null;
else {
// 只有一个chunk的时候生成ArraySpliterator
Spliterator<E> ret = Arrays.spliterator(splChunk,
splElementIndex, splElementIndex + t);
splElementIndex += t;
return ret;
}
}
else {
return null;
}
}
}
return new Splitr(0, spineIndex, 0, elementIndex);
}
}
Q10:什么是WrapSpliterator?
A10:有状态方法并发时,lazy执行时,使用WrapSpliterator代理spl并封装有状态方法之前的sink链,而有状态方法的sink需要再使用其他的spl代理WrapSpliterator进行封装。no-lazy执行,使用Nodes类代理,只代理处理完的spl,在调用spliterator方法生成spl。
// 有状态方法的helper(有状态方法的前一个流)调用
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN>
sourceSpliterator) {
/* 如果helper也是一个有状态的流,depth等于0,
直接返回helper封装好的sourceSpliterator
*/
if (depth == 0) {
return (Spliterator<E_OUT>) sourceSpliterator;
}
else {
// helper是无状态的流,需要封装无状态的sink链
return wrap(this, () -> sourceSpliterator, isParallel());
}
}
@Override
final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel) {
// 直接生成了一个代理对象
return new StreamSpliterators.WrappingSpliterator<>(ph, supplier,
isParallel);
}
private abstract static class AbstractWrappingSpliterator<P_IN, P_OUT,
T_BUFFER extends AbstractSpinedBuffer>
implements Spliterator<P_OUT> {
/* 只能由StatefulOp的helper创建的spl代理,
主要是将无状态的sink链封装进入迭代的逻辑中
由于size的不确定性,使用buffer缓冲容器
但是这里buffer最多只能有一个元素,真的不懂,有什么用
bufferSink是将buffer的accept方法链接到无状态链最后
accept方法将经由无状态链的元素放入buffer中
pusher是无状态sink链的调用者,完成accept动作
finished在accept所有对元素之后为true
nextToConsume下一被消费的元素在buffer中的索引
*/
final boolean isParallel;
final PipelineHelper<P_OUT> ph;
private Supplier<Spliterator<P_IN>> spliteratorSupplier;
Spliterator<P_IN> spliterator;
Sink<P_IN> bufferSink;
BooleanSupplier pusher;
long nextToConsume;
T_BUFFER buffer;
boolean finished;
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> spliteratorSupplier,
boolean parallel) {
this.ph = ph;
this.spliteratorSupplier = spliteratorSupplier;
this.spliterator = null;
this.isParallel = parallel;
}
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
Spliterator<P_IN> spliterator,
boolean parallel) {
this.ph = ph;
this.spliteratorSupplier = null;
this.spliterator = spliterator;
this.isParallel = parallel;
}
final void init() {
if (spliterator == null) {
spliterator = spliteratorSupplier.get();
spliteratorSupplier = null;
}
}
// 使用在tryAdvance中使用,时候buffer中有下一个元素
final boolean doAdvance() {
if (buffer == null) {
if (finished)
return false;
init();
initPartialTraversalState();
nextToConsume = 0;
bufferSink.begin(spliterator.getExactSizeIfKnown());
// buffer填添加一个元素,之后会被tryAdvance消耗
return fillBuffer();
}
else {
// buffer不为null
++nextToConsume; // 1
boolean hasNext = nextToConsume < buffer.count(); // 肯定是false
if (!hasNext) {
nextToConsume = 0;
buffer.clear(); // 清空
hasNext = fillBuffer(); // 添加一个
}
return hasNext;
}
}
abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?>
wrap(Spliterator<P_IN> s);
abstract void initPartialTraversalState();
@Override
public Spliterator<P_OUT> trySplit() {
if (isParallel && buffer == null && !finished) {
init();
Spliterator<P_IN> split = spliterator.trySplit();
return (split == null) ? null : wrap(split);
}
else
return null;
}
private boolean fillBuffer() {
while (buffer.count() == 0) { // 这条件太苛刻
/* bufferSink都是无状态sink,cancellationRequested肯定是false
pusher.getAsBoolean()向buffer中添加一个元素
如果是fliter之类的截停sink,while循环
*/
if (bufferSink.cancellationRequested() ||
!pusher.getAsBoolean()) {
if (finished)
return false;
else {
/* 无状态sink中end也没啥操作阿
如果sink链中有sorted,end填充buffer,这buffer还有点用
*/
bufferSink.end(); // might trigger more elements
finished = true;
}
}
}
return true;
}
@Override
public final long estimateSize() // 略
@Override
public final long getExactSizeIfKnown() // 略
@Override
public final int characteristics() // 略
@Override
public Comparator<? super P_OUT> getComparator() // 略
@Override
public final String toString() // 略
}
static final class WrappingSpliterator<P_IN, P_OUT> extends
AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
WrappingSpliterator(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean parallel) {
super(ph, supplier, parallel);
}
WrappingSpliterator(PipelineHelper<P_OUT> ph,
Spliterator<P_IN> spliterator,
boolean parallel) {
super(ph, spliterator, parallel);
}
@Override
WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
return new WrappingSpliterator<>(ph, s, isParallel);
}
@Override
void initPartialTraversalState() {
SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
buffer = b;
bufferSink = ph.wrapSink(b::accept); // 链接sink
pusher = () -> spliterator.tryAdvance(bufferSink); // 填充buffer的方法
}
@Override
public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
Objects.requireNonNull(consumer);
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
@Override
public void forEachRemaining(Consumer<? super P_OUT> consumer) {
// 没有调用过tryAdvance,buffer == null,finished == false
if (buffer == null && !finished) {
Objects.requireNonNull(consumer);
init();
// 不经过buffer了,直接链接sink链执行
ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
finished = true;
}
else {
// 有buffer
do { } while (tryAdvance(consumer));
}
}
}
Q11:并行任务的逻辑是什么?
A11:主要使用CountedCompleter任务。先将spl分割成多个spl,每个task持有一个spl,之后将spl使用sink消费迭代。有状态方法,可能会对sink消费产生对元素进行另外的操作。
总结
并行处理ORDERED特征的spl时,每一段都需要并行处理之后传递给下一段再并行处理。处理NOT_ORDERED特征的spl,可以惰性求值,效率更高。使用unordered方法可以将特征变成NOT_ORDERED(结果不保持自然顺序),应该使用在StatefulOp之前,sorted方法是特征变成IS_ORDERED的唯一方法,配合unordered使用需要注意逻辑。
虽然并行处理将所用StatefulOp的短路特征去掉,但是StatefulOp并行处理的sink逻辑都是特别实现的,其内部是完成逻辑上的短路,并且是分段执行,前一段有短路逻辑,下一段的数据必然是比前一段没有短路逻辑的要少。效率情况很模糊,和线程的切换、数据的总量、短路数据的量、OREDED特征的保持有关。并向效率和非并行效率的对比需要实测。
只有可以惰性求值的方法才需要重写opEvaluateParallelLazy,无法惰性求值的方法(e.g. sorted)是调用默认的opEvalutateParallelLazy调用opEvalutateParallel这个非惰性方法直接求值。