本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面
- 假设kafka集成kerberos
- 假设kudu集成kerberos
- 假设用非root用户操作
- spark基于yarn-cluster模式
代码编写,这里只介绍关键代码
package deng.yb.sparkStreaming;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import deng.yb.sparkStreaming.kafka.KafkaTools;
import deng.yb.sparkStreaming.utils.NginxInfo;
import deng.yb.sparkStreaming.utils.SpringContextUtil;
/**
* Hello world!
*
*/
@SuppressWarnings("unchecked")
public class EApp {
private static final Logger logger = Logger.getLogger(App.class);
private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
private static Map<String, String> conf = new HashMap<String, String>();
/**
* epp接口-request
*/
private static final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";
/**
* app接口-request
*/
private static final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";
/**
* 在spring 配置的参数id
*/
private static final String CONFIG = "commonConfig";
/**
* 以下配置参数皆为配置key spark模式
*/
private static final String MASTER = "master";
/**
* spark-appName
*/
private static final String APP_NAME = "appName";
/**
* 自定义字段
*/
private static final String COLUMNS = "columns";
/**
* topic
*/
private static final String TOPIC = "topic";
/**
* 表名
*/
private static final String TABLE = "tables";
static {
String[] confs = new String[] { BEAN_CONF };
// 把actx设置进去,后续可以共用
SpringContextUtil
.setApplicationContext(new ClassPathXmlApplicationContext(confs));
conf = (Map<String, String>) SpringContextUtil.getBean(CONFIG);
}
public static void main(String args[]) {
try {
SparkSession spark = SparkSession.builder()
.appName(conf.get(APP_NAME)).master(conf.get(MASTER))
.getOrCreate();
Map<String, Object> confMap = KafkaTools.kafkaConf(conf);
String[] topicArr = conf.get(TOPIC).split(",");
Collection<String> topics = Arrays.asList(topicArr);
StreamingContext sc = new StreamingContext(spark.sparkContext(),
Durations.milliseconds(5000));
JavaStreamingContext jssc = new JavaStreamingContext(sc);
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
.createDirectStream(jssc, LocationStrategies
.PreferConsistent(), ConsumerStrategies
.<String, String> Subscribe(topics, confMap));
jssc.sparkContext().setLogLevel("ERROR");
stream.context().sparkContext().setLogLevel("ERROR");
// nginx日志对应字段
String[] columns = conf.get(COLUMNS).split(",");
Map<String, String> colimnsMap = new LinkedHashMap<String, String>();
// 把字段和类型映射
String[] temp;
for (String column : columns) {
temp = column.split(":");
colimnsMap.put(temp[0], temp[1]);
}
// 表名
String[] tables = conf.get(TABLE).split(",");
// epp表额外的字段
String[] eppExtColumns = { "app_name", "end", "portal_user_id",
"resource", "start", "username", "app_id" };
KuduContext kudu = new KuduContext(conf.get("kudu.instances"),
sc.sparkContext());
// dstream transform
// 第一层封装
// 第二层切分
// 第三层转换
JavaDStream<LinkedHashMap<String,String>> linkMap = stream.map(record -> {
logger.info("消息进来:" + record.value());
LinkedHashMap<String,String> json = new LinkedHashMap<String, String>();
String[] messages = record.value().split(",");
int length = colimnsMap.size();
int i = 0;
for (Map.Entry<String, String> entry : colimnsMap
.entrySet()) {
if (i < length) {
json.put(entry.getKey(), messages[i]);
}
i += 1;
}
// 处理http_version字段
String httpVersion;
if (json.containsKey("http_version")
&& (httpVersion = json.get("http_version")) != null) {
String[] httpVersionArry = httpVersion.split("_");
if (httpVersionArry != null
&& httpVersionArry.length > 1) {
json.put("portal_name", httpVersionArry[0]);
json.put("channel", httpVersionArry[1]);
json.put("version", httpVersionArry[2]);
}
}
logger.info("封装完数据格式:"+json.toString());
return json;
}).cache();
//EPP表
linkMap.flatMap(new FlatMapFunction<LinkedHashMap<String,String>, JSONObject>(){
@Override
public Iterator<JSONObject> call(LinkedHashMap<String,String> json) throws Exception {
// TODO Auto-generated method stub
ArrayList<JSONObject> jsonArray = new ArrayList<JSONObject>();
String request = json.get("request");
if (request.indexOf(EPP_REQUEST) > -1) {
logger.info("消息拆分:" + json.toString());
// 这个进epp表
String requestBody = URLDecoder.decode(
json.get("app_id"), "utf-8");
String[] strArr;
JSONArray array = JSONObject.parseArray((strArr = requestBody
.split("=")).length > 1 ? strArr[1]
: strArr[0]);
// 根据appid拆分
for (int j = 0; j < array.size(); j++) {
JSONObject obj = array.getJSONObject(j);
JSONObject newJson = new JSONObject(
new LinkedHashMap<String,Object>());
// 把原来的属性加上
for (String oldColumn : json.keySet()) {
newJson.put(oldColumn,
json.get(oldColumn));
}
for (String extColumn : eppExtColumns) {
newJson.put(extColumn,
obj.get(extColumn));
}
// kudu表一定要有主键
newJson.put("id", UUID.randomUUID()
.toString().replace("-", ""));
logger.info("生成EPP主键:"+newJson.getString("id"));
jsonArray.add(newJson);
}
return jsonArray.iterator();
}
return new ArrayList().iterator();
}
})
.map(eppRowMap -> {
logger.info("消息转换为epprow:" + eppRowMap.toString());
List<Object> objArry = new ArrayList<Object>();
eppRowMap.forEach((key, value) -> {
objArry.add(NginxInfo.valueTranForm(key, value));
});
return RowFactory.create(objArry.toArray());
})
.foreachRDD(eppRdd -> {
Dataset<Row> rows = spark.createDataFrame(
eppRdd,
DataTypes
.createStructType(NginxInfo
.getStructFieldList("EPP")));
kudu.insertRows(rows,
tables[0]);
});
jssc.start();
jssc.awaitTermination();
logger.info("完成!");
} catch (Exception e) {
logger.error("处理消息错误2!", e);
}
}
private StructType contructStructType() {
List<StructField> structFields = new ArrayList<StructField>();
return null;
}
}
- KafkaTools类,主要获取kafka配置,代码仅供参考
public static Map<String, Object> kafkaConf(Map<String, String> conf) {
if (conf == null) {
return null;
}
// kafka配置
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
//kafka集成kerberos后的security.inter.broker.protocol
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name", "kafka");
kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get("bootStrapServers"));
return kafkaParams;
}
注意到,集成kerberos后,身份验证的代码并没有在项目写太多,只有kafka客户端配置加上kafkaParams.put("security.protocol", "SASL_PLAINTEXT")而已
- 身份验证的操作分别交给spark-submit处理和调度器linux crontab 处理
- 假设我用的是wms这个账号去跑任务
- 新建kafka_client_jaas.conf文件
cd /usr/wms/sparkstreaming/
#该文件给kafka身份验证用
[wms@node1 sparkstreaming]$ vi kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName="kafka"
keyTab="./wms.keytab"
principal="wms@W.COM";
};
#把wms.keytab也放在相应目录下,此时目录机构应该是如此
-rwxr-xr-x 1 root root 352 Jul 16 09:48 wms.keytab
[wms@node1 sparkstreaming]$ ll
总用量 114172
#conf.properties文件是spark应用的配置文件
-rwxr-xr-x 1 wms wms 897 7月 16 09:45 conf.properties
-rwxr-xr-x 1 wms wms 221 7月 16 09:45 kafka_client_jaas.conf
-rwxr-xr-x 1 wms wms 352 7月 16 09:45 wms.keytab
#scp到其他目录
scp /usr/wms/sparkstreaming/* root@bi-slave1:/usr/wms/sparkstreaming/
scp /usr/wms/sparkstreaming/* root@bi-slave2:/usr/wms/sparkstreaming/
scp /usr/wms/sparkstreaming/* root@bi-slave3:/usr/wms/sparkstreaming/
- spark启动前,先初始化driver和executor是节点票据
#该操作主要是为了保证executor节点执行kudu操作前有权限
#这里我们写了一个批处理脚本,能在所有节点执行某个命令
#我们用linux调度工具,到点初始化wms用户票据,防止票据失效
#在root权限下操作
exit
[root@node1 sparkstreaming]# crontab -e
#每五分钟,在每台机器初始化wms用户票据,防止失效
*/5 * * * * ./doCommand.sh "su wms -c 'kinit -kt /usr/wms/sparkstreaming/wms.keytab wms@W.COM'" > /usr/wms/sparkstreaming/lastupdate
# 注意需要在配置文件目录下执行spark2-submit命令
# driver节点需要配置kafka的security.auth.login.config信息
# executor节点需要配置kafka的security.auth.login.config信息
# driver根据绝对路径读取配置
# executor根据相对路径读取配置
# 通过files配置把kafka_client_jaas.conf,wms.keytab发到executor节点
spark2-submit --driver-java-options=-Djava.security.auth.login.config=/etc/wonhighconf/bi/bi-sparkstreaming/kafka_client_jaas.conf --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --files kafka_client_jaas.conf,wms.keytab --master yarn --deploy-mode cluster --class deng.yb.sparkStreaming.App /usr/wms/sparkstreaming/sparkStreaming-0.0.1-SNAPSHOT.jar