flink使用初探

版本:flink 1.17.0

启动集群:

/usr/local/Cellar/apache-flink/1.17.0/libexec/bin/start-cluster.sh 

提交任务:

flink run /Volumes/Documents/project/java/flink-demo/out/artifacts/flink_demo_jar/flink-demo.jar

idea打jar包:

project setting => jar => from module with denpenes => main class

可能遇到版本问题:

例如:

java.lang.UnsupportedClassVersionError: com/test/WindowWordCount has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0
解决方案,指定编译时候的版本即可:
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>11</source>
                <target>11</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<properties>
    <mainClass>com.source.FlinkCleanKafka</mainClass>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
</properties>
同时添加对应的依赖:
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.1</version>
</dependency>

flink connector下载

  • 在flink官网查找 找到对应版本的flink 然后选择download client

watermark水位线问题:

  • kafka partition 并行度会导致watermark异常 从而导致不产生数据。因此, 在并行流下对于像Kafka这样的多Partition数据源, 应该把Watermark的生成放在Source算子中. 在原来的 SourceFunction API和最新的Source API中都提供了对应的方法. 而不是在schemabuild里面设置

flink 的rocksdb的checkpoint配置

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs:///Volumes/Documents/flink_data # location to store checkpoints /Volumes/Documents/flink_data 为自定义目录 由于是rocksdb使用ssd性能最佳

flink本地参数配置:

jobmanager.memory.process.size: 2600m

taskmanager.memory.process.size: 2728m

taskmanager.memory.flink.size: 2280m

taskmanager.numberOfTaskSlots: 4

flink线上集群参数配置:

vim flink-conf.yaml
# jobManager 的IP地址
jobmanager.rpc.address: node1
# JobManager 的端口号
jobmanager.rpc.port: 6123
# JobManager的总进程内存大小
jobmanager.memory.process.size: 1024m
# TaskManager的总进程内存大小
taskmanager.memory.process.size: 1024m
# 每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 2
#是否进行预分配内存,默认不进行预分配,这样在我们不使用flink集群时候不会占用集群资源
taskmanager.memory.preallocate: false
# 程序默认并行计算的个数
parallelism.default: 1
#JobManager的Web界面的端口(默认:8081)
rest.port: 8081

flink的kafka消费数据入库postgresql使用示例:

//todo 重启的影响 ? 回朔消费组offset? 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
env.enableCheckpointing(1000);  //检查点 每5000ms

KafkaSource<String> testKafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers("192.168.229.100:9092")
        .setProperty("enable.auto.commit", "true")
        .setProperty("auto.offset.reset", "latest") // 多分区的消费组的offset提交问题 Kafka 客户端在没有初始偏移量或偏移量无效(即当前偏移量不在服务器上存在)时如何处理的设置选项 latest:在消费者启动时,将偏移量重置为最新的偏移量。 earliest:在消费者启动时,将偏移量重置为最早的偏移量。 none:如果未找到以前的消费者偏移量,则向消费者抛出异常。
        .setProperty("auto.commit.interval.ms", "1000")
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
        .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
        .setTopics("topic-test1")
        .setGroupId("topic-test1_2")
        .build();
        
DataStream<String> stream = env.fromSource(testKafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Kafka Source");//kafka需要配置watermark策略
DataStream<Tuple5<String, String, String, String, String>> CleanData = stream.map(new MapFunction<String, Tuple5<String, String, String, String, String>>() {
            @Override
            public Tuple5<String, String, String, String, String> map(String value) throws ParseException {
                String[] data = value.split("\\t");
                String CourseID = null;
                String url = data[2].split(" ")[1];
                if (url.startsWith("/class")) {
                    String CourseHTML = url.split("/")[2];
                    CourseID = CourseHTML.substring(0, CourseHTML.lastIndexOf("."));
                }
                return Tuple5.of(data[0], data[1], CourseID, data[3], data[4]);
            }
        })
        .filter(new FilterFunction<Tuple5<String, String, String, String, String>>() {
            @Override
            public boolean filter(Tuple5<String, String, String, String, String> value) {
                return value.f2 != null;
            }
        });

Table table = tEnv.fromDataStream(CleanData, Schema.newBuilder()
        .column("f0", DataTypes.STRING())
        .column("f1", DataTypes.STRING())
        .column("f2", DataTypes.STRING())
        .column("f3", DataTypes.STRING())
        .column("f4", DataTypes.STRING())
        .columnByExpression("rectime", "CAST(TO_TIMESTAMP(`f1`) as TIMESTAMP_LTZ(3))") // 水印使用本地时区时间戳 否则使用标准时间
        .watermark("rectime", "rectime - INTERVAL '5' SECOND")
        .build());

tEnv.createTemporaryView("log_info", table);
tEnv.executeSql("CREATE TEMPORARY TABLE stat_out_sink (" +
        "recdate date," +
        "hr int," +
        "pv int," +
        "uv int," +
        "PRIMARY KEY (recdate, hr) NOT ENFORCED" +
        ")" +
        "WITH ('connector' = 'jdbc'," +
        "'url' = 'jdbc:postgresql://192.168.229.100:5432/postgres'," +
        "'table-name' = 'stat_out'," +
        "'username' = 'postgres'," +
        "'password' = 'postgres')");
tEnv.executeSql("insert into stat_out_sink select" +
        " CAST(date_format(window_start, 'yyyy-MM-dd') as date) recdate," +
        " CAST(date_format(window_end, 'HH') as int) hr," +
        " CAST(count(f0) as int) pv," +
        " CAST(count(distinct f0) as int) uv" +
        " FROM TABLE(CUMULATE(TABLE log_info, DESCRIPTOR(rectime), INTERVAL '5' SECOND, INTERVAL '1' HOUR))" +
        " GROUP BY window_start, window_end");
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容