更详细的可以参考:https://zhuanlan.zhihu.com/p/279784873
https://blog.csdn.net/weixin_35688430/article/details/111292744
生产者:producer
生产者kafka有对应的三方库可以支撑去进行信息发送。
from kafka import KafkaProducer
import json
def kafka_sendMsg(topic,bootstrap_servers,key,msg,header) :
producer = KafkaProducer(value_serializer= lambda v :json.dumps(v).encode('utf-8'),bootstrap_servers=['127.0.0.1:9092'])
消费者:consumer
消费者,同样可以通过topic和服务器名称,去获取对应的数据。
from kafka import KafkaconsumerMsg(topic,bootstrap_servers)
import json
def kafka_sendMsg(topic,bootstrap_servers,key,msg,header) :
consumer= KafkaConsumer(topic,bootstrap_servers=bootstrap_servers)
for msg in consumer:
recv ="%s:%d%d: key=%s value=%s" %(msg.topic,msg.partition,msg.offset,msg.key, msg.value)
使用kafka
def kafka():
topic = "AA1.reaply.info" #kafka的主题名称
bootstrap_servers = "127.0.0.1" #kafka连接
kafkaMsg = kafka_msg_mode.getModel() #字典,获取对应的取值
key = bytes(kafkaMsg["msgkey"]+self.feature.loginUserNo,encoding='utf-8') #kakfa的key
msg = json.loads(topic_str_info) #kafka的value
header =["Headers"]
kafka_sendMsg(topic,bootstrap_servers,key,msg,header) #向kafka中间件发送消息