一、概述
上一篇实战中我们已经使用ogg实现了mysql数据以json的格式同步到了kafka里边去了,也就是说我们的源端的埋点的数据已经处理好咯;那么接下来我们就可以使用 Flink 开始对数据源进行处理计算,当然这里值得一提的是:ogg 同步过来的json数据格式是嵌套型的,而且我们的数据不是想普通的网站日志那么简单,因为的源数据是从数据库中过来的-会涉及到增、删、改,因此我们要对刚从源库中过来的数据进行简单的 ETL 处理。废话不多说,先看下数据格式是长啥样的:
# 添加
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.796000","pos":"00000000010000036968","after":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.797000","pos":"00000000010000037147","after":{"EID":"102","ENAME":"siling","ESAL":1234.12}}
# 修改
{"table":"bms_st.employees","op_type":"U","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.247000","pos":"00000000010000037501","before":{"EID":"102","ENAME":"siling","ESAL":1234.12},"after":{"EID":"102","ENAME":"sunsiling","ESAL":1000.00}}
# 删除
{"table":"bms_st.employees","op_type":"D","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.248000","pos":"00000000010000037636","before":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}
从数据格式中可以看得出:op_type 是我们对数据源的增删改的标志,真正的数据是在 after 或者 before 的值里边的。接下来我们将用 Flink 对这些数据进行 ETL处理 并发往 kafka 供下一层数仓计算使用:
二、项目结构
mmain: 程序入口
utils:工具类
entity:实体类
commonbase:抽象父类
achieve:实现类
三、项目的实现
3.1 静态的资源文件,用于配置信息 application.properties:
# source kafka config
PJbtServers1: cdh101:9092,cdh102:9092,cdh103:9092
PJgroupId1: test
PJoffsetReset1: latest
PJtopicStr1: piaoju-topic
# sink kafka config
pj-BtServers2: cdh101:9092,cdh102:9092,cdh103:9092
pj-ZkStr2: cdh101:2181,cdh102:2181,cdh102:2181
pj-GroupId2: test
pj-OffsetReset2: latest
pj-TopicStr2: piaoju-to-kafka-topic
# ---------------------------------------------------------------------------------------------------------
# 员工日增薪资
employee_tb_name: bms_st.employees
employee_job_name: EmployeeSource
#employee_create_table: employee_money
#employee_row_col: tb_name VARCHAR, op_type VARCHAR, ts VARCHAR, eId VARCHAR, eName VARCHAR, eSal VARCHAR
3.2 在 utils目录 下创建获取以上文件信息值的类 LoadPropertiesFile.java:
import java.io.InputStream;
import java.util.Properties;
/**
* @author feiniu
* @create 2020-03-26 9:37
*/
public class LoadPropertiesFile {
public static String getPropertyFileValues(String proKey){
String proStr = "";
try {
//读取配置文件
InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties");
Properties properties = new Properties();
properties.load(is);
proStr = properties.getProperty(proKey);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return proStr;
}
}
3.3 commonbase 目录下创建抽象类 对接kafka的数据,并解析关键字段,代码架构如下:
package com.nfdwsyy.commonbase;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.utils.LoadPropertiesFile;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.text.ParseException;
import java.util.Properties;
/**
* @author feiniu
* @create 2020-04-03 20:47
*/
public abstract class SourceCommonBase {
public void getDataStream(String jobName) throws Exception {
// 1. 环境的设置
// 2.资源配置文件信息的获取
// 3.消费者接收数据并做json的简要解析
// 4.抽象方法的设置
}
- 环境的设置:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
env.enableCheckpointing(1000);
// Checkpoint 语义设置为 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// CheckPoint 的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间,只允许 有 1 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 两次 Checkpoint 之间的最小时间间隔为 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
2.资源配置文件信息的获取:
// 获取资源配置文件信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", LoadPropertiesFile.getPropertyFileValues("PJbtServers1"));
properties.setProperty("group.id", LoadPropertiesFile.getPropertyFileValues("PJgroupId1"));
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", LoadPropertiesFile.getPropertyFileValues("PJoffsetReset1")); //value 反序列化
3.消费者接收数据并做json的简要解析:
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
LoadPropertiesFile.getPropertyFileValues("PJtopicStr1"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);
// prase json
DataStream<String> mStream = stream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
String table = jsonObject.getString("table");
String op_type = jsonObject.getString("op_type");
String op_ts = jsonObject.getString("op_ts");
String before = jsonObject.getString("before");
String after = jsonObject.getString("after");
String resultStr = parseSourceKafkaJson(table,op_type,op_ts,before,after);
return resultStr;
}
});
// let chirld etl to kafka
sendToSinkKafka(mStream);
env.execute(jobName);
4.抽象方法的设置:
// let chirld class do it
public abstract String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException;
// sink to kafka
public abstract void sendToSinkKafka(DataStream<String> mStream);
3.4 achieve下创建实现类,用于对数据进行 ETL 处理,类的架构设计如下:
package com.nfdwsyy.achieve;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.commonbase.SourceCommonBase;
import com.nfdwsyy.entity.Employee;
import com.nfdwsyy.utils.LoadPropertiesFile;
import com.nfdwsyy.utils.MySinkKafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.io.Serializable;
import java.text.ParseException;
/**
* @author feiniu
* @create 2020-07-23 10:12
*/
public class EmpSourceAchi extends SourceCommonBase implements Serializable {
@Override
public String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException {
// 1.数据的 ETL 处理 (这里根据实际情况而定)
}
@Override
public void sendToSinkKafka(DataStream<String> mStream) {
// 2.将处理完之后的数据发往 kafka 队列 供下游计算使用
}
// 3. 调用父类的处理方法,供主类调用
}
1.数据的 ETL 处理:
String eId = "";
String eName = "";
double eSal = 0;
double after_money = 0;
double before_money = 0;
JSONObject jObjBefore = JSON.parseObject(before);
JSONObject jObjAfter = JSON.parseObject(after);
System.out.println("在 parseSourceKafkaJson 方法中,table -> "+ table +" , op_type -> "+ op_type +" , op_ts -> "+ op_ts +" , before -> "+ before + " , after -> "+ after);
String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
Employee employee = null;
if (StringUtil.isNullOrEmpty(op_type) || StringUtil.isNullOrEmpty(table)){
System.out.println("获取的类型为空哦-> "+ op_type);
}else if (table.equals(tb_name)){
switch (op_type){
case "I":
eId = jObjAfter.getString("EID");
eName = jObjAfter.getString("ENAME");
eSal = Double.parseDouble(jObjAfter.getString("ESAL"));
break;
case "U":
eId = jObjAfter.getString("EID");
eName = jObjAfter.getString("ENAME");
after_money = Double.valueOf(jObjAfter.getString("ESAL"));
before_money = Double.valueOf(jObjBefore.getString("ESAL"));
eSal = after_money - before_money;
break;
case "D":
eId = jObjBefore.getString("EID");
eName = jObjBefore.getString("ENAME");
eSal = Double.parseDouble("-"+ jObjBefore.getString("ESAL"));
break;
}
employee = new Employee(tb_name, op_type, op_ts, eId, eName, eSal);
}
// the entity must have tb_name
return JSONObject.toJSONString(employee);
2.将处理完之后的数据发往 kafka :
DataStream<String> mS = mStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
if (StringUtil.isNullOrEmpty(s)){
return false;
} else {
return true;
}
}
});
String broker_list = LoadPropertiesFile.getPropertyFileValues("pj-BtServers2");
String topic = LoadPropertiesFile.getPropertyFileValues("pj-TopicStr2");
String groupId = LoadPropertiesFile.getPropertyFileValues("pj-GroupId2");
String offsetReset = LoadPropertiesFile.getPropertyFileValues("pj-OffsetReset2");
// the entity must have tb_name
String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
// 发往 Kafka 的自定义类
mS.addSink(new MySinkKafka(broker_list, topic, groupId, offsetReset, tb_name)).name("employee_tb_name");
- 调用父类的处理方法,供主类调用 :
// transfer the parent method
public void successKafka2KafkaMethod(){
try {
String jobName = LoadPropertiesFile.getPropertyFileValues("employee_job_name");
getDataStream(jobName +" Source");
} catch (Exception e) {
e.printStackTrace();
}
}
到这里整体上算是弄完了,但是要注意的一点是数据发往 kafka 的类是需要我们去自定义的,接下来我们再去创建一个数据发往 kafka 的工具类:
package com.nfdwsyy.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author feiniu
* @create 2020-04-04 10:29
*/
public class MySinkKafka extends RichSinkFunction<String> {
private Properties props = null;
private KafkaProducer producer = null;
private ProducerRecord record = null;
private String broker_list;
private String topic;
private String groupId;
private String offsetReset;
private String sourceTbName;
public MySinkKafka(String broker_list, String topic, String groupId, String offsetReset, String sourceTbName) {
this.broker_list = broker_list;
this.topic = topic;
this.groupId = groupId;
this.offsetReset = offsetReset;
this.sourceTbName = sourceTbName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("group.id", groupId);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
props.put("auto.offset.reset", offsetReset); //value 反序列化
producer = new KafkaProducer<String, String>(props);
}
@Override
public void invoke(String value, Context context) {
if(value.equals("") || value.equals("null")) {
System.out.println("Sink 中 invoke 方法过来的字符串值-> "+ value);
} else {
JSONObject jObjNew = JSON.parseObject(value);
String tb_name = jObjNew.getString("tb_name");
System.out.println("表明对比 -> " + tb_name + " --- " + sourceTbName);
if (tb_name.equals(sourceTbName)) {
record = new ProducerRecord<String, String>(topic, null, null, value);
producer.send(record);
System.out.println("发送数据: " + value);
producer.flush();
}
}
}
@Override
public void close() throws Exception {
super.close();
}
}
- 创建主类,调用ETL方法:
package com.nfdwsyy.mmain;
import com.nfdwsyy.achieve.EmpSourceAchi;
/**
* @author feiniu
* @create 2020-07-23 10:55
*/
public class EmployeeMain01 {
public static void main(String[] args){
EmpSourceAchi empAchi = new EmpSourceAchi();
empAchi.successKafka2KafkaMethod();
}
}
好了,全部代码都写完了,接下来我们可以去测试使用咯。
四、本地测试 并 打包部署上 yarn
4.1 本地测试
运行程序之后对数据库的源表进行增删改,即可在控制台看到发往kafka的数据,这里不做本地测试。
4.2 部署上 yarn 服务器
打包并上传至服务器的指定目录,然后执行如下命令部署应用:
bin/flink run -m yarn-cluster -ynm oggsyncflinkjob -d -c com.nfdwsyy.mmain.EmployeeMain01 /opt/mycdhflink/myjar/Kafka2FlinkETL2Kafka.jar
这时候我们可以在页面上部署情况了:接下来我们再启动接收ETL之后的消费者:
bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic piaoju-to-kafka-topic --from-beginning
源库中对表数据操作:从处理结果的数据看来,其实它已经变成是一个处理过增删改操作之后最简单的 json串了,那么至于如果对这些处理过后的数据进行计算如聚合等那都是小菜一碟了;原创不易,转载必须注明出处;欲知如何计算,请看下回分晓,哈哈哈哈。。。