Storm Trident之一spout和bolt

Trident Spout


Trident Spout特点

  • Trident中,定义Spout的接口为ITridentSpout。
  • Trident Spout必须以批量形式发送tuple。
  • Trident Spout不真正执行数据的发送,而是由ITridentSpout.Emitter负责发送数据。同时引入了协调器的概念,协调器负责管理数据发送的批次和元数据,当事务失败时,调度Emitter根据元数据重新发送数据。协调器接口为ITridentSpout.BatchCoordinator。

maven依赖

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
</dependency>

Trident Spout简单实现

    public class WordSpout implements ITridentSpout<String> {
        
        /**
         * 
         */
        private static final long serialVersionUID = -954626449213280061L;
    
        /**
         * 协调器
         * 负责保存重放batch元数据,当重放一个batch时,通过协调器中保存的元数据创建batch
         */
        @Override
        public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
            return new WordCoordinator();
        }
    
        @Override
        public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
            return new WordEmitter();
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        /**
         * 定义发送的所有字段
         */
        @Override
        public Fields getOutputFields() {
            return new Fields("field1","field2");
        }
        
        private class WordCoordinator implements BatchCoordinator<String> {
    
            @Override
            public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
                return null;
            }
    
            @Override
            public void success(long txid) {
                logger.info("success: " + txid);
            }
    
            @Override
            public boolean isReady(long txid) {
                return Boolean.TRUE;
            }
    
            @Override
            public void close() {
                
            }
            
        }
        
        /**
         * 发射器
         * 发送数据流
         *
         */
        private class WordEmitter implements Emitter<String> {
    
            @Override
            public void success(TransactionAttempt tx) {
                logger.info("emitter success " + tx.getId());
            }
    
            @Override
            public void close() {
            }
            
            /**
             * 每次调用本方法所发送的数据集合被称为batch
             * batch是Trident中发送数据流的最小单元
             */
            @Override
            public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
                for(int i=0;i<10;i++){
                    List list = Lists.newArrayList();
                    list.add("event1");
                    list.add("event2");
                    collector.emit(list);
                }
            }
            
        }
    
        private Logger logger = LoggerFactory.getLogger("Trident Spout");
    }
重点说明

Emitter定义的emitBatch方法。该方法实现了发送哪里数据。该方法每执行一次,发送的所有数据被称为batch。batch中的每条数据被称为tuple。

ITridentSpout.getOutputFields定义了每条tuple有哪些字段,本例中定义了2个字段,字段名为"field1"、"field2"。在Emitter.emitBatch中每条tuple均符合该定义。本例中每次调用emitBatch方法发送的数据内容及格式可以假象如下:

[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]
[event1,event2]

每次调用均发送了10条数据(10个tuple),这10个tuple构成1个batch,tuple均符合ITridentSpout.getOutputFields中定义的字段

Trident bolt


Trident bolt特点

  • Trident中没有bolt接口,而是分为了Filter和Function两类

Trident Function简单实现

public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收数据流
     * 每次接收batch中一条数据
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        logger.info(tuple.getValueByField("field1").toString());
    }

    private Logger logger = LoggerFactory.getLogger("Trident Function");
}
重点说明

BaseFunction已经实现了Function接口。

execute方法用于具体实现接收到tuple后如何处理。每次接收1个tuple。在本例中emitter每次发送1个batch,每个batch有10条数据,则每次发送数据,execute方法均会被调用10次。

在tuple中可以获取数据流中的数据,能够获取的字段受TridentTopology对象的控制。

在execute方法中处理完成后可继续使用TridentCollector对象继续发送数据到下一节点,Function发送数据时只能添加新的字段,不能修改或删除已有的字段

启动TridentTopology


public class Start {

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        WordSpout spout = new WordSpout();
        WordFunction function = new WordFunction();

        topology.newStream("filter", spout)
                /**
                 * 将spout发送的数据流中哪些字段传入bolt中
                 */
                .each(new Fields("field1"), function, new Fields());

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("MyStorm", conf, buildTopology());

        Thread.sleep(1000 * 60);
        cluster.shutdown();
    }

}
重点说明

topology定义数据流时指定function可以读取哪些字段。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • 一. wordCount Topology开发: 1.spout数据收集器(SentenceSpout类): 有...
    奉先阅读 1,179评论 0 0
  • 这是一个JStorm使用教程,不包含环境搭建教程,直接在公司现有集群上跑任务,关于JStorm集群环境搭建,后续研...
    Coselding阅读 6,291评论 1 9
  • 声明 本文首发于个人技术博客,转载请注明出处,本文链接:http://qifuguang.me/2015/11/2...
    winwill2012阅读 2,178评论 1 15
  • 简介: Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入、有状...
    hello_coke阅读 3,228评论 0 1
  • 现在的社会太匆忙,人情关系太复杂,有些事情只能靠自己解决,无论是工作还是生活。有自己的想法,就应该想各种办法,最终...
    平静的海洋阅读 1,544评论 0 0