1.安装依赖库
composer require php-amqplib/php-amqplib
地址:https://github.com/php-amqplib/php-amqplib
2. mq生产者.php
include(__DIR__ . '../../public/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Created by PhpStorm.
* User: pandeng
* Date: 2017-07-26
* Time: 21:51
*/
class MessageQueue
{
const exchange = 'router';
const queue = 'msgs';
public static function pushMessage($data)
{
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare(self::queue, false, true, false, false);
$channel->exchange_declare(self::exchange, 'direct', false, true, false);
$channel->queue_bind(self::queue, self::exchange);
$messageBody = $data;
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, self::exchange);
$channel->close();
$connection->close();
return "ok";
}
}
3.消费者.php
namespace app\index\controller;
include(__DIR__ . '../../../../public/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\Controller;
use think\Log;
use think\Request;
use think\Db;
class MessageConsume extends Controller
{
const exchange = 'router';
const queue = 'msgs';
const consumerTag = 'consumer';
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
write_log("closed",3);
}
function process_message($message)
{
if ($message->body !== 'quit') {
$obj = json_decode($message->body);
if (!isset($obj->id)) {
echo 'error data\n';
write_log("error data:" . $message->body, 2);
} else {
try {
write_log("data:" . json_encode($message));
} catch (\Think\Exception $e) {
write_log($e->getMessage(), 2);
write_log(json_encode($message), 2);
} catch (\PDOException $pe) {
write_log($pe->getMessage(), 2);
write_log(json_encode($message), 2);
}
}
}
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
/**
* 启动
*
* @return \think\Response
*/
public function start()
{
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->queue_declare(self::queue, false, true, false, false);
$channel->exchange_declare(self::exchange, 'direct', false, true, false);
$channel->queue_bind(self::queue, self::exchange);
$channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, 'process_message'));
register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
while (count($channel->callbacks)) {
$channel->wait();
}
write_log("starting",3);
}
}
4. 启动消费者(守护进程)
nohup php index.php index/Message_Consume/start &