Flink-kafka-connector
- 读写kafka
Kafka中的partition机制和Flink的并行度机制结合 - 实现数据恢复
Kafka可以作为Flink的source和sink 任务失败,通过设置kafka的offset来恢复应用
配置
- kafka启动服务
nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
nohup kafka-server-start /usr/local/etc/kafka/server.properties &
Flink消费Kafka注意事项
- setStartFromGroupOffsets()【默认消费策略】
默认读取上次保存的offset信息 如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据
setStartFromEarliest() 从最早的数据开始进行消费,忽略存储的offset信息
setStartFromLatest() 从最新的数据进行消费,忽略存储的offset信息
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>) 从指定位置进行消费
当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。
为了能够使用支持容错的kafka Consumer,需要开启checkpoint env.enableCheckpointing(5000); // 每5s checkpoint一次。
SerializationSchema&&DeserializationSchema
如何将从 kafka 中获取的字节流转换为 Java Object,则通过 DeserializationSchema 来实现转换。其中 SimpleStringSchema 将 kafka 获取的字节流转换为字符串。
其中 KeyedDeserializationSchema 支持 Key, Value 反序列化。
示例
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.12</artifactId>
<version>1.8.0</version>
</dependency>
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
public class KafkaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
Properties properties = new Properties();// kafka&&zk 配置参数
properties.setProperty("bootstrap.servers", "localhost:9092");
//new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09("test",new SimpleStringSchema(),properties);
/*
//event-timestamp事件的发生时间
producer.setWriteTimestampToKafka(true);
*/
text.addSink(producer);
env.execute();
}
public static class MyNoParalleSource implements SourceFunction<String> {//1
//private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 启动一个source
* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(isRunning){
//图书的排行榜
List<String> books = new ArrayList<>();
books.add("Pyhton从入门到放弃");//10
books.add("Java从入门到放弃");//8
books.add("Php从入门到放弃");//5
books.add("C++从入门到放弃");//3
books.add("Scala从入门到放弃");//0-4
int i = new Random().nextInt(5);
// 此处类同于storm的emit操作
ctx.collect(books.get(i));
//每2秒产生一条数据
Thread.sleep(2000);
}
}
//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
}
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning