Storm trident聚合运算之persistentAggregate
persistentAggregate可以看成是对源源不断发送过来数据流做一个总的聚合,每个批次的聚合值只是一个中间状态,通过与trident新提出的state概念结合,实现中间状态的持久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>。聚合运算完成后将运算结果emit给后续的Bolt处理
比如:
TridentTopology topology = new TridentTopology();
topology.newStream("filter", new WordSpout()).parallelismHint(1).shuffle()
.each(new Fields("field1", "field2"), new WordFilter()).parallelismHint(4).shuffle()
.each(new Fields("field1", "field2"), new WordFunction(), new Fields("field3")).parallelismHint(4).shuffle()
.persistentAggregate(new MyState.MyFactory(), new Fields("field3"), new Doing(), new Fields("field4")).newValuesStream().shuffle()
.each(new Fields("field4"), new CountFilter(), new Fields("field5")).parallelismHint(4);
具体介绍一下persistentAggregate的定义。
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
重点看看persistentAggregate方法所需的参数。
StateFactory stateFactory,要求一个实现org.apache.storm.trident.state.StateFactory接口的对象。该接口比较简单,只有一个方法
State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);
Map conf:启动topology时的配置信息。
IMetricsContext metrics:监控指标统计对象
int partitionIndex:分区下标
int numPartitions:分区数
返回类型要求一个实现org.apache.storm.trident.state.State接口对象。该接口有2个方法
public interface State {
void beginCommit(Long txid); // can be null for things like partitionPersist occuring off a DRPC stream
void commit(Long txid);
}
主要用来支持事务性。
persistentAggregate方法的第二个参数很简单,一个org.apache.storm.tuple.Fields对象,用来指定前一步操作emit出的Fields。
persistentAggregate方法的第三个参数用来指定具体执行聚合运算的对象,也就要求实现CombinerAggregator<T>或者ReducerAggregator<T>的对象。
import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.tuple.TridentTuple;
public class Doing implements CombinerAggregator<Long>{
@Override
public Long init(TridentTuple tuple) {
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 * val2;
}
@Override
public Long zero() {
return 0L;
}
}
init:每条tuple调用1次,对tuple做预处理。
combine:每条tuple调用1次,和之前的聚合值(val1)做运算。如果是第一条tuple则和zero返回的值做运算。
zero:当没有数据流时的处理逻辑。
在每个batch结束时将最后运算得到的结果emi出去t。
persistentAggregate方法的第四个参数指定聚合运算完成后,emit给下一个bolt的Fields。
注意:实现State接口的对象还要实现Snapshottable接口。
package org.apache.storm.trident.state.snapshot;
import org.apache.storm.trident.state.ValueUpdater;
// used by Stream#persistentAggregate
public interface Snapshottable<T> extends ReadOnlySnapshottable<T> {
T update(ValueUpdater updater);
void set(T o);
}
通过调用update方法的参数ValueUpdater updater,最终会调用CombinerAggregator接口实现对象的combine方法。
import java.util.Map;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.ValueUpdater;
import org.apache.storm.trident.state.snapshot.Snapshottable;
public class MyState implements State,Snapshottable<Long> {
private Map<String,Long> result = Maps.newConcurrentMap();
@Override
public void beginCommit(Long txid) {
}
@Override
public void commit(Long txid) {
}
@Override
public Long get() {
return result.get("key");
}
@Override
public Long update(ValueUpdater updater) {
Long l = (Long) updater.update(get());
set(l);
return l;
}
@Override
public void set(Long o) {
result.put("key", o);
}
public static class MyFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new MyState();
}
}
}
假如spout发射3个batch,每个batch4条数据,则调用顺序如下:
第一个batch
CombinerAggregator.zero //当前数据流中没有数据,执行zero方法
CombinerAggregator.init //对当前batch中第一条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第一条数据,执行combine方法,完成聚合运算。用之前步骤中zero的结果与前一步init的结果进行计算
CombinerAggregator.init //对当前batch中第二条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第二条数据执行聚合运算。用前一步init的结果与前一次combine的结果进行计算。
CombinerAggregator.init //对当前batch中第三条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第三条数据执行聚合运算。用前一步init的结果与前一次combine的结果进行计算。
CombinerAggregator.init //对当前batch中第四条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第四条数据执行聚合运算。用前一步init的结果与前一次combine的结果进行计算。
CombinerAggregator.zero //当前数据流中没有数据,执行zero方法
CombinerAggregator.combine //用前一步zero方法的结果与前一次combine的结果进行计算。
State.update //回调State接口实现对象,将前一batch聚合结果与当前batch聚合结果进行计算,需要在State接口实现对象的update方法中保存最近全局聚合运算结果。
第二个batch
CombinerAggregator.zero
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.zero
CombinerAggregator.combine
State.update
第三个batch
CombinerAggregator.zero
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.zero
CombinerAggregator.combine
State.update
更多的batch或每个batch有更多的数据时,规则相同