首先spout有三种:这些关系到相同的batchid里面是否包含相同的tuple
事务性:相同
模糊事务性:如果取不到原来的,则拿新的
无事务:不一定
所以只有事务性才能做到一个tuple唯一一次处理
模糊事务性 才能做到 最多处理一次
而无事务,则可能处理多次
————————
State本身就状态的意思,是为了解决把状态存在bolt内存中的问题:一个节点挂了,那么状态就丢了
所以需要存到外存中,所以state就是一个与外存打交道的抽象!!!
而在State这里,
它会提供两个方法基类方法:
beginCommit(Long txid)
commit(Long txid) 这两个方法,每次事务性写一个批次batch的数据的时候,会调用!!!!
来完成事务性操作
如果真正要完成事务型操作,需要写txid或者preval到数据库里,才能真正实现数据的事务
这需要实现上面两个方法,storm已经提供了实现了这两个方法的State,这样写到数据库里,会有额外信息,比如MemcacheState
如果State里面采用了事务型,那么Spout必须采用事务型spout
但是很多State在这两个方法的实现里都是空,所以也就是并没有做相关的数据,只是简单的数据持久化
但是要做到一条数据最多只被处理一次,那么只需要控制spout就可以了,State不需要做什么
和持久化打交道的事,全都放在了State方法里
StateFactory:一般是传入一个options,StateFactory的核心方法是makeState就是如何创建state,这个方法的核心参数: tuple到存储数据的mapper,有多少partition,这是第几个partition
对State可以有query,update方法,来将数据通过State方法,持久化!!
persist的时候,需要调用getStateFactory操作回去StateFactory,然后要有一个updater方法,里面调用State的update方法,来实现数据的持久化
查询的过程,是StateFactory+query方法,这个query方法调用State的query方法
State里的update和query,最好是批量的,这样可以减少和数据库的交互次数!!一般有MultiGet,MultiPut, MultiUpdate等方法
可以研究一下Hbase的State:它是一个kv的,写到hbase里面,有txid的
groupby之后,调用persistentAggregate,传入的是一个Aggregater类对象,persistentAggregate底层是调用一个partitionPersist方法 ,传入一个Updater,这个Updater就调用State的multiUpdate方法了,但是这个multiUpdate方法也要看属于什么样的State
如果是事务型,那么就要把txid存到hbase里了
state不是一个batch或者一个tuple就生成一个,而是而是一个分区一个,里面可以维护几个session,来连接到数据库
statefactory就是为了根据不同条件生成state用的,
一般一个state会对应一个blockid,这是和所在storm bolt partition的index计算得到的
然后在写hive的时候,由于hive表是多分区的,可能有多个state写到同一个分区中,但是不同的state肯定是写不同的文件,所以一个分区,会有多个文件,数量和state数量一直
在写hdfs的时候,不同的state会写不同的part文件,但是一般会有不同的filerotate策略,比如按照时间和文件大小,所以同一个state输出的文件也会分裂