Apache pulsar producer接口详解

producer.png

send(消息发送)

生产者以同步或者异步的方式将消息发送给broker,pulsar提供以下三种消息发送的接口:

  • sned
    每发送一条消息时,producer将会等待来自broker的对于这条消息的ack。如果producer没有收到该消息的ack,就会认为这条消息发送失败。
  • send async
    生产者将消息放到一个阻塞队列中后,无需等待立即返回。之后client在后台将阻塞队列中的消息发送给broker。如果该阻塞队列已满时,继续向该阻塞队列发送消息会抛出异常(PulsarClientException.ProducerQueueIsFullError)。阻塞队列的最大大小可以进行配置。
  • send timeout
    sendTimeout(int sendTimeout, TimeUnit unit)允许在发送的时候设置一个timeout,如果超过这个时间消息还没有被确认,将会抛出一个错误。
    注意:如果我们将timeout的时间设置为 0,例如: setTimeout(0,TimeUnit.SECONDS) ,意味着timeout的时间是无限大。默认时间是:30s。这个在删除pulsar中重复数据的时候有很好的应用场景,因为这个时候client端处于永远重试发送消息的状态,没有错误会返回给应用程序。

close

关闭一个producer,提供以下两种方式:

  • close
  • close async

intercept

拦截器接口主要提供以下三个方法:

  • close
    关闭interceptor,主要用于执行一些资源清理工作

  • before send
    用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算

  • on send ack
    该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率

compression

为了降低网络带宽,消息发送的时候,会对消息进行压缩,pulsar提供了如下三种消息压缩的方式:

  • NONE
  • LZ4
  • ZLIB
  • ZSTD

scheme

  • java string hash
  • murmur3_32hash

producer name

给生产者指定一个名字,如果没有手动指定,系统将会为该producer生成一个全局唯一的名字。
注意:在指定名称时,用户需要确保对于给定的topic,生产者名称在所有Pulsar的集群中是唯一的。 broker将强制执行只有一个给定名称的producer可以在topic上进行publish。

crypto

crypto key reader
  • get public key
  • get private key
add encryption key

添加producer用来加密数据密钥的public key.
当producer创建时,Pulsar客户端会检查是否有密钥添加到encryptionKeys中。 如果找到要添加的key,则针对每个key调用回调函数getKey(String keyName)来获取key的值。 应用程序应实现这个回调并返回pkcs8格式的密钥。 如果启用了压缩功能,则压缩后会对消息进行加密。 如果启用了批处理消息传递,则批处理消息也将被加密。

crypto failure action
  • FAIL

    如果加密操作失败,FAIL是失败时,发送的默认选项

  • SEND

    忽略加密失败并继续发送未加密的消息

flush

  • flush
  • flush async

topic

指定该producer将要把消息publish到哪一个topic

other option

block if queue full

当输出(outgoing)队列已满时,是否停止相应的操作

max pending msg

设置包含待处理消息的队列的最大大小,以便从broker接收确认。
当队列已满时,默认的,所有的调用都会失败,除非blockIfQueueFull设置为true。可以使用blockIfQueueFull来改变这个行为。

max pending msg across partitions

设置所有分区中的最大挂起消息数,此设置将用于降低每个分区的最大挂起消息

initial sequenceId

为producer生产的消息设置最开始的sequenceID,如果没有额外指定,那么接下来生产的第一条消息的sequenceID就是initialSequenceId + 1,第二条消息的sequenceID依次递增。

clone

基于当前的producer,copy一个producer出来。比如我们需要创建多个producer,并且这些producer有一些相同的属性可以复用,那么我们就可以基于这些相同的属性进行copy,然后对copy后的producer在做定制化的配置

get last sequenceID

获取producer publish的最后一个sequenceID。
如果系统中有两个名字一样的producer(原则上,这是不被允许的),该函数将返回在上一个producer发布的最后一条消息的sequenceID,如果没有消息被publish,则返回-1。

msg routing mode

  • single partition
  • round robin partition
  • custom partition

batch

  • max messages
  • max publish delay
  • enable batch
    是否启用batch的功能,启用batch功能之后,producer会累计到batch指定的大小进行flush,当然我们也可以提前触发flush操作,这个时候不管batch中有多少数据都会flush下去,同时清空batch。

property

  • property
    设置单个属性
  • properties
    设置多个属性,可以包装到map中

auto update partitions

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容

  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,424评论 0 34
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,291评论 1 15
  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,567评论 0 10
  • 4. 设计思想 4.1 动机 我们设计的 Kafka 能够作为一个统一的平台来处理大公司可能拥有的所有实时数据馈送...
    疯狂的橙阅读 1,066评论 1 4
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,339评论 0 9