一、根目录:需要配置四项,其中context可选
{"type":"index_hadoop",
"spec"{},
"context":{},}
(1)type:表示上传的方式,本地文件上传使用index,hdfs文件上传使用index_hadoop。
(2)context(可选):用于配置一些运行参数,比如可以设置上传csv时候是否包含表头行
(3)dataSource:数据源名称,用于设置上传数据之后的表名称。
(4)spec:用于设置数据的具体配置以及转换方式,重点介绍
二、spec目录
包含了三个字段:
1、dataSchema:用于指定上传数据的格式
2、ioConfig:用于指定数据的来源以及存储位置
3、tuningConfig(可选):用于指定如何协调各种不同的参数
1、dataSchema
(1)dataSource: 数据源名称,用于设置上传数据之后的表名称。
(2)parser:用于指定数据怎么被转化,转化为什么格式
(3)granularitySpec:指定如何划分segment以及数据的时间范围
(4)metricsSpec:包含了一系列的aggregators转换
(1)“dataSource”=“tableName”
(2)parser
(2.1)type:指定常规数据的格式,默认为string,如果保存hdfs上那么指定为hadoopyString,注意hadoop后边有个y;
(2.2)parseSpec:用于指定数据转换格式
(2.1)type:默认为string,如果保存hdfs上那么指定为hadoopString;
(2.2)parseSpec包含以下格式
(2.2.1)format:指定上传文件格式
(2.2.2)timestampSpec:指定时间戳序列
(2.2.3)columns(可选):csv格式特有,用于指定源数据中的列名
(2.2.4)dimensionsSpec:配置维度数据,也就是将要在druid数据表中展现的列
(2.2.1)format:可以为csv,json,tsv,javascript、timeAndDims等
(2.2.2)timestampSpec:包含column以及format两个参数,column必选,用于指定时间戳对应的列。format用于指定时间格式,可以使用iso、millis、posix、zuto,默认为auto。
"timestampSpec":{"format":"auto","column":"start_time"}
(2.2.3)columns:用于配置源文件中应该包含的所有列的列名
"columns":["columns","column2","column3"]
(2.2.4)dimensionsSpec用于指定维度列。dimensions用于指定所有的列;dimensionExclusion可选,用于指定不需要的数据,默认为空;spatialDimensions可选,用于指定列的空间限制,默认为空。
"dimensionsSpec":{
"dimensions":["page","language",
{"type":"long","name":"countryNum" } ] }
如果字段为String,那么直接列出,如果为其他类型如long/double等,需要使用{"type":"long","name":"字段名称" }单独列出,注意:配置时间戳的列不需要在dimensions中列出。
(3)granularitySpec包含如下五个维度:
(3.1)type:用来指定粒度类型的使用,默认为type=uniform,建议设置为uniform或arbitrary(尝试创建大小相等的字段)
(3.2)segmentGranularity(可选):指定每个segment包含的时间戳的范围
(3.3)queryGranularity(可选):允许查询的时间粒度
(3.4)rollup(可选):是否使用预计算算法
(3.5)intervals:用于指定上传时间限制时间段
(3.1)type:默认为type=uniform,建议设置为uniform或arbitrary(尝试创建大小相等的字段)
(3.2)segmentGranularity:默认为day,用来确定每个segment包含的时间戳的范围,可以为 "SECOND" ... "MINUTE" ... "HOUR" ... "DAY" ... "DOW" ... "DOY" ... "WEEK" ... "MONTH" ... "QUARTER" ... "YEAR" ... "EPOCH" ... "DECADE" ... "CENTURY" ... "MILLENNIUM" ... 等。
(3.3)queryGranularity:默认为None,允许查询的时间粒度,单位与segmentGranularity相同,如果为None那么允许以任意时间粒度进行查询。
(3.4)rollup:是否使用预计算算法,默认为true,推荐true,比较快。
(3.5)intervals:使用时间段来将数据源进行限制。只有时间段内的数据可以上传。
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : { "type" : "none"},
"rollup" : "true",
"intervals" : [ "2017-11-15T00:00:00.000Z/2017-11-18T00:00:00.000Z" ]}}
(4)metricsSpec:包含了一系列的aggregators转换
"metricsSpec":[{"type":"count","name":"count"},{"type":"doubleSum","name":"added","fieldName":"added"},{"type":"doubleSum","name":"deleted","fieldName":"deleted"},{"type":"doubleSum","name":"delta","fieldName":"delta"}]
type可以为:count、longSum、doubleSum、doubleMin\doubleMax、longMin\longMax、doubleFirst\doubleLast、longFirst\longLast
除count外其他都需要指定name和fieldName两个参数,name表示最后输出的,也就是在表中体现的名称,而fieldName则代表源数据中的列名。
详细用法:metricsSpec-Aggregations官网详细解释
3、ioConfig:用于指定数据的来源以及存储位置
(1)type:用于指定源数据获取方式
(2)inputSpec:指定源数据路径
(1)type:保存为本地使用“local”,保存为hdfs使用“hadoop”
(2)inputSpec:包含type,以及paths路径
(2.1)
"ioConfig" : {"type" : "hadoop",
"inputSpec" : {"type" : "static",
"paths" : "hdfs://master:9000/user/root/.."}
支持批量上传数据,路径指定到文件夹即可。
(2.2)当设置type为“granularity”时候,则需要根据时间戳使用路径格式将数据导入目录中。
"ioConfig" : {"type" : "hadoop",
"inputSpec" : {"type" : "granularity",
.."}
4、tuningConfig(可选):用于指定如何协调各种不同的参数
(1)type:指定数据存放方式
(2)paritionSpec:用于指定数据的segment的分区方式以及大小,默认为hashed
(3)jobProperties:在添加mapreduce作业时候的一些配置,key:value表示
(4)workingPath:用于指示数据中间落地的路径(mapreduce中间结果),默认为'/tmp/druid-indexing'
(5)version:创建更加详细的版本,这将忽略hadoopindextask,除非将useExplicitVersion设置为true,默认为日期时间的索引为开始
(6)maxRowsInMemory:指定聚合之后的数据行数,默认75000
(7)leaveInermediate:作业完成后是否留下workingPath的中间文件,默认false。
(8)cleanupOnFailure:作业失败时是否清除中间文件,只有在leavelnermediate为true时候生效默认true。
(9)overwriteFiles:在执行index导入数据时候是否覆盖已经存在的文件。默认false
(10)ignoreInvalidRows:忽略发现有问题的行,默认false
(11)combineText:是否在CombineTextInputFormat阶段将多个文件合并到一个文件split中,这可以在处理大量小文件时候加速hadoop作业。
(12)useCombiner:如果可能的话,是否在mapper阶段中合并行,默认false
(13)indexSpec:调整数据的索引方式
(14)buildV9Directly:是否直接构建V9索引,而不是先构建V8 index再转换为V9 index。
(15)numBackgroundPersistThreads:是否增加后台持久化线程的数量,这会导致内存以及cpu的负荷增加,提高效率,默认为0(使用当前的持久化线程),建议用1
(16)forceExtendableShardSpecs:强制使用可扩展的ShardSpecs,该功能可以与kafka index扩展服务一起使用,默认false
(17)useExplicitVersion:是否强制使用HasoopIndexTask版本,默认false
(1)type:hadoop
(2)partitionSpec
Segment会给予时间戳进行分区,并根据其他类型进一步分区,druid支持hashed(基于每行所有维度的哈希值)和dimension(基于单个维度的范围)来进行分区。为了让druid在重查询负载下运行良好,对于段文件大小的推荐在300Mb到700mb之间,可以使用partitionsSpec来调整大小。
(2.1)hashed分区
hashed分区首先会选择多个Segment,然后根据每行数据所有列的哈希值对这些Segment进行分区,Segment的数量是输入数据集的基数以及目标分区大小自动确定的。
type:要分区的类型,hashed
targetPartitionSize:包含在分区中的目标行数,应该在500M-1G
maxPartitionSize:分区中包括的最大行数,默认为比targetPartitionSize大50%。
numShards:直接指定分区的数量而不是分区的大小,可以跳过自动选择多个分区的必要步骤,录入数据将会更快。此项与targetPartitionSize只需要填入一项即可。
partitionDemensions(可选),只能配合numShards一起使用,指定需要分区的维度,为空则选择所有维度列,当targetPartitionSize设置时候将自动忽略。
"partitionsSpec": {"type":"hashed","targetPartitionSize":5000000,"maxPartitionSize":7500000,"assumeGrouped":false,"numShards": -1,"partitionDimensions": [ ]}
(2.2)Only-dimension单维度分区
选择作为分区指标的维度列,然后将该维度分隔成连续的不同的分区,每个分区都会包含该维度值在该范围内的所有行。默认情况下使用的维度都是自动指定的。
type:要分区的类型,dimension
targetPartitionSize(必须):包含在分区中的目标行数,应该在500M-1G
maxPartitionSize(可选):分区中包括的最大行数,默认为比targetPartitionSize大50%。
partitionDemension(可选),要分区的维度,为空时自动选择
assumeGrouped(可选):如果数据源已经按照时间和维度分组了,该选项将会提高加载数据的速度,但是如果没有那么会选择次优分区。
(3)jobProperties:我们的集群因为一些问题需要对虚拟机进行一些调参
"tuningConfig" : {"type" : "hadoop",
"partitionsSpec" : {"type" : "hashed","targetPartitionSize" : 5000000},
"jobProperties" : {"mapreduce.map.log.level" : "DEBUG","mapreduce.reduce.log.level" : "DEBUG","mapreduce.reduce.java.opts" : "-Xmx1024m","mapreduce.map.java.opts" : "-Xmx1024m"}}}
(13)IndexSpec:
三、示例
curl -X 'POST' -H 'Content-Type:application/json' -d @index.json localhost:8090/druid/indexer/v1/task
其中的index.json便是需要配置的index文件
本地导入csv格式数据的 task文件示例
看不明白的稍微加工一下:使用json在线解析层次更加清晰
{"type":"index","spec":{"dataSchema":{"dataSource":"wikipedia","parser":{"type":"string","parseSpec":{"format":"csv","timestampSpec":{"column":"timestamp"},"columns":["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],"dimensionsSpec":{"dimensions":["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]}}},"metricsSpec":[{"type":"count","name":"count"},{"type":"doubleSum","name":"added","fieldName":"added"},{"type":"doubleSum","name":"deleted","fieldName":"deleted"},{"type":"doubleSum","name":"delta","fieldName":"delta"}],"granularitySpec":{"type":"uniform","segmentGranularity":"DAY","queryGranularity":"NONE","intervals":["2013-08-31/2013-09-01"]}},"ioConfig":{"type":"index","firehose":{"type":"local","baseDir":"./","filter":"wikipedia_data.csv"}},"tuningConfig":{"type":"index","targetPartitionSize":0,"rowFlushBoundary":0}}}
导入hdfs中的csv格式文件
{"type":"index","spec":{"dataSchema":{"dataSource":"wikipedia","parser":{"type":"string","parseSpec":{"format":"csv","timestampSpec":{"column":"timestamp"},"columns":["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],"dimensionsSpec":{"dimensions":["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]}}},"metricsSpec":[{"type":"count","name":"count"},{"type":"doubleSum","name":"added","fieldName":"added"},{"type":"doubleSum","name":"deleted","fieldName":"deleted"},{"type":"doubleSum","name":"delta","fieldName":"delta"}],"granularitySpec":{"type":"uniform","segmentGranularity":"DAY","queryGranularity":"NONE","intervals":["2013-08-31/2013-09-01"]}},"ioConfig":{"type":"hadoop","inputSpec":{"type":"static","paths":"hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.csv"}},"tuningConfig":{"type":"hadoop"}}}
官网传送门: