上面将数据从kafka搬运到了mysql中,而很多时候,在处理之后也可以继续放到kafka中,供下游消费。
FlinkKafkaProducer
Flink提供了kafka的对应sink,FlinkKafkaProducer010,下面看看对应实现。
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
private static final long serialVersionUID = 1L;
private boolean writeTimestampToKafka;
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)customPartitioner);
}
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
this(topicId, (KeyedSerializationSchema)serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, (KeyedSerializationSchema)serializationSchema, producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
this.writeTimestampToKafka = false;
}
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.writeTimestampToKafka = writeTimestampToKafka;
}
......
}
基本上都是构造函数,而追到上层,可以看到最终还是实现了RichSinkFunction:
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {}
From Kafka To Kafka
下面的示例从kafka中获取到数据,然后写入到另一个kakfa topic中,当然中间可以做其他处理,这里为了求简单,省掉这一步,代码如下:
import java.util.Properties;
public class KafkaToKakfaJob {
public static void main(String[] args) throws Exception {
// 以kafka作为datasource
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 以event time 为准
// event time: 数据在源头的发生时间,跟flink无关,数据产生时就已经确定过了
// processing time : 数据在flink中开始被处理的时间,跟flink有关
// ingestion time : 数据到达flink集群中的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 构造kafka 及 zk的链接服务器时间
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "metric-group");
properties.put("auto.offset.reset", "latest");// 始终消费最新的数据
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
SingleOutputStreamOperator<String> dataStreamSource =
env.addSource(new FlinkKafkaConsumer010<String>(
"testjin", // topic
new SimpleStringSchema(),
properties
)).setParallelism(1)
// .map(string -> JSON.parseObject(string,UrlInfo.class))
;
// 写入kafka,可以通过kafka-topic.sh查看是否新增了一个 dest_testjin的topic
dataStreamSource.addSink(new FlinkKafkaProducer010<String>(
"localhost:9092",
"dest_testjin",//dest topic,会重新创建一个
new SimpleStringSchema()
)).setParallelism(1).name("add to kafka dest topic");
env.execute("execute from kafka to kafka");
可以看到,这里直接addSink(new FlinkKafkaProducer010),构造函数中传入kafka的broker、topic,以及序列化方式(继续采用最简单的SimpleStringSchema),执行后可以验证,在对应的kafka broker中可以看到新增加的topic dest_testjin.