kafka在配置不合理的情况,会丢失消息,当业务的需求不允许消息丢失的场景,就需要配置kafka的一些参数来保证消息不丢失,但是鱼和熊掌不可兼得,所以如果保证消息不丢失,就要牺牲TPS(吞吐量)
kafka消息丢失分为两种,一是Producer丢失消息,二是Consumer端丢失消息
一、Producer端配置
1、acks设置为-1
设置为0表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)
设置为1意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
设置为-1意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
2、retries设置大一些
设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
3、max.in.flight.requests.per.connection设置为1
kafka可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认是5 (修改)
这个参数其实为了避免消息乱序,如果你的场景不需求消息顺序,可以不设置此值,设置此值为1表示kafka broker在响应请求之前client不能再向同一个broker发送请求
4、使用KafkaProducer.send(record, callback)方法而不是send(record)方法
自定义回调逻辑处理消息发送失败
>producer.send(new ProducerRecord<String, String>(topic, key, msg),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
//处理消息发送失败的业务逻辑
}
});
5、min.insync.replicas > 1
消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用
6、replication.factor > min.insync.replicas
如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可
二、Consumer端
消费者端防止消息丢失相对简单,在消息处理完成前就提交了offset这样就会有可能造成消息丢失,所以要在消息处理完成后在提交offset
首先要关闭enable.auto.commit=false,此参数默认是true,所以需要改成false,然后需要在你消费消息后在提交offset