Spark Streaming/Flink广播实现作业配置动态更新

前言

在实时计算作业中,往往需要动态改变一些配置,举几个栗子:

  • 实时日志ETL服务,需要在日志的格式、字段发生变化时保证正常解析;
  • 实时NLP服务,需要及时识别新添加的领域词与停用词;
  • 实时风控服务,需要根据业务情况调整触发警告的规则。

那么问题来了:配置每次变化都得手动修改代码,再重启作业吗?答案显然是否定的,毕竟实时任务的终极目标就是7 x 24无间断运行。Spark Streaming和Flink的广播机制都能做到这点,本文分别来简单说明一下。

Spark Streaming的场合

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html

很久之前我在这里详细分析了Spark Core内部的广播机制。但广播变量(broadcast variable)的设计初衷是简单地作为只读缓存,在Driver与Executor间共享数据,Spark文档中的原话如下:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

也就是说原生并未支持广播变量的更新,所以我们得自己稍微hack一下。直接贴代码吧。

public class BroadcastStringPeriodicUpdater {
  private static final int PERIOD = 60 * 1000;
  private static volatile BroadcastStringPeriodicUpdater instance;

  private Broadcast<String> broadcast;
  private long lastUpdate = 0L;

  private BroadcastStringPeriodicUpdater() {}

  public static BroadcastStringPeriodicUpdater getInstance() {
    if (instance == null) {
      synchronized (BroadcastStringPeriodicUpdater.class) {
        if (instance == null) {
          instance = new BroadcastStringPeriodicUpdater();
        }
      }
    }
    return instance;
  }

  public String updateAndGet(SparkContext sc) {
    long now = System.currentTimeMillis();
    long offset = now - lastUpdate;
    if (offset > PERIOD || broadcast == null) {
      if (broadcast != null) {
        broadcast.unpersist();
      }
      lastUpdate = now;
      String value = fetchBroadcastValue();
      broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
    }
    return broadcast.getValue();
  }

  private String fetchBroadcastValue() {
    // 在这里获取新的广播变量值
  }
}

这段代码将字符串型广播变量的更新包装成了一个单例类,更新周期是60秒。在Streaming主程序中,就可以这样使用了:

  dStream.transform(rdd -> {
    // 更新并获取广播变量的值
    String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());
    rdd.mapPartitions(records -> {
      // ...
    });
  });

这种方法基本上解决了问题,但不是十全十美的,因为广播数据的更新始终是周期性的,并且周期不能太短(得考虑外部存储的压力),从根本上讲还是受Spark Streaming微批次的设计理念限制的。接下来看看Flink是怎样做的。

Flink的场合

Flink中也有与Spark类似的广播变量,用法也几乎相同。但是Flink在1.5版本引入了更加灵活的广播状态(broadcast state),可以视为operator state的一种特殊情况。它能够将一个流中的数据(通常是较少量的数据)广播到下游算子的所有并发实例中,实现真正的低延迟动态更新。

下图来自Data Artisans(被阿里收购了的Flink母公司)的PPT,其中流A是普通的数据流,流B就是含有配置信息的广播流(broadcast stream),也可以叫控制流(control stream)。流A的数据按照keyBy()算子的规则发往下游,而流B的数据会广播,最后再将这两个流的数据连接到一起进行处理。

https://www.slideshare.net/AljoschaKrettek/advanced-flink-training-design-patterns-for-streaming-applications

既然它的名字叫“广播状态”,那么就一定要有与它对应的状态描述符StateDescriptor。Flink直接使用了MapStateDescriptor作为广播的状态描述符,方便存储多种不同的广播数据。示例:

    MapStateDescriptor<String, String> broadcastStateDesc = new MapStateDescriptor<>(
      "broadcast-state-desc",
      String.class,       // 广播数据的key类型
      String.class        // 广播数据的value类型
    );

接下来在控制流controlStream上调用broadcast()方法,将它转换成广播流BroadcastStream。controlStream的产生方法与正常数据流没什么不同,一般是从消息队列的某个特定topic读取。

BroadcastStream<String> broadcastStream = controlStream
  .setParallelism(1)
  .broadcast(broadcastStateDesc);

然后在DataStream上调用connect()方法,将它与广播流连接起来,生成BroadcastConnectedStream。

BroadcastConnectedStream<String, String> connectedStream = sourceStream.connect(broadcastStream);

最后就要调用process()方法对连接起来的流进行处理了。如果DataStream是一个普通的流,需要定义BroadcastProcessFunction,反之,如果该DataStream是一个KeyedStream,就需要定义KeyedBroadcastProcessFunction。并且与之前我们常见的ProcessFunction不同的是,它们都多了一个专门处理广播数据的方法processBroadcastElement()。类图如下所示。

下面给出一个说明性的代码示例。

    connectedStream.process(new BroadcastProcessFunction<String, String, String>() {
      private static final long serialVersionUID = 1L;

      @Override
      public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
        ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);
        for (Entry<String, String> entry : state.immutableEntries()) {
          String bKey = entry.getKey();
          String bValue = entry.getValue();
          // 根据广播数据进行原数据流的各种处理
        }
        out.collect(value);
      }

      @Override
      public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
        BroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);
        // 如果需要的话,对广播数据进行转换,最后写入状态
        state.put("some_key", value);
      }
    });

可见,BroadcastProcessFunction的行为与CoFlatMapFunction、CoProcessFunction非常相像。其基本思路是processBroadcastElement()方法从广播流中获取数据,进行必要的转换之后将其以键值对形式写入BroadcastState。而processElement()方法从BroadcastState获取广播数据,再将其与原流中的数据结合处理。也就是说,BroadcastState起到了两个流之间的桥梁作用。

最后还有一点需要注意,processElement()方法获取的Context实例是ReadOnlyContext,且BroadcastState实例是ReadOnlyBroadcastState,说明只有在广播流一侧才能修改BroadcastState,而数据流一侧只能读取BroadcastState。这提供了非常重要的一致性保证:假如数据流一侧也能修改BroadcastState的话,不同的operator实例有可能产生截然不同的结果,对下游处理造成困扰。

The End

今天实在是很累,洗洗睡了,晚安吧各位。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容