本文场景:
服务器A:服务端,目的是要接收外部数据处理后放入消息队列。
服务器B:客户端,向服务器A提供数据。
前面有文章测试了基于HTTP的方案,但都是基于一台服务器自己测试。实际在外部服务器测试时单机请求性能很差,远远达不到处理能力,网路性能制约严重。
原基于HTTP的方案,每次推送都要重新建立连接,消耗过大。所以想到了Socket长连接方案。
扩展:HTTP使用TCP 三次握手建立连接,客户端和服务器需要交换3个包。HTTPS除了 TCP 的三个包,还要加上 SSL握手需要的9个包,一共是12个包。
服务端和客户端都是基于Workerman建立的
// Workerman-WebSocket服务端
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin');
$channel = $connection->channel();
$q_name = 'amqplib_test';
$channel->queue_declare($q_name, false, false, false, false);
use Workerman\Worker;
require_once __DIR__ . '/Workerman/Autoloader.php';
$ws = new Worker('websocket://0.0.0.0:8282');
$ws->onConnect = function($connection) use($channel,$q_name)
{
$connection->onWebSocketConnect = function($connection , $http_header)
{
// 可以在这里判断连接来源
};
$connection->onMessage = function($connection, $data) use($channel,$q_name)
{
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', $q_name);
var_dump($data);
// $connection->send('receive success');
};
};
Worker::runAll();
// Workerman-WebSocket客户端
<?php
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/Workerman/Autoloader.php';
$worker = new Worker();
$status = true;
// 进程启动时
$worker->onWorkerStart = function()
{
// 以websocket协议连接远程websocket服务器 ws://IP:端口
$ws_connection = new AsyncTcpConnection("ws://*.*.*.*:8282");
// 连接成功
$ws_connection->onConnect = function($connection)
{
// 调用发送处理
send_msg($connection);
};
// 远程websocket服务器发来消息时
$ws_connection->onMessage = function($connection, $data){
echo "recv: $data\n";
};
// 连接上发生错误时,一般是连接远程websocket服务器失败错误
$ws_connection->onError = function($connection, $code, $msg)
{
echo "error: $msg\n";
};
// 当连接远程websocket服务器的连接断开时
$ws_connection->onClose = function($connection){
echo "connection closed\n";
};
// 设置好以上各种回调后,执行连接操作
$ws_connection->connect();
};
// 定义发送相关操作
function send_msg($connection)
{
// 创建【可发送】状态变量
$connection->Available = true;
$connection->msg = 0;
$do_send = function () use($connection)
{
// 判断当前是否可发送
while($connection->Available)
{
echo $connection->msg.'\n';
$connection->send($connection->msg);
$connection->msg += 1;
}
};
// 发生连接发送缓冲区满事件时设置不可发送消息
$connection->onBufferFull = function($connection)
{
// 标记发送缓冲区满,暂停do_send发送
$connection->Available = false;
};
// 当发送缓冲区数据发送完毕时触发
$connection->onBufferDrain = function($connection)use($do_send)
{
$connection->Available = true;
$do_send();
};
// 执行发送
$do_send();
}
// 执行
Worker::runAll();
之前HTTP方案的测试受制于发送瓶颈太严重,单台请求能力才几百每秒,多服务器请求服务端速率正常倍增。