Storm使用初步

Storm hello world project

  1. pom.xml
<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-redis</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.8.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.9.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.9.0.0</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  1. RedisBolt.java
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * Created by zhouwenchun on 17/3/24.
 */
public class RedisBolt extends BaseRichBolt {
    private OutputCollector _collector;
    private JedisPool pool;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._collector = outputCollector;
        this.pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
    }

    public void execute(Tuple tuple) {
        String log = tuple.getString(0);
        System.out.println(sdf.format(new Date()));
        System.out.println(log);
        Jedis jedis = this.pool.getResource();
        jedis.set("20151020", log);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}
  1. UserlogTopo.java
public class UserlogTopo {
    private static  String topicName = "test2";
    private static String  zkRoot = "/test/test2";

    public static void main(String[] args) throws  Exception{
        BrokerHosts hosts = new ZkHosts("117.169.77.211:2181");
        SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName, zkRoot, UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.metricsTimeBucketSizeInSecs = 5;
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafkaSpout", kafkaSpout);
        builder.setBolt("UserBolt", new RedisBolt(), 2)//parallelism_hint最后一个参数设置Executor的线程数量
                .setNumTasks(4) //每个组件需要的执行任务数, 默认1个Executor执行1个任务, 现在配置为2个
                .shuffleGrouping("kafkaSpout");

        Config conf = new Config();
        conf.setDebug(false);
        if(args != null && args.length > 0) {
            StormSubmitter.submitTopology("userTopo", conf, builder.createTopology());
        }else{
//            conf.setMaxSpoutPending(100);
//            conf.setMaxTaskParallelism(2); //该选项设置了一个组件最多能够分配的 executor 数(线程数上限)
//            conf.setKryoFactory();
            conf.put(Config.NIMBUS_HOST, "10.0.12.36");
            conf.setNumWorkers(3); //设置workers的进程数量
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("userTopo", conf, builder.createTopology());
        }

    }
}

Storm 的消息可靠性

可靠性API

  • 将原始Tupple和新Tupple一起发送;
  • 调用collector#ack()通知Storm处理完成;
    或者使用更简单的方式是继承BaseBasicBolt会自动完成以上两个操作;

禁用可靠性机制:

  • 将 Config.TOPOLOGY_ACKERS 设置为0
  • 可以通过在 SpoutOutputCollector.emit 方法中省略消息 id 来关闭 spout tuple 的跟踪功能;
  • 可以在发送 tuple 的时候选择发送“非锚定”的(unanchored)tuple。

Storm 拓扑的并行度(parallelism)理解

配置storm的Topo的并行度:

  • work数量(Topo在集群中运行所需的工作进程数), 配置方法: Config#setNumWorkers
  • Executors数量(每个组件需要执行的线程数), 配置方法: TopologyBuilder#setSpout() 或TopologyBuilder#setBolt()
  • Task数量(每个组件需要执行的任务数), 配置方法: ComponentConfigurationDeclare#setNumTasks()

如何修改运行当中Topo的并行度

  • 使用Storm UI
  • 使用命令行 eg: storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
    说明:重新配置拓扑 "mytopology",使得该拓扑拥有 5 个 worker processes,
    另外,配置名为 "blue-spout" 的 spout 使用 3 个 executor,
    配置名为 "yellow-bolt" 的 bolt 使用 10 个 executor。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,030评论 30 60
  • Storm入门系列之一:storm核心概念及特性 本文的将介绍一些 storm 入门的基础知识,包括 storm ...
    zhaif阅读 3,066评论 0 17
  • Date: Nov 17-24, 2017 1. 目的 积累Storm为主的流式大数据处理平台对实时数据处理的相关...
    一只很努力爬树的猫阅读 2,156评论 0 4
  • 此刻被困在机场,不知道飞机几点能起飞。今天的写作主题也没什么想法,就想怎么也要开始写,提笔也许就来灵感了。都说一个...
    Molly郭儿阅读 278评论 0 1