前言
工作当中我们往往会遇到这样的需求场景:
某张表中不同的统计指标,需要从多个不同的 kafka 数据源当中统计。这种做法,我们很容易想到把多个 kafka source 统计到的指标 union all 后,再分别统计,但是这样会存在一个问题,如果指标非常多的情况下,会有大量的 0 值填充,不仅会重复写更多的代码,还会让代码的可读性降低。
烦人的 0 填充代码
试想我们有两个业务:寄件下单和快递揽收,分别对应了两个 kafka 数据源,这时我们需要统计当天的下单量和快递揽收量,并且入库到同一张表当中,这个需求非常简单,分别统计两个数据源对应的指标,然后 union all 起来再 sum,代码如下:
SET 'parallelism.default' = '1';
CREATE TABLE order_datagen (
order_count INT,
proctime as proctime()
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.order_count.kind'='random',
'fields.order_count.min'='1',
'fields.order_count.max'='5'
);
CREATE TABLE collect_datagen (
collect_count INT,
proctime as proctime()
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.collect_count.kind'='random',
'fields.collect_count.min'='1',
'fields.collect_count.max'='5'
);
CREATE TABLE print (
sta_date STRING,
order_count INT,
collect_count INT
) WITH (
'connector' = 'print'
);
INSERT INTO print
SELECT
sta_date,
sum(order_count),
sum(collect_count)
FROM (
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
order_count,
0 as collect_count
FROM order_datagen
UNION ALL
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
0 as order_count,
collect_count
FROM collect_datagen
)
GROUP BY sta_date;
这段 sql 代码很容易想到,没什么难度,运行也没什么问题。但是它有个非常让人讨厌的“毛病”—— 0填充。也就是其它数据源不能统计的指标,需要用0去填充它,否则 union all 的时候字段对应不上;这个在指标多的情况下,非常让人头疼。那有没有什么办法解决呢?答案当然有,我们接着往下看。
使用 MAP 类型消除 0 填充
在前面的代码中,我们统计 order_count
的时候,需要把 collect_count
默认设置成 0;现在我们只统计 order_count
,再把它放到一个 MAP 字段里,同理,collect_count
也做相同的处理,union all 的时候,我们只对齐 map 字段就可以了,优化后代码如下:
...
INSERT INTO print
SELECT
sta_date,
sum(init_map['order']),
sum(init_map['collect'])
FROM (
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
MAP['order', order_count] AS init_map
FROM order_datagen
UNION ALL
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
MAP['collect', collect_count] AS init_map
FROM collect_datagen
)
GROUP BY sta_date;
上面代码是不是简洁多了,在指标多的时候,非常方便,代码也容易阅读。
今天的文章很短,但非常实用,希望还在被 union all 折磨的小伙伴们,早日脱离苦海~~
PS: 首先想到这个方法也是一个偶然的机会,是我的一个其它组的同事,看了我们的 flink sql 代码后,问我为什么 union all 中有那么多无用的 0 填充,我就问他有没有什么好的办法,他说 flink sql 支持 map 的话,可以用 map 试试;果然试了 map 后,优化效果非常明显,感谢这位小伙伴。_
THE END.