发现问题
最近接手新项目,使用Maxwell
解析MySQL
的Binlog,发送到Kafka
进行处理。测试的时候发现一个问题,就是Kafka
的数据严重倾斜,8个partition,全部写到同一个partition中,另外7个partition没有任何meaasge。
Kafka数据倾斜的问题一般是由于生产者使用的Partition
接口实现类对分区处理的问题,一般是对key做hash之后,对分区数取模。当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势(参考Apache Kafka 0.10 技术内幕:数据倾斜详解)。
而使用Maxwell
解析MySQL
的Binlog发送到Kafka
的时候,生产者是Maxwell
,那么数据倾斜的问题明细就是Maxwell
引起的了。
排查
在Maxwell
官网查文档得知,在Maxwell
没有配置的情况下,默认使用数据库名database作为计算分区的key,并使用Java默认的hashcode算法进行计算,项目中maxwell
的binlog就是单个database,所以造成数据倾斜。
官方文档
A binlog event's partition is determined by the selected hash function and hash string as follows
HASH_FUNCTION(HASH_STRING) % TOPIC.NUMBER_OF_PARTITIONS
The HASH_FUNCTION is either java's *hashCode* or *murmurhash3*. The default HASH_FUNCTION is *hashCode*. Murmurhash3 may be set with the
kafka_partition_hashoption. The seed value for the murmurhash function is hardcoded to 25342 in the MaxwellKafkaPartitioner class.
The HASH_STRING may be (*database*, *table*, *primary_key*, *column*). The default HASH_STRING is the *database*. The partitioning field can be configured using the
producer_partition_byoption.
Maxwell will discover the number of partitions in its kafka topic upon boot. This means that you should pre-create your kafka topics, and with at least as many partitions as you have logical databases:
bin/kafka-topics.sh --zookeeper ZK_HOST:2181 --create \
--topic maxwell --partitions 20 --replication-factor 2
出处文档
http://maxwells-daemon.io/producers/#kafka-partitioning。
修改Maxwell
的配置文件config.properties中加入对应参数即可,这里我选择了primary_key作为分区key,同时选用murmurhash3
哈希算法,以获得更好的效率和分布:
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=server1:port1
kafka_topic=topic_name
#######修改partition_by,解决kafka数据倾斜######
kafka_partition_hash=murmur3
producer_partition_by=primary_key
# mysql login info
host=******
user=maxwell
password=*****
修改配置后,重启Maxwell
,观察Offset的变化,隔一段时间之后,各partition的Offset的增量基本一致,问题解决!