初始
# 下载nsq
[root@localhost ~]# wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz
# 解压
[root@localhost ~]# tar zxvf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz -C /usr/local/
# 重命名
[root@localhost ~]# mv /usr/local/nsq-1.0.0-compat.linux-amd64.go1.8 /usr/local/nsq-1.0.0
# 添加到环境变量
[root@localhost ~]# echo "export PATH=\$PATH:/usr/local/nsq-1.0.0/bin" >> /etc/profile
# 重新加载环境变量
[root@localhost ~]# source /etc/profile
# 启动nsq
# nsqlookupd 监听 4160、4161 端口
[root@localhost ~]# nsqlookupd
# nsqd 监听 4150、4151 端口
[root@localhost ~]# nsqd --lookupd-tcp-address=127.0.0.1:4160
# nsqadmin 监听 4171 端口
[root@localhost ~]# nsqadmin --lookupd-http-address=127.0.0.1:4161
# 校验安装是否正确, 需要打开浏览器来访问
# nsqadmin管理窗口: http://127.0.0.1:4171
# 安装python版本的nsq组件(pynsq)
[root@localhost ~]# pip install pynsq
备注: 我的环境是CentOS 7, Python 2.7.13
术语
主题(topic)
在NSQ中,消息是需要被明确归类的,这有利于对程序和数据以及行为的管理,topic就是归类的管道。
NSQ不需要明确落实topic创建行为(只要提供正确的数据发布指令,它就会自动创建topic),同时它也提供手动创建topic的接口。但是当你的只有主题没有没有频道(channel)时,不论数据推送都少,都不会被NSQ记录(也就是说这些数据是无效的数据),所以在使用NSQ的时候,还是要先创建主题和频道(channel)。言归正传,下面提供了几个例子分别记录<仅提供数据发布指令, nsq就会自动创建topic>和<手动创建topic>。
-
仅提供数据发布指令(nsq会自动创建topic)
restful api via curl版本(二选一)[root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/pub?topic=learning_nsq&channel=curl' -d "hlo world\!"
tcp protocol via python版本(二选一)
在最后的附录中补充了两个代码简短,但稍微难理解的代码版本[root@localhost ~]# vim producer.py # -.- coding:utf-8 -.- from __future__ import print_function import nsq import tornado from functools import partial def producer(topic, channel, callback=None): # 当writer连接nsqd成功后, 再推送数据 if not writer.conns: return io_loop.add_callback( partial(producer, *(topic, channel, callback)) ) # 推送数据 writer.pub(topic, channel, callback) def on_finish(conn, status_code): # 打印操作返回的结果 print(conn, status_code) # 退出程序 io_loop.stop() if __name__ == '__main__': # 实例化Writer writer = nsq.Writer(['127.0.0.1:4150']) # 实例化IOLoop io_loop = tornado.ioloop.IOLoop.instance() # 将producer函数加入到io_loop的callbacks列表中 io_loop.add_callback( partial(producer, *('learning_nsq', 'python', on_finish)) ) # 启动IOLoop, 它会在内部执行callbacks列表中的函数 io_loop.start() # 运行结果 [root@localhost ~]# python producer.py OK
-
手动创建一个topic
curl版本(二选一)[root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/topic/create?topic=curl_topic'
nsq的 tcp protocol 不支持创建topic
所以pynsq组件代码层面就没有create_topic方法,如果硬要通过python程序来创建,那就用requests或AsyncHTTPClient来完成
requests[root@localhost ~]# pip install requests [root@localhost ~]# vim create_topic_via_requests.py import requests if __name__ == '__main__': resp = requests.post( 'http://127.0.0.1:4151/topic/create?topic=python_requests_topic', data='using python to learn nsq', ) print(resp, resp.content) # 运行程序 [root@localhost ~]# python create_topic_via_requests.py (<Response [200]>, '')
AsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient import tornado.gen import tornado.ioloop @tornado.gen.coroutine def create_topic(): client = AsyncHTTPClient() resp = yield client.fetch( request=('http://127.0.0.1:4151' # host '/topic/create' # uri '?topic=python_AsyncHTTPClient_topic' # parameters ), body='using python to learn nsq', # body method="POST" ) print(resp) raise tornado.gen.Return(resp) if __name__ == '__main__': io_loop = tornado.ioloop.IOLoop.instance() io_loop.run_sync(create_topic)
总结
完成主体的创建之后,可以通过访问nsqadmin页面(http://127.0.0.1:4171)来查看主题是否已经创建成功,如果创建没问题的话,在主页(Stream模块)就可以看到topic的名称列表。
通过点击列表中的某个topic,将会进入到这个topic的详情页面。
频道(channel)
正如上面所说,通过推送一条有效消息,NSQ就会自动创建topic,但是打开了nsqadmin界面后会发现两个问题,第一个问题是message为0;第二个问题是页面有警告提示(当前topic中还没有创建channel,只有当channel存在时推送过来的消息才会被加入到队列中等待消费)。
curl创建channel
curl -X POST 'http://127.0.0.1:4151/channel/create?topic=curl_topic&channel=curl_channel'
python创建channel
requests
import requests
if __name__ == '__main__':
resp = requests.post(
('http://127.0.0.1:4151'
'/channel/create'
'?topic=python_topic&channel=python_channel'),
)
print(resp, resp.content)
AsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient
import tornado.gen
import tornado.ioloop
@tornado.gen.coroutine
def create_topic():
client = AsyncHTTPClient()
resp = yield client.fetch(
request=('http://127.0.0.1:4151'
'/channel/create'
'?topic=python_AsyncHTTPClient_topic'
'&channel=python_channel'
),
body='',
method="POST"
)
print(resp)
raise tornado.gen.Return(resp)
if __name__ == '__main__':
io_loop = tornado.ioloop.IOLoop.instance()
io_loop.run_sync(create_topic)
单点故障(SPOF全称: Single Point Of Failure)
NSQ并没有采用复制的特性,而是利用nsqlookup
来充当指令传输的枢纽,基于多个nsqd
节点 + topic + channel + 持久化来解决单点故障问题。
附录
更简短的消息推送代码
-
import nsq import tornado.ioloop import time def pub_message(): writer.pub('test', time.strftime('%H:%M:%S'), finish_pub) def finish_pub(conn, data): print(data) writer = nsq.Writer(['127.0.0.1:4150']) tornado.ioloop.PeriodicCallback(pub_message, 1000).start() nsq.run()
-
import nsq import tornado writer = nsq.Writer(['127.0.0.1:4150']) @tornado.gen.coroutine def do_pub(): yield tornado.gen.sleep(1) writer.pub("test_topic", "hello world") yield tornado.gen.sleep(1) tornado.ioloop.IOLoop.instance().run_sync(do_pub) tornado.ioloop.IOLoop.instance().run_sync(do_pub)