Trident是什么
Trident是Storm上的高层次抽象,它能够在提供高吞吐量的能力同时(每秒几百万消息),也提供了有状态的流式处理和低延迟分布式查询的能力。它类似Pig这种高级批处理工具,Trident提供了joins、aggregations、grouping、functions以及filters等功能函数。
Trident主要提供了以下功能:
- 常见的流分析操作,比如join、aggregation等。具体就是Trident提供的API操作。
- 一次性处理语义(exactly-once)。
- 事务数据存储(transaction)。
Trident核心数据模型是一系列批处理(Batch)的流,也就是说虽然Storm Trident处理的是Stream,但是处理过程中Trident将Stream分隔成Batch来进行处理。
所以Stream会被切分成一个个Batch分布到集群中,所有应用在Stream上的函数都会具体应用到每个节点的Batch上中,来实现并行计算。
为什么使用Trident
Storm Topology适合一些无统计、不需要Transaction(事务)的应用,比如过滤、清洗数据等场景。Topology在开启Ack的情况下,能够保证数据不丢失但可能重复。
而Trident适合需要严格不丢不重复消息的场景,比如交易额统计。Trident通过事务来实现eactly-once,保证数据不丢不重复。但同时,使用Trident会使其性能有所下降。
Triden API
Trident API可以分为Spout操作和Bolt操作,对于Bolt操作提供常见的流数据分析操作。
Bolt Trident提供了五种类型的操作:
- 本地分区操作(Partition-local operations),操作应用到本地每个分区上,这部分操作不会产生网络传输。
- 重分区操作,对数据流进行重新分区,但是不会改变数据内容,这部分操作会有网络传输。
- 聚合操作,这部分操作会有网络传输。
- 流分组操作。
- 合并(meger)和连接(join)操作。
Trident Spout
Trident与Storm topology一样也是使用Spout作为Trident拓扑的数据源。Trident Spout提供了更复杂的API,因为它既可以获取事务数据源,也可以获取非事务数据源。
对于非事务的Spout,可以使用普通的Storm IRichSpout接口:
TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid",new RichSpout());
Trident拓扑中,所有Spout都需要指定一个唯一的流的标识,比如这里的“myspoutid”(整个集群级别的唯一标识)。Trident使用该唯一标识存储Spout元数据,比如txId(事务ID)以及其它Spout相关的信息。
可以通过如下配置,来配置Zookeeper保存Spout的元数据。
transactional.zookeeper.servers:Zookeeper主机列表
transactional.zookeeper.port:Zookeeper集群端口
transactional.zookeeper.root:在Zookeeper存储元数据的根目录
下面是Trident Spout一些类型:
- ITridentSpout:最通用API接口,可以支持事务和不透明事务语义。一般会用这个API分区的特性,而不是直接使用该接口。
- IBatchSpout:非事务Spout,每次发射一个Batch的元组。
- IPartitionedTridentSpout:事务Spout,从分区数据源读取数据,比如Kafka集群。
- IOpaquePartitionedTridentSpout:不透明事务Spout,从分区数据源中读取数据。
本地分区操作
本地分区操作不会产生网络传输,并且会独立的应用到batch的每个分区上。
函数操作(Functions)
函数用于接受一个tuple,并且指定接收这个tuple的哪些field,它会发射(emit)0个或多个tuple。输出的tuple feild会被追加到原始tuple的后面,如果不输出tuple就意味着这个tuple被过滤掉了。比如下面的实例:
class MyFunction extends BaseFunction {
/**
* 在每个元组上面执行该逻辑函数,并且发射0个或多个元组
*
* @param tuple 传入的元组
* @param collector 用于发射元组的收集器实例
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//tuple.getInteger(0)接收第一个Field
for(int i=0; i< tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
假设我们有一个“mystream”流,其中每个tuple包含以下filed ['a','b','c'],比如有以下元组。
[1,2,3]
[2,1,2]
[10,0,1]
我们将每个元组都经过以下MyFunction操作:
//将每个tuple的字段"b"应用于MyFunction,并且产生新字段"d"追加到原tuple的字段中
mystream.each(new Fields("b"),new MyFunction(),new Fields('d'))
得到数据为:
//[1,2,3]的emit,其中0和1是新字段“d”
[1,2,3,0]
[1,2,3,1]
//[2,1,2]的emit,其中0是新字段“d”
[2,1,2,0]
//[10,0,1]不满足需求,过滤掉了
过滤操作(Filters)
接收一个tuple,并决定这个tuple是否应该被保留。比如我们有以下Filter操作:
class MyFilter extends BaseFilter {
/**
* 确定是否应该从流中过滤元组
*
* @param tuple 被评估的元组
* @return 返回"false"则该元组被抛弃,返回"true"则该元组被保留
*/
@Override
public boolean isKeep(TridentTuple tuple) {
//每个tuple中的第一个Field
return tuple.getInteger(0) > 1;
}
}
同样以Function中的实例进行操作:
mystream.filter(new MyFilter())。
输出数据:
#[1,2,3]中的第一个字段不大于1,所以被过滤掉
[2,1,2]
[10,0,1]
map和flatMap操作
map接收一个tuple,将其作用在map函数上,并且返回经map函数处理过的tuple字段值。
比如下面的实例:
class UpperMap implements MapFunction {
/**
* 流中的每个trident元组调用
*
* @param input 接受trident tuple
* @return 返回转换之后的值
*/
@Override
public Values execute(TridentTuple input) {
//只返回原tuple中第一个Filed的大写字符串(其它filed被丢弃了)
return new Values(input.getString(0).toUpperCase());
}
}
flatMap类似map,但是它会分两步执行:执行flat将所有元素展开,然后每个元素使用map函数。比如[[1,2],3,4]经过flat操作后得到元素集合为[1,2,3,4]。比如我们有以下实例:
class SplitFlatMap implements FlatMapFunction {
/**
* 流中的每个trident元组调用
*
* @param input 接收的trident tuple
* @return 一个可迭代的结果集
*/
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> resultValues = new ArrayList<>();
//获取一个Filed并将其以空格作为切割
for(String word : input.getString(0).split(" ")){
resultValues.add(new Values(word));
}
return resultValues;
}
}
我们通过上面的flatMap和Map就可以得到一个流的所有大写词组流了。
mystream.flatMap(new SplitFlatMap()).map(new UpperMap())。
通常我们也可以将map或flatMap的输出结果命名一个新字段:
mystream.flatMap(new SplitFlatMap(),new Fields("word"))
peek操作
peek操作一般用来debug,比如查看上一步的操作结果。假如我们有以下peek操作。
class PrintPeek implements Consumer {
/**
* 对于输入的每个trident元组应用以下操作
*
* @param input 接收的trident 元组
*/
@Override
public void accept(TridentTuple input) {
System.out.println(input.getString(0));
}
}
以下处理操作,能把转换大写之后的tuple打印打出来:
mystream.flatMap(new SplitFlatMap()).map(new UpperMap()).peek(new PrintPeek());
min和minBy操作
返回一批(Batch)元组中的每个分区的最小值。
比如一批(Batch)元组有以下三个partition,它们对应的Field为['device-id','count']。
Partiton 0:
[213,15]
[125,21]
[100,10]
Partition 1:
[123,20]
[215,32]
[183,25]
针对以上数据统计count最小的device-id:
mystream.minBy(new Fields("count"));
返回结果:
Partition 0:
[100,10]
Partition 1:
[123,20]
除了以上使用方式,我们还可以通过传入比较器来使用min和minBy:
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)
public Stream min(Comparator<TridentTuple> comparator)
max和maxBy操作
max和maxBy操作同min/minBy操作,只不过返回最大值。
mystream.maxBy(new Fields("count"));
上面实例输出结果为:
Partition 0:
[125,21]
Partition 1:
[215,32]
max和maxBy也提供了自定义比较器的方法:
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)
public Stream max(Comparator<TridentTuple> comparator)
窗口操作(Window)
Trident流能够处理具有相同窗口的元素,对它们进行聚合操作,然后将聚合结果向下发送。Storm支持两种窗口操作:翻滚窗口(Tumbing window)和滑动窗口(Sliding window)。
Tumbing window
元组根据处理时间或计数分组到一个窗口中,任何元组只属于其中一个窗口。
//返回一个元组流的聚合结果,它是滚动窗口内每windowCount个数的聚合结果
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一个元组流的聚合结果,这些元组是一个窗口的聚合结果,该窗口在windowDuration的持续时间内滚动
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window
元组在每个滑动间隔的窗口内分组,一个元组可能属于多个窗口。
//返回一个元组流的聚合结果,它是滑动窗口每windowCount个元组树的聚合结果,并在slideCount之后滑动窗口
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一个元组流的聚合结果,该窗口在slidingInterval持续滑动,并在windowDuration处完成一个窗口
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Common window
除了上面提供的滚动窗口api和滑动窗口api,Trident还提供了公用窗口api,通过windowConfig可以支持任意窗口。
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields)
Trident window api需要使用WindowsStoreFactor存储接收到的元组和聚合值。目前,Trident提供了HBaseWindowsStoreFactor的HBase实现。
partitionAggregate操作
partitionAggregate对一批(batch)元组的每个分区进行聚合,与前面Function在元组后面追加不同Field不同,partitionAggregate会使用发射出去的元组替换接收进来的元组。比如以下实例:
比如有以下数据,对应的Field分别为["a","b"]:
Partition 0:
["a":1]
["b":2]
Partition 1:
["c":2]
["d":2]
使用partitionAggregate进行求和:
mystream.partitionAggregate(new Fields("b"),new Sum(),new Fields("sum"))
经过partitionAggregate函数之后的结果为:
Partition 0:
["sum":3]
Partition 1:
["sum":4]
Trident API提供了三个聚合器接口:CombinerAggregator、ReducerAggregator和Aggregator。
CombinerAggregator操作
CombinerAggregator只返回单个tuple,并且这个tuple只包含一个Field。每个元组首先都经过init函数进行预处理,然后在执行combine函数来计算接受到的tuple,直到最后一个tuple到达。如果分区内没有tuple,则会通过zero函数发射结果。
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
比如以下实例:
class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
//计数,每个tuple代表一个数
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
ReducerAggregator操作
ReducerAggregator通过init方法提供一个初始值,然后每个输入的tuple迭代这个值,最后产生一个唯一的tuple输出。
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
比如同样使用RecuerAggregator来实现计数器:
class Count implements ReducerAggregator<Long> {
@Override
public Long init() {
return 0L;
}
@Override
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
Aggregator
执行聚合操作最通用的接口就是Aggregator了,它能够发射任意数量的元组,每个元组可以包含任意数量的字段。
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
它的执行流程是:
- 在处理Batch之前调用init方法,它返回一个聚合的状态值,传递给aggregate和complete方法。
- 为批处理分区中的每个tuple调用aggregate方法,此方法可以更新状态值,也可以发射元组。
- 当aggregator处理完Batch分区的所有元组后调用complete方法。
使用Aggregator来实现计数器:
class CountAgg extends BaseAggregator<CountAgg.CountState> {
class CountState{
long count = 0;
}
@Override
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
@Override
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count += 1;
}
@Override
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
状态查询(stateQuery)和分区持久化(partitionPersist)
stateQuery用于查询状态源,partitionPersist用于更新状态源。具体使用方式可查看:http://storm.apache.org/releases/1.2.2/Trident-state.html
投影(projection)操作
projection操作用于只保留指定的字段,比如元组有字段["a","b","c","d"],通过以下投影操作,输出流只会包含["c","d"]。
mystream.projection(new Fields("c","d"));
重分区操作
Repartition操作运行一个函数来改变元组在任务之间的分布,调整分区数也可能会导致Repartition操作。重分区操作会引发网络传输。下面是重分区的相关函数:
- shuffle:使用随机算法来均衡tuple到每个分区。
- broadcast:每个tuple被广播到所有分区上,使用DRPC时使用这种方法比较多,比如每个分区上做stateQuery。
- global:所有tuple都发送到一个分区上,这个分区用来处理stream。
- batchGlobal:一个batch中的所有tuple会发送到一个分区中,不同batch的元组会被发送到不同分区上。
- partition:通过一个自定义的分区函数来进行分区,这个自定义函数需要实现
org.apache.storm.grouping.CustomStreamGrouping
。
聚合操作
Trident提供了aggregate和persistentAggregate方法,aggregate运行在每个batch中,而persistentAggregate将聚合所有Batch,并将结果保存在一个状态源上。
我们前面讲的aggregate、CombinerAggregator和ReducerAggregator运行在patitionAggregation上是本地分区操作。如果直接作用于流上,则是对全局进行聚合。
在对全局流进行聚合时,Aggregator和ReducerAggregator会首先重分区到一个单分区,然后在该分区上执行聚合函数。而CombinerAggregator则会首先聚合每个分区,然后重分区到单个分区,在网络传输中完成聚合操作。所以我们应该尽量用CombinerAggregator,因为它更加高效。
mystream.aggregate(new Count(),new Fields("count"));
流分组操作
groupBy操作会重新分区流,对指定字段执行partitionBy操作,指定字段相同的元组被划分到相同的分区。goupBy操作如下图:
如果在流分组中运行聚合器,聚合会在每个group中运行,而不是对整个Batch操作。
合并和连接
Trident可以允许我们将不同流组合在一起,通过TridentTopology.merge()方法操作。
//合并流会以第一个流的输出字段来命名
topology.mege(stream1,stream2,stream3);
另一种合并流的方式是连接,类似于SQL那样的连接,要求输入是有限的。所以Trident的join只适用于来自Spout的每个小Bath之间。
比如有一个流包含["key1","val1","val2"],另一个流包含["key2","val1","val2"],通过以下连接操作:
//Trident需要join之后的流重新命名,因为输入流可能存在重复 字段。
mystream.join(stream1,new Fields("key1"),stream2,new Fields("key2"),new Fields("key","a","b","c","d"))