MySQL与Redis的数据同步方案

之前的文章有讲过MySQL到Elasticsearch的多种数据同步方案(多种MySQL与Elasticsearch的数据同步解决方案),今天再来讲下MySQL到Redis的几种数据同步方案。

第一种方案:使用 canal 工具

在之前MySQL同步ES的文章中有简单提过这款工具,但是因为没有用过所以没有详细讲,今天就使用这款工具来实现MySQL到Redis的数据同步(同理,同步到ES、MySQL等也是一样的操作)

canal 是阿里巴巴开源的一款提供增量数据订阅和消费的工具,应用场景有:数据库镜像、数据库实时备份、索引构建和实时维护、业务 cache 刷新、带业务逻辑的增量数据处理等。

原理就与MySQL主从复制相似,canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议,从binlog日志中获取数据。

  1. MySQL配置

    MySQL需要开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    

    授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
  2. 下载 canal

    下载地址,选择自己需要的版本:https://github.com/alibaba/canal/releases

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
    mkdir /usr/local/canal
    tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal/
    rm -rf canal.deployer-1.1.5.tar.gz
    cd /usr/local/canal/
    
  3. 修改配置

    vi conf/example/instance.properties

    ## mysql serverId 不能与mysql的server-id一致
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*
    
  4. 安装java的JDK

    # 查看可安装jdk版本
    yum search java | grep -i --color JDK
    # 选择某一版本进行安装
    yum install java-1.8.0-openjdk.x86_64
    # 安装完成后确认JDK安装完毕,如果输出了版本号,证明安装正确
    java -version
    

    PS:这里有个坑,要安装 JDK8,我用的 JDK11的环境,发现启动不了canal,报错 Error: Could not create the Java Virtual Machine. ,切换成 JDK8 就好了

  5. 启动

    sh bin/startup.sh
    
  6. 检查

    查看 server 日志 和 instance 的日志,有正确的内容输出证明启动成功

    vi logs/canal/canal.log
    vi logs/example/example.log
    

    或者使用 ps -ef | grep canal 查看canal进程

  7. 关闭命令

    sh bin/stop.sh
    
  8. 安装 canal-php

    canal 提供了多语言的客户端,可采用不同语言实现不同的消费逻辑,我用的PHP客户端,它的详细介绍以及其他语言客户端看文档:https://github.com/alibaba/canal/wiki#%E5%A4%9A%E8%AF%AD%E8%A8%80

    composer require xingwenge/canal_php
    

    我这通过创建一个命令来测试

    php artisan make:command canal
    

    内容

    <?php
    
    namespace App\Console\Commands;
    
    use App\Services\CanalToRedisService;
    use Illuminate\Console\Command;
    use xingwenge\canal_php\CanalClient;
    use xingwenge\canal_php\CanalConnectorFactory;
    use xingwenge\canal_php\Fmt;
    
    class canal extends Command
    {
        /**
         * The name and signature of the console command.
         *
         * @var string
         */
        protected $signature = 'canal';
    
        /**
         * The console command description.
         *
         * @var string
         */
        protected $description = 'canal客户端';
    
        /**
         * Create a new command instance.
         *
         * @return void
         */
        public function __construct()
        {
            parent::__construct();
        }
    
        /**
         *
         */
        public function handle()
        {
            try {
                // 创建客户端,默认使用 socket 来通信
                $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
                // 这个是使用 swoole
                //$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
    
                $client->connect("127.0.0.1", 11111);
                $client->checkValid();
                // $client->subscribe("1001", "example", ".*\\..*");
                // 设置过滤,指定要同步的表,上边那种方式是不限制
                $client->subscribe("1001", "example", "lmrs.lmrs_shops");
    
                while (true) {
                    $message = $client->get(100);
                    if ($entries = $message->getEntries()) {
                        foreach ($entries as $entry) {
                            // Fmt::println($entry);
                            // 在这里进行具体业务的逻辑处理,比如同步数据到 redis,es,mysql等
                            CanalToRedisService::println($entry);
                        }
                    }
                    sleep(1);
                }
    
                $client->disConnect();
            } catch (\Exception $e) {
                echo $e->getMessage(), PHP_EOL;
            }
        }
    }
    

    这里我创建了个service来处理同步到Redis的逻辑,内容如下

    <?php
    
    namespace App\Services;
    
    
    use Com\Alibaba\Otter\Canal\Protocol\Column;
    use Com\Alibaba\Otter\Canal\Protocol\Entry;
    use Com\Alibaba\Otter\Canal\Protocol\EntryType;
    use Com\Alibaba\Otter\Canal\Protocol\EventType;
    use Com\Alibaba\Otter\Canal\Protocol\RowChange;
    use Com\Alibaba\Otter\Canal\Protocol\RowData;
    
    class CanalToRedisService
    {
        /**
         * @param Entry $entry
         * @throws \Exception
         */
        public static function println($entry)
        {
            switch ($entry->getEntryType()) {
                case EntryType::TRANSACTIONBEGIN:
                case EntryType::TRANSACTIONEND:
                    return;
                    break;
            }
    
            $rowChange = new RowChange();
            $rowChange->mergeFromString($entry->getStoreValue());
            $evenType = $rowChange->getEventType();
            $header = $entry->getHeader();
            $table = $header->getSchemaName().'_'.$header->getTableName();
    
            /** @var RowData $rowData */
            foreach ($rowChange->getRowDatas() as $rowData) {
                switch ($evenType) {
                    case EventType::DELETE: // 删除
                        self::delete($table, self::ptColumn($rowData->getBeforeColumns()));
                        break;
                    case EventType::INSERT: // 新增
                        self::insert($table, self::ptColumn($rowData->getAfterColumns()));
                        break;
                    default: // 更新
                        self::update($table, self::ptColumn($rowData->getBeforeColumns()), self::ptColumn($rowData->getAfterColumns()));
                        break;
                }
            }
        }
    
        /**
         * 将数据表的字段名和值组装成数组
         * @param $columns
         * @return array
         */
        private static function ptColumn($columns) {
            $argv = [];
            foreach ($columns as $value) {
                $argv[$value->getName()] = $value->getValue();
            }
         // dump($argv);
            return $argv;
        }
    
        /**
         * 新增操作
         * 可以根据表名进行判断具体的业务操作
         * @param string $table 数据表名
         * @param array $data 数据
         */
        private static function insert($table, $data)
        {
            app('redis')->set("shop::".$data['id'], serialize($data));
        }
    
        /**
         * 删除操作
         * 业务处理很简单,这里就不写了,自己完善
        *  @param string $table 数据表名
         * @param array $data 数据
         */
        private static function delete($table, $data)
        {
            //
        }
    
        /**
         * 更新操作
         * 业务处理很简单,这里就不写了,自己完善
         * @param string $table 数据表名
         * @param array $befor_data 更改前的数据
         * @param array $after_data 更改后的数据
         */
        private static function update($table, $befor_data, $after_data)
        {
         //
        }
    }
    
  9. 测试

    可以在service里多dump一些参数,运行 php artisan canal 查看输出,新增MySQL数据,查看Redis是否有变化

  10. canal 还可以结合消息中间件来实现更高效的数据同步,比如:Kafka/RocketMQ 。使用文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

第二种方案:使用 RabbitMQ 消息队列

RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。

RabbitMQ 的核心概念:

  • 生产者(Producer):发送消息的应用
  • 消费者(Consumer):接收消息的应用
  • 队列(Queue):存储消息的缓存
  • 消息(Message):由生产者通过RabbitMQ发送给消费者的信息
  • 连接(Connection):连接RabbitMQ和应用服务器的TCP连接
  • 通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的
  • 交换机(Exchange):交换机负责从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须到绑定一个交换机
  • 绑定(Binding):绑定是队列和交换机的一个关联连接
  • 路由键(Routing Key):路由键是供交换机查看并根据键来决定如何分发消息到列队的一个键。路由键可以说是消息的目的地址

RabbitMQ 的工作模式:

  • 简单队列
  • 工作队列
  • 发布订阅模式
  • 路由模式
  • 主题模式

理论介绍完毕,接下来进入实操

  1. 安装 RabbitMQ

    手动编译安装 RabbitMQ 很麻烦,还得先安装 erlang 环境,所以这里我就直接使用docker安装了。附上erlang和RabbitMQ的下载地址,之后有时间再去尝试手动安装

    erlang:https://www.erlang.org/downloads

    RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/

    拉取docker镜像

    docker pull rabbitmq
    

    构建容器

    docker run -d -p 5672:5672 -p 15672:15672 --hostname my-rabbit -v /docker/rabbitmq:/var/lib/rabbitmq --privileged=true --name rabbitmq rabbitmq
    
  2. 进入RabbitMQ 容器安装可视化界面:rabbitmq_management

    docker exec -it rabbitmq bash
    
    rabbitmq-plugins enable rabbitmq_management
    

    在浏览器访问 ip:15672 打开可视化界面,账号密码默认都是:guest

  3. 安装扩展

    PHP调用RabbitMQ需要amqp的扩展,下载地址:https://pecl.php.net/package/amqp

    wget https://pecl.php.net/get/amqp-1.10.2.tgz
    tar -zxvf amqp-1.10.2.tgz
    cd amqp-1.10.2
    phpize
    ./configure --with-php-config=/usr/local/bin/php-config
    

    到这里报了一个错,configure: error: librabbitmq not found 意思是还缺少个rabbitmq-c

    接着下载,地址:https://github.com/alanxz/rabbitmq-c/releases

    wget https://github.com/alanxz/rabbitmq-c/archive/refs/tags/v0.11.0.tar.gz
    tar -zxvf v0.11.0.tar.gz
    cd rabbitmq-c-0.11.0/
    yum -y install cmake
    cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c
    make && make install
    

    重新编译amqp

    cd amqp-1.10.2
    ./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c
    make && make install
    

    在 php.ini 中加入 extension=amqp.so ,然后重启php

  4. rabbimq在laravel中使用

    安装组件

    composer require vladimir-yuldashev/laravel-queue-rabbitmq "10.X" --ignore-platform-reqs
    

    在 config/queue.php文件的 connections 中加入配置

    'rabbitmq' => [
    
       'driver' => 'rabbitmq',
       'queue' => env('RABBITMQ_QUEUE', 'default'),
       'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
    
       'hosts' => [
           [
               'host' => env('RABBITMQ_HOST', '127.0.0.1'),
               'port' => env('RABBITMQ_PORT', 5672),
               'user' => env('RABBITMQ_USER', 'guest'),
               'password' => env('RABBITMQ_PASSWORD', 'guest'),
               'vhost' => env('RABBITMQ_VHOST', '/'),
           ],
       ],
    
       'options' => [
           'ssl_options' => [
               'cafile' => env('RABBITMQ_SSL_CAFILE', null),
               'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
               'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
               'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
               'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
           ],
           'queue' => [
               'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
           ],
       ],
    
       /*
        * Set to "horizon" if you wish to use Laravel Horizon.
        */
       'worker' => env('RABBITMQ_WORKER', 'default'),
        
    ],
    

    在 .env 文件中加入配置

    # 将默认的 sync 改为 rabbitmq                            
    QUEUE_CONNECTION=rabbitmq
    # mq的ip地址      
    RABBITMQ_HOST=172.17.0.10
    # mq的端口 
    RABBITMQ_PORT=5672
    # mq的账号 
    RABBITMQ_USER=guest
    # mq的密码
    RABBITMQ_PASSWORD=guest
    # 默认的虚拟主机 
    RABBITMQ_VHOST=my_vhost
    # 默认队列名称 
    RABBITMQ_QUEUE=lmrs
    
  5. 创建 service

    <?php
    /**
     * Created by PhpStorm
     * User: Ricky Wong
     * Date: 2021/8/5
     * Time: 0:49
     */
    
    namespace App\Services;
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    class RabbitmqService
    {
        public static function getConnect()
        {
            //RABBITMQ 配置项
            $config = [
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
                'user' => env('RABBITMQ_USER', 'guest'),
                'password' => env('RABBITMQ_PASSWORD', 'guest'),
                'vhost' => env('RABBITMQ_VHOST', '/'),
            ];
            return new AMQPStreamConnection($config["host"],$config["port"],$config["user"],$config["password"],$config["vhost"]);
        }
    
        /**
         * 生产者
         * @param $queue
         * @param $messageBody
         * @param string $exchange
         */
        public static function push($queue,$messageBody,$exchange='router')
        {
            //获取连接
            $connection = self::getConnect();
            //构建通道
            $channel = $connection->channel();
            //声明一个队列
            $channel->queue_declare($queue,false,true,false,false);
            //指定交换机 以路由模式
            $channel->exchange_declare($exchange,'direct',false,true,false);
            //绑定队列和类型
            $channel->queue_bind($queue,$exchange);
            $message = new AMQPMessage($messageBody,array('content_type' => 'text/plain',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
            //消息推送
            $channel->basic_publish($message,$exchange);
            $channel->close();
            $connection->close();
        }
    
        /**
         * 消费者
         * @param $queue
         * @param $callback
         * @param string $exchange
         */
        public static function pop($queue,$callback,$exchange='router')
        {
            $connection = self::getConnect();
            $channel = $connection->channel();
            //从队列中取出消息
            $message = $channel->basic_get($queue);
            $res = $callback($message->getBody());
            if ($res){
                //ack 验证
                $channel->basic_ack($message->getDeliveryTag());
            }
            $channel->close();
            $connection->close();
        }
    }
    
  6. 创建异步任务

    php artisan make:job SyncToRedis
    

    编辑内容

    <?php
    
    namespace App\Jobs;
    
    use Illuminate\Bus\Queueable;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Foundation\Bus\Dispatchable;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Queue\SerializesModels;
    use App\Services\RabbitmqService;
    
    class SyncToRedis implements ShouldQueue
    {
        use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    
        protected $key;
    
        /**
         * Create a new job instance.
         *
         * @param $data
         */
        public function __construct($data)
        {
            $this->key = 'lmrs::product::info::'.$data->id;
            //写入队列
            RabbitmqService::push('update_queue', $data);
        }
    
        /**
         * Execute the job.
         *
         * @return void
         */
        public function handle()
        {
            // 消费消息
            RabbitmqService::pop('update_queue', function ($message) {
                $product = app('redis')->set($this->key, serialize($message));
                if (!$product){
                    return;
                }
    
                return true;
            });
        }
    
        /**
         * 异常处理
         * @param \Exception $exception
         */
        public function failed(\Exception $exception)
        {
            print_r($exception->getMessage());
        }
    }
    
  7. 在需要同步的地方触发任务

    dispatch(new SyncToRedis(Product::find($request->input("id"))));
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容