Storm hello world project
- pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- RedisBolt.java
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* Created by zhouwenchun on 17/3/24.
*/
public class RedisBolt extends BaseRichBolt {
private OutputCollector _collector;
private JedisPool pool;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this._collector = outputCollector;
this.pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
}
public void execute(Tuple tuple) {
String log = tuple.getString(0);
System.out.println(sdf.format(new Date()));
System.out.println(log);
Jedis jedis = this.pool.getResource();
jedis.set("20151020", log);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
- UserlogTopo.java
public class UserlogTopo {
private static String topicName = "test2";
private static String zkRoot = "/test/test2";
public static void main(String[] args) throws Exception{
BrokerHosts hosts = new ZkHosts("117.169.77.211:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName, zkRoot, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.metricsTimeBucketSizeInSecs = 5;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", kafkaSpout);
builder.setBolt("UserBolt", new RedisBolt(), 2)//parallelism_hint最后一个参数设置Executor的线程数量
.setNumTasks(4) //每个组件需要的执行任务数, 默认1个Executor执行1个任务, 现在配置为2个
.shuffleGrouping("kafkaSpout");
Config conf = new Config();
conf.setDebug(false);
if(args != null && args.length > 0) {
StormSubmitter.submitTopology("userTopo", conf, builder.createTopology());
}else{
// conf.setMaxSpoutPending(100);
// conf.setMaxTaskParallelism(2); //该选项设置了一个组件最多能够分配的 executor 数(线程数上限)
// conf.setKryoFactory();
conf.put(Config.NIMBUS_HOST, "10.0.12.36");
conf.setNumWorkers(3); //设置workers的进程数量
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("userTopo", conf, builder.createTopology());
}
}
}
Storm 的消息可靠性
可靠性API
- 将原始Tupple和新Tupple一起发送;
- 调用collector#ack()通知Storm处理完成;
或者使用更简单的方式是继承BaseBasicBolt会自动完成以上两个操作;
禁用可靠性机制:
- 将 Config.TOPOLOGY_ACKERS 设置为0
- 可以通过在 SpoutOutputCollector.emit 方法中省略消息 id 来关闭 spout tuple 的跟踪功能;
- 可以在发送 tuple 的时候选择发送“非锚定”的(unanchored)tuple。
Storm 拓扑的并行度(parallelism)理解
配置storm的Topo的并行度:
- work数量(Topo在集群中运行所需的工作进程数), 配置方法: Config#setNumWorkers
- Executors数量(每个组件需要执行的线程数), 配置方法: TopologyBuilder#setSpout() 或TopologyBuilder#setBolt()
- Task数量(每个组件需要执行的任务数), 配置方法: ComponentConfigurationDeclare#setNumTasks()
如何修改运行当中Topo的并行度
- 使用Storm UI
- 使用命令行 eg: storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
说明:重新配置拓扑 "mytopology",使得该拓扑拥有 5 个 worker processes,
另外,配置名为 "blue-spout" 的 spout 使用 3 个 executor,
配置名为 "yellow-bolt" 的 bolt 使用 10 个 executor。