通过在FlinkKafkaConsumer设置maxRecordsPerPoll 来动态调整消费速率
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Properties;
public class DynamicKafkaConsumer<T> implements SourceFunction<T> {
private final FlinkKafkaConsumer<T> kafkaConsumer;
private final int maxRecordsPerPoll;
private volatile boolean running;
public DynamicKafkaConsumer(String topic, KafkaDeserializationSchema<T> schema, Properties props, int maxRecordsPerPoll) {
this.kafkaConsumer = new FlinkKafkaConsumer<>(topic, schema, props);
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.running = true;
}
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
kafkaConsumer.setStartFromLatest();
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
kafkaConsumer.setProperties(props);
kafkaConsumer.setDeserializationSchema(schema);
while (running) {
kafkaConsumer.setPollTimeout(1000);
kafkaConsumer.setMaxRecordsPerPoll(maxRecordsPerPoll);
kafkaConsumer.poll(new KafkaConsumerRecordsHandler(sourceContext));
}
}
@Override
public void cancel() {
running = false;
kafkaConsumer.close();
}
private static class KafkaConsumerRecordsHandler<T> implements KafkaConsumer.CallBridge {
private final SourceContext<T> sourceContext;
private KafkaConsumerRecordsHandler(SourceContext<T> sourceContext) {
this.sourceContext = sourceContext;
}
@Override
public void onCompletion(OffsetsHandler offsetsHandler) {
// This method is called when the KafkaConsumer has finished processing a batch of records.
// In this example, we don't do anything with the offsets, but this is where you could commit them to Kafka.
}
@Override
public void onException(Throwable throwable) {
// This method is called when the KafkaConsumer encounters an exception while polling for records.
// In this example, we just print the exception, but you could also take some other action.
throwable.printStackTrace();
}
@Override
public void onRecords(ConsumerRecords<byte[], byte[]> records, OffsetsHandler offsetsHandler) {
for (ConsumerRecord<byte[], byte[]> record : records) {
// Deserialize the record and emit it to the Flink job.
T value = deserialize(record.value());
sourceContext.collect(value);
}
}
private T deserialize(byte[] bytes) {
// Deserialize the byte[] to your type T
// This method should be implemented by the user according to their schema
return null;
}
}
}
在此代码中,maxRecordsPerPoll 变量确定每次轮询获取的记录数。 通过动态调整该值,可以控制记录的消耗率。 请注意,pollTimeout 设置为 1000 毫秒,这意味着即使未达到 maxRecordsPerPoll,轮询也会在 1000 毫秒后返回。 这确保消费者不会被阻塞太久,并且可以根据需求调整其消费率。
要使用此消费者,创建 DynamicKafkaConsumer 类的实例,传入 Kafka 主题、反序列化架构、Kafka 属性以及每次轮询要获取的最大记录数。