升级原因
- 根据业务需求,需要依赖kafka-0.10。
<dependencies>
<!-- 官方kafka java 客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!-- 消息云客户端扩展 -->
<dependency>
<groupId>com.xx.xx.xxxx</groupId>
<artifactId>kafka-client-ext</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
- 客户端升级后,新版sdk访问较旧版的kafka, 发送kafka不支持的request。当前用的kafka版本为0.9.0.1, 支持的request最大id为16, 这个18是新版 kafka中的ApiVersion Request, 因此会抛这个异常出来。
[2018-07-06 17:59:51,835] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:724)
- 事实上kafka服务端版本可以大于客户端版本(服务端kafka-0.10,客户端还是0.9的kafka版本),但会有个别的配置可能客户端不支持,比如sasl.jaas.conf这种。
升级步骤
- 下载
kafka_2.10-0.10.0.1.tgz
下载链接 - 解压
tar -xzf kafka_2.10-0.10.0.1.tgz
- 修改配置,参考0.9的配置文件进行修改
- kafka_2.10-0.10.0.1/config/zookeeper.properties
dataDir=/export/data
- kafka_2.10-0.10.0.1/config/server.properties
broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://<ip>:9092 num.network.threads=6 num.io.threads=16 log.dirs=/export/kafka-logs log.flush.interval.messages=10000 log.flush.interval.ms=3000 zookeeper.connect=<ip>:<port> delete.topic.enable=true inter.broker.protocol.version=0.10.0.1 log.message.format.version=0.10.0.1
其中inter.broker.protocol.version必须配置,否则重启服务后,还是原来的0.9版本
在0.9的配置中有以下几项,可以不进行配置
host.name 如果设置了它,会仅绑定这个地址。如果没有设置,则会绑定所有的网络接口,并提交一个给ZK。不推荐使用 只有当listeners没有设置时才有必要使用。
port server用来接受client连接的端口。不推荐使用,使用listeners配置项代替;只有在listeners没有配置时才使用。
advertised.host.name 会将hostname通知给生产者和消费者,在多网卡时需要设置该值为另一个ip地址。如果没有设置该值,则返回 配置项host.name设置的值,如果host.name没有设置则返回java.net.InetAddress.getCanonicalHostName()不推荐使用 只有当advertised.listeners或listeners没有设置时才有必要使用。
advertised.port 分发这个端口给所有的producer,consumer和其他broker来建立连接。如果此端口跟server绑定的端口不同,则才有必要设置。不推荐使用 只有当advertised.listeners或listeners没有设置时才有必要使用。
具体配置项含义,参见链接
- 停止kafka-0.9的服务
/bin/kafka-server-stop.sh
- 开启kafka-0.10的服务
/bin/kafka-server-start.sh -daemon config/server.properties
命令中-daemon,防止关闭远程连接(Xshell)时停止kafka服务
检查
- 查一下当前运行时是否是期望的版本
ps -ef | grep java | grep kafka
- 若不是正确的版本重新运行上述第4、5步
- 出现无法停止服务的情况,那就
kill -9 PID
吧 - kill之前可以先
cd /proc/PID
确认是否是想要kill的进程 - 关闭Xshell,检查kafka是否正常运行
更多信息,请移步个人blog