stream描述
用于支持元素流上的功能样式操作的类,例如集合上的map-reduce转换。例如:
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
这里我们使用widgets
a Collection<Widget>
作为流的源,然后对流执行filter-map-reduce以获得红色小部件的权重之和。(求和是减少 操作的一个例子。)
此包中引入的关键抽象是流。类Stream
,IntStream
, LongStream
,和DoubleStream
超过目的和原始流int
,long
和 double
类型。Streams在几个方面与集合不同:
- 没有存储空间 流不是存储元素的数据结构; 相反,它通过计算操作管道传递来自诸如数据结构,数组,生成器函数或I / O通道的源的元素。
- 功能性。对流的操作会产生结果,但不会修改其源。例如,过滤
Stream
从集合中获取的内容会生成一个Stream
没有过滤元素的新元素,而不是从源集合中删除元素。 - 懒惰寻求。许多流操作(例如过滤,映射或重复删除)可以懒惰地实现,从而暴露出优化的机会。例如,“找到
String
具有三个连续元音的第一个”不需要检查所有输入字符串。流操作分为中间(生成Stream
)操作和终端(产生价值或副作用)操作。中间操作总是很懒惰。 - 可能是无限的。虽然集合的大小有限,但流不需要。短路操作,例如
limit(n)
或findFirst()
可以允许无限流上的计算在有限时间内完成。 - 耗材。流的元素仅在流的生命期间访问过一次。像
Iterator
a 一样,必须生成一个新流来重新访问源的相同元素。
流可以通过多种方式获得。一些例子包括:
- 来自
Collection
通过stream()
和parallelStream()
方法; - 从阵列通过
Arrays.stream(Object[])
; - 来自流类的静态工厂方法,例如
Stream.of(Object[])
,IntStream.range(int, int)
或Stream.iterate(Object, UnaryOperator)
; - 文件的行可以从
BufferedReader.lines()
; - 文件路径流可以从方法中获得
Files
; - 随机数的流可以从
Random.ints()
; - 在JDK许多其他流的轴承的方法,包括
BitSet.stream()
,Pattern.splitAsStream(java.lang.CharSequence)
,和JarFile.stream()
。
使用这些技术,第三方库可以提供其他流源 。
流操作和管道
流操作分为中间操作和 终端操作,并组合成流管道。流管道由源(例如 Collection
,数组,生成器函数或I / O通道)组成; 然后是零或更多的中间操作,如 Stream.filter
或Stream.map
; 和终端操作,如Stream.forEach
或Stream.reduce
。
中间操作返回一个新流。他们总是 懒惰 ; 执行中间操作,例如 filter()
实际上不执行任何过滤,而是创建一个新流,当遍历时,它包含与给定谓词匹配的初始流的元素。在执行管道的终端操作之前,不会开始遍历管道源。
终端操作(例如Stream.forEach
或 IntStream.sum
)可以遍历流以产生结果或副作用。在执行终端操作之后,流管道被认为已消耗,并且不能再使用; 如果需要再次遍历同一数据源,则必须返回数据源以获取新流。在几乎所有情况下,终端操作都很渴望,在返回之前完成数据源的遍历和管道的处理。只有终端操作iterator()
而 spliterator()
不是; 这些是作为“逃生舱口”提供的,以便在现有操作不足以执行任务时启用任意客户端控制的管道遍历。
懒惰地处理流可以显着提高效率; 在诸如上面的filter-map-sum示例的流水线中,过滤,映射和求和可以融合到数据的单个传递中,具有最小的中间状态。懒惰还允许在没有必要时避免检查所有数据; 对于诸如“查找超过1000个字符的第一个字符串”之类的操作,只需要检查足够的字符串以找到具有所需特征的字符串,而不检查源中可用的所有字符串。(当输入流是无限的而不仅仅是大的时候,这种行为变得更加重要。)
中间操作进一步分为无状态操作 和有状态操作。无状态操作(例如filter
和)map
在处理新元素时不保留先前看到的元素的状态 - 每个元素都可以独立于其他元素的操作进行处理。状态操作(例如 distinct
和sorted
)可以在处理新元素时包含先前看到的元素的状态。
有状态操作可能需要在生成结果之前处理整个输入。例如,在查看流的所有元素之前,不能通过对流进行排序来产生任何结果。因此,在并行计算下,一些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。仅包含无状态中间操作的管道可以在一次通过中处理,无论是顺序还是并行,具有最小的数据缓冲。
此外,一些操作被认为是短路操作。如果在呈现无限输入时,它可能产生有限流,则中间操作是短路的。如果在呈现无限输入时它可以在有限时间内终止,则终端操作是短路的。在流水线中进行短路操作是处理无限流以在有限时间内正常终止的必要但不充分的条件。
并行
具有显式for-
循环的处理元素本质上是串行的。Streams通过将计算重新定义为聚合操作的流水线而不是作为每个单独元素的命令操作来促进并行执行。所有流操作都可以串行或并行执行。除非明确请求并行性,否则JDK中的流实现会创建串行流。例如,Collection
有方法Collection.stream()
和Collection.parallelStream()
分别生成顺序和并行流; 其他流方法,例如IntStream.range(int, int)
产生顺序流,但这些流可以通过调用它们的BaseStream.parallel()
方法有效地并行化。要并行执行先前的“窗口小部件权重总和”查询,我们会这样做:
int sumOfWeights = widgets.
此示例的串行和并行版本之间的唯一区别是创建初始流,使用“ parallelStream()
”而不是“ stream()
”。当启动终端操作时,根据调用它的流的方向,顺序地或并行地执行流管道。可以使用该isParallel()
方法确定流是以串行还是并行方式执行,并且可以使用BaseStream.sequential()
和BaseStream.parallel()
操作来修改流的方向 。当启动终端操作时,根据调用它的流的模式,顺序地或并行地执行流管道。
除了被识别为明确不确定的操作之外,例如findAny()
,流是顺序执行还是并行执行不应该改变计算结果。
大多数流操作接受描述用户指定行为的参数,这些参数通常是lambda表达式。为了保持正确的行为,这些行为参数必须是非干扰的,并且在大多数情况下必须是无状态的。这些参数总是函数接口的实例,例如Function
,并且通常是lambda表达式或方法引用。
不干涉
Streams使您能够在各种数据源上执行可能并行的聚合操作,包括甚至非线程安全的集合,例如 ArrayList
。只有在执行流管道期间我们能够防止干扰数据源时,才有可能实现这一点 。除了转义舱口的操作iterator()
和 spliterator()
,被调用的终端操作时开始执行,并且终端操作完成时结束。对于大多数数据源,防止干扰意味着确保数据源根本 不被修改在流管道的执行期间。值得注意的例外是其源是并发集合的流,这些集合专门用于处理并发修改。并发流源是那些Spliterator
报告 CONCURRENT
特征的源。
因此,源流可能不是并发的流管道中的行为参数永远不应该修改流的数据源。如果行为参数修改或导致修改流的数据源,则该行为参数会干扰非并发数据源。不干涉的需要适用于所有管道,而不仅仅是并行管道。除非流源是并发的,否则在执行流管道期间修改流的数据源可能会导致异常,错误答案或不一致的行为。对于性能良好的流源,可以在终端操作开始之前修改源,并且这些修改将反映在所覆盖的元素中。例如,请考虑以下代码:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
首先创建一个包含两个字符串的列表:“one”; 和“两个”。然后从该列表创建流。接下来,通过添加第三个字符串来修改列表:“three”。最后,收集流的元素并将它们连接在一起。由于在终端collect
操作开始之前修改了列表,因此结果将是一串“一二三”。从JDK集合和大多数其他JDK类返回的所有流都以这种方式表现良好; 对于其他库生成的流,请参阅 低级流构造,以了解构建行为良好的流的要求。
无国籍行为
如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。有状态lambda(或实现适当功能接口的其他对象)的结果取决于在流管道执行期间可能发生变化的任何状态。有状态lambda的一个示例是map()
in中的参数:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
这里,如果映射操作是并行执行的,则由于线程调度差异,相同输入的结果可能因运行而不同,而对于无状态lambda表达式,结果将始终相同。
另请注意,尝试从行为参数访问可变状态会给您在安全性和性能方面做出错误的选择; 如果您不同步对该状态的访问,则会出现数据争用,因此您的代码已损坏,但如果您同步访问该状态,则存在争用的风险会破坏您希望从中受益的并行性。最好的方法是避免有状态的行为参数完全流动操作; 通常有一种方法可以重构流管道以避免有状态。
副作用
通常,不鼓励行为参数对流操作的副作用,因为它们通常会导致无意中违反无国籍要求以及其他线程安全危险。
如果行为参数确实有副作用,除非明确说明,否则不能保证 这些副作用对其他线程的 可见性,也不保证对同一流管道中“相同”元素的不同操作在同一个线程中执行。此外,这些效果的排序可能令人惊讶。即使管道被约束以产生与流源的遭遇顺序一致的 结果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray()
必须产生[0, 2, 4, 6, 8]
),也不保证将映射器函数应用于各个元素的顺序,或者什么线程为给定元素执行任何行为参数。
许多可能试图使用副作用的计算可以更安全和有效地表达而没有副作用,例如使用 减少而不是可变累加器。但是,诸如println()
用于调试目的的副作用通常是无害的。少量的流操作,例如 forEach()
和peek()
,只能通过副作用操作; 这些应该小心使用。
作为如何将不适当地使用副作用的流管道转换为不使用副作用的流管道的示例,以下代码在字符串流中搜索与给定正则表达式匹配的那些,并将匹配放在列表中。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
此代码不必要地使用副作用。如果并行执行,非线程安全性ArrayList
将导致不正确的结果,并且添加所需的同步将导致争用,从而破坏并行性的好处。此外,在这里使用副作用是完全没有必要的; 在forEach()
可以简单地用一个缩小操作是更安全,更有效,并且更适合于并行替换:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
订购
流可能有也可能没有已定义的遭遇顺序。流是否具有遭遇顺序取决于源和中间操作。某些流源(例如List
或数组)本质上是有序的,而其他(例如HashSet
)则不是。一些中间操作(例如sorted()
,可以)在其他无序流上施加遭遇顺序,而其他中间操作可以呈现无序的有序流,例如BaseStream.unordered()
。此外,一些终端操作可以忽略遭遇顺序,例如 forEach()
。
如果订购了流,则大多数操作都被约束为对其遭遇顺序中的元素进行操作; 如果流的源是List
包含的[1, 2, 3]
,那么执行的结果map(x -> x*2)
必须是[2, 4, 6]
。但是,如果源没有定义的遭遇顺序,则值的任何排列[2, 4, 6]
都将是有效结果。
对于顺序流,遭遇顺序的存在与否不会影响性能,只影响确定性。如果订购了流,则在相同的源上重复执行相同的流管道将产生相同的结果; 如果没有订购,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以实现更高效的执行。如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复项(distinct()
)或分组缩减(Collectors.groupingBy()
)。类似地,与遇到订单本质上相关的操作(例如limit()
)可能需要缓冲以确保正确排序,从而破坏并行性的好处。如果流具有遭遇顺序,但用户并不特别关心该遭遇顺序,则使用明确地对流进行排序unordered()
可以改善某些有状态或终端操作的并行性能。然而,大多数流管道,例如上面的“块的权重总和”示例,即使在排序约束下仍然有效地并行化。
减少操作
甲减少操作(也被称为倍)取输入元素的序列,并通过组合操作的反复应用,例如找到一组数字,或累积元素的总和或最大到一个列表它们组合成一个单一的汇总结果。该流的类具有普遍减少操作,所谓的多种形式 reduce()
和collect()
,以及多个专业化还原的形式,如 sum()
,max()
或count()
。
当然,这样的操作可以很容易地实现为简单的顺序循环,如:
int sum = 0;
for (int x : numbers) {
sum += x;
}
然而,有充分理由优先考虑减少操作而不是如上所述的变异累积。简化不仅“更抽象” - 它作为一个整体而不是单个元素在整个流上运行 - 但正确构造的reduce操作本质上是可并行化的,只要用于处理元素的函数是关联的和 无国籍的。例如,给定我们想要找到总和的数字流,我们可以写:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
要么:
int sum = numbers.stream().reduce(0, Integer::sum);
这些减少操作可以安全地并行运行,几乎不需要修改:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
减少并行很好,因为实现可以并行地对数据的子集进行操作,然后组合中间结果以获得最终的正确答案。(即使语言具有“并行for-each”结构,变异累积方法仍然需要开发人员为共享累积变量提供线程安全更新sum
,然后所需的同步可能会消除并行性带来的任何性能提升。 )reduce()
相反,使用删除了并行化还原操作的所有负担,并且库可以提供有效的并行实现,而无需额外的同步。
前面显示的“小部件”示例显示了简化如何与其他操作结合使用批量操作替换循环。如果widgets
是Widget
具有getWeight
方法的对象集合,我们可以找到最重的小部件:
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
在更一般的形式中,reduce
对类型元素的操作<T>
产生类型 的结果<U>
需要三个参数:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
这里,identity元素既是缩减的初始种子值,也是没有输入元素的默认结果。所述累加器 函数接受部分结果和下一个元素,并产生一个新的部分结果。该组合功能结合了两个部分结果产生一个新的部分结果。(组合器在并行缩减中是必需的,其中输入被分区,为每个分区计算部分累积,然后组合部分结果以产生最终结果。)
更正式地说,该identity
值必须是组合器函数的标识。这意味着,对所有人来说u
, combiner.apply(identity, u)
等于u
。此外,该 combiner
功能必须是关联的,必须与兼容accumulator
功能:对所有u
和t
,combiner.apply(u, accumulator.apply(identity, t))
一定要equals()
到accumulator.apply(u, t)
。
三参数形式是两参数形式的概括,将映射步骤结合到累积步骤中。我们可以使用更一般的形式重新构建简单的权重总和示例,如下所示:
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);
虽然显式的map-reduce形式更具可读性,因此通常应该是首选。通过将映射和缩减组合成单个函数,可以优化远离重要工作的情况提供通用形式。
可变减少
甲可变归约运算累积输入元件到一个可变的结果的容器,如一个Collection
或StringBuilder
,作为其处理流中的元素。
如果我们想要获取字符串流并将它们连接成一个长字符串,我们可以通过普通减少来实现:
String concatenated = strings.reduce("", String::concat)
我们会得到理想的结果,甚至可以并行工作。但是,我们可能对性能不满意!这样的实现将进行大量的字符串复制,并且运行时间将是字符数的O(n ^ 2)。一种更高效的方法是将结果累积到a中StringBuilder
,这是一个用于累积字符串的可变容器。我们可以使用相同的技术来并行化可变缩减,就像我们使用普通缩减一样。
调用可变缩减操作 collect()
,因为它将所需结果收集到诸如a的结果容器中Collection
。甲collect
操作要求三个功能:一个供应商功能构建结果容器,蓄能器功能并入一个输入元件到结果容器和组合功能的新实例的一个结果容器的内容物合并到另一个。这种形式与普通减少的一般形式非常相似:
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
与此同样reduce()
,collect
以这种抽象方式表达的好处是它直接适用于并行化:我们可以并行累积部分结果然后将它们组合,只要累积和组合函数满足适当的要求即可。例如,要将流中元素的String表示形式收集到一个中ArrayList
,我们可以为每个表单编写明显的顺序:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
或者我们可以使用可并行化的收集表单:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
或者,将映射操作从累加器函数中拉出来,我们可以更简洁地表达它:
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
在这里,我们的供应商只是ArrayList constructor
,累加器将字符串化元素添加到一个 ArrayList
,而组合器只是用于addAll
将字符串从一个容器复制到另一个容器。
collect
供应商,累加器和组合器三个方面紧密耦合。我们可以使用a的抽象 Collector
来捕获所有这三个方面。上面用于将字符串收集到a中的示例List
可以使用标准重写Collector
:
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
将可变减少包装到收集器中具有另一个优点:可组合性。该类Collectors
包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。例如,假设我们有一个收集器来计算员工流的工资总和,如下所示:
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(在?
第二个类型参数仅仅表明我们不关心这个收集器所使用的中间表示)。如果我们想创造一个收藏家制表按部门工资的总和,我们可以重复summingSalaries
使用 groupingBy
:
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
与常规还原操作一样,collect()
只有满足适当的条件才能并行化操作。对于任何部分累积的结果,将其与空结果容器组合必须产生等效结果。也就是说,对于作为p
任何一系列累加器和组合器调用的结果的部分累积结果 ,p
必须等效于 combiner.apply(p, supplier.get())
。
此外,然而,计算是分开的,它必须产生等效的结果。对于任何输入元件t1
和t2
,结果 r1
和r2
在计算下面必须是等价的:
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
在这里,等价通常意味着根据Object.equals(Object)
。但在某些情况下,可以放宽等同性以解释顺序的差异。
减少,并发和排序
通过一些复杂的简化操作,例如collect()
产生一个Map
,例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
实际上并行执行操作可能会适得其反。这是因为Map
对于某些Map
实现,组合步骤(通过密钥将一个合并到另一个)可能是昂贵的。
但是,假设在此缩减中使用的结果容器是可同时修改的集合 - 例如a ConcurrentHashMap
。在这种情况下,累加器的并行调用实际上可以将它们的结果同时存入同一个共享结果容器中,从而消除了组合器合并不同结果容器的需要。这可能会提升并行执行性能。我们称之为 同时减少。
Collector
支持并发缩减的A 标记为Collector.Characteristics.CONCURRENT
特征。然而,并发收集也有缺点。如果多个线程同时将结果存入共享容器,则存储结果的顺序是不确定的。因此,只有在对正在处理的流不重要的情况下,才能实现并发减少。如果Stream.collect(Collector)
实现,实现将仅执行并发减少
- 流是平行的;
- 收集器具有
Collector.Characteristics.CONCURRENT
特征,并且; - 流是无序的,或者收集器具有
Collector.Characteristics.UNORDERED
特征。
您可以使用该BaseStream.unordered()
方法确保流无序 。例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)
并发等效于何处groupingBy
)。
请注意,如果给定键的元素按照它们在源中出现的顺序出现很重要,那么我们就不能使用并发缩减,因为排序是并发插入的牺牲品之一。然后,我们将被限制为实现顺序缩减或基于合并的并行缩减。
关联性
如果满足以下条件,则 运算符或函数op
是关联的:
(a op b) op c == a op (b op c)
如果我们将其扩展为四个术语,可以看出这对并行评估的重要性:
a op b op c op d == (a op b) op (c op d)
所以我们可以(a op b)
并行评估(c op d)
,然后调用op
结果。
关联操作的示例包括数字加法,最小值和最大值以及字符串连接。
低级流建设
到目前为止,所有流示例都使用了类似 Collection.stream()
或Arrays.stream(Object[])
获取流的方法。这些流式方法是如何实现的?
该类StreamSupport
有许多用于创建流的低级方法,所有方法都使用某种形式的 Spliterator
。分裂器是一个并行的模拟器 Iterator
; 它描述了一个(可能是无限的)元素集合,支持顺序前进,批量遍历,并将输入的某些部分分成另一个可以并行处理的分裂器。在最低级别,所有流都由分裂器驱动。
在实现spliterator时有许多实现选择,几乎所有这些都是使用该spliterator在实现的简单性和流的运行时性能之间进行权衡。创建spliterator的最简单但性能最差的方法是使用迭代器创建一个 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)
。虽然这样的分裂器可以工作,但它可能会提供较差的并行性能,因为我们丢失了大小调整信息(基础数据集有多大),以及被限制为简单的分裂算法。
更高质量的分裂器将提供平衡且已知大小的分割,准确的大小调整信息,以及characteristics
可由实现用于优化执行的许多其他 分裂器或数据。
可变数据源的Spliterators还有一个挑战; 绑定到数据的时间,因为数据可能在创建分裂器的时间和流管道的执行时间之间发生变化。理想情况下,流的分裂器会报告IMMUTABLE
或的特征 CONCURRENT
; 如果不是它应该是 迟到的。如果某个源无法直接提供推荐的分裂器,它可能使用a间接提供分裂器Supplier
,并通过Supplier
-accepting版本 构造流 stream()
。仅在流管道的终端操作开始之后才从供应商获得分裂器。
这些要求显着减少了流源突变和流管道执行之间潜在干扰的范围。基于具有所需特征的分裂器的流或使用基于供应商的工厂形式的流不受在终端操作开始之前对数据源的修改的影响(假设流操作的行为参数满足非操作的要求标准)干涉和无国籍状态)