storm的集群提交方式
StormSubmitter.subnitTopology()方法
问题一、如何把storm的任务提交到集群上面?
使用命令:storm jar jar包路径 类全限定名
问题二:查看任务日志
凡是有supervisor的节点
storm logviewer &
问题三:停止任务:在所有的supervisor
storm kill topotest 或者在storm的ui界面直接kill掉topology
问题四、spout和bolt做的事情差不多,为什么要有spout来获取数据,而不直接让bolt获取并处理数据?
spout
数据源
一般spout就只是获取数据,几乎不对数据进行处理和计算
问题五:并行度
nimbus->zookeeper->supervisors
supervisor->worker(最多四个,具体由配置的端口号决定,一个worker是一个进程)->executor(一个executor是一个线程,一个executor默认对于运行一个task)->task任务(spout,bolt)
1)默认情况下,一个topotest就分配一个worker
2)默认情况下,一台supervisor就会有四个slot,也就是能启动四个worker
3)默认情况下,一个executor线程对应一个task任务,一个spout或者一个bolt就是一个executor线程。
同一个worker里面的executor只会为同一个topology任务服务。
因此一个spout或者一个bolt就是一个线程。
设置executor个数:
多线程和单线程的区别:对CPU的利用率不一样
一个executor里面运行的task的类型要么都是spout,要么都是bolt,不可能出现既有spout又有bolt.
提高storm任务的并行度:
1)增加supervisor
2)增加worker(增加端口号)
3)增加executor(即增加java代码的线程)
在一个cpu core的情况下,两个executor线程对CPU利用率比较好。
我们一个任务提交了以后,我们可以动态的修改这个任务的worker的个数(一般不修改)和executor的个数。
修改bolt的executor的个数为2,这样就可以实现多线程并行运行bolt任务,对CPU利用率高。
默认情况下,一个worker会有一个ack线程(即一个executor,对应一个task);
该ack线程的作用是监控spout和bolt任务。
通过下面这种方式设置ack个数,0表示没有。便于我们观察executor和task的关系。
我们将任务提交到集群之后,然后在集群上运行命令来动态调整executor的个数:
storm rebalance (--help)查看怎么使用?
参数:
topotest:topology任务的标识
-w:wait,等多久生效
-n :number,代表worker个数
-e:executor,代表executor的个数
-e myblot=2 -e myspout=1(通过代码中指定的spout和bolt的名字来 标识)
storm rebalance topotest -w 10 -n 1 -e mybolt=2 -e myspout=1
task的个数在写代码的时候就固定了,不可以动态修改。
线程安全:即多线程出来的计算结果跟单线程出来的计算结果是一致的。
分组策略:指的是上一级将数据传到下一级,以什么样的策略传数据呢?
需求:统计文本的行数。
从kafka里面读取数据
首先开启kafka并创建一个topic
[hadoop@hadoop02 kafka_2.11-0.10.2.1]$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
[hadoop@hadoop03 kafka_2.11-0.10.2.1]$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
[hadoop@hadoop04 kafka_2.11-0.10.2.1]$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
在任意一台服务器,这里选择了Hadoop02上创建一个topic
[hadoop@hadoop02 kafka_2.11-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 1 --partitions 1 --topic test
查看
[hadoop@hadoop02 kafka_2.11-0.10.2.1]$ bin/kafka-topics.sh --list --zookeeper hadoop02:2181
写代码生产数据到kafka的test主题(即一个文件)
打印线程数:Thread.currentThread().getName()
如果要求不高,可以这样统计结果:一个线程的结果*线程数
FieldGrouping:相同的单词必然分到同一组
AllGrouping:广播发送, 对于每一个tuple, 所有的Bolts都会收到。两个线程获取到的数据都是全部的,且一模一样。
分组策略在线程数大于1是展现出各自不同的效果。
小练习:单词计数/词频统计
1)进行单词计数 2)开启并行度 3)线程安全
思路:需要两个bolt,第一个bolt采取ShuffleGrouping策略,拆分单词;第二个bolt采取FieldsGrouping策略,相同的单词必然到同一个线程,保证了线程安全。
每个bolt开启三个线程。
注意使用FieldsGrouping时必须指定fields字段
注意最后结果:每一次统计结果都会打印,因为storm是实时统计。
淘宝大屏项目
作业:一份数据,统计每天(日期)的pv(文本数据总数)和uv(用户数)
以京东的网站来解释以下名词:
如何判断京东网站火不火?可以通过以下三个变量来估计。
1)点击一下,会产生一个pv(pageView),pv越,网站火
2)UV(userView),同一个用户无论点击多少次,它的UV就是1,UV值高,网站火。
3)会话:一次会话表示一次有效的访问。会话与浏览器共存亡,但有时间周期,可能六分钟不点击这次会话就会过期。
4)跳出率:只浏览了一个页面的会话个数个数占总的会话数的比率,越小越好。会话的平均时间越长越好。
RPC:官网:storm.apache.org
Storm中有一个Distributed RPC:分布式的RPC
RPC概念:remote procedure call protocol:远程过程(进程)调用(方法)协议
不同进程间的方法调用
好多网站上都会有天气预报,这些数据是从哪里来的?
RPC是一个CS架构:即client/server架构
理解RPC:
1)客户端(进程)去调用服务端(进程)的方法
2)方法的执行在服务端
第一个RPC案例:首先需要RPC的jar包,可以搭建一个maven项目
server服务端按照RPC协议的规范实现方法,并提供main方法给客户端提供一个IP地址和端口号,RPC的协议需要提供一个versionID,必须是long类型;服务端一直处在启动状态等待客户端调用。
client客户端通过服务端的IP地址和端口号,通过RPC协议代理类调用服务端方法,最终方法的执行在服务端。
HadoopRPC:NameNode、DataNode、resourcemanager、NodeManager都是服务端
阅读源码的三种方式:
1)maven项目中会自动下载源码,缺点:不能直接修改源代码
2)直接到官网下载 src源码包
3)在github网站去下载源码,直接在网站看也可以
通过查看源码来证明NameNode是HadoopRPC的服务端,证明在初始化NameNode的时候有如下这样一段代码,那么就可以证明它是hadoopRPC的服务端了。
怎么证明是RPC协议?RPC协议中必定会有versionID号。
快捷键:Ctrl+T、Ctrl+f、Ctrl+o
创建目录mkdir的真正的执行者:
clientProtocol协议实例NameNode==真正创建目录的实例对象。
storm的DRPC的实现
需求:客户端传参spark或者Hadoop,通过drpc输出hello spark,hello Hadoop
内置的spout发射的数据:0表示id号,1表示数据
自定义bolt发射数据通关过collector,发射数据有要求:第一个位置必须是id号(因为最后一个bolt是内置bolt,有规定),后面的参数是我们真正要发射的数据
出错:必须明确定义下一个bolt字段,即declare方法必须定义。
第一个案例是本地local
第二个案例是远程remote
服务端:提交topology任务到集群 注意:createRemoteTopology
客户端:host:drpc server主机名 port(drpc服务端口):3773
storm架构:
主节点nimbus:1)用户提交任务给主节点 2)任务运行失败,重新分配任务
加入nimbus挂了的影响
1)任务是可以照常运行
2)不能提交新的任务
3)任务运行失败不能重新分配任务
从节点supervisor:计算处理数据
nimbus和supervisor都把信息存储到zookeeper。
查看storm信息,在zookeeper,1)监控zookeeper的/storm/supervisors目录,监控事件类型:子节点变化事件。2)监控/storm/storms目录,记录的是topology任务分配信息,监控事件类型:子节点数据变化事件。
注意:spark和storm的进程都是worker,冲突
消息的可靠性:
IRichSpout:
ack方法:数据处理成功以后,会被调用
fail方法:数据处理失败以后,会被调用
数据处理失败之后有两种处理:1)重发几次 2)重发几次依然失败单独处理
实现消息的可靠性:通过ack方法和fail方法
数据发射成功,存到集合,ack方法移除该条数据
数据处理失败,先判断retry次数,重发,将messageID和retry次数存到hashmap集合。
在spout和bolt中回调ack方法。注意是BaseRichSpout和BasicRichBolt
消息可靠性原理:
默认情况下,一个worker有一个ack线程。
一个ack线程维护着 taskid,ack_value(默认情况下是0)
xor异或
一个tuple有一个tupleid,ack_value与每一个tupleid异或最终的结果赋值给ack_value
正常情况下最终的ack_vlaue依然是0.
什么是线程安全?简单来说,就是多线程计算结果跟单线程处理结果一致。
DRPC:本地和远程。
补充知识:Trident(框架)