聊聊storm的WindowedBolt

本文主要研究一下storm的WindowedBolt

实例

    @Test
    public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt()
                //windowLength , slidingInterval
                .withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
                //通过withTimestampField指定tuple的某个字段作为这个tuple的timestamp
                .withTimestampField("timestamp")
                //输入流中最新的元组时间戳的最小值减去Lag值=watermark,用于指定触发watermark的的interval,默认为1秒
                //当watermark被触发的时候,tuple timestamp比watermark早的window将被计算
                .withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS))
                //withLag用于处理乱序的数据,当接收到的tuple的timestamp小于等lastWaterMarkTs(`取这批watermark的最大值`),则会被丢弃
                .withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS));
        builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum");
        SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology());
    }
  • 这里主要设置了withWindow、withTimestampField、withWatermarkInterval、withLag

  • SlidingWindowSumBolt

public class SlidingWindowSumBolt extends BaseWindowedBolt {
    private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);

    private int sum = 0;
    private OutputCollector collector;

    @Override
    public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        /*
         * The inputWindow gives a view of
         * (a) all the events in the window
         * (b) events that expired since last activation of the window
         * (c) events that newly arrived since last activation of the window
         */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
        /*
         * Instead of iterating over all the tuples in the window to compute
         * the sum, the values for the new events are added and old events are
         * subtracted. Similar optimizations might be possible in other
         * windowing computations.
         */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sum"));
    }
}
  • TupleWindow可以获取三类值,一类是当前窗口里头的所有数据,一类是上次窗口后新到达的数据,一类是过期的数据

WindowedBolt

IWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

/**
 * A bolt abstraction for supporting time and count based sliding & tumbling windows.
 */
public interface IWindowedBolt extends IComponent {
    /**
     * This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting,
     * the tuples are automatically anchored to the tuples in the inputWindow.
     */
    void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);

    /**
     * Process the tuple window and optionally emit new tuples based on the tuples in the input window.
     */
    void execute(TupleWindow inputWindow);

    void cleanup();

    /**
     * Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing
     * time.
     *
     * @return the timestamp extractor
     */
    TimestampExtractor getTimestampExtractor();
}
  • IWindowedBolt是无状态的,也就是window的数据都存在内存中
  • IWindowedBolt接口有个抽象实现类BaseWindowedBolt,其子类有BaseStatefulWindowedBolt、JoinBolt

IStatefulWindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java

/**
 * A windowed bolt abstraction for supporting windowing operation with state.
 */
public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt {
    /**
     * If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory.
     * <p>
     * The default is to keep all the window events in memory.
     * </p>
     *
     * @return true if the windows should be persisted
     */
    default boolean isPersistent() {
        return false;
    }

    /**
     * The maximum number of window events to keep in memory.
     */
    default long maxEventsInMemory() {
        return 1_000_000L; // default
    }
}
  • 在1.2.2版本IStatefulWindowedBolt没有定义任何方法,2.0.0版本定义了两个default方法,一个是isPersistent,一个是maxEventsInMemory
  • isPersistent决定创建的是PersistentWindowedBoltExecutor还是StatefulWindowedBoltExecutor
  • maxEventsInMemory决定WindowState保留多少数据在内存,其余的移到KeyValueState(HBaseKeyValueState、InMemoryKeyValueState、RedisKeyValueState)中
  • IStatefulWindowedBolt接口有个抽象实现类BaseStatefulWindowedBolt

withWindow与withTumblingWindow

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java

    /**
     * Tuple count based sliding window configuration.
     *
     * @param windowLength    the number of tuples in the window
     * @param slidingInterval the number of tuples after which the window slides
     */
    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * Time duration based sliding window configuration.
     *
     * @param windowLength    the time duration of the window
     * @param slidingInterval the time duration after which the window slides
     */
    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    }

    /**
     * A time duration based tumbling window.
     *
     * @param duration the time duration after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Duration duration) {
        return withWindowLength(duration).withSlidingInterval(duration);
    }

    /**
     * A count based tumbling window.
     *
     * @param count the number of tuples after which the window tumbles
     */
    public BaseWindowedBolt withTumblingWindow(Count count) {
        return withWindowLength(count).withSlidingInterval(count);
    }
  • BaseWindowedBolt抽象类定义了诸多withWindow方法,该方法主要定义windowLength及slidingIntervals参数,而该参数有两个维度,一个是Duration,一个是Count
  • withWindow即sliding window,而withTumblingWindow则是tumbling window
  • 从方法定义可以看到withTumblingWindow的windowLength及slidingInterval参数值相同

WindowedBoltExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java

public class WindowedBoltExecutor implements IRichBolt {
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
    private static final int DEFAULT_MAX_LAG_MS = 0; // no lag

    //......

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
                                                   TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {

        WindowManager<Tuple> manager = stateful ?
            new StatefulWindowManager<>(lifecycleListener, queue)
            : new WindowManager<>(lifecycleListener, queue);

        Count windowLengthCount = null;
        Duration slidingIntervalDuration = null;
        Count slidingIntervalCount = null;
        // window length
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
            windowLengthDuration = new Duration(
                ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
                TimeUnit.MILLISECONDS);
        }
        // sliding interval
        if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
            slidingIntervalDuration =
                new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
        } else {
            // default is a sliding window of count 1
            slidingIntervalCount = new Count(1);
        }
        // tuple ts
        if (timestampExtractor != null) {
            // late tuple stream
            lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
            if (lateTupleStream != null) {
                if (!context.getThisStreams().contains(lateTupleStream)) {
                    throw new IllegalArgumentException(
                        "Stream for late tuples must be defined with the builder method withLateTupleStream");
                }
            }
            // max lag
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            // watermark interval
            int watermarkInterval;
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
                watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
            } else {
                watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
            }
            waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
                                                                    maxLagMs, getComponentStreams(context));
        } else {
            if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
                throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
            }
        }
        // validate
        validate(topoConf, windowLengthCount, windowLengthDuration,
                 slidingIntervalCount, slidingIntervalDuration);
        evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
        triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
                                         manager, evictionPolicy);
        manager.setEvictionPolicy(evictionPolicy);
        manager.setTriggerPolicy(triggerPolicy);
        return manager;
    }

    @Override
    public void execute(Tuple input) {
        if (isTupleTs()) {
            long ts = timestampExtractor.extractTimestamp(input);
            if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                windowManager.add(input, ts);
            } else {
                if (lateTupleStream != null) {
                    windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
                } else {
                    LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
                }
                windowedOutputCollector.ack(input);
            }
        } else {
            windowManager.add(input);
        }
    }
  • initWindowManager会读取maxLags的值,默认为0,即没有lag,之后创建WaterMarkEventGenerator的时候传入了maxLags参数
  • 如果waterMarkEventGenerator.track方法返回false,且没有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则会打印log,其format为Received a late tuple {} with ts {}. This will not be processed.

WaterMarkEventGenerator

storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java

public class WaterMarkEventGenerator<T> implements Runnable {

    /**
     * Creates a new WatermarkEventGenerator.
     *
     * @param windowManager The window manager this generator will submit watermark events to
     * @param intervalMs    The generator will check if it should generate a watermark event with this interval
     * @param eventTsLagMs  The max allowed lag behind the last watermark event before an event is considered late
     * @param inputStreams  The input streams this generator is expected to handle
     */
    public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs,
                                   int eventTsLagMs, Set<GlobalStreamId> inputStreams) {
        this.windowManager = windowManager;
        streamToTs = new ConcurrentHashMap<>();

        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("watermark-event-generator-%d")
            .setDaemon(true)
            .build();
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);

        this.interval = intervalMs;
        this.eventTsLag = eventTsLagMs;
        this.inputStreams = inputStreams;
    }

    public void start() {
        this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
    }

    //......

    /**
     * Tracks the timestamp of the event in the stream, returns true if the event can be considered for processing or false if its a late
     * event.
     */
    public boolean track(GlobalStreamId stream, long ts) {
        Long currentVal = streamToTs.get(stream);
        if (currentVal == null || ts > currentVal) {
            streamToTs.put(stream, ts);
        }
        checkFailures();
        return ts >= lastWaterMarkTs;
    }

    @Override
    public void run() {
        try {
            long waterMarkTs = computeWaterMarkTs();
            if (waterMarkTs > lastWaterMarkTs) {
                this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
                lastWaterMarkTs = waterMarkTs;
            }
        } catch (Throwable th) {
            LOG.error("Failed while processing watermark event ", th);
            throw th;
        }
    }

    /**
     * Computes the min ts across all streams.
     */
    private long computeWaterMarkTs() {
        long ts = 0;
        // only if some data has arrived on each input stream
        if (streamToTs.size() >= inputStreams.size()) {
            ts = Long.MAX_VALUE;
            for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
                ts = Math.min(ts, entry.getValue());
            }
        }
        return ts - eventTsLag;
    }
}
  • track方法根据tuple的时间戳与lastWaterMarkTs判断,是否需要处理该tuple
  • lastWaterMarkTs在WaterMarkEventGenerator的run方法里头被更新,computeWaterMarkTs方法先计算streamToTs这批tuple的最小时间戳,然后减去eventTsLag,就是waterMarkTs值
  • 如果waterMarkTs大于lastWaterMarkTs,则更新,也就是说WaterMarkEventGenerator的run方法不断计算waterMarkTs,然后保证lastWaterMarkTs取waterMarkTs的最大值
  • WaterMarkEventGenerator在start方法里头触发一个定时调度任务,其时间间隔正是watermarkInterval,也就是run方法每隔watermarkInterval时间被执行一次

小结

  • storm的WindowedBolt分为IWindowedBolt及IStatefulWindowedBolt,一个是无状态的,一个是有状态的
  • window有两个重要的参数,一个是windowLength,一个是slidingInterval,它们有两个维度,一个是Duration,一个是Count
  • BaseWindowedBolt的withTumblingWindow方法设置的windowLength及slidingInterval参数值相同;即tumbling window是一种特殊的sliding window,两个参数值一样,即window不会重叠
  • WaterMarkEventGenerator会触发一个调度任务,每隔watermarkInterval时间计算一下waterMarkTs(输入流中最新的元组时间戳的最小值减去Lag值),然后如果比lastWaterMarkTs值大,则更新lastWaterMarkTs
  • WaterMarkEventGenerator.track方法用于计算该tuple是否应该处理,如果该tuple的timestamp小于lastWaterMarkTs,则返回false,如果有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM则会发送给该stream,没有则打印log

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容