1.首先引入
github.com/apache/pulsar-client-go/pulsar
2.启动pulsar的client
/*启动一个pulsar client*/
defer func() {
ifr:=recover();r!=nil{
logger.Fatalf(ctx,"pulsar is down stack[%+v] recover[%+v]",string(debug.Stack()),r)
}
}()
varerrerror
SunacClient,err=pulsar.NewClient(pulsar.ClientOptions{
URL:conf.PulsarSetting.Url,
})
iferr!=nil{
logger.Errorf(ctx,"setup pulsar failed to get new client err[%v]",err)
return
}
3.生产者
funcSendMessage(ctx*gin.Context,clientpulsar.Client,topicstring,msg []byte) {
/*
提供生产者方法
*/
/*
(1)初始化一个producer设置好主题
*/
producer,err:=client.CreateProducer(pulsar.ProducerOptions{
Topic:topic,
})
/*
*/
iferr!=nil{
logger.Errorf(ctx,"SendMessage Error[%v]",err)
}
/*
*/
//(2)把消息结构体发给pusar
_,err=producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload:msg,
EventTime:time.Now(),
})
logger.Infof(ctx,"Pulsar_Published_SendMessage [%v]",string(msg))
deferproducer.Close()
iferr!=nil{
logger.Errorf(ctx,"Failed to publish message",err)
}
}
4.消费者
funcnewConsumer(ctx*gin.Context,clientpulsar.Client,topicstring) {
//创建消费者
defer func() {
ifr:=recover();r!=nil{
logger.Fatalf(ctx,"Pulsar is down [stack=%+v] [recover=%+v]",string(debug.Stack()),r)
}
}()
consumer,err:=client.Subscribe(pulsar.ConsumerOptions{
Topics:[]string{topic},
SubscriptionName:conf.RcSeverSetting.ServiceName,
Type:pulsar.Failover,
})
iferr!=nil{
logger.Errorf(ctx,"NewConsumer failed to start pulsar consumer,err[%v]",err)
return
}
deferconsumer.Close()
//循环的获取pulsar的消息 e
for{
msg,err:=consumer.Receive(context.Background())
iferr!=nil{
logger.Error(ctx,"newConsumer failed to receive message,err[%v],topic[%s]",err,msg.Topic())
}
//根据消息的类型 对消息进行处理
logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s] msg[%v]",msg.Topic(),string(msg.Payload()))
switchmsg.Topic() {
casecommon.RCPULSARFACEADD://人脸添加或更新
logger.Infof(ctx,"newConsumer RCPULSARFACEADD topic[%s]",msg.Topic())
casecommon.RCPULSARPASSRECORDADD://通行记录添加
logger.Infof(ctx,"newConsumer RCPULSARPASSRECORDADD topic[%s]",msg.Topic())
default:
logger.Errorf(ctx,"pulsar consumer got an wrong topic,message[%v]",msg)
}
iferr!=nil{
logger.Errorf(ctx,"newConsumer handle got error,msg will nack,topic=[%s],err=[%v]",topic,err)
consumer.Nack(msg)//When a message is "negatively acked" it will be marked for redelivery after some fixed delay
continue
}
consumer.Ack(msg)
}
}