Storm Trident之二事务控制

Storm中的事务控制是门艺术,其中ack机制是精髓,可以参考Storm源码分析 一书,其中有精彩的分析。

在storm开发过程中,相信一直有一个困扰很久的问题:function执行失败,抛出了某个异常对象而导致topology终止运行。这时我们需要根据业务情况从下面2种选择中作出判断。

1.记录本次异常,整个topology继续运行。

2.通知spout重发数据。由于Trident中数据流的最小单位为batch,所以重发数据意味重发失败的整个batch。

第一种情况很好处理
public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收数据流
     * 每次接收batch中一条数据
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        try{
            //    正常业务
        }
        catch (Exception e) {
            //    记录异常日志
        }
        
    }

    private Logger logger = LoggerFactory.getLogger(getClass());
}

通过try...catch捕获异常,并进行相应处理。保证topology继续运行。

第二种情况处理也很简单
public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收数据流
     * 每次接收batch中一条数据
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        try{
            //    正常业务
        }
        catch (Exception e) {
            //    记录异常日志
            throw new FailedException(e.getMessage(), e);
        }
        
    }

    private Logger logger = LoggerFactory.getLogger(getClass());
}

只需要将捕获的异常转换为FailedException即可,FailedException位于org.apache.storm.topology中。

为什么必须是FailedException呢?通过在throw new FailedException(e.getMessage(), e);设置断点跟踪可观察到在Bolt的执行器TridentBoltExecutor类public void execute(Tuple tuple)方法中有代码块

try {
    _bolt.execute(tracked.info, tuple);
    if(tracked.condition.expectedTaskReports==0) {
        success = finishBatch(tracked, tuple);
    }
}
catch(FailedException e) {
    failBatch(tracked, e);
}

TridentBoltExecutor捕获到FailedException后调用了failBatch方法,继续跟踪failBatch方法最终会在事务对象TransactionAttempt上将事务尝试号+1并调用spout的emitBatch方法。

完整demo
public class WordSpout implements ITridentSpout<String> {
    
    /**
     * 
     */
    private static final long serialVersionUID = -954626449213280061L;
    
    private String chars = "abcdefghijklmnopqrstuvwxyz";
    

    /**
     * 协调器
     * 负责保存重放batch元数据,当重放一个batch时,通过协调器中保存的元数据创建batch
     */
    @Override
    public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
        return new WordCoordinator();
    }

    @Override
    public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new WordEmitter();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    /**
     * 定义发送的所有字段
     */
    @Override
    public Fields getOutputFields() {
        return new Fields("field1","field2");
    }
    
    private class WordCoordinator implements BatchCoordinator<String> {

        @Override
        public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
            return null;
        }

        @Override
        public void success(long txid) {
            logger.info("success: " + txid);
        }

        @Override
        public boolean isReady(long txid) {
            logger.info("begin {}", txid);
            return Boolean.TRUE;
        }

        @Override
        public void close() {
            
        }
        
    }
    
    /**
     * 发射器
     * 发送数据流
     *
     */
    private class WordEmitter implements Emitter<String> {

        @Override
        public void success(TransactionAttempt tx) {
            logger.info("emitter success " + tx.getId());
        }

        @Override
        public void close() {
        }
        
        /**
         * 每次调用本方法所发送的数据集合被称为batch
         * batch是Trident中发送数据流的最小单元
         */
        @Override
        public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
            
            logger.info("TransactionId : {},AttemptId : {},currMetadata : {}",tx.getTransactionId(),tx.getAttemptId(),coordinatorMeta);
            
            for(int i=0;i<10;i++){
                List list = Lists.newArrayList();
                list.add("" + chars.charAt((int)(Math.random() * 26)));
                list.add("event2");
                collector.emit(list);
            }
        }
    }

    private Logger logger = LoggerFactory.getLogger("Trident Spout");
}

public class WordFunction extends BaseFunction {

    /**
     * 
     */
    private static final long serialVersionUID = 735468688795780833L;

    /**
     * 接收数据流 每次接收batch中一条数据
     */
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String value = tuple.getValueByField("field1").toString();
        logger.info("funtion value : " + value);
        if (value.charAt(0) > 'h' && value.charAt(0) < 'n') {
            throw new FailedException();
        }
    }

    private Logger logger = LoggerFactory.getLogger(getClass());
}

public class Start {

    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();
        WordSpout spout = new WordSpout();
        WordFunction function = new WordFunction();

        topology.newStream("filter", spout)
                /**
                 * 将spout发送的数据流中哪些字段传入bolt中
                 */
                .each(new Fields("field1"), function, new Fields());

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("MyStorm", conf, buildTopology());

        Thread.sleep(1000 * 60);
        cluster.shutdown();
    }
}

spout每次发送10个字母,bolt对每次接收到的字母进行判断,如果该字母位于h--n之间则认为事务失败,抛出FailedException。通知spout重发batch

日志可见

16237 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 1
16348 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 1,AttemptId : 0,currMetadata : null
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
16361 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : l
16362 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : j
16362 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : e
16365 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : g
16366 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16377 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 1
16495 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 1,AttemptId : 1,currMetadata : null
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : d
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : z
16497 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : g
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : t
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : p
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : z
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : h
16506 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - emitter success 1
16506 [Thread-14-$spoutcoord-spout-filter-executor[2 2]] INFO  Trident Spout - success: 1
16567 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 2
16684 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 2,AttemptId : 0,currMetadata : null
16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : r
16686 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : q
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : a
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : k
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : s
16687 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : k
16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : c
16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : l
16688 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : n
16693 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO  Trident Spout - begin 2
16791 [Thread-22-spout-filter-executor[5 5]] INFO  Trident Spout - TransactionId : 2,AttemptId : 1,currMetadata : null
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : m
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : j
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : a
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO  c.s.d.s.WordFunction - funtion value : w

第一次发送batch,AttemptId为0,每次重发则+1。

我们注意到重发的batch与该batch第一次发送时的数据内容不一致。这在实际项目肯定是不允许的。如何保证一个batch是否经历重发数据内容一致?需要使用到BatchCoordinator.initializeTransaction方法所提供的元数据。

batch发送成功后,Emitter.success和BatchCoordinator.success均会被调用。但所处线程不同,Emitter.success与Emitter.emitBatch处于同一线程,而BatchCoordinator.success则处于协调器线程。个人习惯于在Emitter.success中处理发送成功后的逻辑处理。

由于分区事务IPartitionedTridentSpout通常与kafka结合使用且处理方式大同小异,so不再展开。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容