维度数据实时关联的实践(w/ Flink、Vert.x & Guava Cache)

Data Enrichment

在流式处理作业(特别是实时数仓ETL作业)中,我们的数据流可以视为无界事实表,其中往往缺乏一些维度信息。例如,对于埋点日志流而言,为了减少传输冗余,可能只会带有城市ID、商品ID等,如果要映射到对应的名称,就需要与外部存储中的维度表进行关联。这里的外部存储一般是指适合OLTP场景的数据库,如MySQL、Redis、HBase等。

英文语境里习惯将上述操作称为data enrichment。下图展示出了Trackunit公司的实时IoT处理架构,比较有代表性。注意图中的"Enrich"字样。

https://www.ververica.com/blog/trackunit-leverages-flink-industrial-iot

实时关联维度数据的思路主要有如下4种。

  1. 全量预加载+定时刷新:适用于规模较小的缓慢变化维度(SCD),思路最简单,可以参见笔者之前写的示例

  2. 实时查询+缓存刷新:适用于规模较大的缓慢变化维度(SCD),在数仓维度建模过程中,这种维度最为常见,本文接下来会详细叙述其实现方式。

  3. 纯实时查询:适用于快速变化维度(RCD),或者对关联时效性要求极高的场合,需特别注意频繁请求对外部存储的压力。

  4. 流式化维度:比较特殊且灵活,将维度表的change log转化为流,从而把静态表的关联转化为双流join。从change log解析出的维度数据可以写入状态存储,起到缓存的作用。之后再提。

上述4种思路并没有绝对的好坏之分,而是需要根据业务特点和需求来取舍。

下面介绍用Flink异步I/O、Vert.x JDBC Client和Guava Cache实现的实时查询+缓存刷新方案。

Flink Async I/O

Flink的异步I/O专门用来解决Flink计算过程中与外部系统的交互问题。在默认情况下,算子向外部系统发出请求后即阻塞,等待结果返回才能发送下一个请求,可能会造成较大的延迟,吞吐量下降。有了异步I/O之后,就可以并发地发出请求和接收响应,延迟大大降低。下图来自官方文档,一看便知。

关于它的细节,看官可以参考之前的《聊聊Flink异步I/O机制的原理》一文,不再废话。

Vert.x JDBC Client

Vert.x是一个由Eclipse基金会开源的跨语言、事件驱动的异步应用程序框架,运行在JVM平台上,底层依赖于Netty。Vert.x的异步应用场景极为广泛,如Web、数据库访问、响应式编程、微服务、MQTT、认证与鉴权、消息队列、事件总线等等,详情可以参见官方文档

本文采用的维度表数据源是MySQL,而Java原生的JDBC机制是同步的,要与Flink异步I/O一同使用的话,按传统方式需要自己创建连接池、线程池并实现异步化。我们引入Vert.x JDBC Client模块来简化之,先加入依赖项。

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.47</version>
</dependency>

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-jdbc-client</artifactId>
  <version>3.8.5</version>
</dependency>

通过VertxOptions指定事件循环线程池和工作线程池的大小,然后指定JDBC连接的各项参数(注意c3p0的连接池大小max_pool_size),并创建异步的SQL客户端实例。

Properties dbProps = ParameterUtil.getFromResourceFile("mysql.properties");

Vertx vertx = Vertx.vertx(
  new VertxOptions()
    .setWorkerPoolSize(10)
    .setEventLoopPoolSize(5)
);

JsonObject config = new JsonObject()
  .put("url", dbProps.getProperty("mysql.sht.url"))
  .put("driver_class", "com.mysql.jdbc.Driver")
  .put("max_pool_size", 20)
  .put("user", dbProps.getProperty("mysql.sht.user"))
  .put("password", dbProps.getProperty("mysql.sht.pass"));

SQLClient sqlClient = JDBCClient.createShared(vertx, config);

按如下的异步风格获取连接、执行查询、处理查询结果,并关闭连接。借助Lambda表达式,可以将回调写得相对优雅一些。

sqlClient.getConnection(connResult -> {
  if (connResult.failed()) {
    LOGGER.error("Cannot get MySQL connection via Vertx JDBC client ", connResult.cause());
    return;
  }

  SQLConnection conn = connResult.result();
  String sql = "/* SQL statement here */";

  conn.query(sql, queryResult -> {
    if (queryResult.failed()) {
      LOGGER.error("Error executing SQL query: {}", sql, queryResult.cause());
      return;
    }

    ResultSet resultSet = queryResult.result();
    for (JsonObject row : resultSet.getRows()) {
      // handle result here...
    }

    conn.close();
  });
});

千万别忘记在处理结束后调用SQLConnection.close()方法,否则连接池会被很快耗尽。

Guava Cache

显而易见,data enrichment过程中对维度数据的访问是非常频繁的,并且维度表往往也比较大,全量加载的成本可能不低。为了避免对维度数据库造成压力,并且同时加快关联的速度,在维度不太经常变动、对精确度要求不很高的情况下,就可以用缓存暂时将一部分维度数据保留在内存中,并设定合理的过期策略。缓存是典型的空间换时间思想的体现。

Google Guava专门提供了集中式、线程安全的Cache组件满足这类需求,我们可以将它近似理解成带有缓存特性的ConcurrentMap。按以下方法创建一个维度缓存。

Cache<String, String> dimCache = CacheBuilder.newBuilder()
  .initialCapacity(10_000)
  .maximumSize(20_000)
  .expireAfterAccess(1, TimeUnit.HOURS)
  .build();

initialCapacity()方法和maximumSize()方法分别指定该缓存的初始容量和最大容量,推荐对它们有一个预估。Guava Cache的过期/刷新策略有3种,根据需求选用即可:

  • expireAfterWrite():指定数据被写入缓存之后多久过期;
  • expireAfterAccess():指定数据多久没有被访问过之后过期;
  • refreshAfterWrite():指定数据被写入缓存之后多久刷新其值(不删除)。

简单的用法如下。

String key = /* ... */;
String value = dimCache.getIfPresent(key);
if (value == null) {
  value = getFromDatabase(key);
  dimCache.put(key, value);
}

也可以直接用get()方法一步实现“若无则计算”(compute-if-absent)的逻辑,第二个参数是一个Callable。

String value = dimCache.get(key, () -> {
  return getFromDatabase(key);
});

关于它的详细用法(比如带自动加载的LoadingCache、基于弱/软引用的清除策略等),可以参见GitHub上的Wiki页

Integration

扯了这么多,把三者结合起来写个示例吧。下面的AsyncFunction实现了从MySQL异步加载城市名、商品名和分类名3个维度的数据。AnalyticsAccessLogRecord是事件的POJO类。

public static final class MySQLDimensionAsyncFunc
  extends RichAsyncFunction<AnalyticsAccessLogRecord, AnalyticsAccessLogRecord> {
  private static final long serialVersionUID = 1L;
  private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDimensionAsyncFunc.class);

  private transient SQLClient sqlClient;
  private transient volatile Cache<String, String> dimCache;

  @Override
  public void open(Configuration parameters) throws Exception {
    Properties dbProps = ParameterUtil.getFromResourceFile("mysql.properties");

    Vertx vertx = Vertx.vertx(
      new VertxOptions()
        .setWorkerPoolSize(10)
        .setEventLoopPoolSize(5)
    );

    JsonObject config = new JsonObject()
      .put("url", dbProps.getProperty("mysql.sht.url"))
      .put("driver_class", "com.mysql.jdbc.Driver")
      .put("max_pool_size", 20)
      .put("user", dbProps.getProperty("mysql.sht.user"))
      .put("password", dbProps.getProperty("mysql.sht.pass"));

    sqlClient = JDBCClient.createShared(vertx, config);

    dimCache = CacheBuilder.newBuilder()
      .initialCapacity(10_000)
      .maximumSize(20_000)
      .expireAfterAccess(1, TimeUnit.HOURS)
      .build();
  }

  @Override
  public void close() throws Exception {
    sqlClient.close();
    dimCache.invalidateAll();
  }

  @Override
  public void asyncInvoke(AnalyticsAccessLogRecord record, ResultFuture<AnalyticsAccessLogRecord> resultFuture) throws Exception {
    boolean needEnriching = false;
    long siteId = record.getSiteId();
    long categoryId = record.getCategoryId();
    long merchandiseId = record.getMerchandiseId();

    String siteCacheKey = "s" + siteId;
    String categoryCacheKey = "c" + categoryId;
    String merchandiseCacheKey = "m" + merchandiseId;

    List<String> selectSql = new ArrayList<>();

    if (siteId >= 0) {
      String name = dimCache.getIfPresent(siteCacheKey);
      if (name == null) {
        selectSql.add("SELECT 's' AS t,name AS n FROM xx_db_new.site WHERE id = " + siteId);
        needEnriching = true;
      } else {
        record.setSiteName(name);
      }
    }
    if (categoryId >= 0) {
      String name = dimCache.getIfPresent(categoryCacheKey);
      if (name == null) {
        selectSql.add("SELECT 'c' AS t,name AS n FROM xx_db_new.category WHERE id = " + categoryId);
        needEnriching = true;
      } else {
        record.setCategoryName(name);
      }
    }
    if (merchandiseId >= 0) {
      String name = dimCache.getIfPresent(merchandiseCacheKey);
      if (name == null) {
        selectSql.add("SELECT 'm' AS t,title AS n FROM xx_db_new.merchandise WHERE id = " + merchandiseId);
        needEnriching = true;
      } else {
        record.setMerchandiseName(name);
      }
    }
   
    if (needEnriching) {
      sqlClient.getConnection(connResult -> {
        if (connResult.failed()) {
          LOGGER.error("Cannot get MySQL connection via Vertx JDBC client ", connResult.cause());
          return;
        }

        SQLConnection conn = connResult.result();
        String sql = StringUtils.join(selectSql, " UNION ALL ");

        conn.query(sql, queryResult -> {
          if (queryResult.failed()) {
            LOGGER.error("Error executing SQL query: {}", sql, queryResult.cause());
            return;
          }

          ResultSet resultSet = queryResult.result();
          for (JsonObject row : resultSet.getRows()) {
            String tag = row.getString("t");
            String name = row.getString("n");

            switch (tag) {
              case "s":
                record.setSiteName(name);
                dimCache.put(siteCacheKey, name);
                break;
              case "c":
                record.setCategoryName(name);
                dimCache.put(categoryCacheKey, name);
                break;
              case "m":
                record.setMerchandiseName(name);
                dimCache.put(merchandiseCacheKey, name);
                break;
              default: break;
            }
          }

          resultFuture.complete(Collections.singletonList(record));
          conn.close();
        });
      });
    } else {
      resultFuture.complete(Collections.singletonList(record));
    }
  }

  @Override
  public void timeout(AnalyticsAccessLogRecord record, ResultFuture<AnalyticsAccessLogRecord> resultFuture) throws Exception {
    LOGGER.warn("Async operation timed out with record: {}", record.toString());
    resultFuture.complete(Collections.singletonList(record));
  }
}

最后通过AsyncDataStream.(un)orderedWait()方法调用之,注意设定超时时间与异步请求的数量限制。

DataStream<AnalyticsAccessLogRecord> recordStream = /* ... */;
DataStream<AnalyticsAccessLogRecord> enrichedRecordStream = AsyncDataStream.unorderedWait(
  recordStream,
  new MySQLDimensionAsyncFunc(),
  3, TimeUnit.SECONDS,
  100
).name("async_dimension_enrich").uid("async_dimension_enrich");

大功告成。

民那晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • 一、维表join使用场景 维表Join是流与表的关联操作,为了补全流里的额外字段,通常这些待补全的维度字段很少发生...
    data之道阅读 5,892评论 1 8
  • 最近看了公众号[Flink 实战剖析]的部分文章,觉得其中维表关联系列的文章总结得挺全面,因此做一次搬运工,并进行...
    LZhan阅读 2,082评论 0 3
  • 早上四点就醒过来了,看了看时间还早,就又躺下了,想充电,天还很黑,我害怕把三妮姐吵醒,就又放了回去。到了6点15闹...
    愿诸事顺利呀阅读 149评论 0 0
  • 上午,张明观老师打来电话,他说,《柳亚子史料札记三辑》将印好。他询问我的收书地址。我叫他寄到农贸市场伟福店里。 下...
    黄叶村人阅读 264评论 0 2
  • [玫瑰] 尊敬的贾仁玲学长好,真的羡慕您夫妻双方父母都健在,真好,可以让自己有机会去尽孝心,给孩子做榜样。您学习那...
    b176f3a4e693阅读 126评论 0 0