Stream 复杂API的操作
在第一部分stream part1中,我们已经学习了部分stream的api来处理数据集合。我们先复习一下上一章节的内容,对transaction value 大于100的值进行汇总求和,我们建立了一个由中间操作(过滤器,映射)和终端操作(reduce)组成的pipeline。如下所示:
例1
double sumExpensive =
transactions.stream()
.filter(t -> t.getValue() > 100)
.map(Transaction::getValue)
.reduce(0.0, Double::sum);
但是第一部分内容没有用到以下两个操作:
- flatMap: 一个中间操作,结合map(映射)和flatten(扁平化)的一组操作
-
collect: * 一种终端操作,它支持多种Collector参数,用于将,将流中的元素累积成一个汇总结果。
这两个操作能提供更高水平的抽象和概括,解决更复杂的归约汇总。 例如,你可以使用flatMap和collect*来新生成一个Map,用于统计单词流中的每个字符的出现次数,如例2所示。不要担心这个代码最初是否显得过于晦涩难以理解。 本文的目的是更详细地学习这两个操作。
例2
Stream<String> words = Stream.of("Java", "Magazine", "is",
"the", "best");
Map<String, Long> letterToCount =
words.map(w -> w.split(""))
.flatMap(Arrays::stream)
.collect(groupingBy(identity(), counting()));
输出如下:
{a=4, b=1, e=3, g=1, h=1, i=2, J=1, M=1, n=1, s=2, t=2, v=1, z=1}
结果看起来很棒,接下来让我们深入了解一下flatMap和collect的工作原理吧。
flatMap
假设你要查找文件中有多少各不相同唯一单词。 你会怎么做?
你可能认为这很容易; 我们可以使用在上一篇文章中看到的 Files.lines()返回由文件行组成的流。 然后我们再使用map() 操作将每一行拆分为单词,最后使用distinct() 操作删除重复项。 如下所示:
例3
Files.lines(Paths.get("data.txt"))
.map(line -> line.split("\\s+")) // Stream<String[]>
.distinct() // Stream<String[]>
.forEach(System.out::println);
输出如下:
[Ljava.lang.String;@58372a00
[Ljava.lang.String;@4dd8dc3
很遗憾,输出的内容是一个String数组的地址,似乎不是我们想要的结果啊。我们其实是想要打印出一个单词字符串而已。为什么会输出这个内容呢?这是因为传递给map()的lambda表达式为文件中的每一行返回一个String数组 (String[]) .因此,map方法返回的流实际上是Stream<String[]>类型。而我们真正想要的是Stream<String>来表示一个单词流。
幸运的是,使用flatMap方法可以解决这个问题。 接下来,让我们一步一步找到正确的解决方案。
首先,我们需要一个单词流而不是数组流。而上一部分内容学过,Arrays .stream()接受一个数组参数,并生成一个流。 如下所示:
例4
String[] arrayOfWords = {"Java", "Magazine"};
Stream<String> streamOfwords = Arrays.stream(arrayOfWords);
那接下来我们改造一下上面例3的代码,如下所示:
例5
Files.lines(Paths.get("data.txt"))
.map(line -> line.split("\\s+")) // Stream<String[]>
.map(Arrays::stream) // Stream<Stream<String>>
.distinct() // Stream<Stream<String>>
.forEach(System.out::println);
输出如下:
java.util.stream.ReferencePipeline$Head@568db2f2
java.util.stream.ReferencePipeline$Head@378bf509
沮丧,解决方案仍然无效。 这是因为我们现在的结果是得到了流的流(更确切地说是Stream<Stream<String>>)。 实际上,我们原来打算将每一行转换为一个单词数组,然后使用Arrays.stream()方法将每个数组转换为单独的流。
好吧,不绕弯子,你可以像下面这样使用flatMap来解决这个问题:
例6
Files.lines(Paths.get("data.txt"))
.map(line -> line.split("\\s+")) // Stream<String[]>
.flatMap(Arrays::stream) // Stream<String>
.distinct() // Stream<String>
.forEach(System.out::println);
输出如下:
The
quick
brown
fox
jumped
over
the
lazy
dog
Perfectly,要得就是这个效果。使用flatMap方法的效果是,各个数组并不是分别映射成一个流,而是映射成流的内容。所有使用map(Arrays::stream)时生成的单个流都被合并起来,即扁平化为一个流。。下图说明了 使用flatMap方法的效果
一言以蔽之,flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接 起来成为一个流。
flatMap是一种常见的模式. 在后面学习Optional 和 CompletableFuture时,我们会再次用到它。
collect
我们在stream part1中已经使用过了collect方法,我们知道,一般而言,如果一个方法返回值是一个stream(我们称之为中间操作),若返回一个具体的值,如boolean,int,或者Optional对象(我们称之为终端操作)。
collect方法是一个终端操作,它将流转换为列表。例如你想要获取transaction value 大于100的ID列表,可以使用下面的方法:
例7
List<Integer> expensiveTransactionsIds =
transactions.stream()
.filter(t -> t.getValue() > 100)
.map(Transaction::getId)
.collect(toList());
传递给collect 方法的参数是java.util.stream.Collector的.Collector对象的是干什么的呢? 它实质上描述了将流中的元素累积到最终结果中的方法. 前面使用的工厂方法Collectors.toList()具体说明了如何将流归结到集合中.而且,java.util.stream.Collector中有很多相似的收集器提供使用.
将流收集到其他集合中. 例如,使用toSet()将流转化为set集合,set集合将删除重复的元素。例如你想要获取transaction value 大于100的城市set列表,可以使用下面的方法:
例8
Set<String> cities =
transactions.stream()
.filter(t -> t.getValue() > 100)
.map(Transaction::getCity)
.collect(toSet());
Note: 再接下来的示例中,我们都是默认静态导入Collectors类的工厂方法(import static java.util.stream.Collectors.*)。
上面虽然返回的是set接口类型,但是你无法控制具体的set接口实现,如果使用toCollection()的话,那么你可以很好的指定具体的返回类型。比如,你可以通过构造器引用来指定返回类型为HashSet,如下所示:
例9
Set<String> cities =
transactions.stream()
.filter(t -> t.getValue() > 1000)
.map(Transaction::getCity)
.collect(toCollection(HashSet::new));
然而,这仅仅是collect和收集器的一小部分功能,下面的示例,看看你用collect和收集器能够做什么。
- 对一个交易列表按货币分组,获得该货币的所有交易额总和(返回一个Map<Currency, Integer>)
- 将交易列表分成两组:贵的和不贵的(返回一个Map<Boolean, List<Transaction>>)
- 创建多级分组,比如按城市对交易分组,然后进一步按照贵或不贵分组(返回一个 Map<Boolean, List<Transaction>>)。
激动吗?很好,我们先来看如何使用Stream API和收集器,我们首先从一个“summarizes” 流开始:计算流的平均值,最大值和最小值。然后我们进行简单的分组,最后我们利用收集器组合来完成复杂的功能,如多级分组。
Summarizing.我们先从一个简单的例子入手, 在上一篇文章中看到了如何使用reduce操作和使用原始流来计算元素的数量,最大值,最小值和平均值。然后,使用预定义的收集器会更简单,例如,你可以使用counting()计算元素数目,如下所示:
例10
long howManyTransactions =
transactions.stream().collect(counting());
你可以使用summingDouble(), summingInt(), summingLong()来记算Double, Int, Long类型的汇总值。例如,我们汇总transactions的value值,如下所示:
例11
double totalValue = transactions.stream().collect(
summingDouble(Transaction::getValue));
同理,你可以使用* averagingDouble(), averagingInt(), averagingLong()*计算平均值,如下所示:
例12
double average = transactions.stream().collect(
averagingDouble(Transaction::getValue));
此外,你可以通过使用maxBy() , minBy()计算流的最大和最小元素。但是你需要指定流中对象的排序方式,这也是为什么maxBy() , minBy()需要传入Comparator作为参数的原因,如图所示:
在下面的示例中,我们将使用静态方法comparing(), 该方法需要一个Function 参数,Function该函数用于从流的元素中提取可比较的key,然后生成一个*Comparator *对象. 那么我们使用transaction的value属性作为key来寻找最贵的事务.如下所示:
例13
Optional<Transaction> highestTransaction =
transactions.stream()
.collect(maxBy(comparing(Transaction::getValue)));
还有一个更广义归约收 集器reducing(),它允许你通过重复计算流中的所有元素,直到生成结果。 它在概念上类似于之前看到的reduce()方法。 例如,使用* reducing()*计算所有事务总和。如下所示:
例14
double totalValue = transactions.stream().collect(reducing(
0.0, Transaction::getValue, Double::sum));
它需要三个参数:
- 第一个参数是归约操作的起始值,也是流中没有元素时的返回值,在本示例中为0.0.
- 第二个参数是应用于流中每一个元素的Function函数,在本示例中为将Transaction转化成一个表示其值的double。
- 第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值。这里它就是 对两个double求和。
你可能会说,“等一下; 我已经可以用其流方法做到这一点,比如reduce(), max(), 和min(),那你为什么要告诉我这个呢?“ 稍后我们会看到,我们可以将收集器组合起来构建更复杂组合( 例如,分组加平均值),因此先了解这些内置收集器很有必要。
Grouping
常见的数据库查询是使用属性对数据进行分组。 例如,你可能希望按货币对事务列表进行分组。 使用显式迭代表达这样的查询有点痛苦,如下所示:
例15
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>(); //建立累积交易分组的 Map
for (Transaction transaction : transactions) { // 迭代Transaction的List
Currency currency = transaction.getCurrency(); // 提取Transaction的货币
List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
if (transactionsForCurrency == null) { // 如果分组Map中没有这种货币条目,就创建一个
transactionsForCurrency = new ArrayList<>();
transactionsByCurrencies.put(currency, transactionsForCurrency);
}
transactionsForCurrency.add(transaction); //将当前遍历的Transaction加入同一货币的Transaction的List
}
如果你是一位经验丰富的Java程序员,写这种东西可能挺顺手的,不过你必须承认,做这么 简单的一件事就得写很多代码。更糟糕的是,读起来比写起来更费劲!代码的目的并不容易看出 来,尽管换作白话的话是很直截了当的:“把列表中的交易按货币分组。不过现在好了,用 Stream中collect方法的一个更通用的Collector参数,一个名叫*groupingBy() *的收集器,你就可以用一句话实现完全相同的结果,如下所示:
例16
Map<Currency, List<Transaction>> transactionsByCurrencies =
transactions.stream().collect(groupingBy(
Transaction::getCurrency));
工厂方法groupingBy()的参数是一个Function,一个用于获取事务属性的函数。 我们称之为classification function。 在本示例中,我们传递方法引用* Transaction::getCurrency*,按货币对事务进行分组。如图所示
Partitioning
partitioningBy()是groupingBy()的特殊情况:由一个predicate(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它多可以 分为两组——true是一组,false是一组。 换句话说,对事务流进行分区的返回类型为Map<Boolean, List<Transaction>>。例如,如果要将事务分组为两个列表 - 便宜(value<=100)和昂贵(value<=100),你可以使用partitioningBy收集器,如下所示:
例17
Map<Boolean, List<Transaction>> partitionedTransactions =
transactions.stream().collect(partitioningBy(
t -> t.getValue() > 100));
Composing collectors
如果您熟悉SQL,那么你知道可以将GROUP BY与COUNT()和SUM()等函数结合使用,按货币及其总和对事务进行分组。 那么,我们可以使用Stream API做类似的事情吗? 是。 实际上,有一个groupingBy() 的重载版本,它将另一个收集器对象作为第二个参数。 此附加收集器用于定义使用groupingBy收集器对函数所定义的key关联的元素进行累积。
好吧,这听起来有点抽象。所以让我们看一个简单的例子。 我们想根据每个城市的所有交易总和生成Map。 在这里,我们告诉groupingBy使用方法getCity() 作为classification function。 因此,生成的Map的key是city,sum(value)作为value。 如下所示:
例18
Map<String, Double> cityToSum =
transactions.stream().collect(groupingBy(
Transaction::getCity, summingDouble(Transaction::getValue)));
我们增加了一个summingDouble()的参数,汇总了与城市相关交易的所有值。 结果,我们返回一个 Map<String, Double>类型,它将每个城市映射到该城市所有交易的总价值。 很酷,不是吗? 想一想,其实 groupingBy (Transaction::getCity)的基本版本实际上只是groupingBy (Transaction::getCity, toList())的简写。
让我们再看另外一个例子。 如果你想统计每个城市的最高value的Transaction怎么办? 你可能已经猜到,我们可以使用之前接触过的maxBy收集器,如下所示:
例19
Map<String, Optional<Transaction>> cityToHighestTransaction =
transactions.stream().collect(groupingBy(
Transaction::getCity, maxBy(comparing(Transaction::getValue))));
你可以看到Stream API真的很有效率; 我们现在可以简洁地编写统计和分析功能。 你能想象回到原本啰嗦的迭代处理一个集合是怎么一种糟糕的体验么?
让我们看一个更复杂的例子。 groupingBy可以将另一个收集器对象作为参数,根据收集器进一步的分类。 因为groupingBy本身就是一个收集器,所以我们可以通过传递一个groupingBy收集器来创建多级分组。例如,现在我们按city对交易进行分组,然后我们再进一步按每个城市的交易货币对交易进行分组,以获得该货币的平均交易价值。如下所示:
例20
Map<String, Map<Currency, Double>> cityByCurrencyToAverage =
transactions.stream().collect(groupingBy(Transaction::getCity,
groupingBy(Transaction::getCurrency,
averagingDouble(Transaction::getValue))));
自定义收集器
到目前为止我们展示的所有收集器都实现了接口java.util.stream.Collector。Collector接口包含了一系列方法,为实现具体的归约操作(即收集器)提供了范本。这也意味着, 你可以为Collector接口提供自己的实现,从而自由地创建自定义归约操作。但是,这个主题比较适合再写一篇文章来论述,所以我们不在这里讨论。
结论
在这篇文章中,我们探讨了Stream API的两个高级操作:flatMap和collect。 它们结合起来可以提供很强大的统计归约操作。特别是,你已经看到collect方法可用于汇总,分组和分区操作。但是,本文没有论述所有可用的内置收集器。你可以自己查看Collectors并尝试使用其他收集器,例如* mapping(),joining()*和 collecting AndThen(),你会发现它们真的很好用。
另外本文的所有示例都可以再我的github上面找到源码。如StreamPart2Example ,GroupingTransactions,TestMultilevelGrouping。