Shopify/sarama的producer有两种运行模式:
- 同步模式
producer把消息发给kafka之后会等待结果返回。 - 异步模式
producer把消息发给kafka之后不会等待结果返回。
- 同步模式
config := sarama.NewConfig()
config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
if err != nil {
log.Fatalf("unable to produce message: %q", err)
}
...
注意同步模式下,下面配置必须置上:
config.Producer.Return.Successes = true
否则运行报错:
2018/12/25 08:08:30 unable to create kafka producer: "kafka:
invalid configuration (Producer.Return.Successes must be true to be used in a SyncProducer)"
- 异步模式
异步模式,顾名思义就是produce一个message之后不等待发送完成返回;这样调用者可以继续做其他的工作。
config := sarama.NewConfig()
// config.Producer.Return.Successes = true
client, err := sarama.NewClient([]{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewAsyncProducerFromClient
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := fmt.Sprintf("message %08d", i)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
...
关于异步producer有一个地方取药注意的。
- 异步模式produce一个消息后,缺省并不会报告成功状态。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false
,那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。当然可以加一个default分支绕过去,就不会挂住了:
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default")
}
- 如果打开了Return.Successes配置,则上述代码段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
case msg := <-producer.Successes():
log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
}
从log可以看到,每发送一条消息,收到一条Return.Successes,类似于:
2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...
就像是同步produce一样的行为了。
- 如果打开了Return.Successes配置,而又没有producer.Successes()提取,那么Successes()这个chan消息会被写满。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)
// wait response
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-producer.Errors():
log.Println("Produced message failure: ", err)
default:
log.Println("Produced message default",)
}
写满的结果就是不能再写入了,导致后面的Return.Successes消息丢失, 而且producer也会挂住,因为共享的buffer被占满了,大量的Return.Successes没有被消耗掉。
运行一段时间后:
2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]
在produce第00000608个message的时候被挂住了,因为消息缓冲满了;这个缓冲的大小是可配的(可能是这个MaxRequestSize?),但是不管大小是多少,如果没有去提取Success消息最终都会被占满的。
结论就是说配置config.Producer.Return.Successes = true
和操作<-producer.Successes()
必须配套使用;配置成true,那么就要去读取Successes,如果配置成false,则不能去读取Successes。