1. 为什么要用流?
先看下流处理有什么优势吧:
下面两段代码都是用来返回低热量的菜肴名称的,并按照卡路里排序,一个是用Java 7写的,另一个是用Java 8的流写的。
之前(Java 7):
List<Dish> lowCaloricDishes = new ArrayList<>();
for(Dish d: menu){
if(d.getCalories() < 400){
lowCaloricDishes.add(d);
}
}
Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
public int compare(Dish d1, Dish d2){
return Integer.compare(d1.getCalories(), d2.getCalories());
}
});
List<String> lowCaloricDishesName = new ArrayList<>();
for(Dish d: lowCaloricDishes){
lowCaloricDishesName.add(d.getName());
}
在这段代码中,你用了一个“垃圾变量”lowCaloricDishes。它唯一的作用就是作为一次
性的中间容器。在Java 8中,实现的细节被放在它本该归属的库里了。
之后(Java 8):
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
List<String> lowCaloricDishesName = menu.stream().filter(d -> d.getCalories() < 400)
.sorted(comparing(Dish::getCalories))
.map(Dish::getName)
.collect(toList());
为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():
List<String> lowCaloricDishesName = menu.parallelStream().filter(d -> d.getCalories() < 400)
.sorted(comparing(Dishes::getCalories))
.map(Dish::getName)
.collect(toList());
总结一下,Java 8中的Stream API可以让你写出这样的代码:
- 声明性——更简洁,更易读
- 可复合——更灵活
- 可并行——性能更好
2. 流到底是什么呢?
简短的定义就是“从支持数据处理操作的源生成的元素序列”。
- 元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。但流的目的在于表达计算,比如你前面见到的filter、sorted和map。集合讲的是数据,流讲的是计算。我们会在后面几节中详细解释这个思想。
- 源——流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
- 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。
此外,流操作有两个重要的特点。 - 流水线——很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。这让我们下一章中的一些优化成为可能,如延迟和短路。流水线的操作可以看作对数据源进行数据库式查询。
- 内部迭代——与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的
3. 流与集合
请注意,和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。
4. 流操作
可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作。
总而言之,流的使用一般包括三件事:
- 一个数据源(如集合)来执行一个查询;
- 一个中间操作链,形成一条流的流水线;
- 一个终端操作,执行流水线,并能生成结果。
5. 使用流
- 筛选和切片
- 筛选 -> filter(Lambda表达式)
- 去重 -> distinct()
- 截短 -> limit(n)
- 跳过 -> skip(n)
-映射
map:
给定一个数字列表,如何返回一个由每个数的平方构成的列表呢?例如,给定[1, 2, 3, 4,5],应该返回[1, 4, 9, 16, 25]。
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> squares =numbers.stream().map(n -> n * n).collect(toList());
flatmap:
给定两个数字列表,如何返回所有的数对呢?例如,给定列表[1, 2, 3]和列表[3, 4],应该返回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]。为简单起见,你可以用有两个元素的数组来代表数对。
List<Integer> numbers1 = Arrays.asList(1, 2, 3);
List<Integer> numbers2 = Arrays.asList(3, 4);
List<int[]> pairs =numbers1.stream().
flatMap(i -> numbers2.stream().map(j -> new int[]{i, j})).
collect(toList());
查找和匹配
Stream API通过allMatch、anyMatch、noneMatch、findFirst和findAny方法提供了这样的工具归约
元素求和:
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
实践
(1) 找出2011年发生的所有交易,并按交易额排序(从低到高)
List<Transaction> sorted = transactions.stream().sorted(Comparator.comparing(Transaction::getValue)).collect(toList());
(2) 交易员都在哪些不同的城市工作过?
List<String> cities = transactions.stream().map(transaction -> transaction.getTrader().getCity()).distinct().collect(toList());
(3) 查找所有来自于剑桥的交易员,并按姓名排序。
List<Trader> cambridge = transactions.stream().filter(transaction -> transaction.getTrader().getCity() == "Cambridge").
map(transaction -> transaction.getTrader()).
distinct().
sorted(Comparator.comparing(Trader::getName)).
collect(toList());
(4) 返回所有交易员的姓名字符串,按字母顺序排序。
List<String> traders = transactions.stream().map(transaction -> transaction.getTrader().getName()).
distinct().
sorted(Comparator.comparing(String::toString)).
collect(toList());
(5) 有没有交易员是在米兰工作的?
boolean match = transactions.stream().anyMatch(transaction -> transaction.getTrader().getCity() == "Milan");
(6) 打印生活在剑桥的交易员的所有交易额。
int total = transactions.stream().filter(transaction -> transaction.getTrader().getCity() == "Cambridge").map(transaction -> transaction.getValue()).reduce(0, (a,b) -> a+b);
(7) 所有交易中,最高的交易额是多少?
int max = transactions.stream().map(transaction -> transaction.getValue()).reduce(Integer::max).get()
(8) 找到交易额最小的交易。
Transaction min = transactions.stream().reduce((a,b)-> a.getValue() > b.getValue()? b:a).get();
6. 创建流
- 由值创建流
Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");
- 由数组创建流
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();
由文件生成流
Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。java.nio.file.Files中的很多静态方法都会返回一个流。 例如,一个很有用的方法是Files.lines,它会返回一个由指定文件中的各行构成的字符串流。由函数生成流:创建无限流
1. 迭代
Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);
2. 生成
Stream.generate(Math::random).limit(5).forEach(System.out::println);
7. 用流收集数据
归约和汇总
(1) 查找流中的最大值和最小值
Optional<Dish> mostCalorieDish = menu.stream() .collect(maxBy(dishCaloriesComparator));
(2) 汇总
int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));
double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
(3) 连接字符串
String shortMenu = menu.stream().map(Dish::getName).collect(joining());
分组
(1) 如果有定义的function
Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
(2) 自己分组:
public enum CaloricLevel { DIET, NORMAL, FAT }
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(groupingBy(
dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
} ));
(3) 还可以多级分组
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
menu.stream().collect(
groupingBy(Dish::getType,
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
} )
)
);
(4) 按子组收集数据
Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));
8. 自定义Collector
下面是Collector的接口
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
Function<A, R> finisher();
BinaryOperator<A> combiner();
Set<Characteristics> characteristics();
}
- 建立新的结果容器:supplier方法
- 将元素添加到结果容器:accumulator方法
- 对结果容器应用最终转换:finisher方法
- 合并两个结果容器:combiner方法
- characteristics方法
9. 并行数据处理
- 使用RecursiveTask
要定义RecursiveTask,只需实现它唯一的抽象方法compute:
protected abstract R compute();
示例:
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {{
sum += numbers[i];
}
return sum;
}
//执行
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
- Spliterator
示例
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++));
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null;
}
for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
if (Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}