Flink Kafka Connector介绍和使用(DataStream and Table)

前言

Flink提供了一个Apache Kafka连接器,我们可以很方便的实现从Kafka主题读取数据和向其写入数据。

Flink附带了提供了多个Kafka连接器:universal通用版本,0.100.11

官方文档解释说universal(通用版本)的连接器,会尝试跟踪Kafka最新版本,兼容0.10或者之后的Kafka版本,官方文档也说对于绝大多数情况使用这个即可。在最新的官方文档上有这个通用版本连接器的迁移介绍:

Migrating Kafka Connector from 0.11 to universal

In order to perform the migration, see the upgrading jobs and Flink versions guide and:

  • Use Flink 1.9 or newer for the whole process.
  • Do not upgrade Flink and user operators at the same time.
  • Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (uid):
  • Use stop with savepoint feature to take the savepoint (for example by using stop --withSavepoint)CLI command.

But:如果你使用的Kafka版本是0.11.x 或者0.10.x,官方建议使用专用版本的Kafka连接器。

Flink DataStream中的Kafka消费者

Kafka的消费者用来消费Kafka中Topic的数据,在Flink中FlinkKafkaConsumer提供了订阅一个Topic或者多个Topic,一下是FlinkKafkaConsumer011部分构造器源码:

public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
    // ...省略
    // 订阅单个Topic
    public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }
    // 订阅多个Topic
    public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    // ...省略
}

可以看到,在连接器的构造器中,主要有3个参数,分别是:

  • String topic/List<String> topics: 订阅的Topic或者Topic的列表
  • DeserializationSchema<T> valueDeserializer注意泛型):指定kafka反序列化的Schema
  • Properties props: 消费者的配置参数,其中必须的参数有:bootstrap.serversgroup.id

举个栗子:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "bigdata01:9092")
properties.setProperty("group.id", "flink-consumer-kafka-test")
stream = env.addSource(new FlinkKafkaConsumer[String]("flink-consumer-kafka-topic", new SimpleStringSchema(), properties))

在上面的例子中,要注意泛型的使用,从构造器源码上可以看出在类上边声明了泛型T,所以在创建Kafka消费者连接器的时候声明了FlinkKafkaConsumer的泛型为String,同时在构造器中传入SimpleStringSchema这个类的实例,为什么要这么做呢?大家结合在上面给出的构造器来看一下:

构造器中的第二个参数:DeserializationSchema<T> valueDeserializer中的泛型T会变成在创建构造器传入的String,然后大家继续看一下SimpleStringSchema的类源码:

public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
     // ...省略
}

是不是瞬间明白了?原因就是SimpleStringSchema继承了DeserializationSchema<String>

DeserializationSchema的实现类Flink已经帮我们做了很多,常用的除了我们上面列出的SimpleStringSchema,还有关于TypeInformationSerializationSchema、JsonDeserializationSchema、AvroDeserializationSchema、CsvRowDeserializationSchema、CanalJsonDeserializationSchema的反序列化schema等等。

需要注意的是使用Avro的反序列化需要引入对应的依赖

Flink DataStream中的Kafka消费者偏移量配置

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromSpecificOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)
...
  • setStartFromGroupOffsets()

    默认行为,默认读取上次保存的偏移量信息,如果是应用第一次启动,读取不到上次的偏移量信息,则会根据这个参数auto.偏移量.reset的值来进行消费数据

  • setStartFromEarliest()/setStartFromLatest()

    从最早/最新的数据开始进行消费,忽略存储的偏移量信息

  • setStartFromTimestamp(long startupOffsetsTimestamp)

    从topic中指定的时间点开始消费,指定时间戳(以毫秒为单位)之前的数据忽略

  • setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)

    从指定位置进行消费,需要注意的是如果Topic没有消费者指定的分区偏移量时,此配置会退化到setStartFromGroupOffsets()。eg:

    val offsets = new util.HashMap[KafkaTopicPartition, lang.Long]()
    offsets.put(new KafkaTopicPartition(topic, 0), 0L);
    offsets.put(new KafkaTopicPartition(topic, 1), 0L);
    myConsumer.setStartFromSpecificOffsets(offsets)
    

需要注意:当作业从故障中自动恢复或使用保存点手动恢复时,这些启动位置配置方法不会影响启动位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或checkpoint中的偏移量决定

Flink DataStream中的Kafka消费者与容错

启用了Flink的checkpoint后,Flink Kafka消费者在消费来自topic的记录的同时,并定期检查其所有Kafka偏移量以及其他操作的状态。在作业失败的情况下,Flink将把流程序恢复到最新的checkpoint的状态,并从checkpoint中存储的偏移量开始,重新消费来自Kafka的记录。

如果禁用checkpoint,Kafka消费者将定期向Zookeeper提交偏移量。

Flink DataStream中Kafka消费者Topic和Partition Discovery

  • Partition Discovery

    在Flink Kafka中分区发现默认是禁用的,如需要可以配置flink.partition-discovery.interval-millis表示发现间隔(以毫秒为单位)。

  • Topic Discovery

    支持通过正则表达式来实现Topic发现

    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "bigdata01:9092")
    properties.setProperty("group.id", "flink-consumer-kafka")
    
    val myConsumer = new FlinkKafkaConsumer[String](
        java.util.regex.Pattern.compile("test-topic-[0-9]"),
        new SimpleStringSchema,
        properties)
    
    val stream = env.addSource(myConsumer)
    

    上面的例子仅仅是在启动时订阅了 0-9 Topic,如果程序动态的发现新创建的Topic,需要配置flink.partition-discovery.interval-millis表示发现间隔(以毫秒为单位)

Flink DataStream中Kafka消费者偏移量提交行为配置

Flink Kafka消费者允许配置如何将偏移量提交回Kafka Brokers的行为。

注意,Flink Kafka消费者并不依赖于提交的偏移量来保证容错。提交的偏移量只是为了监视目的而暴露消费者进度进度的一种手段。

Flink Kafka消费者的提交行为取决于在程序中是否配置了checkpoint

  • Checkpointing disabled

    如果禁用了ck,Flink Kafka消费者将自动的周期性的向Kafka提交偏移量,要禁用offse只需在提供的属性配置中将enable.auto.commit / auto.commit.interval.ms设置为适当的值。

  • Checkpointing enabled

    如果启动了ck,Flink Kafka消费者会在ck完成时,将偏移量保存在ck的状态中,这样做保证了Kafka Borkers中提交的偏移量与检查点状态中的偏移量一致。用户可以通过调用消费者的setCommitOffsetsOnCheckpoints(boolean)方法来选择禁用或启用偏移量提交(默认情况下,该行为为true)。注意,在这个场景消费者配置中的自动提交会被忽略。

Flink DataStream中的Kafka生产者

同上面的消费者一样,在Flink中,Kafka 生产者也提供了universal(通用版本)和0.10,0.11等版本

支持将数据写入一个或者多个Topic中,下面给出部分源码

public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {
    // 内部的一个语义枚举类
    public enum Semantic {
        // 精确一次,Flink生产者将在Kafka的事务中写入所有消息,这些消息将提交给检查点上的Kafka。
        EXACTLY_ONCE,
        // 至少一次,消息不会丢失,但可能会重复 (默认)
        AT_LEAST_ONCE,
        // 没有语义,意味着消息可能会对视或者重复
        NONE;
        private Semantic() {}
    }
    
    // ...省略
    public FlinkKafkaProducer(
        String topicId,
        KeyedSerializationSchema<IN> serializationSchema,
        Properties producerConfig,
        Semantic semantic) {
        this(topicId,
             serializationSchema,
             producerConfig,
             Optional.of(new FlinkFixedPartitioner<IN>()),
             semantic,
             DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    }
    // ...省略
}

由上面代码可知,创建一个Kafka生产者需要必须的4个参数

  • String topicId

    生产者发送的主题

  • KeyedSerializationSchema<IN> serializationSchema

    序列化schema,使用KafkaSerializationSchema可以配置header,定义每一条数据的key或者指定数据的自定义分区

  • Properties producerConfig

    生产者配置信息,其中必须的参数有:bootstrap.servers

  • Semantic semantic

    容错语义,源码体现在一个内部枚举类,默认是至少一次

需要注意的时,在构造器中还有一个参数指定池中kafkaproducer的数量,默认是5个

/**
* Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}.
*/
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;

Semantic.EXACTLY_ONCE 语义为每个 FlinkKafkaProducer011 实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小, FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。

Flink DataStream中的Kafka生产者的容错

因为在启用了Flink的检查点后,FlinkKafkaProducerFlinkKafkaProducer011通过二阶段提交(2PC,以后专门开一个帖子总结)可以提供精确一次的保证,以下内容围绕0.11版本来讲。

除了使用Flink的检查点机制外,我们也可以在创建Kafka生产者的时候使用内部的容错语义(默认是至少一次)

这里需要注意的一个问题是Semantic.EXACTLY_ONCE模式依赖于提交在接受检查点之前启动的事务,以及在从上述检查点恢复之后启动的事务。如果Flink应用程序崩溃和完成重启之间的时间大于Kafka的事务超时(transaction.timeout.ms),那么就会出现数据丢失(Kafka会自动中止超过超时的事务)。所以在配置时需要考虑事务超时不要小于崩溃和完成重启之间的时间。

这里列出源码中对Kafka事务超时的配置信息:

ProducerConfig.class

public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." +
    "If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.";

如何在代码中使用呢?举个栗子:

// 二阶段提交事务 保证端到端一致性
val outprop: Properties = new Properties()
outprop.setProperty("bootstrap.servers", "hadoop01:9092")
//设置事务超时时间,这里设置了15分钟的超时时间
outprop.setProperty("transaction.timeout.ms", 60000 * 15 + "")

val kafkaProducer = new FlinkKafkaProducer011[String](
    "outputTopic",
    new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
    outprop,Semantic.EXACTLY_ONCE)

dataStream.addSink(kafkaProducer)

这里要说明的事,在Kafka Brokers中默认的transaction.max.timeout.ms是15分钟,此属性不允许小于生产者设置的事务超时。FlinkKafkaProducer011 默认的 transaction.timeout.ms为1个小时:

FlinkKafkaProducer011.class

/**
* Default value for kafka transaction timeout.
*/
public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1);

所以如果在Semantic.EXACTLY_ONCE模式下,需要调大transaction.max.timeout.ms

Flink DataStream中的Kafka生产者写入EventTime

FlinkKafkaProducer011.class

/**
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
* Timestamps must be positive for Kafka to accept them.
*
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
    this.writeTimestampToKafka = writeTimestampToKafka;
}

public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
    checkErroneous();

    byte[] serializedKey = schema.serializeKey(next);
    byte[] serializedValue = schema.serializeValue(next);
    String targetTopic = schema.getTargetTopic(next);
    if (targetTopic == null) {
        targetTopic = defaultTopicId;
    }

    Long timestamp = null;
    if (this.writeTimestampToKafka) {
        // 获取时间戳
        timestamp = context.timestamp();
    }

    ProducerRecord<byte[], byte[]> record;
    int[] partitions = topicPartitionsMap.get(targetTopic);
    if (null == partitions) {
        partitions = getPartitionsByTopic(targetTopic, transaction.producer);
        topicPartitionsMap.put(targetTopic, partitions);
    }
    if (flinkKafkaPartitioner != null) {
        record = new ProducerRecord<>(
            targetTopic,
            flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
            timestamp,
            serializedKey,
            serializedValue);
    } else {
        record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
    }
    pendingRecords.incrementAndGet();
    transaction.producer.send(record, callback);
}

Flink Table中使用Kafka连接器

使用SQL的DDL方式创建

CREATE TABLE kafkaTable (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_behavior',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'csv',
    'scan.startup.mode' = 'earliest-offset'
)

参数说明:

Option Required Description
connector required 指定要连接的Kafka版本,可选项有: 'kafka', 'kafka-0.11', 'kafka-0.10'.
topic required 订阅的Topic
properties.bootstrap.servers required Kafka Brokers(逗号分隔)
properties.group.id required by source group
format required 用于反序列化和序列化Kafka消息的格式。可选项有 'csv', 'json', 'avro', 'debezium-json' and 'canal-json'. 更多格式请点击 Formats
scan.startup.mode optional 设置Kafka偏移量,默认为group-offsets,可选项有 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp'and 'specific-offsets'更多细节请点击Start Reading Position
scan.startup.specific-offsets optional 'specific-offsets'的模式下指定partition的offset eg:'partition:0,offset:42;partition:1,offset:300'.
scan.startup.timestamp-millis optional timestamp启动模式下,从指定的epoch时间戳(毫秒)开始。
sink.partitioner optional 指定 Kafka Sink时候的分区,可选项有round-robin,也可以继承FlinkKafkaPartitioner 并实现自己分区逻辑: e.g: 'org.mycompany.MyPartitioner'.

这里要提一句:

配置选项sink.partitioner可以在当Flink的分区到Kafka的分区时指定输出分区。默认情况下,Kafka Sink最多写入与其自身并行性相同的分区(每个接收器的并行实例只写入一个分区)。为了将写操作分发到更多的分区或控制行在分区中的路由,可以提供自定义接收器分区器。需要注意的是,该配置的可选项round-robin对于避免不平衡分区非常有用。但是,它将导致所有Flink实例和所有Kafka代理之间的大量网络连接。

使用Table Api方式创建

tableEnv.connect(new Kafka()
                 .version("0.11")
                 .topic("user_behavior")
                 .property("zookeeper.connect", "bigdata01:2181")
                 .property("bootstrap.servers", "bigdata01:9092")
                 .startFromLatest()
                )
.withFormat(new Json())
.withSchema(
    new Schema()
    .field("user_id", DataTypes.BIGINT())
    .field("item_id", DataTypes.BIGINT())
    .field("category_id", DataTypes.BIGINT())
    .field("behavior", DataTypes.BIGINT())
    .field("ts", DataTypes.DataTypes.TIMESTAMP(3))
).createTemporaryTable("kafkaTable")

End

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345