Flink SQL空闲状态保留时间(idle state retention time)实现原理

前言

如果要列举Flink SQL新手有可能犯的错误,笔者认为其中之一就是忘记设置空闲状态保留时间导致状态爆炸。2021年的第一篇技术文,时间很紧张,聊聊这个简单的话题吧。

为什么要设置

如果我们在数据流上进行分组查询,分组处理产生的结果(不仅仅是聚合结果)会作为中间状态存储下来。随着分组key的不断增加,状态自然也会不断膨胀。但是这些状态数据基本都有时效性,不必永久保留。例如,使用Top-N语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时或一天内),过了这段时间之后,对应的状态就不再需要了。Flink SQL提供的idle state retention time特性可以保证当状态中某个key对应的数据未更新的时间达到阈值时,该条状态被自动清理。设置方法是:

stenv.getConfig().setIdleStateRetentionTime(Time.hours(24), Time.hours(36))

注意setIdleStateRetentionTime()方法需要传入两个参数:状态的最小保留时间minRetentionTime和最大保留时间maxRetentionTime(根据实际业务决定),且两者至少相差5分钟。为什么会有这种限制呢?看一下源码就知道了。

如何实现的

idle state retention time特性在底层以o.a.f.table.runtime.functions.CleanupState接口来表示,代码如下。

public interface CleanupState {
    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState,
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService)
            throws Exception {
        // last registered timer
        Long curCleanupTime = cleanupTimeState.value();

        // check if a cleanup timer is registered and
        // that the current cleanup timer won't delete state we need to keep
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
            // we need to register a new (later) timer
            long cleanupTime = currentTime + maxRetentionTime;
            // register timer and remember clean-up time
            timerService.registerProcessingTimeTimer(cleanupTime);
            // delete expired timer
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            cleanupTimeState.update(cleanupTime);
        }
    }
}

由上可知,每个key对应的最近状态清理时间会单独维护在ValueState中。如果满足以下两条件之一:

  • ValueState为空(即这个key是第一次出现)
  • 或者当前时间加上minRetentionTime已经超过了最近清理的时间

就用当前时间加上maxRetentionTime注册新的Timer,并将其时间戳存入ValueState,用于触发下一次清理。如果有已经过期了的Timer,则一并删除之。可见,如果minRetentionTime和maxRetentionTime的间隔设置太小,就会比较频繁地产生Timer与更新ValueState,维护Timer的成本会变大(参见之前笔者写的Timer原理文章),所以一般建议设置间隔比较长的清理区间。

CleanupState接口的继承关系如下图所示。

可见支持空闲状态清理的Function很多,但基类都是KeyedProcessFunctionWithCleanupState抽象类。它的源码如下。

public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
        extends KeyedProcessFunction<K, IN, OUT> implements CleanupState {
    private static final long serialVersionUID = 2084560869233898457L;

    private final long minRetentionTime;
    private final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;

    // holds the latest registered cleanup timer
    private ValueState<Long> cleanupTimeState;

    public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) {
        this.minRetentionTime = minRetentionTime;
        this.maxRetentionTime = maxRetentionTime;
        this.stateCleaningEnabled = minRetentionTime > 1;
    }

    protected void initCleanupTimeState(String stateName) {
        if (stateCleaningEnabled) {
            ValueStateDescriptor<Long> inputCntDescriptor =
                    new ValueStateDescriptor<>(stateName, Types.LONG);
            cleanupTimeState = getRuntimeContext().getState(inputCntDescriptor);
        }
    }

    protected void registerProcessingCleanupTimer(Context ctx, long currentTime) throws Exception {
        if (stateCleaningEnabled) {
            registerProcessingCleanupTimer(
                    cleanupTimeState,
                    currentTime,
                    minRetentionTime,
                    maxRetentionTime,
                    ctx.timerService());
        }
    }

    protected boolean isProcessingTimeTimer(OnTimerContext ctx) {
        return ctx.timeDomain() == TimeDomain.PROCESSING_TIME;
    }

    protected void cleanupState(State... states) {
        for (State state : states) {
            state.clear();
        }
        this.cleanupTimeState.clear();
    }

    protected Boolean needToCleanupState(Long timestamp) throws IOException {
        if (stateCleaningEnabled) {
            Long cleanupTime = cleanupTimeState.value();
            // check that the triggered timer is the last registered processing time timer.
            return timestamp.equals(cleanupTime);
        } else {
            return false;
        }
    }
}

可以发现,空闲状态保留时间目前(1.12版本)仍然只支持processing time语义,并且minRetentionTime只有设为大于0的值才会生效。

KeyedProcessFunctionWithCleanupState只是提供了一些helper方法,具体发挥作用需要到实现类中去找。以计算Top-N的AppendOnlyTopNFunction为例,它的processElement()方法中会对到来的每个元素注册清理Timer:

@Override
public void processElement(RowData input, Context context, Collector<RowData> out) throws Exception {
    long currentTime = context.timerService().currentProcessingTime();
    // register state-cleanup timer
    registerProcessingCleanupTimer(context, currentTime);
    // ......
}

而一旦Timer触发,在onTimer()方法中调用基类的cleanupState()方法来实际清理:

@Override
public void onTimer(
        long timestamp,
        OnTimerContext ctx,
        Collector<RowData> out) throws Exception {
    if (stateCleaningEnabled) {
        // cleanup cache
        kvSortedMap.remove(keyContext.getCurrentKey());
        cleanupState(dataState);
    }
}

空闲状态保留的逻辑并不仅应用在上述Function中。在Table/SQL模块中还有一个内置的触发器StateCleaningCountTrigger,它可以对窗口中的元素进行计数,并按照计数阈值或者空闲状态保留的时间阈值来清理(即FIRE_AND_PURGE)。看官可自行参考对应的源码,不再废话了。

The End

今天号称是帝都21世纪以来最冷的一天,趁早洗洗睡吧。

民那晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容