产品正式上线有一段时间了,用户量越来越多,对产品进行深入的商业分析这件事逐渐被提上了日程。此时,就需要搭建一套系统能够实现对数据的采集,处理和展示。
系统架构
做了一些调研,最后决定用ElasticStack架构,也就是 Kafka+ELK 的架构。架构实现参照文章 【 ES 私房菜 】系统运维数据分析平台架构
系统主要架构如图所示:
具体实现为:
1. Maxwell 链接数据库,读取Mysql数据库中的BinLog文件,并将内容转化为json格式发送给 Kafka。
2. Kafka接收消息并实现转发
3. Logstash 从Kafka 读取转发的消息,并经过一定的数据处理后存入Elastcisearch
4. Elasticsearch按照一定的数据结构存储数据
5. Kibana展示Elastcisearch存储的数据
组件搭建
Maxwell
Maxwell 能够读取MySQL 数据库的 binlogs,并将binlog中的更新以JSON的格式转发给Kafka,Kinesis,RabbitMQ, Google Cloud Pub/Sub 或者Redis 等第三方数据存储或转发工具。
数据库配置
在数据库连接Maxwell之前,需要先对数据库做一些配置。
1. 配置MySQL的binlogs 格式为row
$ vi my.cnf
[mysqld]
server-id=1
log-bin=master
binlog_format=row
2. MySQL数据库新建一个用户 maxwell,并为其配置相关权限
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
Maxwell docker搭建
1. 调试maxwell和MySQL的连接,可将producer定义为输出
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='XXXXXX' --host='mysql_host' --producer=stdout
若连接正常,则一旦MySQL中的binlog文件有更新,maxwell会接收文件更新,并打印输出。
2. Maxwell连接kafka,将mysql消息发送到kafka
搭建好kafka后(后文会介绍如何docker搭建kafka),maxwell可以将producer设置为kafka,此时maxwell能够将MySQL中的数据更新发送给kafka。
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='XXXXXX' --host='mysql_host' --producer=kafka --kafka.bootstrap.servers=kafka_host:9092 --log_level=debug
配置 log_level 的环境变量值为debug可以查看maxwell运行时更加详细的log输出。不配置次环境变量时默认为 info。
若一切正常,kafka 的 maxwell 的topic的consumer能够收到MySQL的binlog数据更新,格式为json。
以上为maxwell 的docker搭建和配置。后文会继续介绍 kafka, elk等的docker搭建。