RabbitMQ

依赖包安装

yuminstall ncurses-devel unixODBC unixODBC-devel

erlang环境

wget

http://erlang.org/download/otp_src_18.1.tar.gz

tar -zxvf otp_src_18.

1.tar.gz

cd otp_src_18.

1

./configure --prefix=/usr/local/erlang

make

make install

# 配置erlang环境变量

vim /etc/profile

# 增加内容:

export PATH=

"

$PATH:/usr/local/erlang/bin"

# 保存退出,并刷新变量

source /etc/profile

# 测试erlang是否安装成功

# 安装完成以后,执行erl看是否能打开eshell,用’halt().’退出,注意后面的点号,那是erlang的结束符。

[root

@localhost src]

# erl

Erlang/OTP

17 [erts-

6.

1] [source] [

64-bit] [async-threads:

10] [hipe] [kernel-

poll:

false]

Eshell V6.

1 (abort with

^G)

2>

9+

3.

12

3> halt().

安装rabbitmq依赖文件,安装rabbitmq

# 安装rabbitmq依赖包

yum

install xmlto

# 安装rabbitmq服务端

wgethttp://www.rabbitmq.com/releases/rabbitmq-

server/v3

.5

.7/rabbitmq-

server-

3.5

.7.tar.gz

tar zxvf rabbitmq-

server-

3.5

.7.tar.gz

cd rabbitmq-

server-

3.5

.7/

make

make

install TARGET_DIR=/usr/

local/rabbitmq SBIN_DIR=/usr/

local/rabbitmq/sbin MAN_DIR=/usr/

local/rabbitmq/man DOC_INSTALL_DIR=/usr/

local/rabbitmq/doc

# 配置

hosts

vim /etc/

hosts

# 增加一行内容

# 当前IP地址 绑定HOSTNAME名称(vim /etc/sysconfig/network)

192.168

.2

.208 localhost.localdomain

# 这种会提示错误(Warning: PID file

not written; -detached was passed.)

/usr/local/rabbitmq/sbin/rabbitmq-server -detached 启动rabbitmq

/usr/local/rabbitmq/sbin/rabbitmqctl status 查看状态

/usr/local/rabbitmq/sbin/rabbitmqctl

stop 关闭rabbitmq

# 目前我自己使用

/usr/

local/rabbitmq/sbin/rabbitmq-

server

start & 启动rabbitmq

/usr/

local/rabbitmq/sbin/rabbitmqctl

status 查看状态

/usr/local/rabbitmq/sbin/rabbitmqctlstop 关闭rabbitmq

启用管理插件

mkdir /etc/rabbitmq

/usr/

local/rabbitmq/sbin/rabbitmq-plugins list 查看插件列表

/usr/

local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management (启用插件)

/usr/

local/rabbitmq/sbin/rabbitmq-plugins disable rabbitmq_management (禁用插件)

# 重启rabbitmq

# 访问http://127.0.0.1:15672/

# 如果有iptables

vim /etc/sysconfig/iptables

# 增加一下内容

-A INPUT -

m

state --

state NEW -

m tcp -p tcp --dport

15672 -j ACCEPT

# 重启动iptable

service iptables restart

开机自启动配置

#!/bin/sh

#start rabbitMq

sudo /usr/local/rabbitmq/sbin/rabbitmq-server & > /usr/local/rabbitmq/logs/rabbitmq.log2>&1

RabbitMQ PHP扩展安装

# 安装rabbitmq-c依赖包

yum install libtool autoconf

# 安装rabbitmq-c ( 最好下载 0.5的,0.6安装可能会报错)

# 版本下载:https://github.com/alanxz/rabbitmq-c/releases/tag/v0.5.0

wgethttps://github.com/alanxz/rabbitmq-c/releases/download/v0.

5.0/rabbitmq-c-

0.5.

0.tar.gz

tar -zxvf v0.

5.0

cd rabbitmq-c-

0.5.

0/

autoreconf -i

./configure --prefix=/usr/

local/rabbitmq-c

make

make install

# 安装PHP扩展 amqp

wgethttp://pecl.php.net/get/amqp-

1.6.

1.tgz

tar zxvf amqp-

1.6.

1.tgz

cd amqp-

1.6.

1

/usr/

local/php/bin/phpize

./configure --with-php-config=/usr/

local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/

local/rabbitmq-c

make

make install

# 编辑php.ini文件,增加amqp扩展支持

vim /usr/

local/php/etc/php.ini

# 增加下面内容

; rabbitmq扩展支持

extension=amqp.so

# 重启php-fpm

/etc/init.d/php-fpm restart

验证是否成功 phpinfo()查看下是否支持amqp扩展

相关配置

hostname mq

// 设置hostname名称

vim /etc/sysconfig/network

// 设置hostname

vim /etc/hosts

// 编辑hosts

./rabbitmqctl add_user admin admin

// 添加用户

./rabbitmqctl set_user_tags admin administrator

// 添加admin 到 administrator分组

./rabbitmqctl set_permissions -p / admin"*.""*.""*."// 添加权限

创建配置文件

#在/usr/rabbitmq/sbin/rabbitmq-defaults 查看config文件路径

# 创建配置文件

touch/usr/rabbitmq/sbin

#vm_memory_high_watermark 内存低水位线,若低于该水位线,则开启流控机制,阻止所有请求,默认值是0.4,即内存总量的40%,

#vm_memory_high_watermark_paging_ratio 内存低水位线的多少百分比开始通过写入磁盘文件来释放内存

vi /usr/rabbitmq/sbin/rabbitmq.config 输入

[

{rabbit, [{vm_memory_high_watermark_paging_ratio,

0.75},

{vm_memory_high_watermark,

0.7}]}

].

创建环境文件

touch /etc/rabbitmq/rabbitmq-env.conf

#输入

RABBITMQ_NODENAME=FZTEC-

240088 节点名称

RABBITMQ_NODE_IP_ADDRESS=

127.0.0.1 监听IP

RABBITMQ_NODE_PORT=

5672 监听端口

RABBITMQ_LOG_BASE=/data/rabbitmq/log 日志目录

RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目录

RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 后端存储目录

操作命令

查看exchange信息

/usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete

arguments

查看队列信息

/usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me

查看绑定信息

/usr/rabbitmq/sbin/rabbitmqctl list_bindings

查看连接信息

/usr/rabbitmq/sbin/rabbitmqctl list_connections

php的server端脚本

<?php

$routingkey=

'key';

//设置你的连接

$conn_args = array(

'host' =>

'localhost',

'port' =>

'5672',

'login' =>

'guest',

'password' =>

'guest');

$conn = new AMQPConnection(

$conn_args);

if (

$conn->connect()) {

echo

"Established a connection to the broker \n";

}

else {

echo

"Cannot connect to the broker \n ";

}

//你的消息

$message = json_encode(array(

'Hello World3!',

'php3',

'c++3:'));

//创建channel

$channel = new AMQPChannel(

$conn);

//创建exchange

$ex = new AMQPExchange(

$channel);

$ex->

setName(

'exchange');//创建名字

$ex->

setType(AMQP_EX_TYPE_DIRECT);

$ex->

setFlags(AMQP_DURABLE);

//

$ex->

setFlags(AMQP_AUTODELETE);

//

echo

"exchange status:".

$ex->

declare();

echo

"exchange status:".

$ex->

declareExchange();

echo

"\n";

for(

$i=

0;

$i<

100;

$i++){

if(

$routingkey==

'key2'){

$routingkey=

'key';

}

else{

$routingkey=

'key2';

}

$ex->publish(

$message,

$routingkey);

}

/*

$ex->publish(

$message,

$routingkey);

创建队列

$q = new AMQPQueue(

$channel);

设置队列名字 如果不存在则添加

$q->

setName(

'queue');

$q->

setFlags(AMQP_DURABLE | AMQP_AUTODELETE);

echo

"queue status: ".

$q->

declare();

echo

"\n";

echo

'queue bind: '.

$q->

bind(

'exchange',

'route.key');

将你的队列绑定到routingKey

echo

"\n";

$channel->startTransaction();

echo

"send: ".

$ex->publish(

$message,

'route.key'); //将你的消息通过制定routingKey发送

$channel->commitTransaction();

$conn->disconnect();

*/

php客户端脚本

<?php

$bindingkey=

'key2';

//连接RabbitMQ

$conn_args =

array(

'host'=>

'127.0.0.1' ,

'port'=>

'5672',

'login'=>

'guest' ,

'password'=>

'guest',

'vhost' =>

'/');

$conn =

new AMQPConnection(

$conn_args);

$conn->connect();

//设置queue名称,使用exchange,绑定routingkey

$channel =

new AMQPChannel(

$conn);

$q =

new AMQPQueue(

$channel);

$q->setName(

'queue2');

$q->setFlags(AMQP_DURABLE);

$q->declare();

$q->bind(

'exchange',

$bindingkey);

//消息获取

$messages =

$q->get(AMQP_AUTOACK) ;

if (

$messages){

var_dump(json_decode(

$messages->getBody(),

true ));

}

$conn->disconnect();

?>

翻译了部分mq常量设置,不正确的地方,大家以试验为准

/**

* Passing in this constant as a flag will forcefully disable all other flags.

* Use this if you want to temporarily disable the amqp.auto_ack ini setting.

* 传递这个参数作为标志将完全禁用其他标志,如果你想临时禁用amqp.auto_ack设置起效

*/

define(

'AMQP_NOPARAM',

0);

/**

* Durable exchanges and queues will survive a broker restart, complete with all of their data.

* 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据

*/

define(

'AMQP_DURABLE',

2);

/**

* Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.

* 被动模式的交换机和队列不能被重新定义,但是如果交换机和队列不存在,代理将扔出一个错误提示

*/

define(

'AMQP_PASSIVE',

4);

/**

* Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.

* 仅对队列有效,这个人标志定义队列仅允许一个客户端连接并且从其消费消息

*/

define(

'AMQP_EXCLUSIVE',

8);

/**

* For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound

* to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete

* flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no

* subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be

* automatically deleted with the client disconnects.

* 对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除.

* 对队列而言,自动删除标志表示如果没有消费者和你绑定的话将被自动删除,如果从没有消费者和其绑定,将不被删除,独占队列在客户断

* 开连接的时候将总是会被删除

*/

define(

'AMQP_AUTODELETE',

16);

/**

* Clients are not allowed to make specific queue bindings to exchanges defined with this flag.

* 这个标志标识不允许自定义队列绑定到交换机上

*/

define(

'AMQP_INTERNAL',

32);

/**

* When passed to the consume method for a clustered environment, do not consume from the local node.

* 在集群环境消费方法中传递这个参数,表示将不会从本地站点消费消息

*/

define(

'AMQP_NOLOCAL',

64);

/**

* When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,

* the messages will be immediately marked as acknowledged by the server upon delivery.

* 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到)

*/

define(

'AMQP_AUTOACK',

128);

/**

* Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.

* 在队列建立时候传递这个参数,这个标志表示队列将在为空的时候被删除

*/

define(

'AMQP_IFEMPTY',

256);

/**

* Passed on queue or exchange creation, this flag indicates that the queue or exchange should be

* deleted when no clients are connected to the given queue or exchange.

* 在交换机或者队列建立的时候传递这个参数,这个标志表示没有客户端连接的时候,交换机或者队列将被删除

*/

define(

'AMQP_IFUNUSED',

512);

/**

* When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.

* 当发布消息的时候,消息必须被正确路由到一个有效的队列,否则将返回一个错误

*/

define(

'AMQP_MANDATORY',

1024);

/**

* When publishing a message, mark this message for immediate processing by the broker. (High priority message.)

* 当发布消息时候,这个消息将被立即处理.

*/

define(

'AMQP_IMMEDIATE',

2048);

/**

* If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple

* messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.

* If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding

* messages.

* 当在调用AMQPQueue::ack时候设置这个标志,传递标签将被视为最大包含数量,以便通过单个方法标示多个消息为已收到,如果设置为0

* 传递标签指向单个消息,如果设置了AMQP_MULTIPLE,并且传递标签是0,将所有未完成消息标示为已收到

*/

define(

'AMQP_MULTIPLE',

4096);

/**

* If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait

* for a reply method. If the server could not complete the method it will raise a channel or connection exception.

* 当在调用AMQPExchange::bind()方法的时候,服务器将不响应请求,客户端将不应该等待响应,如果服务器无法完成该方法,将会抛出一个异常

*/

define(

'AMQP_NOWAIT',

8192);

/**

* If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.

* 如果在调用AMQPQueue::nack方法时候设置,消息将会被传递回队列

*/

define(

'AMQP_REQUEUE',

16384);

/**

* A direct exchange type.

* direct类型交换机

*/

define(

'AMQP_EX_TYPE_DIRECT',

'direct');

/**

* A fanout exchange type.

* fanout类型交换机

*/

define(

'AMQP_EX_TYPE_FANOUT',

'fanout');

/**

* A topic exchange type.

* topic类型交换机

*/

define(

'AMQP_EX_TYPE_TOPIC',

'topic');

/**

* A header exchange type.

* header类型交换机

*/

define(

'AMQP_EX_TYPE_HEADERS',

'headers');

/**

* socket连接超时设置

*/

define('AMQP_OS_SOCKET_TIMEOUT_ERRNO',536870947);

php实例:

概要

我们先大致了解一下rabbitmq,简单的说就是一个生产者-消费者模式的消息队列,支持消息持久化。同时需要了解几个名词,以及这几个名词之间的联系

生产者(producer)

信道(channel)

消息交换机(exchange)

消息队列(queue)

消费者(consumer)

路由关键词

工作流程

生产者产生的消息通过信道投递到某个消息交换机上,投递过程中指定了一个路由关键字,消息交换机将这条消息投递到不同的消息队列中的时候,依据路由关键字,该消息可能会被投递到某一个或者某几个符合路由规则的消息队列中,消费者从消息队列中取出消息进行后一步处理。

分发机制

当多个消费者同时在消费同一个消息队列的时候,rabbitmq会顺序分发队列中message,当每个message收到ack,就会将这条消息从消息队列中删除,这种分发的机制叫做round-robin 

这里不过多讨论rabbitmq的消息分发机制,有兴趣可以参考这个衔接 

RabbitMQ消息队列(三):任务分发机制

Demo

$conn_args=array(

'host'=>'127.0.0.1',//rabbitmq 服务器host

'port'=>5672,//rabbitmq 服务器端口

'login'=>'guest',//登录用户

'password'=>'guest',//登录密码

'vhost'=>'/'//虚拟主机

);

$e_name='e_demo';

$q_name='q_demo';

$k_route='key_1';

$conn=newAMQPConnection($conn_args);

if(!$conn->connect()){

die('Cannot connect to the broker');

}

$channel=newAMQPChannel($conn);

$ex=newAMQPExchange($channel);

$ex->setName($e_name);

$ex->setType(AMQP_EX_TYPE_DIRECT);

$ex->setFlags(AMQP_DURABLE);

$status=$ex->declareExchange();//声明一个新交换机,如果这个交换机已经存在了,就不需要再调用declareExchange()方法了.$q=newAMQPQueue($channel);

$q->setName($q_name);

$status=$q->declareQueue();//同理如果该队列已经存在不用再调用这个方法了。$ex->publish($msg,$k_route);

以上代码就构造了一个生产者,并投递了一条消息到rabbitmq中。

$conn_args=array(

'host'=>'127.0.0.1',

'port'=>5672,

'login'=>'guest',

'password'=>'guest',

'vhost'=>'/'

);

$e_name='e_demo';

$q_name='q_demo';

$k_route='key_1';

$conn=newAMQPConnection($conn_args);

if(!$conn->connect()){

die('Cannot connect to the broker');

}

$channel=newAMQPChannel($conn);

$ex=newAMQPExchange($channel);

$ex->setName($e_name);

$ex->setType(AMQP_EX_TYPE_DIRECT);

$ex->setFlags(AMQP_DURABLE);

$q=newAMQPQueue($channel);

var_dump($q);

$q->setName($q_name);

$q->bind($e_name,$k_route);

$arr=$q->get();

var_dump($arr);

$res=$q->ack($arr->getDeliveryTag());

$msg=$arr->getBody();

var_dump($msg);

以上代码构建了一个消费者,并从消息队列中拿出一条消息,并把该消息从队列中移除。

补充

补充说明一下rabbitmq的使用命令 

rabbitmq-server start是启动rabbitmq服务。 

主要的管理rabbitmq使用的是rabbitctl命令

rabbitmqctl start_app 启动rabbitmq

rabbitmqctl stop_app 关闭rabbitmq

rabbitmqctl reset 重置rabbitmq队列

rabbitmqctl list_queues 查看rabbitmq中队列

rabbitmqctl list_exchanges 查看rabbitmq中的交换机

RabbitMQ常用命令:

查看所有队列信息

# rabbitmqctl list_queues

关闭应用

# rabbitmqctl stop_app

启动应用,和上述关闭命令配合使用,达到清空队列的目的

# rabbitmqctl start_app

清除所有队列

# rabbitmqctl reset

更多用法及参数,可以执行如下命令查看

# rabbitmqctl

(1)首先关闭rabbitmq: rabbitmqctl stop_app

(2)还原: rabbitmqctl reset

(3)启动: rabbitmqctl start_app

(4)添加用户: rabbitmqctl add_user root root

(5)设置权限:rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

(6)查看用户: rabbitmqctl list_users

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343