安装:
1.下载并解压缩Zookeeper
官网地址:http://hadoop.apache.org/zookeeper/releases.html
解压缩步骤此处省略....
2.修改zookeeper的配置文件
把zookeeper对应的conf目录下的zoo-sample.cfg重命名为zoo.cfg,配置工作目录和端口号
# The number of milliseconds of each
ticktickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/dev/zookeeper-3.4.9/data
# the port at which the clients will connect
clientPort=2181
server.id_num1=hostname1:2888:3888
server.id_num2=hostname2:2888:3888
注意:集群模式下分别在datadir目录下创建文件myid,其中的内容为id_num,例如:
echo id_num1 > myid
3.下载并压缩storm
官网地址:http://storm.apache.org/downloads.html
解压缩步骤此处省略.....
4.修改storm配置文件
修改conf/storm.yaml,conf/storm.yaml中的配置选项将覆盖conf/defaults.yaml
1):storm.zookeeper.servers:storm集群中使用的zookeeper集群的地址
storm.zookeeper.servers:
-"host_ip" (此处填写zookeeper集群的主机名或Ip,多个用逗号分隔)
nimbus.host:"host_ip" (此处填下nimbus进程的主机,即主节点ip)
storm.local.dir:"/dest/to/path" (此处填写storm存储目录)
supervisor.slots.ports:
- "host_ip:port" (此处填写从节点的端口号,可随意,当采用单机模式的时候,需要写不同的端口号,具体的个数根据从节点的个数来定)
5.启动zookeeper
注意:集群中的每台机器都要启动,且启动命令一致
进入zookeeper的安装目录
#bin/zkServer.sh start
6.启动storm nimbus
进入storm安装目录
#bin/storm nimbus
7.启动storm supervisor
注意:如果是单机模式,即启动一次即可,如果是集群模式,需要每台都要启动,命令一致
进入storm安装目录
#bin/storm supervisor
8.启动storm ui
进入storm安装目录
#bin/storm ui
然后访问localhost:8080(或者主节点主机名:8080)就会看到storm的基本信息,到此,storm的安装部署已经成功
接下来进入storm的API,首先先要了解storm中的顶层接口IComponent
storm中Spout和Bolt都是其Component(部件的意思),所以storm定义了一个名叫IComponent的接口,全家普如下:
注意:
绿色部分是我们常用的类,红色部分是与事务有关的
BaseComponent是Storm提供的"偷懒"的类,它及其子类,或多或少实现了接口的部分方法,这样我们在使用的时候,不用自己每次都写所有的方法,值得一提的是:像BaseXXX的类,它所实现的方法,都是空的,直接返回null,如果继承这样的类,需要自己重写方法。下面介绍Spout和Bolt组件相关的Api
Spout
首先看一下总体图:
从图中很明显的看出Spout最顶层抽象的是ISpout接口,简单介绍一样接口中的方法:
open():初始化动作,可以在该Spout初始化的时候做一些动作,传递上下文等
close():该Spout关闭之前执行,但不能得到保证一定可以执行.Spout是作为Task运行在Worker中的,在Cluster模式下,supervisor会直接kill -9 worker的进程,这样它就无法执行了.而在本地模式下,如果是发送停止命令,是可以保证close方法的执行的.
activate()和deactivate():一个Spout可以被暂时激活和关闭,这两个方法可以在对应的时刻调用执行
nextTuple():用来发射数据,Spout中最核心的部分,一些具体的需求可以在该方法中实现
ack():一个Tuple会有唯一一个id,当该Tuple被成功处理,会执行该方法
fail():与ack()方法同理,当Tuple处理失败会调用该方法
总结:
通常情况下(shell和事务除外),实现一个Spout,可以直接实现IRichSpout,如果不想写多余的代码,可以继承BaseRichSpout
Bolt
同样,首先看下总体图:
可以看出为什么IBasicBolt没有继承IBolt?
我们先看下IBolt的方法:
我们需要知道的是IBolt继承了java.io.Serializable,我们在nimbus上提交了Topology后,创建出来的Bolt会序列化发送到具体执行的Worker上,Worker在执行该Bolt时,会首先调用prepare方法传入当前执行的上下文
execute(Tuple):接收一个Tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail方法(表示失败)来反馈结果
cleanup():同Ispout的close方法,不能保证其一定被执行
好了,现在可以回答为什么IBasicBolt没有继承IBolt这个问题了,Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中反馈结果了,storm内部会自动反馈结果
总结:
通常情况下实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理反馈结果,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上是自己做掉了prepare方法和collector.emit.ack(inputTuple)方法.
OK,介绍完了简单的方法,下面写一个简单的Demo,加深一下对Spout和Bolt的理解
简单需求:对名称加后缀并转换成大写
RandomWordSpout.java
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RandomWordSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
//模拟一些数据
String[] str = {"hello","word","you","how","are"};
//初始化方法,在spout组件实例化时调用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple{
//随机挑选出一个名称
Random random = new Random();
int index = random.nextInt(str.length);
//获取名称
String name = str[index];
//将名称进行封装成tuple,发送消息给下一个组件
collector.emit(new Vaules(name));
}
//声明本spout组件发送出去的tuple中的数据的字段名
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("orignname"));
}
}
UpperBolt.java
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class UpperBolt extends BaseBasicBolt{
//业务处理逻辑
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先获取到上一个组件传递过来的数据,数据在tuple里面
String godName = tuple.getString(0);
//将名称转换成大写
String godName_upper = godName.toUpperCase();
//将转换完成的商品名发送出去
collector.emit(new Values(godName_upper));
}
//声明该bolt组件要发出去的tuple的字段
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("uppername"));
}
}
SuffixBolt.java
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SuffixBolt extends BaseBasicBolt{
FileWriter fileWriter = null;
//在bolt组件运行过程中只会被调用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//该bolt组件的核心处理逻辑
//每收到一个tuple消息,就会被调用一次
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//先拿到上一个组件发送过来的名称
String upper_name = tuple.getString(0);
String suffix_name = upper_name + "_itisok";
//为上一个组件发送过来的商品名称添加后缀
try {
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
TopoMain.java
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
/**
* 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
* 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
*/
public class TopoMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//将我们的spout组件设置到topology中去
//parallelism_hint :4 表示用4个excutor来执行这个组件
//setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task
builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
//将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
//.shuffleGrouping("randomspout")包含两层含义:
//1、upperbolt组件接收的tuple消息一定来自于randomspout组件
//2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
//将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
//用builder来创建一个topology
StormTopology demotop = builder.createTopology();
//配置一些topology在集群中运行时的参数
Config conf = new Config();
//这里设置的是整个demotop所占用的槽位数,也就是worker的数量
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
//将这个topology提交给storm集群运行
StormSubmitter.submitTopology("demotopo", conf, demotop);
}
}
最后将工程打包,在集群上运行
#storm jar jar_name.jar class_name args0 ....