【rocketmq-client-python】学习笔记

rocketmq-python 是一个基于 rocketmq-client-cpp 封装的 RocketMQ Python 客户端。

rocketmq-client-python安装

目前rocketmq库只支持linux和mac。

rocketmq-client-python 的安装:

pip install rocketmq

安装太慢?国内源安装:

pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple


示例代码:

Producer

from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')

producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')#rocketmq队列接口地址(服务器ip:port)

# For ip and port name server address, use `set_namesrv_addr` method, for example:

# producer.set_namesrv_addr('127.0.0.1:9887')

producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')#可以不使用

producer.start()

msg_body = {"id":"test_id","name":"test_name","message":"test_message"}

ss = json.dumps(msg_body).encode('utf-8')

msg = Message('YOUR-TOPIC') #topic名称

msg.set_keys('XXX')#每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

msg.set_tags('XXX')#一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。

msg.set_body(ss)

ret = producer.send_sync(msg)

print(ret.status, ret.msg_id, ret.offset)

producer.shutdown()


其中:

设置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')

当只有单一服务器时,格式是上面这个;

当有多个服务器地址(集群模式)时,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")

如果使用pandas数据,pandas数据可以直接转换

df.to_json(orient='records').encode('utf-8'),然后放入body中发送。

不同应用的多个Topic使用同一个namesrv_addr时数据传输会发生冲突

解决方案:每一个Topic对应一个 “PID-XXX”


PushConsumer

import time

from rocketmq.client import PushConsumer

def callback(msg):

    print(msg.id, msg.body)

consumer = PushConsumer('CID_XXX')

consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')

# For ip and port name server address, use `set_namesrv_addr` method, for example:

# consumer.set_namesrv_addr('127.0.0.1:9887')

consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')

consumer.subscribe('YOUR-TOPIC', callback)

consumer.start()

while True:

    time.sleep(3600)

consumer.shutdown()


PullConsumer

from rocketmq.client import PullConsumer

consumer = PullConsumer('CID_XXX')

consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')

# For ip and port name server address, use `set_namesrv_addr` method, for example:

# consumer.set_namesrv_addr('127.0.0.1:9887')

consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')

consumer.start()

for msg in consumer.pull('YOUR-TOPIC'):

    print(msg.id, msg.body)

consumer.shutdown()


控制日志的输出频率

from rocketmq.client import dll

dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)


ffi.py

class _CLogLevel(CtypesEnum):

    FATAL = 1

    ERROR = 2

    WARN = 3

    INFO = 4

    DEBUG = 5

    TRACE = 6

    LEVEL_NUM = 7

log4j定义了8个级别的log(除去OFF和ALL,可以说分为6个级别),优先级从高到低依次为:OFF、FATAL、ERROR、WARN、INFO、DEBUG、TRACE、 ALL。

ALL 最低等级的,用于打开所有日志记录。

TRACE designates finer-grained informational events than the DEBUG.Since:1.2.12,很低的日志级别,一般不会使用。

DEBUG 指出细粒度信息事件对调试应用程序是非常有帮助的,主要用于开发过程中打印一些运行信息。

INFO 消息在粗粒度级别上突出强调应用程序的运行过程。打印一些你感兴趣的或者重要的信息,这个可以用于生产环境中输出程序运行的一些重要信息,但是不能滥用,避免打印过多的日志。

WARN 表明会出现潜在错误的情形,有些信息不是错误信息,但是也要给程序员的一些提示。

ERROR 指出虽然发生错误事件,但仍然不影响系统的继续运行。打印错误和异常信息,如果不想输出太多的日志,可以使用这个级别。

FATAL 指出每个严重的错误事件将会导致应用程序的退出。这个级别比较高了。重大错误,这种级别你可以直接停止程序了。

应用案例

PushConsumer

import json

from rocketmq.client import PushConsumer, dll

import traceback

import logging

class RocketMQ():

    def __init__(self):

        logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

        self.logger = logging.getLogger(__name__)   

        self.consumer = PushConsumer("PID-XXX")

        self.consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX")

        self.topic_name = "xxx"

        #减少日志输出

        dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)

    def callback(self,msg):

        test_body = json.loads(msg.body)

        try:

            self.my_func(test_body)

            return PushConsumer

        except Exception as e:

            print('>>>>>>>>>>allback msg:\n{}'.format(es_body))

            print('>>>>>>>>>>callback error:\n{}'.format(e))

            return PushConsumer


    def onMessage(self):

        self.consumer.subscribe(self.topic_name, self.callback)

        self.consumer.start()

        while True:

            time.sleep(2)

        self.consumer.shutdown()

    def my_func(test_body):

        print(test_body)

if __name__ == '__main__':

    mq = RocketMQ()

    mq.onMessage()


Producer

from rocketmq.client import Producer, Message

import json

producer = Producer("PID-XXX")

producer.set_namesrv_addr('XX.XX.XX.XX:XXXX')

producer.start()

topic_name = "xxx"

key_name = "abc"

tags = "123"

msg_body = {

    "key_1":value_1,

    "key_2":value_2

}

ss = json.dumps(msg_body).encode('utf-8')

msg = Message(topic_name)

msg.set_keys(key_name)

msg.set_tags(tags)

msg.set_body(ss)

ret = producer.send_sync(msg)

print(ret.status, ret.msg_id, ret.offset)

producer.shutdown()


PullConsumer

from rocketmq.client import PullConsumer

consumer = PullConsumer("PID-XXX")

consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX")

consumer.start()

while True:

    topic_name = "xxx"

    for msg in consumer.pull(topic_name):print(msg.id, msg.body)


Topic

Topic创建的核心步骤如下

1、mqadmin向broker发起创建Topic的命令。

2、broker生成Topic对应的topicConfig配置保存在broker的TopicConfigManager中。

3、broker向所有的namesrv上报topicConfig信息。

4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。

5、broker会通过定时任务定期向namesrv发送心跳信息更新topic配置。


usage: mqadmin updateTopic -b <arg> | -c <arg>  [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t

      <arg> [-u <arg>] [-w <arg>]

-b,--brokerAddr <arg>      create topic to which broker

-c,--clusterName <arg>      create topic to which cluster

-h,--help                  Print help

-n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

-o,--order <arg>            set topic's order(true|false)

-p,--perm <arg>            set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]

-r,--readQueueNums <arg>    set read queue nums

-s,--hasUnitSub <arg>      has unit sub (true|false)

-t,--topic <arg>            topic name

-u,--unit <arg>            is unit topic (true|false)

-w,--writeQueueNums <arg>  set write queue nums


通过 --brokerAddr在指定的broker创建topic。

通过 --clusterName在整个集群创建topic。

通过 --namesrvAddr指定namesrv地址。

通过 --topic来指定topic名称。

通过 --perm来指定Topic的权限管理。

在rocketmq中添加新的Topic

sh mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderTopic

创建Topic时报错解决方案

Java HotSpot™ 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0

Java HotSpot™ 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0

org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed

at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181)

at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:135)

at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:86)

Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available

at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)

at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:73)

at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:68)

at org.apache.rocketmq.acl.common.AclUtils.calSignature(AclUtils.java:58)

at org.apache.rocketmq.acl.common.AclClientRPCHook.doBeforeRequest(AclClientRPCHook.java:44)

at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.doBeforeRpcHooks(NettyRemotingAbstract.java:172)

at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:370)

at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180)

at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275)

at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222)

at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83)

at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154)

… 2 more

Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available

at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:63)

at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:79)

… 13 more

Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available

at javax.crypto.Mac.getInstance(Mac.java:181)

at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:57)

… 14 more


解决办法是:

1.进入rocketmq的bin目录下:/var/www/rocketmq/rocketmq-all-4.4.0-bin-release/bin,

/var/www/rocketmq是我自己的安装路径。

2.用vim tools.sh打开tools.sh.在JAVA_OPT配置中,在-Djava.ext.dirs这一行的后面添加ext的路径,原配置如下

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"

JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext"

JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

添加ext文件的绝对路径,添加后重新执行命令即可

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"

JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext"

JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

删除Topic

deleteTopic -n localhost:9876 -c DefaultCluster -t orderTopic

rocketmq查看命令

首先进入 RocketMQ 工程,进入/RocketMQ/bin  在该目录下有个 mqadmin 脚本 .

查看帮助:  在 mqadmin 下可以查看有哪些命令 

a: 查看具体命令的使用 : sh mqadmin   

b: sh mqadmin help 命令名称 

例如,查看 updateTopic 的使用

sh mqadmin help updateTopic

2. 关闭nameserver和所有的broker:

  进入到bin下:

sh mqshutdown namesrv

sh mqshutdown broker

3. 查看所有消费组group:

sh mqadmin consumerProgress -n 192.168.1.23:9876

4. 查看指定消费组下的所有topic数据堆积情况:

sh mqadmin consumerProgress -n 192.168.1.23:9876 -g warning-group

5. 查看所有topic :

sh mqadmin topicList -n 192.168.1.23:9876

6. 查看topic信息列表详情统计

sh mqadmin topicstatus -n 192.168.1.23:9876 -t topicWarning

7.  新增topic

sh mqadmin updateTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

8. 删除topic

  sh mqadmin deleteTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

9、查询集群消息

sh mqadmin  clusterList -n 192.168.1.23:9876

Reference

https://github.com/apache/rocketmq-client-python

https://www.oschina.net/p/rocketmq-python

https://www.cnblogs.com/qi-yuan-008/p/14022378.html

https://blog.csdn.net/shiyong1949/article/details/52643711

https://www.jianshu.com/p/b84190af20a8

https://www.cnblogs.com/gmq-sh/p/6232633.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342