版本:flink 1.17.0
启动集群:
/usr/local/Cellar/apache-flink/1.17.0/libexec/bin/start-cluster.sh
提交任务:
flink run /Volumes/Documents/project/java/flink-demo/out/artifacts/flink_demo_jar/flink-demo.jar
idea打jar包:
project setting => jar => from module with denpenes => main class
可能遇到版本问题:
例如:
java.lang.UnsupportedClassVersionError: com/test/WindowWordCount has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0
解决方案,指定编译时候的版本即可:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<mainClass>com.source.FlinkCleanKafka</mainClass>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
同时添加对应的依赖:
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
flink connector下载
- 在flink官网查找 找到对应版本的flink 然后选择download client
watermark水位线问题:
- kafka partition 并行度会导致watermark异常 从而导致不产生数据。因此, 在并行流下对于像Kafka这样的多Partition数据源, 应该把Watermark的生成放在Source算子中. 在原来的 SourceFunction API和最新的Source API中都提供了对应的方法. 而不是在schemabuild里面设置
flink 的rocksdb的checkpoint配置
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs:///Volumes/Documents/flink_data # location to store checkpoints /Volumes/Documents/flink_data 为自定义目录 由于是rocksdb使用ssd性能最佳
flink本地参数配置:
jobmanager.memory.process.size: 2600m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
taskmanager.numberOfTaskSlots: 4
flink线上集群参数配置:
vim flink-conf.yaml
# jobManager 的IP地址
jobmanager.rpc.address: node1
# JobManager 的端口号
jobmanager.rpc.port: 6123
# JobManager的总进程内存大小
jobmanager.memory.process.size: 1024m
# TaskManager的总进程内存大小
taskmanager.memory.process.size: 1024m
# 每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 2
#是否进行预分配内存,默认不进行预分配,这样在我们不使用flink集群时候不会占用集群资源
taskmanager.memory.preallocate: false
# 程序默认并行计算的个数
parallelism.default: 1
#JobManager的Web界面的端口(默认:8081)
rest.port: 8081
flink的kafka消费数据入库postgresql使用示例:
//todo 重启的影响 ? 回朔消费组offset?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
env.enableCheckpointing(1000); //检查点 每5000ms
KafkaSource<String> testKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.229.100:9092")
.setProperty("enable.auto.commit", "true")
.setProperty("auto.offset.reset", "latest") // 多分区的消费组的offset提交问题 Kafka 客户端在没有初始偏移量或偏移量无效(即当前偏移量不在服务器上存在)时如何处理的设置选项 latest:在消费者启动时,将偏移量重置为最新的偏移量。 earliest:在消费者启动时,将偏移量重置为最早的偏移量。 none:如果未找到以前的消费者偏移量,则向消费者抛出异常。
.setProperty("auto.commit.interval.ms", "1000")
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setTopics("topic-test1")
.setGroupId("topic-test1_2")
.build();
DataStream<String> stream = env.fromSource(testKafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Kafka Source");//kafka需要配置watermark策略
DataStream<Tuple5<String, String, String, String, String>> CleanData = stream.map(new MapFunction<String, Tuple5<String, String, String, String, String>>() {
@Override
public Tuple5<String, String, String, String, String> map(String value) throws ParseException {
String[] data = value.split("\\t");
String CourseID = null;
String url = data[2].split(" ")[1];
if (url.startsWith("/class")) {
String CourseHTML = url.split("/")[2];
CourseID = CourseHTML.substring(0, CourseHTML.lastIndexOf("."));
}
return Tuple5.of(data[0], data[1], CourseID, data[3], data[4]);
}
})
.filter(new FilterFunction<Tuple5<String, String, String, String, String>>() {
@Override
public boolean filter(Tuple5<String, String, String, String, String> value) {
return value.f2 != null;
}
});
Table table = tEnv.fromDataStream(CleanData, Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.STRING())
.column("f3", DataTypes.STRING())
.column("f4", DataTypes.STRING())
.columnByExpression("rectime", "CAST(TO_TIMESTAMP(`f1`) as TIMESTAMP_LTZ(3))") // 水印使用本地时区时间戳 否则使用标准时间
.watermark("rectime", "rectime - INTERVAL '5' SECOND")
.build());
tEnv.createTemporaryView("log_info", table);
tEnv.executeSql("CREATE TEMPORARY TABLE stat_out_sink (" +
"recdate date," +
"hr int," +
"pv int," +
"uv int," +
"PRIMARY KEY (recdate, hr) NOT ENFORCED" +
")" +
"WITH ('connector' = 'jdbc'," +
"'url' = 'jdbc:postgresql://192.168.229.100:5432/postgres'," +
"'table-name' = 'stat_out'," +
"'username' = 'postgres'," +
"'password' = 'postgres')");
tEnv.executeSql("insert into stat_out_sink select" +
" CAST(date_format(window_start, 'yyyy-MM-dd') as date) recdate," +
" CAST(date_format(window_end, 'HH') as int) hr," +
" CAST(count(f0) as int) pv," +
" CAST(count(distinct f0) as int) uv" +
" FROM TABLE(CUMULATE(TABLE log_info, DESCRIPTOR(rectime), INTERVAL '5' SECOND, INTERVAL '1' HOUR))" +
" GROUP BY window_start, window_end");