依赖包安装
yuminstall ncurses-devel unixODBC unixODBC-devel
erlang环境
wget
http://erlang.org/download/otp_src_18.1.tar.gz
tar -zxvf otp_src_18.
1.tar.gz
cd otp_src_18.
1
./configure --prefix=/usr/local/erlang
make
make install
# 配置erlang环境变量
vim /etc/profile
# 增加内容:
export PATH=
"
$PATH:/usr/local/erlang/bin"
# 保存退出,并刷新变量
source /etc/profile
# 测试erlang是否安装成功
# 安装完成以后,执行erl看是否能打开eshell,用’halt().’退出,注意后面的点号,那是erlang的结束符。
[root
@localhost src]
# erl
Erlang/OTP
17 [erts-
6.
1] [source] [
64-bit] [async-threads:
10] [hipe] [kernel-
poll:
false]
Eshell V6.
1 (abort with
^G)
2>
9+
3.
12
3> halt().
安装rabbitmq依赖文件,安装rabbitmq
# 安装rabbitmq依赖包
yum
install xmlto
# 安装rabbitmq服务端
wgethttp://www.rabbitmq.com/releases/rabbitmq-
server/v3
.5
.7/rabbitmq-
server-
3.5
.7.tar.gz
tar zxvf rabbitmq-
server-
3.5
.7.tar.gz
cd rabbitmq-
server-
3.5
.7/
make
make
install TARGET_DIR=/usr/
local/rabbitmq SBIN_DIR=/usr/
local/rabbitmq/sbin MAN_DIR=/usr/
local/rabbitmq/man DOC_INSTALL_DIR=/usr/
local/rabbitmq/doc
# 配置
hosts
vim /etc/
hosts
# 增加一行内容
# 当前IP地址 绑定HOSTNAME名称(vim /etc/sysconfig/network)
192.168
.2
.208 localhost.localdomain
# 这种会提示错误(Warning: PID file
not written; -detached was passed.)
/usr/local/rabbitmq/sbin/rabbitmq-server -detached 启动rabbitmq
/usr/local/rabbitmq/sbin/rabbitmqctl status 查看状态
/usr/local/rabbitmq/sbin/rabbitmqctl
stop 关闭rabbitmq
# 目前我自己使用
/usr/
local/rabbitmq/sbin/rabbitmq-
server
start & 启动rabbitmq
/usr/
local/rabbitmq/sbin/rabbitmqctl
status 查看状态
/usr/local/rabbitmq/sbin/rabbitmqctlstop 关闭rabbitmq
启用管理插件
mkdir /etc/rabbitmq
/usr/
local/rabbitmq/sbin/rabbitmq-plugins list 查看插件列表
/usr/
local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management (启用插件)
/usr/
local/rabbitmq/sbin/rabbitmq-plugins disable rabbitmq_management (禁用插件)
# 重启rabbitmq
# 如果有iptables
vim /etc/sysconfig/iptables
# 增加一下内容
-A INPUT -
m
state --
state NEW -
m tcp -p tcp --dport
15672 -j ACCEPT
# 重启动iptable
service iptables restart
开机自启动配置
#!/bin/sh
#start rabbitMq
sudo /usr/local/rabbitmq/sbin/rabbitmq-server & > /usr/local/rabbitmq/logs/rabbitmq.log2>&1
RabbitMQ PHP扩展安装
# 安装rabbitmq-c依赖包
yum install libtool autoconf
# 安装rabbitmq-c ( 最好下载 0.5的,0.6安装可能会报错)
# 版本下载:https://github.com/alanxz/rabbitmq-c/releases/tag/v0.5.0
wgethttps://github.com/alanxz/rabbitmq-c/releases/download/v0.
5.0/rabbitmq-c-
0.5.
0.tar.gz
tar -zxvf v0.
5.0
cd rabbitmq-c-
0.5.
0/
autoreconf -i
./configure --prefix=/usr/
local/rabbitmq-c
make
make install
# 安装PHP扩展 amqp
wgethttp://pecl.php.net/get/amqp-
1.6.
1.tgz
tar zxvf amqp-
1.6.
1.tgz
cd amqp-
1.6.
1
/usr/
local/php/bin/phpize
./configure --with-php-config=/usr/
local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/
local/rabbitmq-c
make
make install
# 编辑php.ini文件,增加amqp扩展支持
vim /usr/
local/php/etc/php.ini
# 增加下面内容
; rabbitmq扩展支持
extension=amqp.so
# 重启php-fpm
/etc/init.d/php-fpm restart
验证是否成功 phpinfo()查看下是否支持amqp扩展
相关配置
hostname mq
// 设置hostname名称
vim /etc/sysconfig/network
// 设置hostname
vim /etc/hosts
// 编辑hosts
./rabbitmqctl add_user admin admin
// 添加用户
./rabbitmqctl set_user_tags admin administrator
// 添加admin 到 administrator分组
./rabbitmqctl set_permissions -p / admin"*.""*.""*."// 添加权限
创建配置文件
#在/usr/rabbitmq/sbin/rabbitmq-defaults 查看config文件路径
# 创建配置文件
touch/usr/rabbitmq/sbin
#vm_memory_high_watermark 内存低水位线,若低于该水位线,则开启流控机制,阻止所有请求,默认值是0.4,即内存总量的40%,
#vm_memory_high_watermark_paging_ratio 内存低水位线的多少百分比开始通过写入磁盘文件来释放内存
vi /usr/rabbitmq/sbin/rabbitmq.config 输入
[
{rabbit, [{vm_memory_high_watermark_paging_ratio,
0.75},
{vm_memory_high_watermark,
0.7}]}
].
创建环境文件
touch /etc/rabbitmq/rabbitmq-env.conf
#输入
RABBITMQ_NODENAME=FZTEC-
240088 节点名称
RABBITMQ_NODE_IP_ADDRESS=
127.0.0.1 监听IP
RABBITMQ_NODE_PORT=
5672 监听端口
RABBITMQ_LOG_BASE=/data/rabbitmq/log 日志目录
RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目录
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 后端存储目录
操作命令
查看exchange信息
/usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete
arguments
查看队列信息
/usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me
查看绑定信息
/usr/rabbitmq/sbin/rabbitmqctl list_bindings
查看连接信息
/usr/rabbitmq/sbin/rabbitmqctl list_connections
php的server端脚本
<?php
$routingkey=
'key';
//设置你的连接
$conn_args = array(
'host' =>
'localhost',
'port' =>
'5672',
'login' =>
'guest',
'password' =>
'guest');
$conn = new AMQPConnection(
$conn_args);
if (
$conn->connect()) {
echo
"Established a connection to the broker \n";
}
else {
echo
"Cannot connect to the broker \n ";
}
//你的消息
$message = json_encode(array(
'Hello World3!',
'php3',
'c++3:'));
//创建channel
$channel = new AMQPChannel(
$conn);
//创建exchange
$ex = new AMQPExchange(
$channel);
$ex->
setName(
'exchange');//创建名字
$ex->
setType(AMQP_EX_TYPE_DIRECT);
$ex->
setFlags(AMQP_DURABLE);
//
$ex->
setFlags(AMQP_AUTODELETE);
//
echo
"exchange status:".
$ex->
declare();
echo
"exchange status:".
$ex->
declareExchange();
echo
"\n";
for(
$i=
0;
$i<
100;
$i++){
if(
$routingkey==
'key2'){
$routingkey=
'key';
}
else{
$routingkey=
'key2';
}
$ex->publish(
$message,
$routingkey);
}
/*
$ex->publish(
$message,
$routingkey);
创建队列
$q = new AMQPQueue(
$channel);
设置队列名字 如果不存在则添加
$q->
setName(
'queue');
$q->
setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo
"queue status: ".
$q->
declare();
echo
"\n";
echo
'queue bind: '.
$q->
bind(
'exchange',
'route.key');
将你的队列绑定到routingKey
echo
"\n";
$channel->startTransaction();
echo
"send: ".
$ex->publish(
$message,
'route.key'); //将你的消息通过制定routingKey发送
$channel->commitTransaction();
$conn->disconnect();
*/
php客户端脚本
<?php
$bindingkey=
'key2';
//连接RabbitMQ
$conn_args =
array(
'host'=>
'127.0.0.1' ,
'port'=>
'5672',
'login'=>
'guest' ,
'password'=>
'guest',
'vhost' =>
'/');
$conn =
new AMQPConnection(
$conn_args);
$conn->connect();
//设置queue名称,使用exchange,绑定routingkey
$channel =
new AMQPChannel(
$conn);
$q =
new AMQPQueue(
$channel);
$q->setName(
'queue2');
$q->setFlags(AMQP_DURABLE);
$q->declare();
$q->bind(
'exchange',
$bindingkey);
//消息获取
$messages =
$q->get(AMQP_AUTOACK) ;
if (
$messages){
var_dump(json_decode(
$messages->getBody(),
true ));
}
$conn->disconnect();
?>
翻译了部分mq常量设置,不正确的地方,大家以试验为准
/**
* Passing in this constant as a flag will forcefully disable all other flags.
* Use this if you want to temporarily disable the amqp.auto_ack ini setting.
* 传递这个参数作为标志将完全禁用其他标志,如果你想临时禁用amqp.auto_ack设置起效
*/
define(
'AMQP_NOPARAM',
0);
/**
* Durable exchanges and queues will survive a broker restart, complete with all of their data.
* 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据
*/
define(
'AMQP_DURABLE',
2);
/**
* Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.
* 被动模式的交换机和队列不能被重新定义,但是如果交换机和队列不存在,代理将扔出一个错误提示
*/
define(
'AMQP_PASSIVE',
4);
/**
* Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.
* 仅对队列有效,这个人标志定义队列仅允许一个客户端连接并且从其消费消息
*/
define(
'AMQP_EXCLUSIVE',
8);
/**
* For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound
* to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete
* flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no
* subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be
* automatically deleted with the client disconnects.
* 对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除.
* 对队列而言,自动删除标志表示如果没有消费者和你绑定的话将被自动删除,如果从没有消费者和其绑定,将不被删除,独占队列在客户断
* 开连接的时候将总是会被删除
*/
define(
'AMQP_AUTODELETE',
16);
/**
* Clients are not allowed to make specific queue bindings to exchanges defined with this flag.
* 这个标志标识不允许自定义队列绑定到交换机上
*/
define(
'AMQP_INTERNAL',
32);
/**
* When passed to the consume method for a clustered environment, do not consume from the local node.
* 在集群环境消费方法中传递这个参数,表示将不会从本地站点消费消息
*/
define(
'AMQP_NOLOCAL',
64);
/**
* When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,
* the messages will be immediately marked as acknowledged by the server upon delivery.
* 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到)
*/
define(
'AMQP_AUTOACK',
128);
/**
* Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.
* 在队列建立时候传递这个参数,这个标志表示队列将在为空的时候被删除
*/
define(
'AMQP_IFEMPTY',
256);
/**
* Passed on queue or exchange creation, this flag indicates that the queue or exchange should be
* deleted when no clients are connected to the given queue or exchange.
* 在交换机或者队列建立的时候传递这个参数,这个标志表示没有客户端连接的时候,交换机或者队列将被删除
*/
define(
'AMQP_IFUNUSED',
512);
/**
* When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.
* 当发布消息的时候,消息必须被正确路由到一个有效的队列,否则将返回一个错误
*/
define(
'AMQP_MANDATORY',
1024);
/**
* When publishing a message, mark this message for immediate processing by the broker. (High priority message.)
* 当发布消息时候,这个消息将被立即处理.
*/
define(
'AMQP_IMMEDIATE',
2048);
/**
* If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple
* messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.
* If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding
* messages.
* 当在调用AMQPQueue::ack时候设置这个标志,传递标签将被视为最大包含数量,以便通过单个方法标示多个消息为已收到,如果设置为0
* 传递标签指向单个消息,如果设置了AMQP_MULTIPLE,并且传递标签是0,将所有未完成消息标示为已收到
*/
define(
'AMQP_MULTIPLE',
4096);
/**
* If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait
* for a reply method. If the server could not complete the method it will raise a channel or connection exception.
* 当在调用AMQPExchange::bind()方法的时候,服务器将不响应请求,客户端将不应该等待响应,如果服务器无法完成该方法,将会抛出一个异常
*/
define(
'AMQP_NOWAIT',
8192);
/**
* If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.
* 如果在调用AMQPQueue::nack方法时候设置,消息将会被传递回队列
*/
define(
'AMQP_REQUEUE',
16384);
/**
* A direct exchange type.
* direct类型交换机
*/
define(
'AMQP_EX_TYPE_DIRECT',
'direct');
/**
* A fanout exchange type.
* fanout类型交换机
*/
define(
'AMQP_EX_TYPE_FANOUT',
'fanout');
/**
* A topic exchange type.
* topic类型交换机
*/
define(
'AMQP_EX_TYPE_TOPIC',
'topic');
/**
* A header exchange type.
* header类型交换机
*/
define(
'AMQP_EX_TYPE_HEADERS',
'headers');
/**
* socket连接超时设置
*/
define('AMQP_OS_SOCKET_TIMEOUT_ERRNO',536870947);
php实例:
概要
我们先大致了解一下rabbitmq,简单的说就是一个生产者-消费者模式的消息队列,支持消息持久化。同时需要了解几个名词,以及这几个名词之间的联系
生产者(producer)
信道(channel)
消息交换机(exchange)
消息队列(queue)
消费者(consumer)
路由关键词
工作流程
生产者产生的消息通过信道投递到某个消息交换机上,投递过程中指定了一个路由关键字,消息交换机将这条消息投递到不同的消息队列中的时候,依据路由关键字,该消息可能会被投递到某一个或者某几个符合路由规则的消息队列中,消费者从消息队列中取出消息进行后一步处理。
分发机制
当多个消费者同时在消费同一个消息队列的时候,rabbitmq会顺序分发队列中message,当每个message收到ack,就会将这条消息从消息队列中删除,这种分发的机制叫做round-robin
这里不过多讨论rabbitmq的消息分发机制,有兴趣可以参考这个衔接
Demo
$conn_args=array(
'host'=>'127.0.0.1',//rabbitmq 服务器host
'port'=>5672,//rabbitmq 服务器端口
'login'=>'guest',//登录用户
'password'=>'guest',//登录密码
'vhost'=>'/'//虚拟主机
);
$e_name='e_demo';
$q_name='q_demo';
$k_route='key_1';
$conn=newAMQPConnection($conn_args);
if(!$conn->connect()){
die('Cannot connect to the broker');
}
$channel=newAMQPChannel($conn);
$ex=newAMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$status=$ex->declareExchange();//声明一个新交换机,如果这个交换机已经存在了,就不需要再调用declareExchange()方法了.$q=newAMQPQueue($channel);
$q->setName($q_name);
$status=$q->declareQueue();//同理如果该队列已经存在不用再调用这个方法了。$ex->publish($msg,$k_route);
以上代码就构造了一个生产者,并投递了一条消息到rabbitmq中。
$conn_args=array(
'host'=>'127.0.0.1',
'port'=>5672,
'login'=>'guest',
'password'=>'guest',
'vhost'=>'/'
);
$e_name='e_demo';
$q_name='q_demo';
$k_route='key_1';
$conn=newAMQPConnection($conn_args);
if(!$conn->connect()){
die('Cannot connect to the broker');
}
$channel=newAMQPChannel($conn);
$ex=newAMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$q=newAMQPQueue($channel);
var_dump($q);
$q->setName($q_name);
$q->bind($e_name,$k_route);
$arr=$q->get();
var_dump($arr);
$res=$q->ack($arr->getDeliveryTag());
$msg=$arr->getBody();
var_dump($msg);
以上代码构建了一个消费者,并从消息队列中拿出一条消息,并把该消息从队列中移除。
补充
补充说明一下rabbitmq的使用命令
rabbitmq-server start是启动rabbitmq服务。
主要的管理rabbitmq使用的是rabbitctl命令
rabbitmqctl start_app 启动rabbitmq
rabbitmqctl stop_app 关闭rabbitmq
rabbitmqctl reset 重置rabbitmq队列
rabbitmqctl list_queues 查看rabbitmq中队列
rabbitmqctl list_exchanges 查看rabbitmq中的交换机
RabbitMQ常用命令:
查看所有队列信息
# rabbitmqctl list_queues
关闭应用
# rabbitmqctl stop_app
启动应用,和上述关闭命令配合使用,达到清空队列的目的
# rabbitmqctl start_app
清除所有队列
# rabbitmqctl reset
更多用法及参数,可以执行如下命令查看
# rabbitmqctl
(1)首先关闭rabbitmq: rabbitmqctl stop_app
(2)还原: rabbitmqctl reset
(3)启动: rabbitmqctl start_app
(4)添加用户: rabbitmqctl add_user root root
(5)设置权限:rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
(6)查看用户: rabbitmqctl list_users