3.1 Broker Configs
基本配置如下:
1. broker.id
2. log.dirs
3. zookeeper.connect
下面将更详细地讨论主题级别的配置和默认设置。
名称 | 描述 | 类型 | 默认 | 重要性 |
---|---|---|---|---|
zookeeper.connect Zookeeper | 主机地址 | string | high | |
advertised.host.name | DEPRECATED:仅在未设置“advertised.listeners”或“listeners”时使用。用advertised.listeners 替换。 主机名发布到ZooKeeper供客户使用,会分发给所有的producer,consumer和其他broker来连接自己。。 在IaaS环境中,这可能需要与代理绑定的接口不同。 如果未设置,则将使用“host.name”的值(如果已配置)。 否则,它将使用从java.net.InetAddress.getCanonicalHostName()返回的值。 |
string | null | high |
advertised.listeners | 监听器发布到ZooKeeper供客户使用,如果与上面的监听器不同。 在IaaS环境中,这可能需要与代理绑定的接口不同。 如果没有设置,将使用listeners 的值。 |
string | null | high |
advertised.port | DEPRECATED:仅在未设置“advertised.listeners”或“listeners”时使用。 改用advertised.listeners 替代。 发布到ZooKeeper供客户端使用的端口。 在IaaS环境中,这可能需要与代理绑定的端口不同。 如果没有设置,它将使用broker绑定的相同端口。 |
int | null | high |
auto.create.topics.enable | 是否允许自动创建topic。如果设为true,那么produce,consume或者fetch metadata一个不存在的topic时,就会自动创建一个默认replication factor和partition number的topic。 | boolean | true | high |
auto.leader.rebalance.enable | 如果设为true,复制控制器会周期性的自动尝试,为所有的broker的每个partition平衡leadership,为更优先(preferred)的replica分配leadership。 | boolean | true | high |
background.threads | 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 | int | 10 | high |
broker.id | 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。如果未设置,则会生成唯一的代理标识。为避免zookeeper生成的代理标识与用户配置的代理标识之间的冲突,生成的代理标识从reserved.broker.max.id + 1开始。 | int | -1 | high |
compression.type | 为主题指定一个压缩类型,此配置接受标准压缩编码('gzip', 'snappy', lz4),另外接受'uncompressed‘相当于不压缩, 'producer' 意味着压缩类型由producer指定。 | string | producer | high |
delete.topic.enable | 启用删除主题。 如果此配置已关闭,则通过管理工具删除主题将不起作用。删除topic是影响注册在/admin/delete_topics的监听 | boolean | false | high |
host.name | DEPRECATED:仅在未设置“listeners”时使用。 使用listeners 来代替。 broker的主机名。 如果这个设置,它只会绑定到这个地址。 如果没有设置,它将绑定到所有interface。并将其中之一发送到ZK,但是发送到zk的不一定是正确的地址,导致消费端消费不到消息,所以这里必须要设置 |
String | "" | High |
leader.imbalance.check.interval.seconds | 分区rebalance检查的频率,由控制器触发 | long | 300 | high |
leader.imbalance.per.broker.percentage | 每个broker允许的不平衡的leader的百分比。如果每个broker超过了这个百分比,复制控制器会对分区进行重新的平衡。该值以百分比形式指定。 | int | 10 | high |
listeners | 监听器列表 - 逗号分隔的我们将监听的URI列表和监听器名称。 如果侦听器名称不是安全协议,则还必须设置listener.security.protocol.map。 指定主机名为0.0.0.0以绑定到所有接口。 保留主机名为空以绑定到默认接口。 合法侦听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093 | string | null | high |
log.dir | 保存日志数据的目录(对log.dirs属性的补充) | string | /tmp/kafka-logs | high |
log.dirs | 一个用逗号分隔的目录列表,可以有多个,用来为Kafka存储数据。每当需要为一个新的分区分配一个目录时,会选择当前的存储分区最少的目录来存储。如果没有配置,则使用log.dir配置的值。 | string | null | high |
log.flush.interval.messages | 在将消息刷新到磁盘之前,在日志分区上累积的消息数量。强制fsync一个分区的log文件之前暂存的消息数量。因为磁盘IO操作是一个慢操作,但又是一个“数据可靠性”的必要手段,所以检查是否需要固化到硬盘的时间间隔。需要在“数据可靠性”与“性能”之间做必要的权衡,如果此值过大,将会导致每次“fsync”的时间过长(IO阻塞),如果此值过小,将会导致”fsync“的次数较多,这也就意味着整体的client请求有一定的延迟,物理server故障,将会导致没有fsync的消息丢失。通常建议使用replication来确保持久性,而不是依靠单机上的fsync | long | 9223372036854775807 | high |
log.flush.interval.ms | 任何主题中的消息在刷新到磁盘之前都保留在内存中的最长时间(以毫秒为单位)。 如果未设置,则使用log.flush.scheduler.interval.ms中的值 | long | null | high |
log.flush.scheduler.interval.ms | 日志刷新器检查是否需要将任何日志刷新到磁盘的频率(以毫秒为单位)检查是否需要fsync的时间间隔 | long | 9223372036854775807 | high |
log.flush.offset.checkpoint.interval.ms | 记录上次把日志刷到磁盘的时间点的频率,用来日后的恢复。通常不需要改变。 | int | 60000 | high |
log.flush.start.offset.checkpoint.interval.ms | 我们更新记录起始偏移量的持续记录的频率 | int | 60000 | high |
log.retention.bytes | 日志达到删除大小的阈值。每个topic下每个分区保存数据的最大文件大小;注意,这是每个分区的上限,因此这个数值乘以分区的个数就是每个topic保存的数据总量。同时注意:如果log.retention.hours和log.retention.bytes都设置了,则超过了任何一个限制都会造成删除一个段文件。注意,这项设置可以由每个topic设置时进行覆盖。-1为不限制。 | long | -1 | high |
log.retention.hours | 每个日志文件删除之前保存的时间,单位小时。默认数据保存时间对所有topic都一样。log.retention.minutes 和 log.retention.bytes 都是用来设置删除日志文件的,如果达到任意一个条件的限制,都会进行删除。这个属性设置可以在topic基本设置时进行覆盖。 | int | 168 | high |
log.retention.minutes | 在删除日志文件之前保留日志的分钟数(以分钟为单位),次要log.retention.ms属性。 如果未设置,则使用log.retention.hours中的值 | int | null | high |
log.retention.ms | 保留日志文件的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes中的值 | long | null | high |
log.roll.hours | 这个设置会强制Kafka去新建一个新的log segment文件,即使当前使用的segment文件的大小还没有超过log.segment.bytes。此配置可以被覆盖 | int | 168 | high |
log.roll.jitter.hours | 从logRollTimeMillis减去的最大抖动(以小时为单位),次要log.roll.jitter.ms属性 | int | 0 | high |
log.roll.jitter.ms | 同上,如果没有设置则使用log.roll.jitter.hours | long | null | high |
log.roll.ms | 同log.roll.hours,单位ms | long | null | high |
log.segment.bytes | topic 分区的日志存放在某个目录下诸多文件中,这些文件将partition的日志切分成一段一段的,这就是段文件(segment file);一个topic的一个分区对应的所有segment文件称为log。这个设置控制着一个segment文件的最大的大小,如果超过了此大小,就会生成一个新的segment文件。此配置可以被覆盖。 int | 1073741824 | high | |
log.segment.delete.delay.ms | 在log文件被移出索引后(删除),log文件的保留时间。在这段时间内运行的任意正在进行的读操作完成操作,不用去打断它。通常不需要改变。 | long | 60000 | high |
message.max.bytes | kafka允许的最大的一个批次的消息大小。 如果这个数字增加,并且有0.10.2版本以下的消费者,那么消费者的提取大小也必须增加,以便他们可以取得这么大的记录批次。在最新的消息格式版本中,记录总是被组合到一个批次以提高效率。 在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于该情况下的单个记录。可以使用主题级别max.message.bytes来设置每个主题。 | int | 1000012 | high |
min.insync.replicas | 当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数(必须确认每一个repica的写数据都是成功的)。 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。当一起使用时,min.insync.replicas和acks允许您强制更大的耐久性保证。 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并且生产者使用“all”选项。 这将确保如果大多数副本没有写入生产者则抛出异常。 | int | 1 | high |
num.io.threads | server端处理请求时的I/O线程的数量,不要小于磁盘的数量。 | int | 8 | high |
num.network.threads | 服务器用于接收来自网络的请求并向网络发送响应的线程数 | int | 3 | high |
num.recovery.threads.per.data.dir | 每个数据目录的线程数,用于启动时的日志恢复和关闭时的刷新 | int | 1 | high |
num.replica.fetchers | 用来从leader复制消息的线程数量,增大这个值可以增加follow的I/O并行度。 | int | 1 | high |
offset.metadata.max.bytes | 允许client(消费者)保存它们元数据(offset)的最大的数据量。 | int | 4096 | high |
offsets.commit.required.acks | 在offset commit可以接受之前,需要设置acks的数目,一般不需要更改 | short | -1 | high |
offsets.commit.timeout.ms | offsets提交将被延迟,直到偏移量topic的所有副本都收到提交或达到此超时。 这与生产者请求超时类似。 | int | 5000 | high |
offsets.load.buffer.size | 每次从offset段文件往缓存加载offsets数据时的读取的数据大小。 | int | 5242880 | high |
offsets.retention.check.interval.ms | 检查失效offset的频率 | long | 600000 | high |
offsets.retention.minutes | 如果一个group在这个时间没没有提交offsets,则会清理这个group的offset数据 | int | 1440 | high |
offsets.topic.compression.codec | 用于offsets主题的压缩编解码器 - 压缩可用于实现“原子”提交 | int | 0 | high |
offsets.topic.num.partitions | Offsets topic的分区数量(部署后不应更改) | int | 50 | high |
offsets.topic.replication.factor | Offsets topic的复制因子(设置得更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求。 | short | 3 | high |
offsets.topic.segment.bytes | 为了便于更快的日志压缩和缓存加载,偏移量的主题段字节应保持相对较小 | int | 104857600 | high |
port | DEPRECATED:仅在未设置“listeners”时使用。 使用listeners 来代替。 这个端口来监听和接受连接 |
int | 9092 | high |
queued.max.requests | I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。这个参数是指定用于缓存网络请求的队列的最大容量,这个队列达到上限之后将不再接收新请求。一般不会成为瓶颈点,除非I/O性能太差,这时需要配合num.io.threads等配置一同进行调整。 int | 500 | high | |
quota.consumer.default | DEPRECATED:只有在动态默认配额没有配置或者为Zookeeper时才使用。 如果一个消费者每秒消费的数据量大于此值,则暂时不会再允许消费。0.9版本新加。 | long | 9223372036854775807 | high |
quota.producer.default | DEPRECATED:只有在动态默认配额没有配置或者为Zookeeper时才使用。如果一个生产者每秒产生的数据大于此值,则暂时会推迟接受生产者数据。 | long | 9223372036854775807 | high |
replica.fetch.min.bytes | 复制数据过程中,replica收到的每个fetch响应,期望的最小的字节数,如果没有收到足够的字节数,就会等待期望更多的数据,直到达到replica.fetch.wait.max.ms。 | int | 1 | high |
replica.fetch.wait.max.ms | Replicas follow同leader之间通信的最大等待时间,失败了会重试。 此值始终应始终小于replica.lag.time.max.ms,以防止针对低吞吐量主题频繁收缩ISR | int 500 | high | |
replica.lag.time.max.ms | 如果一个follower在这个时间内没有发送fetch请求,leader将从ISR重移除这个follower,并认为这个follower已经挂了 | long | 10000 | high |
replica.high.watermark.checkpoint.interval.ms | 每一个replica follower存储自己的high watermark到磁盘的频率,用来日后的recovery。 | long | 5000 | high |
replica.socket.receive.buffer.bytes | 复制数据过程中,follower发送网络请求给leader的socket receiver buffer的大小。 | int | 65536 | high |
replica.socket.timeout.ms | 复制数据过程中,replica发送给leader的网络请求的socket超时时间。它的值应该至少是replica.fetch.wait.max.ms | int | 30000 | high |
request.timeout.ms | 在向producer发送ack之前,broker允许等待的最大时间,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功) ,客户端将在必要时重新发送请求,或者如果重试耗尽,则请求失败。 | int | 30000 | high |
socket.receive.buffer.bytes | server端用来处理socket连接的SO_RCVBUFF缓冲大小。如果值为-1,则将使用操作系统默认值。 | int | 102400 | high |
socket.request.max.bytes | server能接受的请求的最大的大小,这是为了防止server跑光内存,不能大于Java堆的大小。 | int | 104857600 | high |
socket.send.buffer.bytes | server端用来处理socket连接的SO_SNDBUFF缓冲大小。如果值为-1,则将使用操作系统默认值。 | int | 102400 | high |
transaction.max.timeout.ms | 事务的最大允许超时时间。 如果客户请求的事务时间超过这个时间,那么broker将在InitProducerIdRequest中返回一个错误。 这样可以防止客户超时时间过长,从而阻碍消费者读取事务中包含的主题。 | int | 900000 | high |
transaction.state.log.load.buffer.size | 将生产者ID和事务加载到缓存中时,从事务日志段(the transaction log segments)读取的批量大小。 | int | 5242880 | high |
transaction.state.log.min.isr | 覆盖事务主题的min.insync.replicas配置。 | int | 2 | high |
transaction.state.log.num.partitions | 事务主题的分区数量(部署后不应更改)。 | int | 50 | high |
transaction.state.log.replication.factor | 事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求。 | short | 3 | high |
transaction.state.log.segment.bytes | 事务主题段字节应保持相对较小,以便于更快的日志压缩和缓存负载 | int | 104857600 | high |
transactional.id.expiration.ms | 事务协调器在未收到任何事务状态更新之前,主动设置生产者的事务标识为过期之前将等待的最长时间(以毫秒为单位)。 | int | 604800000 | high |
unclean.leader.election.enable | 指明了是否能够使不在ISR中replicas follower设置用来作为leader | boolean | false | high |
zookeeper.connection.timeout.ms | 连接到ZK server的超时时间,没有配置就使用zookeeper.session.timeout.ms | int | null | high |
zookeeper.session.timeout.ms | ZooKeeper的session的超时时间,如果在这段时间内没有收到ZK的心跳,则会被认为该Kafka server挂掉了。如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。 | int | 6000 | high |
zookeeper.set.acl | 连接zookeeper是否使用 ACLs安全验证 | boolean | false | high |
broker.id.generation.enable | 服务器是否允许自动生成broker.id;如果允许则产生的值会交由reserved.broker.max.id审核 | boolean | true | medium |
broker.rack | broker的机架位置。 这将在机架感知复制分配中用于容错。 例如:RACK1 ,us-east-1d
|
string | null | medium |
connections.max.idle.ms | 空闲连接超时:服务器套接字处理器线程关闭闲置超过此的连接 | long | 600000 | medium |
controlled.shutdown.enable | 是否允许控制器关闭broker ,若是设置为true,会关闭在这个broker上所有分区的leader,并转移到其他broker,这会降低在关闭期间不可用的时间。 | boolean | true | medium |
controlled.shutdown.max.retries | 控制器在关闭时可能有多种原因导致失败,可以重新关闭的次数。 | int | 3 | medium |
controlled.shutdown.retry.backoff.ms | 在每次重新关闭前,系统需要一定的时间去恢复发生错误之前的状态,这个就是在重试期间的回退(backoff)时间。 | long | 5000 | medium |
controller.socket.timeout.ms | 控制器到broker通道的socket超时时间 | int | 30000 | medium |
default.replication.factor | 自动创建topic时的默认副本的个数 | int | 1 | medium |
delete.records.purgatory.purge.interval.requests | 详见注解 | int | 1 | medium |
fetch.purgatory.purge.interval.requests | 提取清除请求的清除间隔(请求数)详见注解 | int | 1000 | medium |
producer.purgatory.purge.interval.requests | The purge interval (in number of requests) of the producer request purgatory详见注解 | int | 1000 | medium |
group.initial.rebalance.delay.ms | 在执行第一次再平衡之前,group协调员将等待更多消费者加入group的时间。 延迟时间越长意味着重新平衡的可能性越小,但是等待处理开始的时间增加。 | int | 3000 | medium |
group.max.session.timeout.ms | 消费者向组内注册时允许的最大超时时间,超过这个时间表示注册失败。更长的超时使消费者有更多的时间来处理心跳之间的消息,代价是检测失败的时间更长。 | int | 300000 | medium |
group.min.session.timeout.ms | 消费者向组内注册时允许的最小超时时间,更短的超时以更频繁的消费者心跳为代价但有更快速的故障检测,这可能影响broker资源。 | int | 6000 | medium |
inter.broker.listener.name | 用于经纪人之间沟通的监听者名称。如果未设置,则侦听器名称由security.inter.broker.protocol定义。 同时设置此和security.inter.broker.protocol属性是错误的。 | string | null | medium |
inter.broker.protocol.version | 指定将使用哪个版本的 inter-broker 协议。 在所有经纪人升级到新版本之后,这通常会受到冲击。升级时要设置 | string | 0.11.0-IV2 | medium |
log.cleaner.backoff.ms | 检查log是否需要clean的时间间隔。 | long | 15000 | medium |
log.cleaner.dedupe.buffer.size | 日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好。 | long | 134217728 | medium |
log.cleaner.delete.retention.ms | 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。 | long | 86400000 | medium |
log.cleaner.enable | 启用日志清理器进程在服务器上运行。使用了cleanup.policy = compact的主题,包括内部offsets主题,都应该启动该选项。如果被禁用的话,这些话题将不会被压缩,并且会不断增长。 | boolean | true | medium |
log.cleaner.io.buffer.load.factor | 日志清理中hash表的扩大因子,一般不需要修改。 | double | 0.9 | medium |
log.cleaner.io.buffer.size | 日志清理时候用到的I/O块(chunk)大小,一般不需要修改。 | int | 524288 | medium |
log.cleaner.io.max.bytes.per.second | 在执行log compaction的过程中,限制了cleaner每秒钟I/O的数据量,以免cleaner影响正在执行的请求。 | double | 1. | medium |
log.cleaner.min.cleanable.ratio | 控制了log compactor进行clean操作的频率。默认情况下,当log的50%以上已被clean时,就不用继续clean了。此配置可以被覆盖。 | double | 0.5 | medium |
log.cleaner.min.compaction.lag.ms | 消息在日志中保持未压缩的最短时间。 仅适用于正在压缩的日志。 | long | 0 | medium |
log.cleaner.threads | 用于日志清理的后台线程的数量 | int | 1 | medium |
log.cleanup.policy | 此配置可以设置成delete或compact。如果设置为delete,当log segment文件的大小达到上限,或者roll时间达到上限,文件将会被删除。如果设置成compact,则此文件会被清理,标记成已过时状态,详见 4.8 。此配置可以被覆盖。 | list | delete | medium |
log.index.interval.bytes | 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更耗内存,一般情况下不需要改变这个参数。 | int | 4096 | medium |
log.index.size.max.bytes | 每个log segment的最大尺寸。注意,如果log尺寸达到这个数值,即使尺寸没有超过log.segment.bytes限制,也需要产生新的log segment。 | int | 10485760 | medium |
log.message.format.version | 指定broker将用于将消息添加到日志文件的消息格式版本。 该值应该是有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0。 通过设置特定的消息格式版本,用户保证磁盘上的所有现有消息都小于或等于指定的版本。 不正确地设置这个值将导致使用旧版本的用户出错,因为他们将接收到他们不理解的格式的消息。 | string | 0.11.0-IV2 | medium |
log.message.timestamp.difference.max.ms | broker收到消息时的时间戳和消息中指定的时间戳之间允许的最大差异。 如果log.message.timestamp.type = CreateTime,如果时间戳的差值超过此阈值,则会拒绝接受这条消息。 如果log.message.timestamp.type = LogAppendTime,则忽略此配置。允许的最大时间戳差异不应大于log.retention.ms,以避免不必要地频繁进行日志滚动。 | long | 9223372036854775807 | medium |
log.message.timestamp.type | 定义消息中的时间戳是消息创建时间还是日志追加时间。 该值应该是“CreateTime”或“LogAppendTime” | string | CreateTime | medium |
log.preallocate | 是否预创建新的段文件,windows推荐使用 | boolean | false | medium |
log.retention.check.interval.ms | 检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求。 | long | 300000 | medium |
max.connections.per.ip | broker上每个IP允许最大的连接数 | int | 2147483647 | medium |
max.connections.per.ip.overrides | 每个ip或者hostname默认的连接的最大覆盖 | String | "" | medium |
num.partitions | 新建Topic时默认的分区数 | int | 1 | medium |
principal.builder.class | The fully qualified name of a class that implements the PrincipalBuilder interface, which is currently used to build the Principal for connections with the SSL SecurityProtocol. | class | org.apache. kafka.common. security.auth. DefaultPrincipalBuilder |
medium |
replica.fetch.backoff.ms | 复制数据时失败等待时间 | int 1000 | medium | |
replica.fetch.max.bytes | 为每个分区设置获取的消息的字节数。 这不是绝对最大值,如果第一个非空分区中的第一个record batch大于此值,那么record batch仍将被返回以确保可以进行。 代理接受的最大记录批量大小通过message.max.bytes(broker config)或max.message.bytes(topic config)进行定义。 | int | 1048576 | medium |