Data Enrichment
在流式处理作业(特别是实时数仓ETL作业)中,我们的数据流可以视为无界事实表,其中往往缺乏一些维度信息。例如,对于埋点日志流而言,为了减少传输冗余,可能只会带有城市ID、商品ID等,如果要映射到对应的名称,就需要与外部存储中的维度表进行关联。这里的外部存储一般是指适合OLTP场景的数据库,如MySQL、Redis、HBase等。
英文语境里习惯将上述操作称为data enrichment。下图展示出了Trackunit公司的实时IoT处理架构,比较有代表性。注意图中的"Enrich"字样。
实时关联维度数据的思路主要有如下4种。
全量预加载+定时刷新:适用于规模较小的缓慢变化维度(SCD),思路最简单,可以参见笔者之前写的示例。
实时查询+缓存刷新:适用于规模较大的缓慢变化维度(SCD),在数仓维度建模过程中,这种维度最为常见,本文接下来会详细叙述其实现方式。
纯实时查询:适用于快速变化维度(RCD),或者对关联时效性要求极高的场合,需特别注意频繁请求对外部存储的压力。
流式化维度:比较特殊且灵活,将维度表的change log转化为流,从而把静态表的关联转化为双流join。从change log解析出的维度数据可以写入状态存储,起到缓存的作用。之后再提。
上述4种思路并没有绝对的好坏之分,而是需要根据业务特点和需求来取舍。
下面介绍用Flink异步I/O、Vert.x JDBC Client和Guava Cache实现的实时查询+缓存刷新方案。
Flink Async I/O
Flink的异步I/O专门用来解决Flink计算过程中与外部系统的交互问题。在默认情况下,算子向外部系统发出请求后即阻塞,等待结果返回才能发送下一个请求,可能会造成较大的延迟,吞吐量下降。有了异步I/O之后,就可以并发地发出请求和接收响应,延迟大大降低。下图来自官方文档,一看便知。
关于它的细节,看官可以参考之前的《聊聊Flink异步I/O机制的原理》一文,不再废话。
Vert.x JDBC Client
Vert.x是一个由Eclipse基金会开源的跨语言、事件驱动的异步应用程序框架,运行在JVM平台上,底层依赖于Netty。Vert.x的异步应用场景极为广泛,如Web、数据库访问、响应式编程、微服务、MQTT、认证与鉴权、消息队列、事件总线等等,详情可以参见官方文档。
本文采用的维度表数据源是MySQL,而Java原生的JDBC机制是同步的,要与Flink异步I/O一同使用的话,按传统方式需要自己创建连接池、线程池并实现异步化。我们引入Vert.x JDBC Client模块来简化之,先加入依赖项。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.8.5</version>
</dependency>
通过VertxOptions指定事件循环线程池和工作线程池的大小,然后指定JDBC连接的各项参数(注意c3p0的连接池大小max_pool_size
),并创建异步的SQL客户端实例。
Properties dbProps = ParameterUtil.getFromResourceFile("mysql.properties");
Vertx vertx = Vertx.vertx(
new VertxOptions()
.setWorkerPoolSize(10)
.setEventLoopPoolSize(5)
);
JsonObject config = new JsonObject()
.put("url", dbProps.getProperty("mysql.sht.url"))
.put("driver_class", "com.mysql.jdbc.Driver")
.put("max_pool_size", 20)
.put("user", dbProps.getProperty("mysql.sht.user"))
.put("password", dbProps.getProperty("mysql.sht.pass"));
SQLClient sqlClient = JDBCClient.createShared(vertx, config);
按如下的异步风格获取连接、执行查询、处理查询结果,并关闭连接。借助Lambda表达式,可以将回调写得相对优雅一些。
sqlClient.getConnection(connResult -> {
if (connResult.failed()) {
LOGGER.error("Cannot get MySQL connection via Vertx JDBC client ", connResult.cause());
return;
}
SQLConnection conn = connResult.result();
String sql = "/* SQL statement here */";
conn.query(sql, queryResult -> {
if (queryResult.failed()) {
LOGGER.error("Error executing SQL query: {}", sql, queryResult.cause());
return;
}
ResultSet resultSet = queryResult.result();
for (JsonObject row : resultSet.getRows()) {
// handle result here...
}
conn.close();
});
});
千万别忘记在处理结束后调用SQLConnection.close()方法,否则连接池会被很快耗尽。
Guava Cache
显而易见,data enrichment过程中对维度数据的访问是非常频繁的,并且维度表往往也比较大,全量加载的成本可能不低。为了避免对维度数据库造成压力,并且同时加快关联的速度,在维度不太经常变动、对精确度要求不很高的情况下,就可以用缓存暂时将一部分维度数据保留在内存中,并设定合理的过期策略。缓存是典型的空间换时间思想的体现。
Google Guava专门提供了集中式、线程安全的Cache组件满足这类需求,我们可以将它近似理解成带有缓存特性的ConcurrentMap。按以下方法创建一个维度缓存。
Cache<String, String> dimCache = CacheBuilder.newBuilder()
.initialCapacity(10_000)
.maximumSize(20_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
initialCapacity()方法和maximumSize()方法分别指定该缓存的初始容量和最大容量,推荐对它们有一个预估。Guava Cache的过期/刷新策略有3种,根据需求选用即可:
- expireAfterWrite():指定数据被写入缓存之后多久过期;
- expireAfterAccess():指定数据多久没有被访问过之后过期;
- refreshAfterWrite():指定数据被写入缓存之后多久刷新其值(不删除)。
简单的用法如下。
String key = /* ... */;
String value = dimCache.getIfPresent(key);
if (value == null) {
value = getFromDatabase(key);
dimCache.put(key, value);
}
也可以直接用get()方法一步实现“若无则计算”(compute-if-absent)的逻辑,第二个参数是一个Callable。
String value = dimCache.get(key, () -> {
return getFromDatabase(key);
});
关于它的详细用法(比如带自动加载的LoadingCache、基于弱/软引用的清除策略等),可以参见GitHub上的Wiki页。
Integration
扯了这么多,把三者结合起来写个示例吧。下面的AsyncFunction实现了从MySQL异步加载城市名、商品名和分类名3个维度的数据。AnalyticsAccessLogRecord是事件的POJO类。
public static final class MySQLDimensionAsyncFunc
extends RichAsyncFunction<AnalyticsAccessLogRecord, AnalyticsAccessLogRecord> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDimensionAsyncFunc.class);
private transient SQLClient sqlClient;
private transient volatile Cache<String, String> dimCache;
@Override
public void open(Configuration parameters) throws Exception {
Properties dbProps = ParameterUtil.getFromResourceFile("mysql.properties");
Vertx vertx = Vertx.vertx(
new VertxOptions()
.setWorkerPoolSize(10)
.setEventLoopPoolSize(5)
);
JsonObject config = new JsonObject()
.put("url", dbProps.getProperty("mysql.sht.url"))
.put("driver_class", "com.mysql.jdbc.Driver")
.put("max_pool_size", 20)
.put("user", dbProps.getProperty("mysql.sht.user"))
.put("password", dbProps.getProperty("mysql.sht.pass"));
sqlClient = JDBCClient.createShared(vertx, config);
dimCache = CacheBuilder.newBuilder()
.initialCapacity(10_000)
.maximumSize(20_000)
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}
@Override
public void close() throws Exception {
sqlClient.close();
dimCache.invalidateAll();
}
@Override
public void asyncInvoke(AnalyticsAccessLogRecord record, ResultFuture<AnalyticsAccessLogRecord> resultFuture) throws Exception {
boolean needEnriching = false;
long siteId = record.getSiteId();
long categoryId = record.getCategoryId();
long merchandiseId = record.getMerchandiseId();
String siteCacheKey = "s" + siteId;
String categoryCacheKey = "c" + categoryId;
String merchandiseCacheKey = "m" + merchandiseId;
List<String> selectSql = new ArrayList<>();
if (siteId >= 0) {
String name = dimCache.getIfPresent(siteCacheKey);
if (name == null) {
selectSql.add("SELECT 's' AS t,name AS n FROM xx_db_new.site WHERE id = " + siteId);
needEnriching = true;
} else {
record.setSiteName(name);
}
}
if (categoryId >= 0) {
String name = dimCache.getIfPresent(categoryCacheKey);
if (name == null) {
selectSql.add("SELECT 'c' AS t,name AS n FROM xx_db_new.category WHERE id = " + categoryId);
needEnriching = true;
} else {
record.setCategoryName(name);
}
}
if (merchandiseId >= 0) {
String name = dimCache.getIfPresent(merchandiseCacheKey);
if (name == null) {
selectSql.add("SELECT 'm' AS t,title AS n FROM xx_db_new.merchandise WHERE id = " + merchandiseId);
needEnriching = true;
} else {
record.setMerchandiseName(name);
}
}
if (needEnriching) {
sqlClient.getConnection(connResult -> {
if (connResult.failed()) {
LOGGER.error("Cannot get MySQL connection via Vertx JDBC client ", connResult.cause());
return;
}
SQLConnection conn = connResult.result();
String sql = StringUtils.join(selectSql, " UNION ALL ");
conn.query(sql, queryResult -> {
if (queryResult.failed()) {
LOGGER.error("Error executing SQL query: {}", sql, queryResult.cause());
return;
}
ResultSet resultSet = queryResult.result();
for (JsonObject row : resultSet.getRows()) {
String tag = row.getString("t");
String name = row.getString("n");
switch (tag) {
case "s":
record.setSiteName(name);
dimCache.put(siteCacheKey, name);
break;
case "c":
record.setCategoryName(name);
dimCache.put(categoryCacheKey, name);
break;
case "m":
record.setMerchandiseName(name);
dimCache.put(merchandiseCacheKey, name);
break;
default: break;
}
}
resultFuture.complete(Collections.singletonList(record));
conn.close();
});
});
} else {
resultFuture.complete(Collections.singletonList(record));
}
}
@Override
public void timeout(AnalyticsAccessLogRecord record, ResultFuture<AnalyticsAccessLogRecord> resultFuture) throws Exception {
LOGGER.warn("Async operation timed out with record: {}", record.toString());
resultFuture.complete(Collections.singletonList(record));
}
}
最后通过AsyncDataStream.(un)orderedWait()方法调用之,注意设定超时时间与异步请求的数量限制。
DataStream<AnalyticsAccessLogRecord> recordStream = /* ... */;
DataStream<AnalyticsAccessLogRecord> enrichedRecordStream = AsyncDataStream.unorderedWait(
recordStream,
new MySQLDimensionAsyncFunc(),
3, TimeUnit.SECONDS,
100
).name("async_dimension_enrich").uid("async_dimension_enrich");
大功告成。
民那晚安晚安。