ClickHouse数据导入

一 概述

目前Kafka数据导入ClickHouse的常用方案有两种,一种是通过ClickHouse内置的Kafka表引擎实现,另一种是借助数据流组件,如Logstash。

以下会分别介绍这两种方案。

二 数据导入方案

方案一 Kafka表引擎

方案介绍

Kafka表引擎基于librdkafka库实现与Kafka的通信,但它只充当一个数据管道的角色,负责拉取Kafka中的数据;所以还需要一张物化视图将Kafka引擎表中的数据实时同步到本地MergeTree系列表中。

为了提高性能,接受的消息被分组为 maxinsertblocksize 大小(由kafkamax_block_size参数空值,默认值为65536)的块。如果未在 streamflushinterval_ms 毫秒(默认500 ms)内形成块,则不关心块的完整性,都会将数据刷新到表中。


方案一.png

相关配置参数:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

<kafka>
  <debug>cgrp</debug>
  <auto_offset_reset>smallest</auto_offset_reset>
</kafka>
 
<kafka_logs>
  <retry_backoff_ms>250</retry_backoff_ms>
  <fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>

使用示例

1)部署Kafka

# 部署ZK(略)

# 部署Kafka
docker run -d --name=kafka21 --hostname=node21 --network=host -e KAFKA_ADVERTISED_HOST_NAME=node21 -e KAFKA_ZOOKEEPER_CONNECT=node21:2181,node22:2181,node23:2181 wurstmeister/kafka
 
# 创建Topic
./bin/kafka-topics.sh --zookeeper node21:2181 --create --topic test --partitions 1 --replication-factor 1
 
# 测试
./bin/kafka-console-producer.sh --broker-list node21:9092 --topic test
>{"id":1,"name":"tom"}
>{"id":2,"name":"jerry"}
 
./bin/kafka-console-consumer.sh --bootstrap-server node21:9092 --topic test --from-beginning
{"id":1,"name":"tom"}
{"id":2,"name":"jerry"}

2)创建Kafka引擎表

CREATE TABLE kafka_queue
(
  warehouse_id Int64,
  product_id  Int64,
  product_name String
)
ENGINE = Kafka()
SETTINGS
  kafka_broker_list='node21:9092',
  kafka_topic_list='test',
  kafka_group_name='test',
  kafka_format='JSONEachRow',
  kafka_skip_broken_messages=100

必选参数:

  • kafka_broker_list:brokers列表
  • kafka_topic_list:Topic列表
  • kafka_group_name:消费组名称
  • kafka_format:消息格式

可选参数:

  • kafka_row_delimiter:每个消息体(记录)之间的分隔符,默认为‘‘\0’’
  • kafka_schema:对应Kafka的schema参数
  • kafka_num_consumers:消费者数量(即线程数),默认值为1
  • kafka_skip_broken_messages:数据解析失败时,允许跳过的失败的数据行数,默认值为0
  • kafka_max_block_size:每次可发送的最大数据量,默认值等于max_block_size大小
  • kafka_commit_every_batch:执行kafka commit的频率,默认值为0,即当一整个Block数据块完全写入数据表后才执行commit;如果设置为1,则每写完一个Batch批次的数据就会执行一次commit(一次Block写入操作由多次Batch写入操作组成)

3)创建数据表
使用已有的数据表,以下只给出了分布表的创建语句。

CREATE TABLE warehouse_dist ON CLUSTER cluster_1
(
    warehouse_id Int64,
    product_id Int64,
    product_name String
)
ENGINE = Distributed(cluster_1, default, warehouse_local, warehouse_id)

4)创建物化视图

CREATE MATERIALIZED VIEW kafka_view TO warehouse_dist AS SELECT * FROM kafka_queue

方案二 Logstash

方案介绍

与Elasticsearch写入类似,通过Logstash的ClickHouse插件,订阅Kafka中的数据并写入CH中。其中,ClickHouse插件调用HTTP接口完成数据写入。


方案二.png

使用示例

1)部署Logstash
部署Logstash,并安装ClickHouse插件:

bin/logstash-plugin install logstash-output-clickhouse

2)创建Logstash配置文件

input {
  stdin {
      codec => "json"
  }
}
 
output {
  stdout {
    codec => "json"
  }
  clickhouse {
    http_hosts => ["http://192.168.167.21:8123"]
    table => "warehouse_dist"
    request_tolerance => 1
    flush_size => 1000
    pool_max => 1000
  }
}
 
filter {
  mutate {
    remove_field => [ "@timestamp", "host", "@version" ]
  }
}

相关参数:

  • save_on_failure:发送失败是否保存数据,默认值为true
  • save_dir:发送失败的数据的存储路径,默认为/tmp
  • automatic_retries:连接的重试次数,默认值为1
  • request_tolerance:发送失败(响应码不是200)的重试次数,默认值为5
  • backoff_time:下一次重试连接或发送请求的时间,默认值为3s
  • flush_size:每次批量发送的数据大小,默认值为50
  • idle_flush_time:每次数据批量发送的时间间隔

3)启动Logstash

./bin/logstash -f config/logstash.conf

三 总结

Kafka引擎表和Logstash都是常见的数据导入方式,

  • Logstash可以将Kafka和ClickHouse解耦,与Elasticsearch数据的导入方式保持一直
  • Kafka引擎表是CH官方提供的数据导入方式,可靠性上会更好;Logstash的ClickHouse插件为第三方贡献,且已经一年多没有更新了
  • Kafka引擎表是基于librdkafka实现的,有更丰富的配置参数

参考:《ClickHouse原理解析与应用实践》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容