Storm中的事务控制是门艺术,其中ack机制是精髓,可以参考Storm源码分析 一书,其中有精彩的分析。
在storm开发过程中,相信一直有一个困扰很久的问题:function执行失败,抛出了某个异常对象而导致topology终止运行。这时我们需要根据业务情况从下面2种选择中作出判断。
1.记录本次异常,整个topology继续运行。
2.通知spout重发数据。由于Trident中数据流的最小单位为batch,所以重发数据意味重发失败的整个batch。
第一种情况很好处理
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收数据流
* 每次接收batch中一条数据
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
try{
// 正常业务
}
catch (Exception e) {
// 记录异常日志
}
}
private Logger logger = LoggerFactory.getLogger(getClass());
}
通过try...catch捕获异常,并进行相应处理。保证topology继续运行。
第二种情况处理也很简单
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收数据流
* 每次接收batch中一条数据
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
try{
// 正常业务
}
catch (Exception e) {
// 记录异常日志
throw new FailedException(e.getMessage(), e);
}
}
private Logger logger = LoggerFactory.getLogger(getClass());
}
只需要将捕获的异常转换为FailedException即可,FailedException位于org.apache.storm.topology中。
为什么必须是FailedException呢?通过在throw new FailedException(e.getMessage(), e);设置断点跟踪可观察到在Bolt的执行器TridentBoltExecutor类public void execute(Tuple tuple)方法中有代码块
try {
_bolt.execute(tracked.info, tuple);
if(tracked.condition.expectedTaskReports==0) {
success = finishBatch(tracked, tuple);
}
}
catch(FailedException e) {
failBatch(tracked, e);
}
TridentBoltExecutor捕获到FailedException后调用了failBatch方法,继续跟踪failBatch方法最终会在事务对象TransactionAttempt上将事务尝试号+1并调用spout的emitBatch方法。
完整demo
public class WordSpout implements ITridentSpout<String> {
/**
*
*/
private static final long serialVersionUID = -954626449213280061L;
private String chars = "abcdefghijklmnopqrstuvwxyz";
/**
* 协调器
* 负责保存重放batch元数据,当重放一个batch时,通过协调器中保存的元数据创建batch
*/
@Override
public BatchCoordinator<String> getCoordinator(String txStateId,Map conf, TopologyContext context) {
return new WordCoordinator();
}
@Override
public Emitter<String> getEmitter(String txStateId, Map conf, TopologyContext context) {
return new WordEmitter();
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
/**
* 定义发送的所有字段
*/
@Override
public Fields getOutputFields() {
return new Fields("field1","field2");
}
private class WordCoordinator implements BatchCoordinator<String> {
@Override
public String initializeTransaction(long txid, String prevMetadata, String currMetadata) {
return null;
}
@Override
public void success(long txid) {
logger.info("success: " + txid);
}
@Override
public boolean isReady(long txid) {
logger.info("begin {}", txid);
return Boolean.TRUE;
}
@Override
public void close() {
}
}
/**
* 发射器
* 发送数据流
*
*/
private class WordEmitter implements Emitter<String> {
@Override
public void success(TransactionAttempt tx) {
logger.info("emitter success " + tx.getId());
}
@Override
public void close() {
}
/**
* 每次调用本方法所发送的数据集合被称为batch
* batch是Trident中发送数据流的最小单元
*/
@Override
public void emitBatch(TransactionAttempt tx, String coordinatorMeta, TridentCollector collector) {
logger.info("TransactionId : {},AttemptId : {},currMetadata : {}",tx.getTransactionId(),tx.getAttemptId(),coordinatorMeta);
for(int i=0;i<10;i++){
List list = Lists.newArrayList();
list.add("" + chars.charAt((int)(Math.random() * 26)));
list.add("event2");
collector.emit(list);
}
}
}
private Logger logger = LoggerFactory.getLogger("Trident Spout");
}
public class WordFunction extends BaseFunction {
/**
*
*/
private static final long serialVersionUID = 735468688795780833L;
/**
* 接收数据流 每次接收batch中一条数据
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String value = tuple.getValueByField("field1").toString();
logger.info("funtion value : " + value);
if (value.charAt(0) > 'h' && value.charAt(0) < 'n') {
throw new FailedException();
}
}
private Logger logger = LoggerFactory.getLogger(getClass());
}
public class Start {
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
WordSpout spout = new WordSpout();
WordFunction function = new WordFunction();
topology.newStream("filter", spout)
/**
* 将spout发送的数据流中哪些字段传入bolt中
*/
.each(new Fields("field1"), function, new Fields());
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MyStorm", conf, buildTopology());
Thread.sleep(1000 * 60);
cluster.shutdown();
}
}
spout每次发送10个字母,bolt对每次接收到的字母进行判断,如果该字母位于h--n之间则认为事务失败,抛出FailedException。通知spout重发batch
日志可见
16237 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 1
16348 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 1,AttemptId : 0,currMetadata : null
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : b
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : v
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : n
16361 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : l
16362 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : j
16362 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : e
16365 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : g
16366 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16377 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 1
16495 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 1,AttemptId : 1,currMetadata : null
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : d
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : z
16497 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : g
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : t
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : p
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : z
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : q
16498 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : h
16506 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - emitter success 1
16506 [Thread-14-$spoutcoord-spout-filter-executor[2 2]] INFO Trident Spout - success: 1
16567 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 2
16684 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 2,AttemptId : 0,currMetadata : null
16686 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : n
16686 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : r
16686 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : q
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : a
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : k
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : s
16687 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : k
16688 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : c
16688 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : l
16688 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : n
16693 [Thread-18-$mastercoord-bg0-executor[1 1]] INFO Trident Spout - begin 2
16791 [Thread-22-spout-filter-executor[5 5]] INFO Trident Spout - TransactionId : 2,AttemptId : 1,currMetadata : null
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : y
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : i
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : m
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : j
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : a
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : w
16793 [Thread-24-b-0-executor[4 4]] INFO c.s.d.s.WordFunction - funtion value : w
第一次发送batch,AttemptId为0,每次重发则+1。