快速使用docker部署kafka
1.安装Docker及docker-compose
可参考我的另一篇文章 如何快速上手Docker
2.镜像准备
sudo docker pull wurstmeister/zookeeper
sudo docker pull wurstmeister/kafka
sudo docker pull sheepkiller/kafka-manager # 这个不需要可以去掉,主要可用于图形管理页面
3.编写docker-compose文件
由于主要是用于理解kafka的命令及python中如何应用,所以这里暂时没有集群部署的方式
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
volumes:
- ./kafka_data/data:/data
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
# 设置多brokers时,可不指定宿主机端口,会自动分配,但有报错,具体原因未知
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.101.128
# 单个消息的最大限制,目前大约是2M
KAFKA_MESSAGE_MAX_BYTES: 2000000
# 初始化创建的topic,可以不设置
# KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ./kafka_data/kafka-logs:/kafka
- /var/run/docker.sock:/var/run/docker.sock
# 如果不需要可以把这里注释掉
kafka-manager:
image: sheepkiller/kafka-manager
ports:
- 9020:9000
environment:
ZK_HOSTS: zookeeper:2181
4.启动kafka
sudo docker-compose up -d
5.创建topic
- 查看容器列表,找到kafka的容器ID或者名称
sudo docker ps -a
如果不加-a,只能看到启动状态的所有容器信息
- 执行下面命令,进入容器内部
sudo docker exec -it b6758761234a /bin/bash
- 在容器内部执行下面命令
这样即可创建一个名为testtopic的topic,分区数为2,副本数为1
bash-5.1# kafka-topics.sh --create --topic testtopic --partitions 2 --zookeeper zookeeper:2181 --replication-factor 1
6.producer向topic发送消息
producer发送消息分为无key型消息和有key型消息,默认是无key型
- 发送无key型消息
bash-5.1# kafka-console-producer.sh --broker-list 192.168.101.128:9092 --topic testtopic
>hello
- 发送有key型消息
下面key为hello,value为world,key与value之间用tab间隔
bash-5.1# kafka-console-producer.sh --broker-list 192.168.101.128:9092 --topic testtopic --property parse.key=true
>hello world
7.consumer从topic读取消息
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.101.128:9092 --topic testtopic --property print.key=true --from-beginning
null hello
null aaa:122
null {"wqwq":123}
hello world
--from-beginning 表示从存在的最早消息开始读取
--property print.key=true 表示输出key和value,如果没有key则为null
kafka在python中的应用
Consumergroup
- 允许consumer group(包含多个consumer,如一个集群同时消费)对一个topic进行消费,不同的consumer group之间独立消费。
- 为了对减小一个consumer group中不同consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不同的partition。
- 可以在容器内部创建一个组消费者,下面定义为test
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.101.128:9092 --topic testtopic --group test
- 可以在容器内部创建一个非组消费者
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.101.128:9092 --topic testtopic
在python下创建两个文件分别为prod.py和con.py
1.创建生产者prod.py
import json
import traceback
from kafka import KafkaProducer
from kafka.errors import kafka_errors
bootstrap_servers = ['192.168.101.128:9092']
topic = 'testtopic'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
for i in range(0, 3):
future = producer.send(
topic,
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i),
partition=1) # 向分区1发送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
2.创建消费者con.py
import json
from kafka import KafkaConsumer
bootstrap_servers = ['192.168.101.128:9092']
topic = 'testtopic'
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())))
3.执行消费者con.py
此时容器内部有两个消费者,再加上con.py就一共有三个消费者,其中有两个是组消费者,组id为test,还有一个非组消费者,再执行prod.py
- 容器中的非组消费者,也消费了这个消息
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.101.128:9092 --topic testtopic --property print.key=true
"count_num" "0"
"count_num" "1"
"count_num" "2"
- con.py的组消费者,消费了这个消息
E:\PycharmProjects\scripts\venv\Scripts\python.exe E:/PycharmProjects/scripts/kafka_test/consumer_kafka.py
receive, key: count_num, value: 0
receive, key: count_num, value: 1
receive, key: count_num, value: 2
- 容器中的组消费者,没有执行
bash-5.1# kafka-console-consumer.sh --bootstrap-server 192.168.101.128:9092 --topic testtopic --group test
综上印证了,前面Consumergroup中说到的,如果没有组id,那么每个topic的消息,会被多个消费者重复消费,类似我在过往的python和rabbitmq中的遇到的问题,消费者端是采用gunicorn开了4个worker,所以当生产者一条消息发出,消费者却执行了四次,当使用了groupid就可以保证一条消息只被这个group中的一个消费者消费,如果需要多个消费者消费,可以创建多个group