buffer()
最基本的buffer, 把Flux产生的全部元素放到一个List里面,当onComplete
的时候返回这个List, 消费者直接消费这个List.
public final Flux<List<T>> buffer()
@Test
public void test1() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
Flux<List<Integer>> buffer = flux.buffer();
buffer.subscribe(integers -> {
integers.stream().forEach(integer -> {
System.out.println(integer);
});
});
}
buffer(int maxSize)
buffer(int maxSize, Supplier<C> bufferSupplier)
每个数组的最大值, 当数组里面的产品个数达到这个最大值的时候就会向消费者发送这个List,这里的bufferSupplier
可以指定创建的容器类型,并返回这个容器给消费者,后面这个参数没有特殊说明都是这个作用.
public final Flux<List<T>> buffer(int maxSize)
@Test
public void test2() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
Flux<List<Integer>> buffer = flux.buffer(2);
buffer.subscribe(integers -> {
System.out.println("产品到达,进行卸货:");
integers.stream().forEach(integer -> {
System.out.println(integer);
});
});
}
输出
产品到达,进行卸货:
1
2
产品到达,进行卸货:
3
4
产品到达,进行卸货:
5
6
buffer(int maxSize, int skip)
buffer(int maxSize,int skip, Supplier<C> bufferSupplier)
skip参数指定经过多少个产品开始进行buffer. 每过skip个产品,开始装载maxsize个产品后发送给消费者
public final Flux<List<T>> buffer(int maxSize, int skip)
这里有下面几种情况:
maxSize < skip:这时候会丢弃多余的元素
如图, 每过3个产品开始收集2个产品给消费者
public void test3() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
Flux<List<Integer>> buffer = flux.buffer(2, 3);
buffer.subscribe(integers -> {
System.out.println("产品到达,进行卸货:");
integers.stream().forEach(integer -> {
System.out.println(integer);
});
});
}
输出:
产品到达,进行卸货:
1
2
产品到达,进行卸货:
4
5
maxSize > skip:重复读取数据
public void test4() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
Flux<List<Integer>> buffer = flux.buffer(3, 2);
buffer.subscribe(integers -> {
System.out.println("产品到达,进行卸货:");
integers.stream().forEach(integer -> {
System.out.println(integer);
});
});
}
输出:
产品到达,进行卸货:
1
2
3
产品到达,进行卸货:
3
4
5
产品到达,进行卸货:
5
6
maxSize == skip 时候与 buffer(int maxSize) 效果相同