函数式编程(一) lambda、FunctionalInterface、Method Reference
Stream是Java8最大的亮点,它是对集合对象功能的增强,专注于对集合对象进行各种高效的数据处理,Stream API借助于lambda表达式极大的提高了编程效率及程序的可读性,同时Stream提供串行和并行两种模式进行汇聚操作,并行模式可以充分利用多核处理器的优势。结合lambda表达式及Stream API可以很方便的编写高性能并发程序。
在操作数据库时,SQL语句强大的语义表达使得对数据库的操作非常简单,而Stream的出现使得编写高可读性及高效的集合处理代码变得容易。
本篇博客关注于Stream与Collector的基本概念、组成及方法介绍,对其源码分析请见链接。
class Student{
String name;
int age;
int score;
public Student(String name, int age, int score) {
super();
this.name = name;
this.age = age;
this.score = score;
}
@Override
public String toString() {
return "Student [name=" + name + ", age=" + age + ", score=" + score + "]";
}
}
数据源:
List<Student> list = Arrays.asList(new Student("wang", 20, 90),
new Student("zhao", 30, 80), new Student("li", 25, 99),
new Student("sun", 20, 80), new Student("zhou", 30, 70));
需求:
将学生列表中筛选出成绩大于等于80分的学生,然后按照学生年龄分组,Map的key为学生年龄,value为学生名字列表。
如果使用传统的编码方式,代码如下:
Map<Integer, List<String>> rst = new HashMap<>();
for (Student stu : list) {
if(stu.getScore() >= 80) {
List<String> names = rst.getOrDefault(stu.getAge(), new ArrayList<String>());
names.add(stu.getName());
rst.put(stu.getAge(), names);
}
}
采用流方式编码代码如下:
Map<Integer, List<String>> map = list.stream().filter(s -> s.score>=80).
collect(Collectors.groupingBy(Student::getAge, Collectors.mapping(Student::getName, Collectors.toList())));
对比上面上段代码,不难发现采用Stream方式的代码可读性非常高,而这正是Java8 Stream带来的函数式编程的强大之处。
Stream
JavaDoc里对Stream的第一句话:A sequence of elements supporting sequential and parallel aggregate operations.流其实是支持对系列数据的并行和串行操作。
A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream), and a terminal operation(which produces a result or side-effect).Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.
它本身由三部分组成:
- Head: source Head 流的源头,可以为数组、集合、生成器函数、IO流
- intermediate operations 中间操作,0个或多个,lazy,从一个流转化为另一个流
- TerminalOp 结束操作,启动计算
Tips:源码中对流的标识存在:Sorted与Ordered,Sorted是指流元素是按照元素大小排好序的流,Ordered是指流的存储是否是有顺序的,比如数组、List都是Ordered,而Set、Map等不是Ordered。
1.Head
Stream可以是有限流也可以是无限流,可以由数组、集合、生成器函数、IO流生成。
Stream.generate(Math::random);//生成器构建流
Stream.of(1,2,3,5);//静态of方法构建流
Arrays.stream(strings);//数组构建流
list.stream();//集合构建流
流是自动关闭的,只能消费一次,消费完不能再使用,因此下面的代码是错误的
Stream<Student> stream =list.stream();
System.out.println(stream.count());
stream.forEach(System.out::println);//error,流已经关闭
2.Intermediate Op
操作 | 类型 | 释义 |
---|---|---|
filter | StatelessOp | Returns a stream consisting of the elements of this stream that match the given predicate. |
map | StatelessOp | Returns a stream consisting of the results of applying the given function to the elements of this stream. |
flatMap | StatelessOp | Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. Each mapped stream is closed after its contents have been placed into this stream. (If a mapped stream is null an empty stream is used, instead.) |
distinct | StatefulOp | Returns a stream consisting of the distinct elements of this stream. |
sorted | StatefulOp | Returns a stream consisting of the elements of this stream, sorted according to the provided Comparator or natural order. |
peek | StatelessOp | Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream. |
limit | StatefulOp | Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length. |
skip | StatefulOp | Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream.If this stream contains fewer than n elements then an empty stream will be returned. |
中间操作分为无状态操作StatelessOp和有状态操作StatefulOp。StatelessOp是流中各元素的处理没有关系的操作,比如filter、map对流中每个元素单独处理。StatefulOp是流中的元素处理会考虑前面元素处理结果,比如sorted、distinct等。中间操作是lazy的,即没有Terminal Op的流不会得到计算。
- filter
filter主要起到过滤器的作用,入参是Predicate的函数式接口。
Stream<T> filter(Predicate<? super T> predicate)
//打印学生列表中成绩大于80分的学生姓名
list.stream().filter(s->s.score>80).forEach(s -> System.out.println(s.name));
- map
map的作用是将流中的T类型元素映射为R类型元素
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
//map作用将流中Student类型转化为String类型
String rst = list.stream().map(Student::getName).collect(Collectors.joining(", "));
- flatMap
The flatMap operation has the effect of applying a one-to-many transformation to the elements of the stream, and then flattening the resulting elements into a new stream.
flatMap的作用如JavaDoc中所说,会将流中的一个元素映射为多个元素,然后再将这些元素拉平为一个新的流。
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
注意:flatMap的 Function的入参是T类型,即输入流中元素的类型,Function的输出是Stream<? extends R>类型,即Function的类型是一个流,流中的元素类型是输出类型的元素。
//取所有学生名字中使用的字母的总个数
long count = list.stream().flatMap(s->Arrays.stream(s.name.split(""))).distinct().count();
代码中输入流中元素的类型T为Student类型,输出流的元素类型R为String类型,Function的入参T为Student,输出一个流,类型为Arrays.stream(String[])。
- distinct
去重
Stream<T> distinct();
- sorted
排序,sorted()为自然排序(如果类型不支持自然排序,则抛出异常),sorted(Comparator<? super T> comparator)为传入一个比较器进行比较
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
//按照考试成绩从高到低打印学生信息
list.stream().sorted((s1,s2)->s2.score-s1.score).forEach(System.out::println);
- peek
peek偷看,通过其函数定义可以看出,它对流中的每个元素执行一个Consumer,对流本身没有影响,可以用于调试,偷看一眼。
Stream<T> peek(Consumer<? super T> action);
-limit
限制流元素的个数不超过maxSize个
Stream<T> limit(long maxSize);
//generate生成器会产生无限流,使用limit限制个数
Stream.generate(Math::random).limit(7).forEach(System.out::println);
-skip
与limit相反,skip会跳过流中n个元素,如果流中个数不超过n,则输出空流。
Stream<T> skip(long n);
3.Terminal Op
操作 | 类型 | 释义 |
---|---|---|
forEach | ForEachOps | Performs an action for each element of this stream. |
forEachOrdered | ForEachOps | Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order. |
reduce | ReduceOps | Performs a reduction on the elements of this stream |
collect | 详见Collector | Performs a mutable reduction operation on the elements of this stream.(注意与reduce的差异,mutable) |
min | ReduceOps | Returns the minimum element of this stream according to the provided Comparator. This is a special case of a reduction. |
max | ReduceOps | Returns the maximum element of this stream according to the provided Comparator. This is a special case of a reduction. |
count | ReduceOps | Returns the count of elements in this stream. |
anyMatch | MatchOps(短路) | Returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then false is returned and the predicate is not evaluated. |
allMatch | MatchOps(短路) | Returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated. |
noneMatch | MatchOps(短路) | Returns whether no elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated. |
findFirst | FindOps(短路) | Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty. If the stream has no encounter order, then any element may be returned. |
findAny | FindOps(短路) | Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty. |
Terminal Op分为短路操作和非短路操作,短路操作是指不一定需要处理全部元素就可以返回结果,比如FindOps中的FindFirst找到第一个满足条件的即返回结果,后续元素不再处理,同样MatchOps的allMatch如果有一个元素不满足,直接返回false而不需要判断后续元素是否满足条件。非短路操作是指需要对所有元素进行处理的操作。
-forEach
对流的每个元素(类型T)执行Consumer类型的action动作。
void forEach(Consumer<? super T> action);
//并行流打印,打印的顺序是随机的
list.parallelStream().map(s->s.name + ": " + (s.score>=80 ? "A" : "B")).
forEach(System.out::println);
这里需要注意与list本身的forEach方法区分。
default void forEach(Consumer<? super T> action) {
Objects.requireNonNull(action);
for (T t : this) {
action.accept(t);
}
}
通过上面代码可以发现list.forEach方法只是对每个元素轮流执行action.accept。而流中的forEach方法对于串行流来说底层最终也是类似操作,但是由于存在其他处理,效率上会有所下降,但是流的forEach操作之前可以对流数据进行其他的中间操作,并且并行forEach可以利用多核特性,所以在使用流的forEach方法还是Iterable的forEach方法时需要考虑上述两个因素进行决策。
-forEachOrdered
如果流中的元素本来是有顺序的,则该方法按照本来顺序执行。
void forEachOrdered(Consumer<? super T> action);
//打印顺序与list中元素的顺序始终一致
list.parallelStream().map(s->s.name + ": " + (s.score>=80 ? "A" : "B")).
forEachOrdered(System.out::println);
-reduce
汇聚操作代码编写稍显复杂,但提供了强大的并行处理能力,在这需要首先了解associative accumulation function的含义。
An operator or function op is associative if the following holds:
(a op b) op c == a op (b op c)
The importance of this to parallel evaluation can be seen if we expand this to four terms:
a op b op c op d == (a op b) op (c op d)
So we can evaluate (a op b)
in parallel with (c op d)
, and then invoke op
on the results.
汇聚与loop相比强大之处就在于其优异的并行处理能力,而associative 的含义就是数据的计算顺序不影响最后的计算结果。
reduce是汇聚操作,有三个重载的方法。
- T reduce(T identity, BinaryOperator<T> accumulator);
从此方法原型中可知,T为流中元素的类型,reduce的第一个参数也为T类型,第二个参数为二元运算,显然较多用于有初始值的统计,identity用于初始值的设置,accumulator用于累加。
执行汇聚操作,执行效果类似于以下代码(但不限制是否为串行执行):
T result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
JavaDoc:While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization and with greatly reduced risk of data races.这种汇聚操作看似冗杂,但其可以在不额外进行同步的情况下优雅的并行处理数据并且大大降低了数据竞争的风险。
//获取所有的学生姓名:
String names = list.stream().map(Student::getName).reduce("names:", String::concat);
//获取所有学生中考试成绩最高分
int max = list.stream().mapToInt(Student::getScore).reduce(0, Integer::max);
上面的两段代码只是起到理解此种汇聚操作的使用方法,而两段代码都不太好,第一段会产生大量的String常量占用内存,而第二段完全可以用max方法替换。
- Optional<T> reduce(BinaryOperator<T> accumulator);
T为流中元素的类型,accumulator为累积器。
JavaDoc:Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any. This is equivalent to:
boolean foundAny = false;
T result = null;
for (T element : this stream) {
if (!foundAny) {
foundAny = true;
result = element;
}
else
result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();
but is not constrained to execute sequentially。从其等价代码可以看出该reduce的作用。Optional是Java8为了避免NPE问题引入的。后面会单独阐述。
//获取所有学生姓名中按字符串顺序最大的学生的名字
String name = list.stream().map(Student::getName)
.reduce((o1,o2)->o1.compareToIgnoreCase(o2)>0 ? o1 :o2).orElse("");
代码中使用了lambda表达式作为二元运算的函数式接口,实现了比较两个字符串的大小,并返回较大的字符串。
- U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
格外注意该函数与reduce的第一个重载函数的差异,执行汇聚的流元素类型为T类型,而该方法的第一个入参是U类型,可以和T类型不一致,并且汇聚的返回值也为U类型,其实是reduce为了减少额外的map操作而设计的该重载方法,如果在执行汇聚之前将T类型map为U类型,则可以使用reduce的第一个重载方法进行汇聚。accumulator起到的作用不单包括累积,还存在映射操作。另外对于combiner是只有并行操作才会使用,是用于将分组计算结果进行组合时使用。
JavaDoc:Many reductions using this form can be represented more simply by an explicit combination of map and reduce operations. The accumulator function acts as a fused mapper and accumulator, which can sometimes be more efficient than separate mapping and reduction, such as when knowing the previously reduced value allows you to avoid some computation.
等价代码如下:
This is equivalent to:
U result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
}
给出下面两段代码都是用来统计所有学生得分的总和,说明几点问题
Integer scoreSequentialSum = list.stream().
reduce(0, (t, s) -> t + s.getScore(), (t1,t2)-> {throw new NullPointerException();});
Integer scoreParallelSum = list.parallelStream().
reduce(0, (sum, student) -> sum + student.getScore(), Integer::sum);
1、执行汇聚的流类型为Student,T类型为Student,U类型为Integer,使用lambda表达式来累加学生分数,lambda表达式的第一个入参是U类型(Integer),第二个入参是T类型(Student),返回值是U类型(Integer),combiner的两个入参及出参都是U类型(Integer)。当然如JavaDoc所言,代码完全可以变化为首先将Student流转化为学生分数流Integer,然后再执行汇聚操作。
2、对于串行流而言,combiner不会被使用,串行流combiner直接抛出异常,但上面的代码执行完全没有问题。
-collect
collect与reduce一样都是汇聚操作,都可以在不需要额外同步操作的前提下拥有优异的并行处理能力,而重要的差异是reduce是不变的汇聚操作,而collect的汇聚操作是可变的汇聚操作。
collect存在两个重载方法:
- <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
JavaDoc:Performs a mutable reduction operation on the elements of this stream. A mutable reduction is one in which the reduced value is a mutable result container,such as an ArrayList, and elements are incorporated by updating the state of the result rather than by replacing the result. This produces a result equivalent to:
R result = supplier.get();
for (T element : this stream)
accumulator.accept(result, element);
return result;
Like reduce, collect operations can be parallelized without requiring additional synchronization.
将reduce方法与collect进行对比分析
R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
1、参数1对于collect是Supplier,在上篇博客中已有阐述,该函数式接口主要用于工厂模式创建一个对象,可以是一个container,一个对象,一个数组等等,reduce的入参1是一个值。
2、参数2都是起到累积的作用,但是注意两者的类型,collect的类型是BiConsumer,没有返回值,使用流中的每个元素来更新参数1得到的对象,而reduce的类型为BiFunction,返回值为U类型,是使用流中的每个元素进行计算,并将结果保存在result里
3、参数3都是并行流合并并行处理结果使用,差异在于collect的类型为BiConsumer,而reduce的类型为BinaryOperator。
结合两者的等价代码,不难设计者的考虑。
//将所有学生的姓名保存到List<String>里
List<String> names = list.stream().map(Student::getName).
collect(ArrayList<String>::new, List::add, List::addAll);
- <R, A> R collect(Collector<? super T, A, R> collector);
该重载方法主要与Collectors提供的实现Collector接口的方法结合使用,可以利用Collectors提供的接口方便的实现多级聚合。比如可以借用Collectors的groupingBy方法的叠加实现多级分组。即便对于非并发安全的数据类型(比如ArrayList),并行汇聚也不需要进行额外的同步保护。这块内容将在Collector、Collectors中详细阐述。JavaDoc:Performs a mutable reduction operation on the elements of this stream using a Collector. A Collector encapsulates the functions used as arguments to collect(Supplier, BiConsumer, BiConsumer), allowing for reuse of collection strategies and composition of collect operations such as multiple-level grouping or partitioning. When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe data structures (such as ArrayList), no additional synchronization is needed for a parallel reduction.
//将所有学生的姓名保存到List<String>里,与collect的重载方法1比较
List<String> names = list.stream().map(Student::getName).collect(Collectors.toList());
同样实现将学生姓名保存到List中,借用Collectors提供的方法可以大大简化代码。
-min
min方法是一种特殊的汇聚操作,
//stream中方法声明
Optional<T> min(Comparator<? super T> comparator);
//ReferencePipeline的min方法实现,可见其借助reduce方法实现
public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.minBy(comparator));
}
//打印学生列表中年龄最小的学生信息
list.stream().min(Comparator.comparingInt(Student::getAge)).ifPresent(System.out::println);
其中Comparator.comparingInt(Student::getAge)是学生年龄比较器。是借助Java8的Comparator的静态方法comparingInt的比较器。
-max
max方法是一种特殊的汇聚操作,与min类似。
//stream中方法声明
Optional<T> max(Comparator<? super T> comparator);
//ReferencePipeline的max方法实现,可见其借助reduce方法实现
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
-count
count方法是一种特殊的汇聚操作。非常简单,给出原型及实现。
//stream中方法声明
long count();
/ReferencePipeline的count方法实现
public final long count() {
return mapToLong(e -> 1L).sum();
}
-anyMatch
boolean anyMatch(Predicate<? super T> predicate);
anyMatch属于短路终止操作,语义非常明确,流中是否存在元素满足predicate。如存在即返回true,流操作终止(短路)。注意:如果流是空的,则返回false。
//学生是否存在年龄<=20岁的
boolean b = list.stream().anyMatch(s -> s.getAge() <= 20);
-allMatch
boolean allMatch(Predicate<? super T> predicate);
allMatch属于短路终止操作,流中所有元素是否都满足predicate。如存在不满足即返回false,流操作终止(短路)。注意:如果流是空的,则返回true。
//是否所有学生成绩都>=80
boolean b = list.stream().allMatch(s -> s.getScore() >= 80);
-noneMatch
boolean noneMatch(Predicate<? super T> predicate);
noneMatch属于短路终止操作,流中没有元素满足predicate。如存在满足的元素即返回false,流操作终止(短路)。注意:如果流是空的,则返回true。
//所有学生成绩都不满足>=80
boolean b = list.stream().noneMatch(s -> s.getScore() >= 80);
-findFirst
Optional<T> findFirst();
findFirst获取流中的第一个元素,如果流本身是没有顺序的,则会返回任意一个值。如果流本身有顺序(Ordered),并行流和串行流得到的结果都是一致的。JavaDoc:If the stream has no encounter order, then any element may be returned.
//打印学生成绩大于90分的第一个学生的成绩,并行流执行多次结果一致
list.parallelStream().filter(s->s.score>=90)
.findFirst().ifPresent(s->System.out.println(s.score));
-findAny
Optional<T> findAny();
findAny获取流中的任一元素,每次执行的结果可能不一致。
//打印学生成绩大于90分的任一学生的成绩,并行执行多次结果可能不一致
list.parallelStream().filter(s->s.score>=90)
.findAny().ifPresent(s->System.out.println(s.score));
关于Stream的方法的含义及使用方法阐述完毕,后续会对Collector、Collectors以及Stream源码进行分析。
WalkeR_ZG