storm笔记:storm基本概念

本文主要介绍storm中的基本概念,从基础上了解strom的体系结构,便于后续编程过程中作为基础指导。主要的概念包括:

  1. topology(拓扑)
  2. stream(数据流)
  3. spout(水龙头、数据源)
  4. bolt(螺栓,数据筛选处理)
  5. stream group(数据流分组)
  6. reliability(可靠性)
  7. task(任务)
  8. worker(执行者)

因为上述概念中除了可靠性reliability翻译起来比较合适,其他几个词实在找不到合适的对应词语,就直接使用原词。
另外一点需要注意的是,本文使用的storm-core版本是0.10.0,包路径为backtype.storm。因为阿里巴巴开源了jstorm,据说strom2.0之后使用jstorm作为master主干,从github上可以看到包路径修改为了org.apache.storm,如果发现有包路径错误的地方,请对应修改。

topology

Storm实时运行应用包逻辑上成为一个topology,一个Storm的topology相当于MapReduce的job。关键的不同是MapReduce的job有明确的起始和结束,而Storm的topology会一直运行下去(除非进程被杀死或取消部署)。一个topology是有多个spout、bolt通过数据流分组连接起来的图结构。

storm topology

本地调试

本地调试模拟了集群模式运行方式,对于开发和调试topology很有用。而且本地模式下运行topology与集群模式下类似,只是使用backtype.storm.LocalCluster来模拟集群状态。使用backtype.storm.LocalCluster#submitTopology方法提交topology,定义topology唯一名字、topology的配置(使用的是backtype.storm.Config对象)、以及topology对象(通过backtype.storm.topology.TopologyBuilder#createTopology方法创建)。通过backtype.storm.LocalCluster#killTopology杀掉指定topology,通过backtype.storm.LocalCluster#shutdown停止运行的本地集群模式。比如:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();

本地模式常用的配置如下:

  1. Config.TOPOLOGY_MAX_TASK_PARALLELISM:这个配置项主要用来设置每个组件线程数的上限。在生产环境中,每个topology中有很多并行线程,但是在本地调试过程中,没有必要存在这么多并行线程,可以通过这个配置来进行设置。
  2. Config.TOPOLOGY_DEBUG:设置为true,Storm将记录每个tuple提交后的日志信息,对于调试程序很有用。

集群模式运行

集群模式下运行topology与本地模式下类似,具体步骤如下:

  1. 定义topology(java下使用backtype.storm.topology.TopologyBuilder#createTopology创建)
  2. 通过backtype.storm.StormSubmitter#submitTopology提交topology到集群。StormSubmitter需要的参数与LocalCluster`的参数一致:topology名、topology配置、topology对象。比如:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
  1. 将自己的代码与依赖的代码打成jar包(除了storm自己的代码,storm自己的代码已经在classpath下了)。
    如果使用的是Mava,可以使用Maven Assembly Plugin打包,在pom.xml中加入如下代码:
<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
    <descriptorRefs>  
      <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
      <manifest>
        <mainClass>com.path.to.main.Class</mainClass>
      </manifest>
    </archive>
  </configuration>
</plugin>
  1. 使用storm客户端将topology提交到集群,需要指定jar包路径、类名、以及提交到main方法的参数列表:
./bin/storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
  1. 可以使用storm kill命令停止一个topology:
./bin/storm kill topologyName

数据流

数据流是Storm核心定义的抽象概念,由无限制的tuple组成的序列,tuple包含一个或多个键值对列表,可以包含java自带的类型或者自定义的可序列化的类型。

每个数据流可以在定义时通过backtype.storm.topology.OutputFieldsDeclarer的declareStream方法指定id。默认的id是“default”(直接使用declare将使用默认id)。

在上面的topology图中,每个蓝色、绿色、红色的条带是一个数据流,每个数据流内部由tuple组成。

spout

spout是topology中数据流的数据入口,充当数据采集器功能,通常spout从外部数据源读取数据,将数据转化为tuple,然后将它们发送到topology中。spout可以是可靠的或不可靠的。可靠的spout能够保证在storm处理tuple出现异常情况下,能够重新发送该tuple,而不可靠的spout不再处理已发送的tuple。

spout通过backtype.storm.topology.OutputFieldsDeclarerdeclareStream方法定义数据流,通过backtype.storm.spout.SpoutOutputCollectoremit方法发送tream。

backtype.storm.spout.ISpout#nextTuple方法是spout的主要方法,可以发送用于发送新的tuple,或直接return(不需要发送新的tuple时,可以直接return)。

当Storm检测到由某一spout发送的tuple成功处理后,将调用backtype.storm.spout.ISpout#ack方法;当调用失败,将调用backtype.storm.spout.ISpout#fail方法。具体可以查看后面的可靠性

bolt

在topology中所有操作都是在bolt中执行的,它可以进行过滤、计算、连接、聚合、数据库读写,以及其他操作。可以将一个或多个spout作为输入,对数据进行运算后,选择性的输出一个或多个数据流。一个bolt可以做一些简单的数据变换,复杂的数据处理需要多个步骤或多个bolt。

bolt可以订阅一个或多个spout或bolt的数据,通过backtype.storm.topology.OutputFieldsDeclarer#declareStream方法定义输出的数据流,通过backtype.storm.topology.BasicOutputCollector#emit方法提交数据。

bolt通过backtype.storm.topology.InputDeclarer类的shuffleGrouping方法指定需要订阅的数据流,比如:declarer.shuffleGrouping("1", "stream_id"),同时InputDeclarer也提供了接收所有数据流的语法糖,比如:declarer.shuffleGrouping("1"),相当于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。这个地方有点乱,简单的说,bolt B前面有一个spout A或bolt A,从A中发送一个id为a_id的数据流,如果B向只订阅id为a_id的数据流,就使用第一个方法,如果可以接收所有id类型的数据流,就用第二个方法。

该类型中主要执行的方法是cn.howardliu.demo.storm.kafka.wordCount.SentenceBolt#execute,用来获取新的tuple,并进行处理。同样使用backtype.storm.topology.BasicOutputCollector#emit方法发送新的tuple。bolt可以调用backtype.storm.task.OutputCollector#ack方法来通知Storm该tuple已经处理完成。

数据流分组

定义topology的很重要的一部分就是定义数据流数据流应该发送到那些bolt中。数据流分组就是将数据流进行分组,按需要进入不同的bolt中。可以使用Storm提供的分组规则,也可以实现backtype.storm.grouping.CustomStreamGrouping自定义分组规则。Storm定义了8种内置的数据流分组方法:

  1. Shuffle grouping(随机分组):随机分发tuple给bolt的各个task,每个bolt实例接收到相同数量的tuple;
  2. Fields grouping(按字段分组):根据指定字段的值进行分组。比如,一个数据流按照"user-id"分组,所有具有相同"user-id"的tuple将被路由到同一bolt的task中,不同"user-id"可能路由到不同bolt的task中;
  3. Partial Key grouping(部分key分组):数据流根据field进行分组,类似于按字段分组,但是将在两个下游bolt之间进行均衡负载,当资源发生倾斜的时候能够更有效率的使用资源。The Power of Both Choices: Practical Load
    Balancing for Distributed Stream Processing Engines
    提供了更加详细的说明;
  4. All grouping(全复制分组):将所有tuple复制后分发给所有bolt的task。小心使用。
  5. Global grouping(全局分组):将所有的tuple路由到唯一一个task上。Storm按照最小的task ID来选取接收数据的task;(注意,当时用全局分组是,设置bolt的task并发是没有意义的,因为所有tuple都转发到一个task上。同时需要注意的是,所有tuple转发到一个jvm实例上,可能会引起storm集群某个jvm或服务器出现性能瓶颈或崩溃)
  6. None grouping(不分组):这种分组方式指明不需要关心分组方式。实际上,不分组功能与随机分组相同。预留功能。
  7. Direct grouping(指向型分组):数据源会调用emitDirect来判断一个tuple应该由哪个storm组件接收,只能在声明了指向型的数据流上使用。
  8. Local or shuffle grouping(本地或随机分组):当同一个worker进程中有目标bolt,将把数据发送到这些bolt中。否则,功能将与随机分组相同。该方法取决与topology的并发度,本地或随机分组可以减少网络传输,降低IO,提高topology性能。

可靠行

storm可以保证每一个spout发出的tuple能够被完整处理,通过跟踪tuple树上的每个tuple,检查是否被成功处理。每个topology有一个超时时间,如果storm检查到某个tuple已经超时,将重新发送该tuple。为了使用这种特性,需要定义tuple的起点,以及tuple被成功处理。更多内容查看Guaranteeing message processing

task

task是spout和bolt的实例,他们的nextTuple()和execute()方法会被executors线程调用执行。根据数据流分组来确定如何从某个task中的tuple发送到其他的task。

worker

topology运行在一个或多个worker进程上,worker是jvm虚拟机,运行topology所有task的一部分。比如,topology的并发是300,有50个worker,那每个worker就有6个task。Storm会平衡所有worker的task数量。通过Config.TOPOLOGY_WORKERS来设置topology的worker数量。


个人主页: http://www.howardliu.cn

个人博文: storm笔记:storm基本概念

CSDN主页: http://blog.csdn.net/liuxinghao

CSDN博文: storm笔记:storm基本概念

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容

  • Storm入门系列之一:storm核心概念及特性 本文的将介绍一些 storm 入门的基础知识,包括 storm ...
    zhaif阅读 3,077评论 0 17
  • 什么是实时流计算? 主要的处理模式可以分为:流处理,批处理 流处理是直接处理,有时也分为在线,离线,近线(st...
    Bloo_m阅读 5,050评论 1 1
  • 一. wordCount Topology开发: 1.spout数据收集器(SentenceSpout类): 有...
    奉先阅读 1,179评论 0 0
  • Storm 系统中包含以下几个基本概念:拓扑(Topologies)流(Streams)数据源(Spouts)数据...
    发光的鱼阅读 825评论 0 0
  • 清凌绝对是个有主见的姑娘。她从小就有自己的想法,吃还是不吃,买还是不买。她想要的东西,任凭踏遍千山万水也要寻到,就...
    娑乔阅读 457评论 0 0