[TOC]
数据转换将数据流从一种形式转换为另一种形式,也就是说输入可以是一个或多个数据流,输出也可以是零,或一个或多个数据流。Flink1.7对transform另起一个新的名字“Operators ”--Operators transform 。程序可以将多个transform组合成复杂的数据流拓扑。
1.Map
Map [DataStream->DataStream]
Map: 一对一转换,即一条转换成另一条。
输入一个元素并生成一个元素。 一个map函数,它将输入流的值加倍:
dataStream.map { x => x * 2 }
package com.bigdata.flink.dataStreamMapOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* Summary:
* Map: 一对一转换
*/
public class DataStreamMapOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10)
));
// 转换: 商品的价格乘以8
SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
@Override
public UserAction map(UserAction value) throws Exception {
int newPrice = value.getProductPrice() * 8;
return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
}
});
// 输出: 输出到控制台
// UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=80)
// UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=80)
// UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=64)
result.print();
env.execute();
}
}
2.FlatMap
转换:DataStream → DataStream
FlatMap [DataStream->DataStream]
FlatMap: 一行变零到多行。如下,将一个句子(一行)分割成多个单词(多行)。
输入一个元素并生成零个,一个或多个元素。 将句子分割为单词的flatmap函数:
dataStream.flatMap { str => str.split(" ") }
package com.bigdata.flink.dataStreamFlatMapOperator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Summary:
* FlatMap: 一行变任意行(0~多行)
*/
public class DataStreamFlatMapOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 输入: 英文电影台词
DataStreamSource<String> source = env
.fromElements(
"You jump I jump",
"Life was like a box of chocolates"
);
// 转换: 将包含chocolates的句子转换为每行一个单词
SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
if(value.contains("chocolates")){
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
}
});
// 输出: 输出到控制台
// Life
// was
// like
// a
// box
// of
// chocolates
result.print();
env.execute();
}
}
Filter [DataStream->DataStream]
DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的元素。 过滤掉零值的过滤器,通俗来讲就是过滤掉等于0的元素,转换成新的数据流
dataStream.filter { _ != 0 }
4.KeyBy
转换:DataStream → KeyedStream
KeyBy [DataStream->KeyedStream]
KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。
逻辑分区流分为不同的分区。 具有相同key的所有记录都分配给同一分区。 在内部,keyBy()是使用hash分区实现的。 指定key有不同的方法。此Transformations返回KeyedStream,
注意:
- 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
- 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
- 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
- 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。
Reduce [KeyedStream->DataStream]
Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。