Flink维表关联系列1-Async IO

最近看了公众号[Flink 实战剖析]的部分文章,觉得其中维表关联系列的文章总结得挺全面,因此做一次搬运工,并进行一些总结。

1 维表服务与Flink异步IO

1.1 维表服务

维度或者是维表概念熟知应该从数据仓库维度建模开始了解的,区别于事实表业务真实发生的数据,通常用来表示业务属性,比如订单业务中,商品属性、商家属性都可以称之为维度表。

所以说,某张表可能整体并不是维度表,但是某些字段表示的是属性,这些属性可看做是维度表。

在flink流处理实时分析或者数仓中,同样会使用维表来完成一些数据过滤或者字段补齐的操作,但是我们所需要的维度数据通常存储在Mysql/Redis/Hbase/Es这样的外部数据库中,并且可能是会随时变动的,根据业务要求数据的时效性,需要不同程度的感知维表数据的变化,在实际使用中常常会有以下几种方案可供选择:

<1> 在维度数据量比较小并且业务要求的时效性不高,可以定时全量加载维度数据到内存中,直接从内存中查询维度数据;
比如专柜所在门店的门店信息表等等

<2> 在维度数据量比较大并且业务要求的时效性不高,这时候全量加载就会撑爆内存,可以使用LRU的缓存策略,当缓存的维度数据达到一定大小,采用淘汰最近最少使用的数据,同时还可以设置数据的过期时间;

<3> 业务要求数据时效性比较高,那么就需要flink实时查询,这个时候需要注意外部存储所能承受的QPS;

<4> 最后一种方案直接将维度数据发送到kafka中,flink任务消费kafka的维度数据,然后使用广播方式将维度数据广播到每一个处理task中,这种方式同样要求数据量比较小。


1.2 异步IO

一般来说,在流计算中,如果以事件流为主,关联一些维度信息,就需要根据每个事件中的关键信息去数据库执行一次查询。正常的思路就是通过mapFunction以阻塞的方式查询数据库,等待数据结果返回,然后执行下一个步骤。如果数据库查询时间很长,那么有可能会阻塞流计算的整体流程。

需要注意的是:使用Async I/O的前提是需要一个支持异步请求的客户端,没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。

1.2.1 基于线程池实现基于mysql的jdbc查询异步化

公司同事封装的异步查询JDBC数据库的抽象类:

public abstract class JDBCAsyncFunction<IN, OUT> extends RichAsyncFunction<IN, OUT> {

    private static final int DEFAULT_CAPACITY = 20;

    /**
     * 数据源配置文件路径
     * */
    private String _path;

    /**
     * 队列大小
     * */
    private int _capacity;

    /**
     * 数据源
     * */
    private HikariDataSource _datasource;

    /**
     * 线程池
     * */
    private ExecutorService _executor;

    public JDBCAsyncFunction(String path) {
        this(path, DEFAULT_CAPACITY);
    }

    public JDBCAsyncFunction(String path, int capacity) {
        this._path = path;
        this._capacity = capacity;
    }

    protected <V> Future<V> asyncCall(Callable<V> call) {
        return this._executor.submit(call);
    }

    protected QueryRunner createQueryRunner() {
        return new QueryRunner(this._datasource);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // 初始化 JDBC 连接池
        if (this._datasource == null) {
            HikariConfig config = new HikariConfig(this._path);
            config.setMaximumPoolSize(this._capacity);

            this._datasource = new HikariDataSource(config);
        }

        // 初始化线程池
        if (this._executor == null) {
            this._executor = Executors.newFixedThreadPool(this._capacity);
        }
    }

    @Override
    public void close() throws Exception {
        if (this._executor != null) {
            this._executor.shutdown();
            this._executor = null;
        }

        if (this._datasource != null) {
            this._datasource.close();
            this._datasource = null;
        }

        super.close();
    }

}

可以基于此进行异步查询mysql数据库。另外,会发现封装的这个抽象类,用到了线程池和数据库连接池。因为mysql本身并不支持异步数据库操作。

1.2.2 基于异步JDBC组件Vertx实现的mysql查询异步化

<1> 引入Vertx相关依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.13</version>
</dependency>
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-jdbc-client</artifactId>
    <version>3.8.3</version>
</dependency>
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-core</artifactId>
    <version>3.8.3</version>
</dependency>

<2> 在open中通过Vertx创建SQLClient,其内部维护自己的异步请求服务;在asyncInvoke中调用获取connection,执行查询,并释放连接;然后在close方法中关闭client。

public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {

    private SQLClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        Vertx vertx = Vertx.vertx(new VertxOptions()
                .setWorkerPoolSize(10)
                .setEventLoopPoolSize(10));

        JsonObject config = new JsonObject()
                .put("url", "jdbc:mysql://xx:3306/base")
                .put("driver_class", "com.mysql.cj.jdbc.Driver")
                .put("max_pool_size", 10)
                .put("user", "x")
                .put("password", "x");

        client = JDBCClient.createShared(vertx, config);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception {
        client.getConnection(conn -> {
            if (conn.failed()) {
                return;
            }

            final SQLConnection connection = conn.result();
            connection.query("select id, name from t where id = " + input.getId(), res2 -> {
                ResultSet rs = new ResultSet();
                if (res2.succeeded()) {
                    rs = res2.result();
                }

                List<Store> stores = new ArrayList<>();
                for (JsonObject json : rs.getRows()) {
                    Store s = new Store();
                    s.setId(json.getInteger("id"));
                    s.setName(json.getString("name"));
                    stores.add(s);
                }
                connection.close();
                resultFuture.complete(stores);
            });
        });
    }
}

拓展知识点:
<1> 线程池的原理:

其实线程池的原理很简单,类似于操作系统中的缓冲区的概念,它的流程如下:先启动若干数量的线程,并让这些线程都处于睡眠状态,

当客户端有一个新请求时,就会唤醒线程池中的某一个睡眠线程,让它来处理客户端的这个请求,当处理完这个请求后,线程又处于睡眠状态。

可能你也许会问:为什么要搞得这么麻烦,如果每当客户端有新的请求时,我就创建一个新的线程不就完了?

这也许是个不错的方法,因为它能使得你编写代码相对容易一些,但你却忽略了一个重要的问题?

那就是性能!

高峰期每秒的客户端请求并发数超过100,如果为每个客户端请求创建一个新线程的话,那耗费的CPU时间和内存将是惊人的,如果采用一个拥有200个线程的线程池,

那将会节约大量的系统资源,使得更多的CPU时间和内存用来处理实际的商业应用,而不是频繁的线程创建与销毁。

<2> 数据库连接池的原理:

数据库连接是一种关键的有限的昂贵的资源,这一点在多用户的网页应用程序中体现得尤为突出。

一个数据库连接对象均对应一个物理数据库连接,每次操作都打开一个物理连接,使用完都关闭连接,这样造成系统的性能低下。 数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池(简单说:在一个“池”里放了好多半成品的数据库联接对象),由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。并且应用程序可以根据池中连接的使用率,动态增加或减少池中的连接数。

连接池技术尽可能多地重用了消耗内存地资源,大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。通过使用连接池,将大大提高程序运行效率,同时,我们可以通过其自身的管理机制来监视数据库连接的数量、使用情况等。

1)最小连接数是连接池一直保持的数据库连接,所以如果应用程序对数据库连接的使用量不大,将会有大量的数据库连接资源被浪费;

2)最大连接数是连接池能申请的最大连接数,如果数据库连接请求超过此数,后面的数据库连接请求将被加入到等待队列中,这会影响之后的数据库操作。


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342