前言
Flink提供了一个Apache Kafka连接器,我们可以很方便的实现从Kafka主题读取数据和向其写入数据。
Flink附带了提供了多个Kafka连接器:universal
通用版本,0.10
,0.11
官方文档解释说universal
(通用版本)的连接器,会尝试跟踪Kafka最新版本,兼容0.10
或者之后的Kafka版本,官方文档也说对于绝大多数情况使用这个即可。在最新的官方文档上有这个通用版本连接器的迁移介绍:
Migrating Kafka Connector from 0.11 to universal
In order to perform the migration, see the upgrading jobs and Flink versions guide and:
- Use Flink 1.9 or newer for the whole process.
- Do not upgrade Flink and user operators at the same time.
- Make sure that Kafka Consumer and/or Kafka Producer used in your job have assigned unique identifiers (
uid
): - Use stop with savepoint feature to take the savepoint (for example by using
stop --withSavepoint
)CLI command.
But:如果你使用的Kafka版本是0.11.x
或者0.10.x
,官方建议使用专用版本的Kafka连接器。
Flink DataStream中的Kafka消费者
Kafka的消费者用来消费Kafka中Topic的数据,在Flink中FlinkKafkaConsumer提供了订阅一个Topic或者多个Topic,一下是FlinkKafkaConsumer011部分构造器源码:
public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
// ...省略
// 订阅单个Topic
public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
// 订阅多个Topic
public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
// ...省略
}
可以看到,在连接器的构造器中,主要有3个参数,分别是:
-
String topic/List<String> topics
: 订阅的Topic或者Topic的列表 -
DeserializationSchema<T> valueDeserializer
(注意泛型):指定kafka反序列化的Schema -
Properties props
: 消费者的配置参数,其中必须的参数有:bootstrap.servers
和group.id
举个栗子:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "bigdata01:9092")
properties.setProperty("group.id", "flink-consumer-kafka-test")
stream = env.addSource(new FlinkKafkaConsumer[String]("flink-consumer-kafka-topic", new SimpleStringSchema(), properties))
在上面的例子中,要注意泛型的使用,从构造器源码上可以看出在类上边声明了泛型T,所以在创建Kafka消费者连接器的时候声明了FlinkKafkaConsumer的泛型为String,同时在构造器中传入SimpleStringSchema这个类的实例,为什么要这么做呢?大家结合在上面给出的构造器来看一下:
构造器中的第二个参数:DeserializationSchema<T> valueDeserializer
中的泛型T会变成在创建构造器传入的String,然后大家继续看一下SimpleStringSchema的类源码:
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
// ...省略
}
是不是瞬间明白了?原因就是SimpleStringSchema继承了DeserializationSchema<String>
DeserializationSchema的实现类Flink已经帮我们做了很多,常用的除了我们上面列出的SimpleStringSchema,还有关于TypeInformationSerializationSchema、JsonDeserializationSchema、AvroDeserializationSchema、CsvRowDeserializationSchema、CanalJsonDeserializationSchema的反序列化schema等等。
需要注意的是使用Avro的反序列化需要引入对应的依赖。
Flink DataStream中的Kafka消费者偏移量配置
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromSpecificOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
-
setStartFromGroupOffsets()
默认行为,默认读取上次保存的偏移量信息,如果是应用第一次启动,读取不到上次的偏移量信息,则会根据这个参数auto.偏移量.reset的值来进行消费数据
-
setStartFromEarliest()/setStartFromLatest()
从最早/最新的数据开始进行消费,忽略存储的偏移量信息
-
setStartFromTimestamp(long startupOffsetsTimestamp)
从topic中指定的时间点开始消费,指定时间戳(以毫秒为单位)之前的数据忽略
-
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
从指定位置进行消费,需要注意的是如果Topic没有消费者指定的分区偏移量时,此配置会退化到
setStartFromGroupOffsets()
。eg:val offsets = new util.HashMap[KafkaTopicPartition, lang.Long]() offsets.put(new KafkaTopicPartition(topic, 0), 0L); offsets.put(new KafkaTopicPartition(topic, 1), 0L); myConsumer.setStartFromSpecificOffsets(offsets)
需要注意:当作业从故障中自动恢复或使用保存点手动恢复时,这些启动位置配置方法不会影响启动位置。在恢复时,每个Kafka分区的起始位置由存储在保存点或checkpoint中的偏移量决定
Flink DataStream中的Kafka消费者与容错
启用了Flink的checkpoint后,Flink Kafka消费者在消费来自topic的记录的同时,并定期检查其所有Kafka偏移量以及其他操作的状态。在作业失败的情况下,Flink将把流程序恢复到最新的checkpoint的状态,并从checkpoint中存储的偏移量开始,重新消费来自Kafka的记录。
如果禁用checkpoint,Kafka消费者将定期向Zookeeper提交偏移量。
Flink DataStream中Kafka消费者Topic和Partition Discovery
-
Partition Discovery
在Flink Kafka中分区发现默认是禁用的,如需要可以配置
flink.partition-discovery.interval-millis
表示发现间隔(以毫秒为单位)。 -
Topic Discovery
支持通过正则表达式来实现Topic发现
val env = StreamExecutionEnvironment.getExecutionEnvironment() val properties = new Properties() properties.setProperty("bootstrap.servers", "bigdata01:9092") properties.setProperty("group.id", "flink-consumer-kafka") val myConsumer = new FlinkKafkaConsumer[String]( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema, properties) val stream = env.addSource(myConsumer)
上面的例子仅仅是在启动时订阅了 0-9 Topic,如果程序动态的发现新创建的Topic,需要配置
flink.partition-discovery.interval-millis
表示发现间隔(以毫秒为单位)
Flink DataStream中Kafka消费者偏移量提交行为配置
Flink Kafka消费者允许配置如何将偏移量提交回Kafka Brokers的行为。
注意,Flink Kafka消费者并不依赖于提交的偏移量来保证容错。提交的偏移量只是为了监视目的而暴露消费者进度进度的一种手段。
Flink Kafka消费者的提交行为取决于在程序中是否配置了checkpoint
-
Checkpointing disabled
如果禁用了ck,Flink Kafka消费者将自动的周期性的向Kafka提交偏移量,要禁用offse只需在提供的属性配置中将
enable.auto.commit / auto.commit.interval.ms
设置为适当的值。 -
Checkpointing enabled
如果启动了ck,Flink Kafka消费者会在ck完成时,将偏移量保存在ck的状态中,这样做保证了Kafka Borkers中提交的偏移量与检查点状态中的偏移量一致。用户可以通过调用消费者的
setCommitOffsetsOnCheckpoints(boolean)
方法来选择禁用或启用偏移量提交(默认情况下,该行为为true)。注意,在这个场景消费者配置中的自动提交会被忽略。
Flink DataStream中的Kafka生产者
同上面的消费者一样,在Flink中,Kafka 生产者也提供了universal
(通用版本)和0.10
,0.11
等版本
支持将数据写入一个或者多个Topic中,下面给出部分源码
public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {
// 内部的一个语义枚举类
public enum Semantic {
// 精确一次,Flink生产者将在Kafka的事务中写入所有消息,这些消息将提交给检查点上的Kafka。
EXACTLY_ONCE,
// 至少一次,消息不会丢失,但可能会重复 (默认)
AT_LEAST_ONCE,
// 没有语义,意味着消息可能会对视或者重复
NONE;
private Semantic() {}
}
// ...省略
public FlinkKafkaProducer(
String topicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Semantic semantic) {
this(topicId,
serializationSchema,
producerConfig,
Optional.of(new FlinkFixedPartitioner<IN>()),
semantic,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}
// ...省略
}
由上面代码可知,创建一个Kafka生产者需要必须的4个参数
-
String topicId
生产者发送的主题
-
KeyedSerializationSchema<IN> serializationSchema
序列化schema,使用KafkaSerializationSchema可以配置header,定义每一条数据的key或者指定数据的自定义分区
-
Properties producerConfig
生产者配置信息,其中必须的参数有:
bootstrap.servers
-
Semantic semantic
容错语义,源码体现在一个内部枚举类,默认是至少一次
需要注意的时,在构造器中还有一个参数指定池中kafkaproducer的数量,默认是5个
/**
* Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}.
*/
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
Semantic.EXACTLY_ONCE
语义为每个 FlinkKafkaProducer011 实例使用固定大小的KafkaProducers池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小, FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。
Flink DataStream中的Kafka生产者的容错
因为在启用了Flink的检查点后,FlinkKafkaProducer
和FlinkKafkaProducer011
通过二阶段提交(2PC,以后专门开一个帖子总结)可以提供精确一次的保证,以下内容围绕0.11
版本来讲。
除了使用Flink的检查点机制外,我们也可以在创建Kafka生产者的时候使用内部的容错语义(默认是至少一次)
这里需要注意的一个问题是Semantic.EXACTLY_ONCE
模式依赖于提交在接受检查点之前启动的事务,以及在从上述检查点恢复之后启动的事务。如果Flink应用程序崩溃和完成重启之间的时间大于Kafka的事务超时(transaction.timeout.ms
),那么就会出现数据丢失(Kafka会自动中止超过超时的事务)。所以在配置时需要考虑事务超时不要小于崩溃和完成重启之间的时间。
这里列出源码中对Kafka事务超时的配置信息:
ProducerConfig.class
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." +
"If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.";
如何在代码中使用呢?举个栗子:
// 二阶段提交事务 保证端到端一致性
val outprop: Properties = new Properties()
outprop.setProperty("bootstrap.servers", "hadoop01:9092")
//设置事务超时时间,这里设置了15分钟的超时时间
outprop.setProperty("transaction.timeout.ms", 60000 * 15 + "")
val kafkaProducer = new FlinkKafkaProducer011[String](
"outputTopic",
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
outprop,Semantic.EXACTLY_ONCE)
dataStream.addSink(kafkaProducer)
这里要说明的事,在Kafka Brokers中默认的transaction.max.timeout.ms
是15分钟,此属性不允许小于生产者设置的事务超时。FlinkKafkaProducer011
默认的 transaction.timeout.ms
为1个小时:
FlinkKafkaProducer011.class
/**
* Default value for kafka transaction timeout.
*/
public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1);
所以如果在Semantic.EXACTLY_ONCE
模式下,需要调大transaction.max.timeout.ms
Flink DataStream中的Kafka生产者写入EventTime
FlinkKafkaProducer011.class
/**
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
* Timestamps must be positive for Kafka to accept them.
*
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.writeTimestampToKafka = writeTimestampToKafka;
}
public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
byte[] serializedValue = schema.serializeValue(next);
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
// 获取时间戳
timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}
Flink Table中使用Kafka连接器
使用SQL的DDL方式创建
CREATE TABLE kafkaTable (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
参数说明:
Option | Required | Description |
---|---|---|
connector | required | 指定要连接的Kafka版本,可选项有: 'kafka' , 'kafka-0.11' , 'kafka-0.10' . |
topic | required | 订阅的Topic |
properties.bootstrap.servers | required | Kafka Brokers(逗号分隔) |
properties.group.id | required by source | group |
format | required | 用于反序列化和序列化Kafka消息的格式。可选项有 'csv' , 'json' , 'avro' , 'debezium-json' and 'canal-json' . 更多格式请点击 Formats
|
scan.startup.mode | optional | 设置Kafka偏移量,默认为group-offsets ,可选项有 'earliest-offset' , 'latest-offset' , 'group-offsets' , 'timestamp' and 'specific-offsets' 更多细节请点击Start Reading Position
|
scan.startup.specific-offsets | optional | 在 'specific-offsets' 的模式下指定partition的offset eg:'partition:0,offset:42;partition:1,offset:300' . |
scan.startup.timestamp-millis | optional | 在timestamp 启动模式下,从指定的epoch时间戳(毫秒)开始。 |
sink.partitioner | optional | 指定 Kafka Sink时候的分区,可选项有round-robin ,也可以继承FlinkKafkaPartitioner 并实现自己分区逻辑: e.g: 'org.mycompany.MyPartitioner' . |
这里要提一句:
配置选项sink.partitioner
可以在当Flink的分区到Kafka的分区时指定输出分区。默认情况下,Kafka Sink最多写入与其自身并行性相同的分区(每个接收器的并行实例只写入一个分区)。为了将写操作分发到更多的分区或控制行在分区中的路由,可以提供自定义接收器分区器。需要注意的是,该配置的可选项round-robin
对于避免不平衡分区非常有用。但是,它将导致所有Flink实例和所有Kafka代理之间的大量网络连接。
使用Table Api方式创建
tableEnv.connect(new Kafka()
.version("0.11")
.topic("user_behavior")
.property("zookeeper.connect", "bigdata01:2181")
.property("bootstrap.servers", "bigdata01:9092")
.startFromLatest()
)
.withFormat(new Json())
.withSchema(
new Schema()
.field("user_id", DataTypes.BIGINT())
.field("item_id", DataTypes.BIGINT())
.field("category_id", DataTypes.BIGINT())
.field("behavior", DataTypes.BIGINT())
.field("ts", DataTypes.DataTypes.TIMESTAMP(3))
).createTemporaryTable("kafkaTable")