前言
最近正在将一些原本用Spark Streaming实现的流计算任务迁移到Flink,最简单也是最有代表性的就是实时点击量(PV)统计。除了PV之外,我们还希望同时将内容的回复、点赞、收藏指标也一并统计出来,用来后续确定内容的热度。这个作业会涉及到与消息队列的对接、常用算子的使用、事件时间、窗口、水印、状态等几乎所有Flink应用中的基础内容,所以本文来记录一下过程,使用Flink版本为1.8.1。
日志清洗
数据来源是Nginx日志,原始格式如下:
log_format main '$remote_addr $hostname [$time_local] "$request" $status $request_length $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" "$http_host" "$request_time" "$upstream_response_time" "$upstream_status" "$request_body"';
日志会先预处理成JSON串,并打进RocketMQ,所以RocketMQ就是Flink程序的Source。清洗完毕之后的日志会Sink到Kafka,方便其他业务复用。
下面创建日志清洗Flink程序的StreamExecutionEnvironment。在这个阶段我们暂时还不关心事件时间,只需要将感兴趣的日志过滤出来就行,所以时间特征选用处理时间,顺便设定检查点相关的参数。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
然后接入RocketMQ Source(预先写好Consumer参数,即consumerProps属性),使用filter()算子过滤掉不合法的数据,再用flatMap()算子将需要的日志转换成统一的四元组格式,即(用户ID, 行为对象ID, 时间戳, 行为类别)
。Sink到Kafka时仍然是JSON字符串,简单方便。代码主体如下。
env
.addSource(new RocketMQSource<>(new StringDeserializationSchema(), consumerProps))
.name("rocketmq-source-log-forward-forumapi")
.filter(str -> str.contains("token=") && str.charAt(0) == '{')
.flatMap((String str, Collector<String> out) -> {
JSONObject record = JSON.parseObject(str);
String url = record.getString("url").toLowerCase();
long userId = URLHelper.getUserIdFromUrl(url);
if (userId < 0) {
return;
}
url = URLHelper.getAddress(url);
LocalDateTime dateTime = LocalDateTime.parse(record.getString("time"), DATE_TIME_FORMATTER);
long timestamp = dateTime.toInstant(UTC8_ZONE_OFFSET).getEpochSecond();
Map<String, String> postData = URLHelper.getParamMap(record.getString("post_data").toLowerCase());
String action = "", itemId = "";
switch (url) {
case "/group/topic/get":
if (postData.getOrDefault("last_id", "0").equals("0") &&
postData.getOrDefault("only_owner", "0").equals("0")) {
action = ActionType.FORUM_TOPIC_VIEW;
itemId = postData.getOrDefault("topic_id", "");
}
break;
case "/group/topic/reply":
action = ActionType.FORUM_TOPIC_REPLY;
itemId = postData.getOrDefault("topic_id", "");
break;
// 其他情况略去......
default: break;
}
if (!action.equals("") && !itemId.equals("")) {
JSONObject result = new JSONObject();
result.put("userId", userId);
result.put("itemId", itemId);
result.put("timestamp", timestamp);
result.put("action", action);
out.collect(result.toJSONString());
}
})
.returns(TypeInformation.of(String.class))
.addSink(new FlinkKafkaProducer011<>(
"p-kafka-01:9092,p-kafka-02:9092,p-kafka-03:9092,p-kafka-04:9092,p-kafka-05:9092",
"log_forward_user_forum_behavior",
new SimpleStringSchema()
))
.name("kafka-sink-log-forward-user-forum-behavior");
注意在Flink程序中使用Java Lambda表达式时,由于类型擦除,程序是无法推断出泛型类型的,所以在上述代码的flatMap()算子之后,必须要显式调用returns()方法提供type hint,这点在官方文档中已经有提及。另外,Kafka Sink的topic要预先创建,为了简化问题,创建时指定分区数为1。
看一下数据进到Kafka了没,然后下一步。
转化为POJO与设定水印
下面开始写统计内容PV等指标的Flink程序。为了方便后续操作,先定义用户行为的POJO类。根据这篇文章中给出的建议,可以用继承TupleX类的方式达到可读性与易用性的平衡(毕竟是内置类型,还不用操心序列化的事情)。代码如下。
public class UserBehavior extends Tuple4<Long, String, Long, String> {
private static final long serialVersionUID = -3144189553355270382L;
public UserBehavior() {}
public UserBehavior(long userId, String itemId, long timestamp, String action) {
this.f0 = userId;
this.f1 = itemId;
this.f2 = timestamp;
this.f3 = action;
}
public long getUserId() { return this.f0; }
public void setUserId(long userId) { this.f0 = userId; }
public String getItemId() { return this.f1; }
public void setItemId(String itemId) { this.f1 = itemId; }
public long getTimestamp() { return this.f2; }
public void setTimestamp(long timestamp) { this.f2 = timestamp; }
public String getAction() { return this.f3; }
public void setAction(String action) { this.f3 = action; }
}
这次就需要用事件时间作为时间特征了。解析一遍新的JSON,将数据封装成上述UserBehavior实例,没什么好说的。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 检查点配置略。但后面要用到状态,所以状态后端必须预先配置,在flink-conf.yaml或者这里均可
env.setStateBackend(new MemoryStateBackend(true));
DataStream<UserBehavior> forumBehaviorStream = env
.addSource(new FlinkKafkaConsumer011<>(
"log_forward_user_forum_behavior",
new SimpleStringSchema(),
consumerProps
))
.name("kafka-source-log-forward-forum")
.map(str -> {
JSONObject record = JSON.parseObject(str);
return new UserBehavior(
record.getLong("userId"),
record.getString("itemId"),
record.getLong("timestamp"),
record.getString("action")
);
})
.returns(TypeInformation.of(UserBehavior.class))
然后就得在业务数据里抽取时间戳作为水印。在上述DataStream上调用assignTimestampsAndWatermarks()方法,它接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(特定事件触发)水印。关于水印的细节之后再说,把TimestampAssigner的继承关系贴一下得了。
如果数据不会乱序(时间单调递增),就可以用简单的AscendingTimestampExtractor。但我们的日志数据可能会乱序,所以用BoundedOutOfOrdernessTimestampExtractor来抽取时间戳与打水印。注意构造它时,需要传入maxOutOfOrderness参数,表示最大能容忍的乱序区间大小。也就是说它实际发射的水印为当前程序看见的最大事件时间减去maxOutOfOrderness,这里给了1分钟。
DataStream<UserBehavior> forumBehaviorTimedStream = forumBehaviorStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.minutes(1)) {
@Override
public long extractTimestamp(UserBehavior userBehavior) {
return userBehavior.getTimestamp() * 1000;
}
}
);
开窗统计
先根据itemId生成KeyedStream。这种业务没有必要用滑动窗口,固定窗口就OK了。如果要用滑动窗口,就把TumblingEventTimeWindows换成SlidingEventTimeWindows。
DataStream<TopicActionWindowStat> topicStatStream = forumBehaviorTimedStream
.keyBy(1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregateFunction(), new ResultWindowFunction());
我们用aggregate()算子进行聚合。更加通用的apply()算子实际上也可以,但是apply()算子会将窗口中所有数据都缓存下来,等到窗口结束再一起计算,如果数据量大的话,压力也会比较大。如果我们自己编写聚合函数(AggregateFunction),再利用aggregate()算子进行预聚合,可以减少内存中维护的数据量。这与编写Spark程序时尽量采用map side预聚合的算子(aggregateByKey、reduceByKey)是异曲同工。
下面就来编写一个聚合函数,它对一篇帖子的阅读、回复、收藏和点赞量进行累加。其中TopicActionAcc是一个特别简单的POJO,看官读源码就能知道怎么实现,因此不再贴了。
public static class CountAggregateFunction
implements AggregateFunction<UserBehavior, TopicActionAcc, TopicActionAcc> {
private static final long serialVersionUID = 8926737679356974335L;
@Override
public TopicActionAcc createAccumulator() {
return new TopicActionAcc();
}
@Override
public TopicActionAcc add(UserBehavior value, TopicActionAcc accumulator) {
switch (value.getAction()) {
case ActionType.FORUM_TOPIC_VIEW: accumulator.addRead(1); break;
case ActionType.FORUM_TOPIC_REPLY: accumulator.addReply(1); break;
case ActionType.FORUM_TOPIC_FAVOR: accumulator.addFavor(1); break;
case ActionType.FORUM_TOPIC_LIKE: accumulator.addLike(1); break;
default: break;
}
return accumulator;
}
@Override
public TopicActionAcc getResult(TopicActionAcc accumulator) {
return accumulator;
}
@Override
public TopicActionAcc merge(TopicActionAcc a, TopicActionAcc b) {
a.addRead(b.getRead());
a.addReply(b.getReply());
a.addFavor(b.getFavor());
a.addLike(b.getLike());
return a;
}
}
aggregate()算子的第二个参数是做什么的呢?它可以将聚合函数输出的结果再进行处理,称作窗口函数(WindowFunction)。由于它对窗口本身是有感知的,所以我们可以方便地获取到窗口的开始与结束时间(左闭右开区间)。代码如下。
public static class ResultWindowFunction
implements WindowFunction<TopicActionAcc, TopicActionWindowStat, Tuple, TimeWindow> {
private static final long serialVersionUID = 3431146528087070967L;
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<TopicActionAcc> input,
Collector<TopicActionWindowStat> out
) throws Exception {
String itemId = ((Tuple1<String>) key).f0;
TopicActionAcc acc = input.iterator().next();
out.collect(new TopicActionWindowStat(
itemId,
window.getEnd(),
acc.getRead(),
acc.getReply(),
acc.getFavor(),
acc.getLike()
));
}
}
这里的TopicActionWindowStat POJO就封装了一个窗口内对一篇帖子的统计信息,包括ID、窗口结束时间戳、阅读、回复、收藏和点赞量。
使用状态输出结果
由于数据有可能是乱序的,所以我们必须保证一个窗口内的所有数据都收集齐全之后才能输出,这就需要用到状态。先根据上面的窗口结束时间戳进行分组,然后编写ProcessFunction来处理它。
topicStatStream
.keyBy(1)
.process(new OutputProcessFunction());
env.execute();
ProcessFunction是Flink中的低级(亦即自由度较大)Function,可以将它近似地理解为Spark中的transform()算子。它能够提供基于TimerService的定时器功能,只要注册一个事件时间戳为windowEndTimestamp + 1
的定时器(多次注册则只有第一次有效),在收到该时间戳的水印时,就表示该窗口内的所有数据都已到达,可以输出了。
维护已经到达的数据则需要使用状态,以保证在程序崩溃时不丢失数据。回忆一下基础知识:Flink中的状态分为Keyed State(KeyedStream级别的状态)和Operator State(算子级别的状态)。我们在这里使用Keyed State中的ListState,即基于列表的多个状态。其他常用的状态还有单值状态ValueState、映射状态MapState、Reduce状态ReduceState等。
说了这么多,还是看代码,很简单。
public static class OutputProcessFunction
extends KeyedProcessFunction<Tuple, TopicActionWindowStat, String> {
private ListState<TopicActionWindowStat> state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<TopicActionWindowStat> stateDescriptor = new ListStateDescriptor<>(
"topic-action-window-stat-state",
TopicActionWindowStat.class
);
state = getRuntimeContext().getListState(stateDescriptor);
}
@Override
public void processElement(TopicActionWindowStat value, Context ctx, Collector<String> out) throws Exception {
state.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEndTimestamp() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
for (TopicActionWindowStat stat : state.get()) {
System.out.println(stat.toString());
out.collect(stat.toString());
}
state.clear();
}
}
为了看起来直观,这里将结果直接打到了标准输出,实际应用中Sink到HBase或者Redis等进行持久化。如果要实现窗口TopN的话,排个序就行了。下面是部分输出的截图。
The End
洗洗睡了。民那晚安。