package com.ctgu.flink.project;
import com.ctgu.flink.entity.LoginEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class Flink_Sql_Login_CEP {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.readTextFile("data/LoginLog.csv");
DataStream<LoginEvent> loginDataStream = dataStream
.filter(line -> line.split(",").length >= 4)
.map(line -> {
String[] s = line.split(",");
return new LoginEvent(Long.valueOf(s[0]), s[1], s[2], Long.valueOf(s[3]) * 1000);
}).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
Pattern<LoginEvent, LoginEvent> loginFailPattern =
Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getLoginState());
}
}).next("secondFail").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return "fail".equals(loginEvent.getLoginState());
}
}).within(Time.seconds(2));
PatternStream<LoginEvent> patternStream =
CEP.pattern(loginDataStream.keyBy(LoginEvent::getUserId), loginFailPattern);
patternStream.select(new MyPatternSelectFunction()).print();
env.execute("Table SQL");
System.out.println("耗时: " + (System.currentTimeMillis() - start) / 1000);
}
public static class MyPatternSelectFunction
implements PatternSelectFunction<LoginEvent, Tuple4<Long, Long, Long, String>> {
@Override
public Tuple4<Long, Long, Long, String> select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent firstFail = map.get("firstFail").iterator().next();
LoginEvent secondFail = map.get("secondFail").get(0);
return new Tuple4<>(firstFail.getUserId(), firstFail.getTimestamp(),
secondFail.getTimestamp(), "fail "+ map.size() + " times");
}
}
}
测试data
5402,83.149.11.115,success,1558430815
23064,66.249.3.15,fail,1558430826
5692,80.149.25.29,fail,1558430833
7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,success,1558430845
1035,83.149.9.216,fail,1558430843
1035,83.149.9.216,success,1558430846
1035,83.149.24.26,fail,1558430844
7328,193.114.45.13,success,1558430848
29607,66.249.73.135,success,1558430847
2133,50.16.19.13,success,1558430857
6745,66.249.73.185,success,1558430859
76456,110.136.166.128,success,1558430853
8345,46.105.14.53,success,1558430855
76456,110.136.166.128,success,1558430857
76456,110.136.166.128,success,1558430854
76456,110.136.166.128,fail,1558430859
76456,110.136.166.128,success,1558430861
3464,123.125.71.35,success,1558430860
76456,110.136.166.128,success,1558430865
65322,50.150.204.184,success,1558430866
23565,207.241.237.225,fail,1558430862
8455,200.49.190.101,success,1558430867
8455,200.49.190.100,success,1558430865
8455,200.49.190.101,success,1558430869
8455,200.49.190.101,success,1558430872
32031,66.249.73.185,success,1558430875
12018,66.249.73.135,success,1558430874
12018,66.249.73.135,success,1558430879
12018,66.249.73.135,success,1558430881
21419,67.214.178.190,success,1558430882
21419,67.214.178.190,success,1558430880
23565,207.241.237.220,success,1558430881
2386,46.105.14.53,success,1558430883
23565,207.241.237.227,success,1558430884
83419,91.177.205.119,success,1558430881
83419,91.177.205.119,fail,1558430882
83419,91.177.205.119,success,1558430885
83419,91.177.205.119,fail,1558430886
83419,91.177.205.119,success,1558430884
83419,91.177.205.119,success,1558430886
4325,26.249.73.15,success,1558430888
2123,207.241.237.228,success,1558430887
21083,207.241.237.101,success,1558430889
13490,87.169.99.232,success,1558430886
93765,209.85.238.199,success,1558430890
93765,209.85.238.199,success,1558430892