1、动态调整线程数:
storm rebalance topotest -w 10 -n 1 -e mybolt=2 -e myspout=1
可以提高任务平行度 ,从而提高CPU核利用率。
2、两个重要的分组策略:
shuffleGrouping
fieldsGrouping
运用:词频统计,每个bolt开启三个线程
3、storm的drpc的运用:
说明:内置的spout发射的数据:0表示id号,1表示数据
自定义bolt发射数据通关过collector,发射数据有要求:第一个位置必须是id号(因为最后一个bolt是内置bolt,有规定),后面的参数是我们真正要发射的数据
出错:必须明确定义下一个bolt字段,即declare方法必须定义。
1)本地:直接在eclipse中运行。
2)远程:drpc host:在集群中配置drpc server主机地址,drpc port:默认是3773 ,并使用命令storm drpc开启drpc服务,之后去web页面访问。
4、storm的消息可靠性的实现
ack线程和fail
5、storm的消息可靠性的原理
6、storm与Trident
代码
第一个案例:文本行数统计
spout的任务:从kafka中接收文本数据,一条一条发射给bolt
bolt的任务:每接收一条数据,统计次数+1
这里指定bolt为两个线程,因此每个线程统计 了一半的数据,最终的数据是两个线程统计结果之和。
第二个案例:词频统计
spout的任务:从kafka中接收文本数据,文本格式不一定,采取shuffleGrouping的策略一条一条发射给splitbolt
splitbolt的任务:将一行文本拆分成一个个单词,采取fieldsGrouping策略将数据分发给每一个countbolt线程,保证相同单词一定在同一个线程
countbolt的任务:统计每个单词的出现次数(提示:用hashMap),最终将结果打印在控制台。
注意:countbolt中的一个execute方法每次只会接收一条数据,因此hashmap集合必须定义在外面;
在execute方法中统计一个单词出现的次数,通过tuple获取单词,然后判断这个单词在map集合中是否出现过,没有出现,则count复制为0;然后将这个单词和它的count扔进map集合里,并进行累加。
如果要提交到集群上面,spout获取文件的路径必须是Linux系统路径
第三个案例:客户端传递参数spark,通过stormDRPC输出hello spark
bolt的任务:继承BaseRichBolt,execute方法中写需求
main方法:DRPC的实例和代理类等等。
第四个案例:storm第二天作业:统计每天的pv和UV,在此基础上实现消息的可靠性
思路和wordcount类似,关键是谁是Word,因为统计的是uv,因此最后一个字段是Word,统计不同Word的个数
每天pv:在datebolt中按照fieldsGrouping分离数据,这个field字段是日期字段,将日期相同的分到一组,然后在pvbolt中统计这个字段出现的次数,即pv,可以把这个结果写出到一个pv统计文件中;
每天uv,在每天pv的基础上统计uv,这一次的Word变成了最后一个字段,统计不同单词出现的次数,用Treeset,最后遍历set集合统计次数。