序
本文主要研究一下storm的WindowedBolt
实例
@Test
public void testSlidingTupleTsTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("integer", new RandomIntegerSpout(), 1);
BaseWindowedBolt baseWindowedBolt = new SlidingWindowSumBolt()
//windowLength , slidingInterval
.withWindow(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
//通过withTimestampField指定tuple的某个字段作为这个tuple的timestamp
.withTimestampField("timestamp")
//输入流中最新的元组时间戳的最小值减去Lag值=watermark,用于指定触发watermark的的interval,默认为1秒
//当watermark被触发的时候,tuple timestamp比watermark早的window将被计算
.withWatermarkInterval(new BaseWindowedBolt.Duration(1, TimeUnit.SECONDS))
//withLag用于处理乱序的数据,当接收到的tuple的timestamp小于等lastWaterMarkTs(`取这批watermark的最大值`),则会被丢弃
.withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS));
builder.setBolt("slidingSum", baseWindowedBolt, 1).shuffleGrouping("integer");
builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("slidingSum");
SubmitHelper.submitRemote("slideWindowTopology",builder.createTopology());
}
这里主要设置了withWindow、withTimestampField、withWatermarkInterval、withLag
SlidingWindowSumBolt
public class SlidingWindowSumBolt extends BaseWindowedBolt {
private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowSumBolt.class);
private int sum = 0;
private OutputCollector collector;
@Override
public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
/*
* The inputWindow gives a view of
* (a) all the events in the window
* (b) events that expired since last activation of the window
* (c) events that newly arrived since last activation of the window
*/
List<Tuple> tuplesInWindow = inputWindow.get();
List<Tuple> newTuples = inputWindow.getNew();
List<Tuple> expiredTuples = inputWindow.getExpired();
LOG.debug("Events in current window: " + tuplesInWindow.size());
/*
* Instead of iterating over all the tuples in the window to compute
* the sum, the values for the new events are added and old events are
* subtracted. Similar optimizations might be possible in other
* windowing computations.
*/
for (Tuple tuple : newTuples) {
sum += (int) tuple.getValue(0);
}
for (Tuple tuple : expiredTuples) {
sum -= (int) tuple.getValue(0);
}
collector.emit(new Values(sum));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sum"));
}
}
- TupleWindow可以获取三类值,一类是当前窗口里头的所有数据,一类是上次窗口后新到达的数据,一类是过期的数据
WindowedBolt
IWindowedBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java
/**
* A bolt abstraction for supporting time and count based sliding & tumbling windows.
*/
public interface IWindowedBolt extends IComponent {
/**
* This is similar to the {@link org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except that while emitting,
* the tuples are automatically anchored to the tuples in the inputWindow.
*/
void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector);
/**
* Process the tuple window and optionally emit new tuples based on the tuples in the input window.
*/
void execute(TupleWindow inputWindow);
void cleanup();
/**
* Return a {@link TimestampExtractor} for extracting timestamps from a tuple for event time based processing, or null for processing
* time.
*
* @return the timestamp extractor
*/
TimestampExtractor getTimestampExtractor();
}
- IWindowedBolt是无状态的,也就是window的数据都存在内存中
- IWindowedBolt接口有个抽象实现类BaseWindowedBolt,其子类有BaseStatefulWindowedBolt、JoinBolt
IStatefulWindowedBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java
/**
* A windowed bolt abstraction for supporting windowing operation with state.
*/
public interface IStatefulWindowedBolt<T extends State> extends IStatefulComponent<T>, IWindowedBolt {
/**
* If the stateful windowed bolt should have its windows persisted in state and maintain a subset of events in memory.
* <p>
* The default is to keep all the window events in memory.
* </p>
*
* @return true if the windows should be persisted
*/
default boolean isPersistent() {
return false;
}
/**
* The maximum number of window events to keep in memory.
*/
default long maxEventsInMemory() {
return 1_000_000L; // default
}
}
- 在1.2.2版本IStatefulWindowedBolt没有定义任何方法,2.0.0版本定义了两个default方法,一个是isPersistent,一个是maxEventsInMemory
- isPersistent决定创建的是PersistentWindowedBoltExecutor还是StatefulWindowedBoltExecutor
- maxEventsInMemory决定WindowState保留多少数据在内存,其余的移到KeyValueState(
HBaseKeyValueState、InMemoryKeyValueState、RedisKeyValueState
)中 - IStatefulWindowedBolt接口有个抽象实现类BaseStatefulWindowedBolt
withWindow与withTumblingWindow
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java
/**
* Tuple count based sliding window configuration.
*
* @param windowLength the number of tuples in the window
* @param slidingInterval the number of tuples after which the window slides
*/
public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
}
/**
* Time duration based sliding window configuration.
*
* @param windowLength the time duration of the window
* @param slidingInterval the time duration after which the window slides
*/
public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
}
/**
* A time duration based tumbling window.
*
* @param duration the time duration after which the window tumbles
*/
public BaseWindowedBolt withTumblingWindow(Duration duration) {
return withWindowLength(duration).withSlidingInterval(duration);
}
/**
* A count based tumbling window.
*
* @param count the number of tuples after which the window tumbles
*/
public BaseWindowedBolt withTumblingWindow(Count count) {
return withWindowLength(count).withSlidingInterval(count);
}
- BaseWindowedBolt抽象类定义了诸多withWindow方法,该方法主要定义windowLength及slidingIntervals参数,而该参数有两个维度,一个是Duration,一个是Count
- withWindow即sliding window,而withTumblingWindow则是tumbling window
- 从方法定义可以看到withTumblingWindow的windowLength及slidingInterval参数值相同
WindowedBoltExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
public class WindowedBoltExecutor implements IRichBolt {
public static final String LATE_TUPLE_FIELD = "late_tuple";
private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
private static final int DEFAULT_MAX_LAG_MS = 0; // no lag
//......
private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf,
TopologyContext context, Collection<Event<Tuple>> queue, boolean stateful) {
WindowManager<Tuple> manager = stateful ?
new StatefulWindowManager<>(lifecycleListener, queue)
: new WindowManager<>(lifecycleListener, queue);
Count windowLengthCount = null;
Duration slidingIntervalDuration = null;
Count slidingIntervalCount = null;
// window length
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
windowLengthCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
} else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
windowLengthDuration = new Duration(
((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
TimeUnit.MILLISECONDS);
}
// sliding interval
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
slidingIntervalCount = new Count(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
} else if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
slidingIntervalDuration =
new Duration(((Number) topoConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
} else {
// default is a sliding window of count 1
slidingIntervalCount = new Count(1);
}
// tuple ts
if (timestampExtractor != null) {
// late tuple stream
lateTupleStream = (String) topoConf.get(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
if (lateTupleStream != null) {
if (!context.getThisStreams().contains(lateTupleStream)) {
throw new IllegalArgumentException(
"Stream for late tuples must be defined with the builder method withLateTupleStream");
}
}
// max lag
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
maxLagMs = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
} else {
maxLagMs = DEFAULT_MAX_LAG_MS;
}
// watermark interval
int watermarkInterval;
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)) {
watermarkInterval = ((Number) topoConf.get(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue();
} else {
watermarkInterval = DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
}
waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, watermarkInterval,
maxLagMs, getComponentStreams(context));
} else {
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
}
}
// validate
validate(topoConf, windowLengthCount, windowLengthDuration,
slidingIntervalCount, slidingIntervalDuration);
evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
manager, evictionPolicy);
manager.setEvictionPolicy(evictionPolicy);
manager.setTriggerPolicy(triggerPolicy);
return manager;
}
@Override
public void execute(Tuple input) {
if (isTupleTs()) {
long ts = timestampExtractor.extractTimestamp(input);
if (waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
windowManager.add(input, ts);
} else {
if (lateTupleStream != null) {
windowedOutputCollector.emit(lateTupleStream, input, new Values(input));
} else {
LOG.info("Received a late tuple {} with ts {}. This will not be processed.", input, ts);
}
windowedOutputCollector.ack(input);
}
} else {
windowManager.add(input);
}
}
- initWindowManager会读取maxLags的值,默认为0,即没有lag,之后创建WaterMarkEventGenerator的时候传入了maxLags参数
- 如果waterMarkEventGenerator.track方法返回false,且没有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM的话,则会打印log,其format为Received a late tuple {} with ts {}. This will not be processed.
WaterMarkEventGenerator
storm-2.0.0/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java
public class WaterMarkEventGenerator<T> implements Runnable {
/**
* Creates a new WatermarkEventGenerator.
*
* @param windowManager The window manager this generator will submit watermark events to
* @param intervalMs The generator will check if it should generate a watermark event with this interval
* @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late
* @param inputStreams The input streams this generator is expected to handle
*/
public WaterMarkEventGenerator(WindowManager<T> windowManager, int intervalMs,
int eventTsLagMs, Set<GlobalStreamId> inputStreams) {
this.windowManager = windowManager;
streamToTs = new ConcurrentHashMap<>();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("watermark-event-generator-%d")
.setDaemon(true)
.build();
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
this.interval = intervalMs;
this.eventTsLag = eventTsLagMs;
this.inputStreams = inputStreams;
}
public void start() {
this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
}
//......
/**
* Tracks the timestamp of the event in the stream, returns true if the event can be considered for processing or false if its a late
* event.
*/
public boolean track(GlobalStreamId stream, long ts) {
Long currentVal = streamToTs.get(stream);
if (currentVal == null || ts > currentVal) {
streamToTs.put(stream, ts);
}
checkFailures();
return ts >= lastWaterMarkTs;
}
@Override
public void run() {
try {
long waterMarkTs = computeWaterMarkTs();
if (waterMarkTs > lastWaterMarkTs) {
this.windowManager.add(new WaterMarkEvent<>(waterMarkTs));
lastWaterMarkTs = waterMarkTs;
}
} catch (Throwable th) {
LOG.error("Failed while processing watermark event ", th);
throw th;
}
}
/**
* Computes the min ts across all streams.
*/
private long computeWaterMarkTs() {
long ts = 0;
// only if some data has arrived on each input stream
if (streamToTs.size() >= inputStreams.size()) {
ts = Long.MAX_VALUE;
for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
ts = Math.min(ts, entry.getValue());
}
}
return ts - eventTsLag;
}
}
- track方法根据tuple的时间戳与lastWaterMarkTs判断,是否需要处理该tuple
- lastWaterMarkTs在WaterMarkEventGenerator的run方法里头被更新,computeWaterMarkTs方法先计算streamToTs这批tuple的最小时间戳,然后减去eventTsLag,就是waterMarkTs值
- 如果waterMarkTs大于lastWaterMarkTs,则更新,也就是说WaterMarkEventGenerator的run方法不断计算waterMarkTs,然后保证lastWaterMarkTs取waterMarkTs的最大值
- WaterMarkEventGenerator在start方法里头触发一个定时调度任务,其时间间隔正是watermarkInterval,也就是run方法每隔watermarkInterval时间被执行一次
小结
- storm的WindowedBolt分为IWindowedBolt及IStatefulWindowedBolt,一个是无状态的,一个是有状态的
- window有两个重要的参数,一个是windowLength,一个是slidingInterval,它们有两个维度,一个是Duration,一个是Count
- BaseWindowedBolt的withTumblingWindow方法设置的windowLength及slidingInterval参数值相同;即tumbling window是一种特殊的sliding window,两个参数值一样,即window不会重叠
- WaterMarkEventGenerator会触发一个调度任务,每隔watermarkInterval时间计算一下waterMarkTs(
输入流中最新的元组时间戳的最小值减去Lag值
),然后如果比lastWaterMarkTs值大,则更新lastWaterMarkTs - WaterMarkEventGenerator.track方法用于计算该tuple是否应该处理,如果该tuple的timestamp小于lastWaterMarkTs,则返回false,如果有配置Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM则会发送给该stream,没有则打印log