前言
翻看师傅写的一个strom计步算法又发现了一个新技能。get之后就赶紧来写笔记。没办法,对于LZ这种前学后忘,不长记性的搬砖工人,记笔记、写博客是唯一与遗忘作斗争最有效的办法,不说废话,直奔主题。
在用strom与kafka做整合的时候,KafkaSpout和SpoutConfig这两个类是必不可少的,值得注意的是后者是前者创建的必要参数,也就是说在创建KafkaSpout对象之前,必须先准备一个SpoutConfig对象。前者是kafka消费者与storm的spout数据源的一个整合类,其目的就是让kafka的消费数据直接作为storm的数据源进行数据处理。而后者则是spout的一个配置类,它需要配置kafka的各种信息,包括节点、端口、主题等等。另外还有一些辅助非强制性配置的信息,在业务所需的情况下也有必要配置,例如LZ今天要分享的scheme配置。
在使用KafkaStorm时需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。我们先来看看scheme接口的源码
public interface Scheme extends Serializable {
public List<Object> deserialize(byte[] ser);
public Fields getOutputFields();
}
需要实现反序列化方法和输出fields名称,来看简单StringScheme实现:
public class MyScheme implements Scheme {
public static final String STRING_SCHEME_KEY = "str";
public List<Object> deserialize(byte[] bytes) {
return new Values(deserializeString(bytes));
}
public static String deserializeString(byte[] string) {
try {
return new String(string, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
public Fields getOutputFields() {
return new Fields(STRING_SCHEME_KEY);
}
}
其实就是直接返回了一个String,在Spout往后发射时就一个字段,其名为“str”,如果采用StringScheme时,大家在Bolt中可以用
tuple.getStringByField("str")
这些Scheme主要负责从消息流中解析出所需要的数据。
假设我们现在要把kafka某一个主题的生产数据要获取到作为storm流分析的数据源,我们的生产数据如下
16 91 16777216 0 17 6 7 15 41 46 535 6.158485 1.813451 0.000000 -1068 8496 13572 28 276 0 1597 100 0 1496821304 436028
16 91 16777216 0 17 6 7 15 41 46 785 5.917774 1.716683 0.000000 -1064 8392 13544 109 254 -19 1598 100 0 1496821304 686031
16 91 16777216 0 17 6 7 15 41 47 35 5.090148 1.145671 0.000000 0 0 0 0 0 0 0 0 0 1496821304 935960
16 91 16777216 0 17 6 7 15 41 47 285 6.013670 1.660154 0.000000 -1096 8432 13700 72 314 12 1599 100 0 1496821305 186008
16 91 16777216 0 17 6 7 15 41 47 535 5.637641 1.650586 0.000000 -1080 8468 13468 23 373 -10 1600 100 0 1496821305 436129
以上数据时LZ工作中的部分业务数据,每一行代表一条标签终端数据,以空格分割有25列,每一列代表不同的含义(包括协议类型、球场id、标签id、护腿板id、时间戳、定位精度、加速度、陀螺仪等等),已经提前录入到一个TXT文件中,现在用kafka开启一个生产者模拟数据源实时发送数据(代码查看上一篇),以下主要写storm相关处理逻辑
运行环境
环境还是上一篇storm案例的开发环境
http://www.jianshu.com/p/0b70133ee040
jdk1.7
zookeeper-3.4.5
storm-0.9.2
kafka-2.9
redis-3.2.3
确保kafka数据已发送后再编写storm
storm相关代码
(1):空格分割编写bolt切分
KafkaSpoutMain.java
/**
* @author lvfang
* @create 2017-06-09 13:57
* @desc kafka整合storm 主程序入口
**/
public class KafkaSpoutMain {
// 主题与zk端口(local)
public static final String TOPIC = "htb_position_test";
public static final String ZKINFO = "192.168.90.240:2181";
private static final String HDTAS_SPOUT = "hdtasSpout";
private static final String HDTAS_DATA_BOLT = "dataBolt";
private static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
private static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
private static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
private static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
private static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
private static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
//创建zk主机
ZkHosts zkHosts = new ZkHosts(ZKINFO);
//创建spout
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC, "","KafkaSpout");
//整合kafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
//设置storm数据源为kafka整合storm的kafkaSpout
topologyBuilder.setSpout(HDTAS_SPOUT, kafkaSpout, 1);
//流向dataBolt进行空格分割处理(总处理,同时分发给多个bolt)
// topologyBuilder.setBolt(HDTAS_DATA_BOLT, new DataBolt(), 1).shuffleGrouping(HDTAS_SPOUT);
Config config = new Config();
config.setNumWorkers(1);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
} catch (Exception e) {}
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("HDTAS", config,topologyBuilder.createTopology());
}
}
}
KafkaSpoutMain 主要写了各个bolt的运行逻辑,我们可以发现这里只有一个bolt就是我们自定义的DataBolt,由于我们对数据不做业务处理,只做切分处理,所以一个bolt就能看到切分效果就够了,以下是DataBolt的实现
DataBolt.java
/**
* @author lvfang
* @create 2017-06-09 13:57
* @desc 总Bolt,对数据进行分割处理
**/
public class DataBolt extends BaseRichBolt {
private OutputCollector collector;
public Map<String,String> map;
/**
* 业务操作,数据处理(这里进行分割发送)
* @param tuple
*/
@Override
public void execute(Tuple tuple) {
String string = new String((byte[]) tuple.getValue(0));
String[] datas = string.split(" ");
System.out.println("接收到消息:"+string);
if(datas.length==25){
this.collector.emit(new Values(datas[0],datas[1],datas[2],datas[3],datas[4],datas[5],datas[6],datas[7],datas[8],datas[9],
datas[10],datas[11],datas[12],datas[13],datas[14],datas[15],datas[16],datas[17],datas[18],datas[9],
datas[20],datas[21],datas[22],datas[23],datas[24]));
}
}
/**
* 初始化方法
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
/**
* 指定流向,标注流向字段
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(
new Fields(HdtasInfo.PROTOCOL_TYPE,
HdtasInfo.FIELD_ID,
HdtasInfo.UWB_ID,
HdtasInfo.SIGN_ID,
HdtasInfo.YEAR,
HdtasInfo.MONTH,
HdtasInfo.DAY,
HdtasInfo.HOUR,
HdtasInfo.MINUTE,
HdtasInfo.SECOND,
HdtasInfo.MILLISECOND,
HdtasInfo.X,
HdtasInfo.Y,
HdtasInfo.Z,
HdtasInfo.A_SPEED_X,
HdtasInfo.A_SPEED_Y,
HdtasInfo.A_SPEED_Z,
HdtasInfo.GYROSCOPE_X,
HdtasInfo.GYROSCOPE_Y,
HdtasInfo.GYROSCOPE_Z,
HdtasInfo.HEART_RATE,
HdtasInfo.ELECTRIC,
HdtasInfo.CHARGING_STATUS,
HdtasInfo.SERVER_ACCEPT_TIME_S,
HdtasInfo.SERVER_ACCEPT_TIME_N));
}
}
我们看到以上DataBolt的实现是用string接收到数据后split切分,然后在通过 public void declareOutputFields(OutputFieldsDeclarer declarer)方法定义好各个列名后下发给下一个bolt,这样的方式完全可以实现我们的业务要求,也很好理解。不过唯一的缺点就是在接收数据时要先自己自定义一个总的数据处理bolt,去切分数据,在从总bolt中下发给其他bolt,这样不但在开发效率上有所降低,而且当数据量复杂时还容易出错。
storm提供了一种简单的方法,spout的scheme配置,在数据从kafka接收后先进行逻辑处理,然后在对各bolt进行分发,以下为相关代码
(2):scheme数据处理
HdtasScheme.java
/**
* @author lvfang
* @create 2017-09-25 15:56
* @desc
**/
public class HdtasScheme implements Scheme {
private static final Charset UTF8_CHARSET;
//协议类型
public static final String PROTOCOL_TYPE = "protocol_type";
//场地ID
public static final String FIELD_ID = "field_id";
//主设备ID(终端id)
public static final String UWB_ID = "uwb_id";
//护腿板ID
public static final String SIGN_ID = "sign_id";
// 年 月 日 时 分 秒 毫秒
public static final String YEAR = "year";
public static final String MONTH = "month";
public static final String DAY = "day";
public static final String HOUR = "hour";
public static final String MINUTE = "minute";
public static final String SECOND = "second";
public static final String MILLISECOND = "millisecond";
//定位精度 X Y Z
public static final String X = "x";
public static final String Y = "y";
public static final String Z = "z";
//加速度 X Y Z
public static final String A_SPEED_X = "a_speed_x";
public static final String A_SPEED_Y = "a_speed_y";
public static final String A_SPEED_Z = "a_speed_z";
//陀螺仪 X Y Z
public static final String GYROSCOPE_X = "gyroscope_x";
public static final String GYROSCOPE_Y = "gyroscope_y";
public static final String GYROSCOPE_Z = "gyroscope_z";
//心率
public static final String HEART_RATE = "heart_rate";
//电池电量
public static final String ELECTRIC = "electric";
//电池充电状态 1:充电 0:放电
public static final String CHARGING_STATUS = "charging_status";
//Unix时间戳 秒
public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s";
//Unix时间戳 纳秒
public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n";
@Override
public List<Object> deserialize(ByteBuffer ser) {
//流转string
String input = deserializeString(ser);
//空格切分
String[] strs = input.split(" ");
if (strs.length != 25) {//不符合数据格式,错误数据,则不发送
return null;
}
return new Values(strs);//返回数据
}
//序列化string(由于接收的数据是ByteBuffer类型,固要序列化)
private static String deserializeString(ByteBuffer string) {
if (string.hasArray()) {
int base = string.arrayOffset();
return new String(string.array(), base + string.position(), string.remaining());
} else {
return new String(Utils.toByteArray(string), UTF8_CHARSET);
}
}
@Override
public Fields getOutputFields() {
//指定列
return new Fields(PROTOCOL_TYPE, FIELD_ID, UWB_ID, SIGN_ID, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, MILLISECOND, X, Y, Z, A_SPEED_X, A_SPEED_Y, A_SPEED_Z, GYROSCOPE_X, GYROSCOPE_Y, GYROSCOPE_Z, HEART_RATE, ELECTRIC, CHARGING_STATUS, SERVER_ACCEPT_TIME_S, SERVER_ACCEPT_TIME_N);
}
static {
UTF8_CHARSET = Charset.forName("UTF-8");
}
}
毫无疑问,我们应该先自定义一个scheme,这个scheme就类似于对数据的划分标准以及逻辑,由程序员自己编写,很明显自定义scheme需要实现storm的scheme接口,并重写相关方法,这里我们可以看到重写了public List<Object> deserialize(ByteBuffer ser)方法和 public Fields getOutputFields(),很明显deserialize()方法是进行数据逻辑处理的,我们这里是对数据进行空格分割处理并返回一个Value对象, getOutputFields()方法很明显是定义流向bolt数据各个字段的列名,根据列名称可以获取对应列数据,先来看看scheme的源码
scheme.java
//实现Serializable 接口实现数据的可序列化
public interface Scheme extends Serializable {
//处理数据的逻辑方法
List<Object> deserialize(ByteBuffer var1);
//处理后对数据进行列定义(保证其他bolt就可以从对应列拿到数据)
Fields getOutputFields();
}
根据scheme的接口方法,我们可以看出scheme就是做了一件事,就是数据处理,他的处理不同之处是在于他不用编写bolt,在所有的bolt之前,以相关业务的格式提前设置了SpoutConfig,我们看看他的bolt逻辑怎么写
KafkaSpoutMain.java
/**
* @author lvfang
* @create 2017-06-09 13:57
* @desc kafka整合storm 主程序入口
**/
public class KafkaSpoutMain {
// 主题与zk端口(test)
// public static final String TOPIC = "htb_position_test";
// public static final String ZKINFO = "192.168.1.118:2181/kafka";
// 主题与zk端口(local)
public static final String TOPIC = "htb_position_test";
public static final String ZKINFO = "192.168.90.240:2181";
private static final String HDTAS_SPOUT = "hdtasSpout";
private static final String HDTAS_DATA_BOLT = "dataBolt";
private static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
private static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
private static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
private static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
private static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
private static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
//创建zk主机
ZkHosts zkHosts = new ZkHosts(ZKINFO);
//创建spout
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC, "","KafkaSpout");
spoutConfig.scheme = new SchemeAsMultiScheme(new HdtasScheme());//设置自定义scheme
//整合kafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
//设置storm数据源为kafka整合storm的kafkaSpout
topologyBuilder.setSpout(HDTAS_SPOUT, kafkaSpout, 1);
// 用自定义Scheme切分数据,与下边的空格切分一个功能
topologyBuilder.setBolt(HDTAS_DATA_BOLT, new AllDataBolt(), 1).shuffleGrouping(HDTAS_SPOUT);
Config config = new Config();
config.setNumWorkers(1);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
} catch (Exception e) {}
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("HDTAS", config,topologyBuilder.createTopology());
}
}
}
bolt接收数据
AllDataBolt.java
/**
* @author lvfang
* @create 2017-06-09 13:57
* @desc 总Bolt,对数据进行分割处理
**/
public class AllDataBolt extends BaseRichBolt {
private OutputCollector collector;
public Map<String,String> map;
/**
* 业务操作,数据处理(这里进行分割发送)
* @param input
*/
@Override
public void execute(Tuple input) {
String x = input.getStringByField(HdtasScheme.X);
String y = input.getStringByField(HdtasScheme.Y);
String z = input.getStringByField(HdtasScheme.Z);
System.out.println("接收到消息:"+x + "-" + y + "-" + z);
}
/**
* 初始化方法
* @param map
* @param topologyContext
* @param outputCollector
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
/**
* 指定流向,标注流向字段
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//不下发数据,所以不实现
}
}