Storm为确保消息的安全可靠到达每一个指定的bolt进行业务处理提供了一套可靠的安全机制
Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理。 完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过了Topology中每一个应该到达的Bolt的处理。
当storm在消息发送的过程中,如果某一处出现异常主要通过Storm的ACKER机制处理,保证数据重发从而确保数据不会丢失,具体原理查看storm消息重发机制。
http://www.tuicool.com/articles/vErmIb
实例代码
【spout】
public class MessageSpout implements IRichSpout {
private static final long serialVersionUID = 1l;
private int index = 0;
private SpoutOutputCollector collector;
private String[] subjects = new String[]{
"groovy,oeacnbase",
"openfire,restful",
"flume,activiti",
"hadoop,hbase",
"spark,sqoop"
};
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
if(index < subjects.length){
String sub = subjects[index];
//发送信息参数1为值,参数2为msgId
collector.emit(new Values(sub),index);
index++;
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("subjects"));
}
//ack函数即发送消息成功后会调用(监听消息发送是否成功)
public void ack(Object msgId) {
System.out.println("【消息发送成功!!!】(msgId = " + msgId + ")");
}
//失败调用函数
public void fail(Object msgId) {
System.out.println("【消息发送失败... ...】(msgId = " + msgId + ")");
System.out.println("【重发进行中... ...】");
collector.emit(new Values(subjects[(Integer)msgId]),msgId);
System.out.println("【重发成功!!!】");
}
public void activate() {
}
public void close() {
}
public void deactivate() {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【bolt1】空格切分
public class SpliterBolt implements IRichBolt {
private static final long serialVersionUID = 1l;
private OutputCollector collector;
private boolean flag = false;
public void cleanup() {
}
public void execute(Tuple tuple) {
try {
String subjects = tuple.getStringByField("subjects");
//模拟失败(每一个bolt环节都有可能失败)测试ack
// if(!flag && subjects.equals("flume,activiti")){
// flag = true ;
// int a = 1/0;
// }
String[] words = subjects.split(",");
for(String word : words){
//注意这里循环发送消息,要携带tuple对象,用于处理异常时重发策略
collector.emit(tuple,new Values(word));
}
collector.ack(tuple);//标记成功
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);//标记失败
}
}
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【bolt】结果输出
public class WriterBolt implements IRichBolt {
private static final long serialVersionUID = 1l;
private FileWriter writer;
private OutputCollector collector;
private boolean flag = false;
public void cleanup() {
}
public void execute(Tuple tuple) {
String word = tuple.getString(0);
try {
//模拟失败(每一个bolt环节都有可能失败)
// if(!flag && word.equals("hadoop")){
// flag = true;
// int a = 1/0;
// }
writer.write(word);
writer.write("\r\n");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
collector.fail(tuple);//标记失败
}
collector.emit(tuple,new Values(word));
collector.ack(tuple);//标记成功
}
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
writer = new FileWriter("e://message.txt");
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【topology】处理逻辑
/**
*
* @author mis
*
* 测试ack保证机制
*/
public class MessageTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new MessageSpout());
builder.setBolt("split-bolt", new SpliterBolt()).shuffleGrouping("spout");
builder.setBolt("write-bolt", new WriterBolt()).shuffleGrouping("split-bolt");
//本地运行
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
System.out.println(cluster);
cluster.submitTopology("message", config, builder.createTopology());
Thread.sleep(10000);
cluster.killTopology("message");
cluster.shutdown();
}
}
成功
失败(启动ack保证机制)