Flink 常用 API 详解

@[TOC]

<font color=red size=50> [还有视频讲解在我的B站-宝哥chbxw](https://www.bilibili.com/video/BV1Ga4y1Y7C4/), 希望大家可以支持一下,谢谢。</font>

# 前言之分层 API 

Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。

![在这里插入图片描述](https://img-blog.csdnimg.cn/20200622173937180.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d1eGludGRyaA==,size_16,color_FFFFFF,t_70)

- **ProcessFunction** 是 Flink 所提供最底层接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑。

-  **DataStream API** 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和Scala 语言,预先定义了例如 map()、reduce()、aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。

-  **SQL & Table API**:Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。

-  **扩展库**

-  **复杂事件处理(CEP)**:模式检测是事件流处理中的一个非常常见的用例。Flink 的 CEP库提供了 API,使用户能够以例如正则表达式或状态机的方式指定事件模式。CEP 库与Flink 的 DataStream API 集成,以便在 DataStream 上评估模式。CEP 库的应用包括网络入侵检测,业务流程监控和欺诈检测。

- **DataSet API**:DataSet API 是 Flink 用于批处理应用程序的核心 API。DataSet API 所提供的基础算子包括 map、reduce、(outer) join、co-group、iterate 等。所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作。如果数据大小超过预留内存,则过量数据将存储到磁盘。Flink 的 DataSet API 的数据处理算法借鉴了传统数据库算法的实现,例如混合散列连接(hybrid hash-join)和外部归并排序(external merge-sort)。

-  **Gelly**: Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。因此,它能够受益于其可扩展且健壮的操作符。Gelly 提供了内置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一个简化自定义图算法实现的 Graph API。

# 一、DataStream 的编程模型

DataStream 的编程模型包括四个部分:Environment,DataSource,Transformation,Sink。

![在这里插入图片描述](https://img-blog.csdnimg.cn/20200602181529395.png)

# 二、Flink 的 DataSource 数据源

## 2.1、基于文件,此处是HDFS

```java

package com.chb.flink.source

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FileSource {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题

        import org.apache.flink.streaming.api.scala._

        //读取数据

        val stream = streamEnv.readTextFile("hdfs://10.0.0.201:9000/README.txt")

        //转换计算

        val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))

            .map((_, 1))

            .keyBy(0)

            .sum(1)

        //打印结果到控制台

        result.print()

        //启动流式处理,如果没有该行代码上面的程序不会运行

        streamEnv.execute("wordcount")

    }

}

```

## 2.2、基于集合的源

有点像Spark的序列化

```java

package com.chb.flink.source

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object CollectionSource {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题

        import org.apache.flink.streaming.api.scala._

        //读取数据

        var dataStream = streamEnv.fromCollection(Array(

            new StationLog("001", "186", "189", "busy", 1577071519462L, 0),

            new StationLog("002", "186", "188", "busy", 1577071520462L, 0),

            new StationLog("003", "183", "188", "busy", 1577071521462L, 0),

            new StationLog("004", "186", "188", "success", 1577071522462L, 32)

        ))

        dataStream.print()

        streamEnv.execute()

    }

}

/*

    * 通信基站日志数据

    * @param sid 基站ID

    * @param callOut 主叫号码

    * @param callIn 被叫号码

    * @param callType 通话类型eg:呼叫失败(fail),占线(busy),拒接(barring),接通(success)

    * @param callTime 呼叫时间戳,精确到毫秒

    * @Param duration 通话时长 单位:秒

*/

class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

```

## 2.3、Kafka

首 先 需 要 配 置 Kafka 连 接 器 的 依 赖 , [另 外 更 多 的 连 接 器 可 以 查 看 官 网](https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/connectors/)

### 2.3.1、引入依赖

```java

        <!-- Kafka connector-->

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.10.1</version>

            <exclusions>

                <exclusion>

                    <!-- 排除对Jackson 的引用 ; -->

                    <groupId>com.fasterxml.jackson.core</groupId>

                    <artifactId>*</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka-clients</artifactId>

            <version>2.4.1</version>

        </dependency>

```

### 2.3.2、Kafka第一种Source

```java

package com.chb.flink.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSourceByString {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换

        import org.apache.flink.streaming.api.scala._

        // kafka配置

        val props = new Properties()

        props.setProperty("bootstrap.servers", "ShServer:9092")

        props.setProperty("group.id", "chb01")

        props.setProperty("key.deserializer", classOf[StringDeserializer].getName)

        props.setProperty("value.deserializer", classOf[StringDeserializer].getName)

        props.setProperty("auto.offset.reset", "latest")

        //设置kafka为数据源

        val flinkKafkaConSumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props)

        val stream = streamEnv.addSource(flinkKafkaConSumer)

        stream.print()

        streamEnv.execute()

    }

}

```

### 2.3.3、Kafka第二种Source

```java

package com.chb.flink.source

import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

object KafkaSourceByKeyValue {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换

        import org.apache.flink.streaming.api.scala._

        val props = new Properties()

        props.setProperty("bootstrap.servers", "ShServer:9092")

        props.setProperty("group.id", "fink02")

        props.setProperty("key.deserializer", classOf[StringDeserializer].getName)

        props.setProperty("value.deserializer", classOf[StringDeserializer].getName)

        props.setProperty("auto.offset.reset", "latest")

        //设置kafka为数据源

        val stream = streamEnv.addSource(new

                FlinkKafkaConsumer[(String, String)]("test", new KafkaDeserializationSchema[(String, String)] {

                    //流是否结束

                    override def isEndOfStream(t: (String, String)) = false

                    override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]) = {

                        if (consumerRecord != null) {

                            var key = "null"

                            var value = "null"

                            if (consumerRecord.key() != null)

                                key = new String(consumerRecord.key(), "UTF-8")

                            if (consumerRecord.value() != null)

                                value = new String(consumerRecord.value(), "UTF-8")

                            (key, value)

                        } else { //如果kafka中的数据为空返回一个固定的二元组

                            ("null", "null")

                        }

                    }

                    //设置返回类型为二元组

                    override def getProducedType =

                        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[

                            String])

                }

                    , props).setStartFromEarliest())

        stream.print()

        streamEnv.execute()

    }

}

```

#### 2.3.3.1、Kafka生产测试

```java

package com.chb.flink.source

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

object MyKafkaProducer {

    def main(args: Array[String]): Unit = {

        val props = new Properties()

        props.setProperty("bootstrap.servers", "ShServer:9092")

        // 注意此处是序列化

        props.setProperty("key.serializer", classOf[StringSerializer].getName)

        props.setProperty("value.serializer", classOf[StringSerializer].getName)

        val producer = new KafkaProducer[String, String](props)

        val random = new Random()

        while(true) {

            producer.send(new ProducerRecord[String, String]("test", "key" + random.nextInt(), "value" + random.nextInt()))

            Thread.sleep(1000)

        }

    }

}

```

## 2.4、自定义Source

自定义数据源,有两种方式实现:

 通过实现 SourceFunction 接口来自定义无并行度(也就是并行度只能为 1)的 Source。

 通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自定义有并行度的数据源。

### 2.4.1、实现SourceFunction的自定义Source

```java

package com.chb.flink.source

import org.apache.flink.streaming.api.functions.source.SourceFunction

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.Random

/**

* 当然也可以自定义数据源,有两种方式实现:

*  通过实现 SourceFunction 接口来自定义无并行度(也就是并行度只能为 1)的 Source。

*  通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自

* 定义有并行度的数据源。

* *

* 写一个实现SourceFunction接口

*/

class MyCustomerSource extends SourceFunction[StationLog] {

    //是否终止数据流的标记

    var flag = true;

    /**

    * 主要的方法

    * 启动一个Source

    * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了

    *

    * @param sourceContext * @throws Exception

    */

    override def run(sourceContext: SourceFunction.SourceContext[StationLog]):

    Unit = {

        val random = new Random()

        var types = Array("fail", "busy", "barring", "success")

        while (flag) { //如果流没有终止,继续获取数据

            1.to(5).map(i => {

                var callOut = "1860000%04d".format(random.nextInt(10000))

                var callIn = "1890000%04d".format(random.nextInt(10000))

                new StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4)), System.currentTimeMillis(), 0)

            }).foreach(sourceContext.collect(_)) //发数据

            Thread.sleep(2000) //每发送一次数据休眠2秒

        }

    }

    //终止数据流

    override def cancel(): Unit = flag = false

}

object CustomerSource {

    def main(args: Array[String]): Unit = {

        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        env.setParallelism(1)

        import org.apache.flink.streaming.api.scala._

        val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource)

        stream.print()

        env.execute()

    }

}

```

# 三、 Flink 的 Sink 数据目标

Flink 针对 DataStream 提供了大量的已经实现的数据目标(Sink),包括文件、Kafka、Redis、HDFS、Elasticsearch 等等。

## 3.1、HDFS Sink

### 3.1.1、配置支持 Hadoop FileSystem 的连接器依赖

```java

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-filesystem_2.11</artifactId>

<version>1.10.1</version>

</dependency>

```

### 3.1.2、Streaming File Sink

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

&emsp;&emsp;Streaming File Sink 能把数据写入 HDFS 中,还可以支持分桶写入,每一个**分桶**就对应 HDFS 中的一个目录。默认按照**小时来分桶**,在一个桶内部,会进一步将输出基于滚动策略**切分成更小**的文件。这有助于防止桶文件变得过大。滚动策略也是可以配置的,默认策略会根据文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间。

![在这里插入图片描述](https://img-blog.csdnimg.cn/20200629195643155.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d1eGludGRyaA==,size_16,color_FFFFFF,t_70)

#### 3.1.2.1、滚动策略

- DefaultRollingPolicy

- CheckpointRollingPolicy

#### 3.1.2.2、分桶策略

- DateTimeBucketAssigner : Default time based assigner

- BasePathBucketAssigner : Assigner that stores all part files in the base path (single global bucket)

**注意必须开启checkpoint, 否则生成的文件都是inprocess状态**

#### 3.1.2.3、代码实现

```java

package com.chb.flink.sink

import com.chb.flink.source.{MyCustomerSource, StationLog}

import org.apache.flink.api.common.serialization.SimpleStringEncoder

import org.apache.flink.core.fs.Path

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object HDFSFileSink {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换

        import org.apache.flink.streaming.api.scala._

        // 启动checkPoint, 否则,生成的文件都是inprocess状态的

        streamEnv.enableCheckpointing(1000)

        // 数据源

        val data: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)

        //创建一个文件滚动规则

        val rolling: DefaultRollingPolicy[StationLog, String] = DefaultRollingPolicy.create()

            .withInactivityInterval(2000) //不活动的间隔时间。

            .withRolloverInterval(2000) //每隔两秒生成一个文件 ,重要

            .build()

        //创建一个HDFS Sink

        var hdfsSink = StreamingFileSink.forRowFormat[StationLog](

            // 注意此处是flink的Path

            new Path("hdfs://ShServer:9000/sink001/"), new SimpleStringEncoder[StationLog]("UTF-8"))

            .withBucketCheckInterval(1000) //检查分桶的间隔时间

            //            .withBucketAssigner(new MemberBucketAssigner)

            .withRollingPolicy(rolling)

            .build()

        // 添加sink

        data.addSink(hdfsSink)

        streamEnv.execute()

    }

    import org.apache.flink.core.io.SimpleVersionedSerializer

    import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner

    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer

    /**

    * 自定义分桶策略

    */

    class MemberBucketAssigner extends BucketAssigner[StationLog, String] {

        // 指定桶名 yyyy-mm-dd

        override def getBucketId(info: StationLog, context: BucketAssigner.Context): String = {

            val date = new Date(info.callTime)

            new SimpleDateFormat("yyyy-MM-dd/HH").format(date)

        }

        override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE

    }

}

```

## 3.2、基于 Redis 的 Sink

Flink 除了内置的连接器外,还有一些额外的连接器通过 Apache **Bahir** 发布,包括:

 Apache ActiveMQ (source/sink)

 Apache Flume (sink)

 Redis (sink)

 Akka (sink)

 Netty (source)

### 3.2.1、依赖

```java

        <dependency>

            <groupId>org.apache.bahir</groupId>

            <artifactId>flink-connector-redis_2.11</artifactId>

            <version>1.0</version>

        </dependency>

```

### 3.2.2、将结果写道redis

```java

package com.chb.flink.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.connectors.redis.RedisSink

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSink {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题

        import org.apache.flink.streaming.api.scala._

        //读取数据

        val stream = streamEnv.socketTextStream("hadoop01", 8888)

        //转换计算

        val result = stream.flatMap(_.split(","))

            .map((_, 1))

            .keyBy(0)

            .sum(1)

        //连接redis的配置

        val config = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("hadoop01").setPort(6379).build()

        //写入redis

        result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {

            override def getCommandDescription = new

                    RedisCommandDescription(RedisCommand.HSET, "t_wc")

            override def getKeyFromData(data: (String, Int)) = {

                data._1 //单词

            }

            override def getValueFromData(data: (String, Int)) = {

                data._2 + "" //单词出现的次数

            }

        }))

        streamEnv.execute()

    }

}

```

## 3.3、Kafka Sink

### 3.3.1、第一种

```java

package com.chb.flink.sink

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

/**

* Kafka Sink

*/

object KafkaSinkByString {

    def main(args: Array[String]): Unit = {

        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1) //默认情况下每个任务的并行度为1

        import org.apache.flink.streaming.api.scala._

        //读取netcat流中数据 (实时流)

        val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)

        //转换计算

        val result = stream1.flatMap(_.split(","))

        //数据写入Kafka,并且是KeyValue格式的数据

        result.addSink(new FlinkKafkaProducer[String]("hadoop01:9092", "t_topic", new SimpleStringSchema()))

        streamEnv.execute()

    }

}

```

### 3.3.2、第二种

```java

package com.chb.flink.sink

import java.lang

import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}

import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.kafka.common.serialization.StringSerializer

/**

* Kafka Sink

*/

object KafkaSinkByKeyValue {

    def main(args: Array[String]): Unit = {

        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1) //默认情况下每个任务的并行度为1

        import org.apache.flink.streaming.api.scala._

        //读取netcat流中数据 (实时流)

        val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)

        //转换计算

        val result = stream1.flatMap(_.split(","))

            .map((_, 1))

            .keyBy(0)

            .sum(1)

        //Kafka生产者的配置

        val props = new Properties()

        props.setProperty("bootstrap.servers", "hadoop01:9092")

        props.setProperty("key.serializer", classOf[StringSerializer].getName)

        props.setProperty("value.serializer", classOf[StringSerializer].getName)

        //数据写入Kafka,并且是KeyValue格式的数据

        result.addSink(new FlinkKafkaProducer[(String, Int)]("t_topic",

            new KafkaSerializationSchema[(String, Int)] {

                override def serialize(element: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {

                    new ProducerRecord("t_topic", element._1.getBytes, (element._2 + "").getBytes())

                }

            }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) //EXACTLY_ONCE 精确一次

        streamEnv.execute()

    }

}

```

## 3.4、自定义Sink

```java

package com.chb.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.chb.flink.source.{MyCustomerSource, StationLog}

import org.apache.flink.configuration.Configuration

import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/*

*从自定义的Source中读取StationLog数据,通过Flink写入Mysql数据库

*

* 当然你可以自己定义 Sink,有两种实现方式:

* 1、实现 SinkFunction 接口。

* 2、实现RichSinkFunction 类。后者增加了生命周期的管理功能。

* 比如需要在 Sink 初始化的时候创建连接对象,则最好使用第二种。

* 案例需求:把 StationLog 对象写入 Mysql 数据库中。

*/

object CustomJdbcSink {

    //自定义一个Sink写入Mysql

    class MyCustomSink extends RichSinkFunction[StationLog] {

        var conn: Connection = _

        var pst: PreparedStatement = _

        //生命周期管理,在Sink初始化的时候调用

        override def open(parameters: Configuration): Unit = {

            conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123123")

            pst = conn.prepareStatement("insert into t_station_log(sid, call_out, call_in, call_type, call_time, duration) values(?, ?, ?, ?, ?, ?)")

        }

        //把StationLog 写入到表t_station_log

        override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = {

            pst.setString(1, value.sid)

            pst.setString(2, value.callOut)

            pst.setString(3, value.callIn)

            pst.setString(4, value.callType)

            pst.setLong(5, value.callTime)

            pst.setLong(6, value.duration)

            pst.executeUpdate()

        }

        override def close(): Unit = {

            pst.close()

            conn.close()

        }

    }

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题

        import org.apache.flink.streaming.api.scala._

        val data: DataStream[StationLog] = streamEnv.addSource(new

                MyCustomerSource)

        //数据写入msyql

        data.addSink(new MyCustomSink)

        streamEnv.execute()

    }

}

```

# 四、DataStream 转换算子

这个非常简单,看api就知道

# 五、函数类和富函数类

上节的所有算子几乎都可以自定义一个**函数类、富函数类**作为参数。因为Flink暴露了这两种函数类的接口,常见的函数接口有:

-  MapFunction

-  FlatMapFunction

-  ReduceFunction

-  。。。。。

**富函数**接口同其他常规函数接口的不同在于:**可以获取运行环境的上下文,在上下文环境中可以管理状态(State),并拥有一些生命周期方法**,所以可以实现更复杂的功能。富函数的接口有:

-  RichMapFunction

-  RichFlatMapFunction

-  RichFilterFunction

- 。。。。。

## 5.1、普通函数类举例:按照指定的时间格式输出每个通话的拨号时间和结束时间

## 5.2、富函数类举例:把呼叫成功的通话信息转化成真实的用户姓名

通话用户对应的用户表(在 Mysql 数据中)为:

![在这里插入图片描述](https://img-blog.csdnimg.cn/20200603082439421.png)

由于需要从数据库中查询数据,就需要创建连接,创建连接的代码必须写在生命周期的open方法中。所以需要使用富函数类。

Rich Function 有一个生命周期的概念。**典型的生命周期方法**有:

- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open()会被调用。

- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态

```java

package com.chb.flink.func

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import java.text.SimpleDateFormat

import com.chb.flink.source.StationLog

import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}

import org.apache.flink.configuration.Configuration

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**

* 富函数类举例:把呼叫成功的通话信息转化成真实的用户姓名

*/

object TestFunction {

    def main(args: Array[String]): Unit = {

        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

        // 隐式转换

        import org.apache.flink.streaming.api.scala._

        val data: DataStream[StationLog] = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)

            .map(line => {

                val arr = line.split(",")

                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)

            })

        //定义时间输出格式

        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

        //过滤那些通话成功的

        data.filter(_.callType.equals("success"))

            .map(new CallMapFunction(format))

            .print()

        streamEnv.execute()

    }

}

//自定义的富函数类

class CallRichMapFunction() extends RichMapFunction[StationLog, StationLog] {

    var conn: Connection = _

    var pst: PreparedStatement

    = _

    //生命周期管理,初始化的时候创建数据连接

    override def open(parameters: Configuration): Unit = {

        conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123456")

        pst = conn.prepareStatement("select name from t_phone where phone_number =?")

    }

    override def map(in: StationLog): StationLog = {

        //查询主叫用户的名字

        pst.setString(1, in.callOut)

        val set1: ResultSet = pst.executeQuery()

        if (set1.next()) {

            in.callOut = set1.getString(1)

        }

        //查询被叫用户的名字

        pst.setString(1, in.callIn)

        val set2: ResultSet = pst.executeQuery()

        if (set2.next()) {

            in.callIn = set2.getString(1)

        }

        in

    }

    //关闭连接

    override def close(): Unit = {

        pst.close()

        conn.close()

    }

}

```

# 六、底层 ProcessFunctionAPI

ProcessFunction 是一个**低层次的流处理操作**,允许返回所有 Stream 的基础构建模块:

- 访问 Event 本身数据(比如:Event 的时间,Event 的当前 Key 等)

- 管理状态 State(仅在 Keyed Stream 中)

-  管理定时器 Timer(包括:注册定时器,删除定时器等)

总而言之,ProcessFunction 是 Flink 最底层的 API,也是功能最强大的。

例如:监控每一个手机,如果在 5 秒内呼叫它的通话都是失败的,发出警告信息。

```java

package com.chb.flink.func

import java.text.SimpleDateFormat

import java.util.Date

import com.chb.flink.source.StationLog

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}

import org.apache.flink.streaming.api.functions.KeyedProcessFunction

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.util.Collector

/**

* 监控每一个手机号,如果在5秒内呼叫它的通话都是失败的,发出警告信息

* 在5秒中内只要有一个呼叫不是fail则不用警告

*/

object TestProcessFunction {

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //导入隐式转换

        import org.apache.flink.streaming.api.scala._

        //读取socket数据

        val data = streamEnv.socketTextStream("10.0.0.201", 8888)

            .map(line => {

                var arr = line.split(",")

                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)

            })

        //处理数据

        data.keyBy(_.callOut)

            .process(new MonitorCallFail())

            .print()

        streamEnv.execute()

    }

    class MonitorCallFail() extends KeyedProcessFunction[String, StationLog, String] {

        // 定义一个状态记录时间

        lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time", classOf[Long]))

        // 处理数据

        override def processElement(value: StationLog,

                                    context: KeyedProcessFunction[String, StationLog, String]#Context,

                                    collector: Collector[String]): Unit = {

            val time = timeState.value() // 从状态中取出时间

            if (value.callType.equals("fail") && time == 0) { // 第一次失败

                // 获取当前时间, 注册定时器

                val now = context.timerService().currentProcessingTime()

                var onTime = now + 5000L // 5秒后触发

                context.timerService().registerProcessingTimeTimer(onTime);

                println("first time: " + new Date())

                timeState.update(onTime)

            }

            // 有呼叫成功, 取消触发器

            if (!value.callType.equals("fail") && time != 0) {

                context.timerService().deleteProcessingTimeTimer(time)

                timeState.clear()

            }

        }

        // 时间到, 执行触发器,发出告警

        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext,

                            out: Collector[String]): Unit = {

            val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

            var warnStr = "触发时间:" + df.format(new Date(timestamp)) + " 手机号:" + ctx.getCurrentKey

            out.collect(warnStr)

            timeState.clear()

        }

    }

}

```

#  七、侧输出流 Side Output

&emsp;&emsp;在 flink 处理数据流时,我们经常会遇到这样的情况:在处理一个数据源时,往往**需要将该源中的不同类型的数据做分割处理**

- 如果使用 filter 算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;

- 侧输出就是将数据流进行分割,而不对流进行复制的一种分流机制。

- flink 的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据。

案例:根据基站的日志,请把呼叫成功的 Stream(主流)和不成功的 Stream(侧流)分别输出。

```java

package com.chb.flink.func

import com.chb.flink.source.StationLog

import org.apache.flink.streaming.api.functions.ProcessFunction

import org.apache.flink.util.Collector

/**

* 把呼叫成功的Stream(主流)和不成功的Stream(侧流)分别输出。

*/

object TestSideOutputStream {

    //侧输出流首先需要定义一个流的标签 , 此处需要将隐式转换放在前面

    import org.apache.flink.streaming.api.scala._

    var notSuccessTag = new OutputTag[StationLog]("not_success")

    def main(args: Array[String]): Unit = {

        //初始化Flink的Streaming(流计算)上下文执行环境

        val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        streamEnv.setParallelism(1)

        //读取文件数据

        val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)

            .map(line => {

                var arr = line.split(",")

                new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)

            })

        val mainStream: DataStream[StationLog] = data.process(new CreateSideOutputStream(notSuccessTag))

        //得到侧流

        val sideOutput: DataStream[StationLog] = mainStream.getSideOutput(notSuccessTag)

        mainStream.print("main")

        sideOutput.print("sideoutput")

        streamEnv.execute()

    }

    class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {

        override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): Unit = {

            if (value.callType.equals("success")) { //输出主流

                out.collect(value)

            } else { //输出侧流

                ctx.output(tag, value)

            }

        }

    }

}

```

# [还有视频讲解在我的B站-宝哥chbxw](https://www.bilibili.com/video/BV1Ga4y1Y7C4/), 希望大家可以支持一下,谢谢。

<br>

# [Flink目录导读](https://chbxw.blog.csdn.net/article/details/106452793)

# 关注我的公众号【宝哥大数据】, 更多干货。

![在这里插入图片描述](https://img-blog.csdnimg.cn/20210623213623230.png)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容