kafaka非常好用的消息队列 一堆好处,不废话上代码
1.生产者
package main
import (
"fmt"
"log"
)
func main() {
// 构建 生产者
// 生成 生产者配置文件
config := sarama.NewConfig()
// 设置生产者 消息 回复等级 0 1 all
config.Producer.RequiredAcks = sarama.WaitForAll
// 设置生产者 成功 发送消息 将在什么 通道返回
config.Producer.Return.Successes = true
// 设置生产者 发送的分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 构建 消息
msg := &sarama.ProducerMessage{}
msg.Topic = "aaa"
msg.Value = sarama.StringEncoder("123")
// 连接 kafka
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Print(err)
return
}
defer producer.Close()
// 发送消息
message, offset, err := producer.SendMessage(msg)
if err != nil {
log.Println(err)
return
}
fmt.Println(message, " ", offset)
}
2.消费者
packageconsumer
import(
"errors"
"fmt"
"gitlab.aiforward.cn/inf/golib/app/context"
)
varconsumersarama.Consumer
//消费者回调函数
typeConsumerCallbackfunc(data []byte)
//初始化消费者
funcInitConsumer(hoststring)error{
config:=sarama.NewConfig()
client,err:=sarama.NewClient([]string{host},config)
iferr!=nil{
returnerrors.New("unable to create kafka client"+err.Error())
}
consumer,err=sarama.NewConsumerFromClient(client)
iferr!=nil{
returnerr
}
fmt.Println("InitConsumer Success")
returnerr
}
//消费者循环
funcLoopConsumer(topicstring,callbackConsumerCallback)error{
ctx:=context.GetGinContextWithRequestId()
partitionList,err:=consumer.Partitions(topic)
iferr!=nil||len(partitionList)==0{
fmt.Println(ctx,"SubscribeTopic_Topic partitionList Error[%v]",err)
}
for_,part:=rangepartitionList{
go func(partint32) {
partitionConsumer,err:=consumer.ConsumePartition(topic,part,sarama.OffsetNewest)
iferr!=nil{
fmt.Println(err)
}
deferpartitionConsumer.Close()
for{
msg:= <-partitionConsumer.Messages()
ifcallback!=nil{
callback(msg.Value)
}
}
}(part)
}
returnnil
}
funcClose() {
ifconsumer!=nil{
consumer.Close()
}
}