生产实践 | 基于 Flink 的短视频生产消费监控

本文详细介绍了实时监控类指标的数据流转链路以及技术方案,大多数的实时监控类指标都可按照本文中的几种方案实现。

短视频生产消费监控

短视频带来了全新的传播场域和节目形态,小屏幕、快节奏成为行业潮流的同时,也催生了新的用户消费习惯,为创作者和商户带来收益。而多元化的短视频也可以为品牌方提供营销机遇。

其中对于垂类生态短视频的生产消费热点的监控分析目前成为了实时数据处理很常见的一个应用场景,比如对某个圈定的垂类生态下的视频生产或者视频消费进行监控,对热点视频生成对应的优化推荐策略,促进热点视频的生产或者消费,构建整个生产消费数据链路的闭环,从而提高创作者收益以及消费者留存。

本文将完整分析垂类生态短视频生产消费数据的整条链路流转方式,并基于 Flink 提供几种对于垂类视频生产消费监控的方案设计。通过本文,你可以了解到:

  • 垂类生态短视频生产消费数据链路闭环

  • 实时监控短视频生产消费的方案设计

  • 不同监控量级场景下的代码实现

  • flink 学习资料

项目简介

垂类生态短视频生产消费数据链路流转架构图如下,此数据流转图也适用于其他场景:

链路

在上述场景中,用户生产和消费短视频,从而客户端、服务端以及数据库会产生相应的行为操作日志,这些日志会通过日志抽取中间件抽取到消息队列中,我们目前的场景中是使用 Kafka 作为消息队列;然后使用 flink 对垂类生态中的视频进行生产或消费监控(内容生产通常是圈定垂类作者 id 池,内容消费通常是圈定垂类视频 id 池),最后将实时聚合数据产出到下游;下游可以以数据服务,实时看板的方式展现,运营同学或者自动化工具最终会帮助我们分析当前垂类下的生产或者消费热点,从而生成推荐策略。

方案设计

架构

其中数据源如下:

  • Kafka 为全量内容生产和内容消费的日志。
  • Rpc/Http/Mysql/配置中心/Redis/HBase 为需要监控的垂类生态内容 id 池(内容生产则为作者 id 池,内容消费则为视频 id 池),其主要是提供给运营同学动态配置需要监控的 id 范围,其可以在 flink 中进行实时查询,解析运营同学想要的监控指标范围,以及监控的指标和计算方式,然后加工数据产出,可以支持随时配置,实时数据随时计算产出。

其中数据汇为聚类好的内容生产或者消费热点话题或者事件指标:

  • Redis/HBase 主要是以低延迟(Redis 5ms p99,HBase 100ms p99,不同公司的服务能力不同)并且高 QPS 提供数据服务,给 Server 端或者线上用户提供低延迟的数据查询。
  • Druid/Mysql 可以做为 OLAP 引擎为 BI 分析提供灵活的上卷下钻聚合分析能力,供运营同学配置可视化图表使用。
  • Kafka 可以以流式数据产出,从而提供给下游继续消费或者进行特征提取。

废话不多说,我们直接上方案和代码,下述几种方案按照监控 id 范围量级区分,不同的量级对应着不同的方案,其中的代码示例为 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要监控逻辑都相同。

方案 1

适合监控 id 数据量小的场景(几千 id),其实现方式是在 flink 任务初始化时将需要监控的 id 池或动态配置中心的 id 池加载到内存当中,之后只需要在内存中判断内容生产或者消费数据是否在这个监控池当中。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {
    
    // 配置中心动态 id 池
    private Config<Set<Long>> needMonitoredIdsConfig;

    @Override
    public void open(Configuration parameters) throws Exception {
        this.needMonitoredIdsConfig = ConfigBuilder
                .buildSet("needMonitoredIds", Long.class);
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        Set<Long> needMonitoredIds = needMonitoredIdsConfig.get();
        /**
         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }
}

监控的 id 池可以按照固定或者可配置从而分出两种获取方式:第一种是在 flink 任务开始时就全部加载进内存中,这种方式适合监控 id 池不变的情况;第二种是使用动态配置中心,每次都从配置中心访问到最新的监控 id 池,其可以满足动态配置或者更改 id 池的需求,并且这种实现方式通常可以实时感知到配置更改,几乎无延迟。

方案 2

适合监控 id 数据量适中(几十万 id),监控数据范围会不定时发生变动的场景。其实现方式是在 flink 算子中定时访问接口获取最新的监控 id 池,以获取最新监控数据范围。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

    private long lastRefreshTimestamp;

    private Set<Long> needMonitoredIds;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.refreshNeedMonitoredIds(System.currentTimeMillis());
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        long windowStart = context.window().getStart();
        this.refreshNeedMonitoredIds(windowStart);
        /**
         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }

    public void refreshNeedMonitoredIds(long windowStart) {
        // 每隔 10 秒访问一次
        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
            this.lastRefreshTimestamp = windowStart;
            this.needMonitoredIds = Rpc.get(...)
        }
    }
}

根据上述代码实现方式,按照时间间隔的方式刷新 id 池,其缺点在于不能实时感知监控 id 池的变化,所以刷新时间可能会和需求场景强耦合(如果 id 池会频繁更新,那么就需要缩小刷新时间间隔)。也可根据需求场景在每个窗口开始前刷新 id 池,这样可保证每个窗口中的 id 池中的数据一直保持更新。

方案 3

方案 3 对方案 2 的一个优化(几十万 id,我们生产环境中最常用的)。其实现方式是在 flink 中使用 broadcast 算子定时访问监控 id 池,并将 id 池以广播的形式下发给下游参与计算的各个算子。其优化点在于:比如任务的并行度为 500,每 1s 访问一次,采用方案 2 则访问监控 id 池接口的 QPS 为 500,在使用 broadcast 算子之后,其访问 QPS 可以减少到 1,可以大大减少对接口的访问量,减轻接口压力。

public class Example {

    @Slf4j
    static class NeedMonitorIdsSource implements SourceFunction<Map<Long, Set<Long>>> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Map<Long, Set<Long>>> sourceContext) throws Exception {
            while (!this.isCancel) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Set<Long> needMonitorIds = Rpc.get(...);
                    // 可以和上一次访问的数据做比较查看是否有变化,如果有变化,才发送出去
                    if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                        sourceContext.collect(new HashMap<Long, Set<Long>>() {{
                            put(0L, needMonitorIds);
                        }});
                    }
                } catch (Throwable e) {
                    // 防止接口访问失败导致的错误导致 flink job 挂掉
                    log.error("need monitor ids error", e);
                }
            }
        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }
    }

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/
        BroadcastStream<Map<Long, Set<Long>>> broadcastStream = env
                .addSource(new NeedMonitorIdsSource()) // redis photoId 数据广播
                .setParallelism(1)
                .broadcast(broadcastMapStateDescriptor);

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getStringKeySelector(CommonModel::getKeyField))
                .connect(broadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, CommonModel, Map<Long, Set<Long>>, CommonModel>() {

                    private Set<Long> needMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        this.needMonitoredIds = Rpc.get(...)
                    }

                    @Override
                    public void processElement(CommonModel commonModel, ReadOnlyContext readOnlyContext, Collector<CommonModel> collector) throws Exception {
                        // 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
                    }

                    @Override
                    public void processBroadcastElement(Map<Long, Set<Long>> longSetMap, Context context, Collector<CommonModel> collector) throws Exception {
                        // 需要监控的字段
                        Set<Long> needMonitorIds = longSetMap.get(0L);
                        if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                            this.needMonitoredIds = needMonitorIds;
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);
        
        env.execute(inputParams.jobName);
    }

}

方案 4

适合于超大监控范围的数据(几百万,我们自己的生产实践中使用扩量到 500 万)。其原理是将监控范围接口按照 id 按照一定规则分桶。flink 消费到日志数据后将 id 按照 监控范围接口 id 相同的分桶方法进行分桶 keyBy,这样在下游算子中每个算子中就可以按照桶名称,从接口中拿到对应桶的监控 id 数据,这样 flink 中并行的每个算子只需要获取到自己对应的桶的数据,可以大大减少请求的压力。

public class Example {

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
                .timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
                .process(new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

                    private long lastRefreshTimestamp;

                    private Set<Long> oneBucketNeedMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
                        long windowStart = context.window().getStart();
                        this.refreshNeedMonitoredIds(windowStart, bucket);
                        /**
                         * 判断 commonModel 中的 id 是否在 needMonitoredIds 池中
                         */
                    }

                    public void refreshNeedMonitoredIds(long windowStart, long bucket) {
                        // 每隔 10 秒访问一次
                        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
                            this.lastRefreshTimestamp = windowStart;
                            this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);

        env.execute(inputParams.jobName);
    }
}

总结

本文首先介绍了,在短视频领域中,短视频生产消费数据链路的整个闭环,并且其数据链路闭环一般情况下也适用于其他场景;以及对应的实时监控方案的设计和不同场景下的代码实现,包括:

  • 垂类生态短视频生产消费数据链路闭环:用户操作行为日志的流转,日志上传,实时计算,以及流转到 BI,数据服务,最后数据赋能的整个流程

  • 实时监控方案设计:监控类实时计算流程中各类数据源,数据汇的选型

  • 监控 id 池在不同量级场景下具体代码实现

学习资料

flink

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343