1. Flink的CheckPoint存在哪里?
默认情况下,State会保存在TaskManager的内存中,CheckPoint会存储在JobManager的内存中。State和CheckPoint的存储位置取决于StateBackend的配置。Flink一共提供了3种StateBackend。包括基于内存的MemoryStateBackend、基于文件系统的FsStateBackend,以及基于RockDB作为存储介质的RocksDBState-Backend。
checkpoint的参数设置:
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkEnvConf {
public static StreamExecutionEnvironment getEnv(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置每次checkpoint间隔,每隔60s一次
env.enableCheckpointing(1000 * 60);
//设置checkpoint模式,exactly_once只处理一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置时间语义,设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置失败重启策略,尝试重启5次,每隔120s重启1次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.seconds(120)));
//设置checkpoint之心超时时间,设置10分钟超时
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置两次checkpoint的最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//设置最大并行的checkpoint数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置是够保存checkpoint的数据,RETAIN_ON_CANCELLATION会保留CheckPoint数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置checkpoint可以容忍失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
//设置若有最新的savepoint,优先采用savepoint来恢复成检查点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
return env;
}
}
原文链接:https://blog.csdn.net/lzxlfly/article/details/108819754
2. Flink状态机制
state 和 checkpoint
3. Flink的checkpoint机制对比spark有什么不同和优势
从数据一致性角度来看,Spark Streaming的checkpoint是基于RDD的计算图,而Flink的checkpoint是基于状态后端的。从效率和延迟的角度来看,Spark Streaming的checkpoint可能导致较高的延迟,而Flink的checkpoint通过异步快照实现较低的延迟。从容错性角度来看,Spark Streaming依赖于可靠的分布式文件系统,而Flink可以选择不同的状态后端进行容错。最后,从精准一次语义的角度来看,Spark Streaming的checkpoint无法保证精准一次语义,而Flink的checkpoint能够保证精准一次的语义。
01数据一致性
从数据一致性的角度来看,Spark Streaming的checkpoint和Flink的checkpoint在实现上有一些不同:
1.Spark Streaming的checkpoint:
Spark Streaming的checkpoint机制是基于RDD(弹性分布式数据集)的。它通过将应用程序的元数据(包括DAG图、转换操作等)和RDD的计算图序列化到可靠的分布式文件系统(如HDFS)上来实现数据一致性。
当应用程序发生故障时,Spark Streaming可以使用checkpoint信息来恢复丢失的数据。它会从checkpoint中重新计算丢失的RDD,并继续处理后续的数据流。
2.Flink的checkpoint:
Flink的checkpoint机制是基于状态后端的,它保存了应用程序的状态和操作符的计算状态。Flink的状态后端可以选择不同的存储方式,包括内存、文件系统、分布式文件系统等,以保证数据的可靠性和一致性。
当应用程序发生故障时,Flink可以从最近一次成功的checkpoint中恢复状态,并继续处理数据流。它会确保所有操作符的状态都能够准确地恢复到最近一次的一致状态。
精确一次语义:
Spark Streaming的checkpoint机制无法保证精确一次的语义。在故障恢复时,Spark Streaming可能会有一些重复数据或丢失数据的情况发生。这是因为Spark Streaming的checkpoint是基于RDD的计算图,而RDD的转换操作是惰性执行的,当应用程序恢复时,可能会重新执行一些转换操作,导致数据的重复或丢失。
Flink的checkpoint机制可以保证精确一次的语义。它会在执行快照时,对所有状态进行一致性检查,并在故障恢复时准确地恢复到最近一次的一致状态。这意味着在Flink中,不会有重复数据或丢失数据的情况发生。
02效率和延迟
从效率和延迟的角度来看,Spark Streaming的checkpoint和Flink的checkpoint在实现上也存在一些差异:
1.Spark Streaming的checkpoint:
Spark Streaming的checkpoint机制对于大规模的流处理应用来说,可能会导致较高的延迟。当进行checkpoint时,Spark Streaming需要将RDD的计算图序列化到可靠的分布式文件系统(如HDFS)上,这会涉及磁盘IO操作,导致一定的延迟。
此外,Spark Streaming的checkpoint机制需要将RDD的计算图完整地保存在checkpoint中,这会占用较多的存储空间,并且在进行故障恢复时,需要重新计算丢失的RDD,进一步增加了延迟。
2.Flink的checkpoint:
Flink的checkpoint机制通过异步快照来实现,可以在不阻塞流处理的情况下进行状态的持久化,因此具有较低的延迟。Flink会将应用程序的状态和操作符的计算状态异步地写入到状态后端中,而不需要进行磁盘IO操作。
此外,Flink的状态后端可以选择不同的存储方式,包括内存、文件系统、分布式文件系统等,可以根据应用程序的需求进行灵活配置,从而进一步提高效率和降低延迟。
03容错性
从容错性的角度来看,Spark Streaming的checkpoint和Flink的checkpoint在实现上也有一些不同:
1.Spark Streaming的checkpoint:
Spark Streaming的checkpoint机制依赖于可靠的分布式文件系统(如HDFS)来存储checkpoint数据,以保证数据的容错性。当应用程序执行checkpoint时,它会将应用程序的元数据和RDD的计算图序列化到分布式文件系统上。
在故障恢复时,Spark Streaming可以使用checkpoint信息来恢复丢失的数据。它会重新计算丢失的RDD,并继续处理后续的数据流,以确保应用程序能够从故障中恢复并继续进行流处理。
2.Flink的checkpoint:
Flink的checkpoint机制可以选择不同的状态后端来存储checkpoint数据,包括内存、文件系统、分布式文件系统等。这些状态后端都提供了持久化存储的能力,以确保数据的容错性。
当应用程序发生故障时,Flink可以从最近一次成功的checkpoint中恢复状态,并继续处理数据流。它会确保所有操作符的状态都能够准确地恢复到最近一次的一致状态,以保证应用程序的容错性。
https://www.sohu.com/a/729473781_100110101
4. 反压
生产数据的速率比下游 task 消费数据的速率要快
解决方案
https://flink-learning.org.cn/article/detail/6e41b3bbeb59c49f66030e86b1281751
https://zhuanlan.zhihu.com/p/656037315
5. Flink CEP
https://xie.infoq.cn/article/b2ffe9c41ecb4647a1ed8a340
6. Flink双流JOIN
- window join
- join 算子
- coGroup 算子
- interval join
https://developer.huawei.com/consumer/cn/forum/topic/0202775562683000448
TTL
tableEnv.createTemporaryView("t1", waterSensor1);
tableEnv.createTemporaryView("t2", waterSensor2);
// inner join
// left: OnCreateAndWrite right: OnCreateAndWrite
// tableEnv.sqlQuery("select t1.id, t1.vc, t2.id, t2.vc from t1 join t2 on t1.id = t2.id")
// .execute()
// .print();
// left join
// left: OnReadAndWrite right: OnCreateAndWrite
// tableEnv.sqlQuery("select t1.id, t1.vc, t2.id, t2.vc from t1 left join t2 on t1.id = t2.id")
// .execute()
// .print();
// full join
// left: OnReadAndWrite right: OnReadAndWrite
tableEnv.sqlQuery("select t1.id, t1.vc, t2.id, t2.vc from t1 full join t2 on t1.id = t2.id")
.execute()
.print();