PHP Swoole之mysql数据库连接池的实现

最近一直在学习研究swoole,发现这个框架真是PHP的神作,swoole + php7 性能可以说是能与java媲美了,但是JAVA在处理高并发时还是比PHP更有优势,最近一直在想PHP是不是能像JAVA一样实现数据库连接池。

一般的fpm PHP应用程序都是使用的数据库短连接,每个php-fpm的请求都会新建一个mysql数据库连接,那么在并发数很高的情况mysql的连接数很快就会用完,从而导致导致MySQL服务器崩溃。即使使用了mysql pconnet 但是又会导致很多无用的sleep连接。如何才能有效的避免这个问题 让PHP也能开发像java一样的高并发应用呢。

连接池是可以有效降低MySQL-Server负载的。原理是 连接池使用一个共享资源的模式,如并发100个请求,实际上并不是每个请求的所有时间都在执行SQL查询。这样100个请求,共享20个MySQL连接就可以满足需求了。当一个请求操作完数据库后,开始进入模板渲染等其它逻辑流程,这时就会释放数据库连接给其他的请求使用。

连接池仅在超大型应用中才有价值。普通的应用采用MySQL长连接方案,每个php-fpm创建一个MySQL连接,每台机器开启100个php-fpm进程。如果有10台机器,每台机器并发的请求为100。实际上只需要创建1000个MySQL连接就能满足需求,数据库的压力并不大。即使有100台机器,硬件配置好的存储服务器依然可以承受。

达到数百或者数千台应用服务器时,MySQL服务器就需要维持十万级的连接。这时数据库的压力就会非常大了。连接池技术就可以派上用场了,可以大大降低数据库连接数。

下面是我基于swoole扩展实现的一个mysql数据库连接池类,其基本原理是使用mysql长连接,使用空闲队列分配数据库连接,当没有空闲资源时生成新的数据库连接,设置一个最大连接数,和一个最小连接数,当超过设置的最大连接数,不予申请新的数据库连接,当sleep的连接数超过最小连接数时,释放多余的mysql连接。并且程序中也实现了mysql长连接的断线重连,下面给出连接池程序源码欢迎同学们吐槽试用,报bug。

<?php
namespace frame\base;

use frame\log\Log;

class MysqlPool
{
    const MAX_CONN = 100;
    const TIME_OUT = 1800;
    const MIN_CONN = 10; //最小连接
    const RECOVERY_TIME_INTERVAL = 30000; //定时回收毫秒

    public static $working_pool; //工作连接池
    public static $free_queue; //空闲连接资源队列
    public static $close_queue; //已关闭连接资源队列
    public static $config;
    public static $timer_start = false;

    /**
     * [init 连接池初始化 支持多个数据库连接池]
     * @param  $connkey 连接关键字,可以实现多个不同的数据库连接
     * @param  $argv [description] 配置参数
     */
    public static function init($connkey, $argv)
    {
        if (empty(self::$config[$connkey]['is_init'])) {
            Log::debug(__METHOD__ . " init start ");
            self::$config[$connkey]['max'] = $argv['max'];
            self::$config[$connkey]['min'] = $argv['min'];
            self::$config[$connkey]['is_init'] = true;
            self::$working_pool[$connkey] = array();
            self::$free_queue[$connkey] = new \SplQueue();
            self::$close_queue[$connkey] = new \SplQueue();
        }

    }

    /**
     * 开启定时任务
     * @param $connkey
     * @param $argv
     */
    public static function start($connkey, $argv)
    {

        if (!self::$timer_start) {
            Log::debug(__METHOD__ . " schedule ");
            //定时更新过期资源
            //self::schedule($connkey, $argv);
            //定时回收数据库连接资源
            self::recovery($connkey, $argv);
            self::$timer_start = true;
        }
    }

    /**
     * 获取连接资源
     * @param $connkey
     * @param $argv
     * @return array
     */
    public static function getResource($connkey, $argv)
    {
        if(empty($argv['max'])) $argv['max'] = self::MAX_CONN;
        if(empty($argv['min'])) $argv['min'] = self::MIN_CONN;
        if(empty($argv['timeout'])) $argv['timeout'] = self::TIME_OUT;

        self::init($connkey, $argv);
        self::start($connkey, $argv);

        if (!self::$free_queue[$connkey]->isEmpty()) {
            //现有资源可处于空闲状态
            $key = self::$free_queue[$connkey]->dequeue();
            Log::debug(__METHOD__ . " free queue  key == $key ", __CLASS__);

            return array(
                'r' => 0,
                'key' => $key,
                'data' => self::update($connkey, $key, $argv),
            );
        } elseif (count(self::$working_pool[$connkey]) < self::$config[$connkey]['max']) {
            Log::debug(__METHOD__ . " below max, current count:" . count(self::$working_pool[$connkey]), __CLASS__);
            if(self::$close_queue[$connkey]->isEmpty()) {
                $key = count(self::$working_pool[$connkey]);
            }
            else {
                $key = self::$close_queue[$connkey]->dequeue();
            }

            //当前池可以再添加资源用于分配
            $resource = self::product($connkey, $argv);
            //product失败
            if(!$resource) {
                Log::info('product resource error:' . $connkey . $key);
                return array('r' => 1);
            }

            self::$working_pool[$connkey][$key] = $resource;

            return array(
                'r' => 0,
                'key' => $key,
                'data' => self::$working_pool[$connkey][$key]['obj'],
            );
        } else {
            Log::error(__METHOD__ . " no resource can apply ", __CLASS__);
            return array('r' => 1);
        }

    }

    /**
     * [freeResource 释放资源]
     * @param $connkey
     * @param $key
     */
    public static function freeResource($connkey, $key)
    {
        Log::debug(__METHOD__ . " key == $key", __CLASS__);
        self::$free_queue[$connkey]->enqueue($key);
        self::$working_pool[$connkey][$key]['status'] = 0;
    }

    /**
     * [schedule 定时调度 释放过期资源]
     * @param $connkey
     * @param $argv
     */
    public static function schedule($connkey, $argv)
    {
        Log::debug(__METHOD__ . ' schedule start:' . $argv['timeout'] . 's');
        swoole_timer_tick($argv['timeout'] * 1000, function() use($argv) {
            Log::debug('schedule timer tick start');
            foreach (self::$working_pool as $connkey => $pool_data) {
                foreach ($pool_data as $key => $data) {
                    //当前连接已超时
                    if($data['status'] != 0 && $data['lifetime'] < microtime(true)) {
                        //释放资源
                        self::freeResource($connkey, $key);
                    }
                }
            }
        });
    }

    /**
     * 定时回收多余空闲连接资源
     * @param string $connkey
     * @param array $argv
     */
    public static function recovery($connkey, $argv)
    {
        Log::debug(__METHOD__ . ' recovery start:' . self::RECOVERY_TIME_INTERVAL);
        swoole_timer_tick(self::RECOVERY_TIME_INTERVAL, function() use($argv) {
            Log::debug('recovery timer tick start');
            foreach (self::$free_queue as $connkey => $queue) {
                if($queue->isEmpty() || $queue->count() <= $argv['min'])
                    continue;

                //空闲资源超过最小连接,关闭多余的数据库连接
                for($i = $argv['min']; $i < $queue->count();) {
                    $key = $queue->dequeue();
                    //关闭数据库连接
                    self::$working_pool[$connkey][$key]['obj']->close();
                    self::$close_queue[$connkey]->enqueue($key);
                    unset(self::$working_pool[$connkey][$key]);
                    Log::debug(__METHOD__ . ' key' . $key . ' queue count:' . $queue->count() . ' connect number:' . count(self::$working_pool[$connkey]));
                }
            }
        });
    }

    /**
     * [product 生产资源]
     * @param $connkey
     * @param $argv
     * @return mixed
     */
    private static function product($connkey, $argv)
    {
        //防止并发出现已超过连接数
        if(count(self::$working_pool[$connkey]) >= self::$config[$connkey]['max'])
            return false;

        $resource = $argv['db']->connect($argv['config']);
        if(!$resource) return false;
        return array(
            'obj' => $resource,  //实例
            'lifetime' => microtime(true) + ((float) $argv['timeout']),   //生命期
            'status' => 1, //状态 1 在用 0 空闲
        );
    }

    /**
     * [update 更新资源]
     * @param string $connkey
     * @param string $key
     * @param array $argv
     * @return array
     */
    private static function update($connkey, $key, $argv)
    {
        self::$working_pool[$connkey][$key]['status'] = 1;
        self::$working_pool[$connkey][$key]['lifetime'] = microtime(true) + ((float) $argv['timeout']);
        return self::$working_pool[$connkey][$key]['obj'];
    }

    /**
     * 更新数据库连接
     * @param string $connkey
     * @param string $key
     * @param array $argv
     * @return array
     */
    public static function updateConnect($connkey, $key, $argv)
    {
        //更新资源
        $argv['db']->close();
        $resource = $argv['db']->connect($argv['config']);
        self::$working_pool[$connkey][$key]['obj'] = $resource;
        self::$working_pool[$connkey][$key]['lifetime'] = microtime(true) + ((float) $argv['timeout']);
        Log::info('更新working pool key:' . $connkey . $key);
        return $resource;
    }
}

更完整的框架源码请查阅我的github项目地址 https://github.com/jack15083/sample-swoole

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • 更改ip和dnsVi /etc/sysconfig/network-scripts/ifcfg-eth0vi /...
    Xwei_阅读 1,798评论 0 3
  • 1. Nginx的模块与工作原理 Nginx由内核和模块组成,其中,内核的设计非常微小和简洁,完成的工作也非常简单...
    rosekissyou阅读 10,184评论 5 124
  • 1.LAMP介绍  LAM(M)P:L: linuxA: apache (httpd)M: mysql, mar...
    尛尛大尹阅读 1,037评论 0 1
  • 当我编写一个类是,其实就是在描述对象的属性和行为,而并没有产生实质上的对象,只有通过new关键字才会产生出对象,这...
    大晴天小阳光阅读 242评论 0 0
  • 精神病级别级别的梦。比利维坦口味更重……身体深层的恐惧 不断的去约会的梦。乳头在感受男人和婴儿之间的转化。从女人到...
    白鲲夏猫阅读 339评论 0 0