kafka =>SparkStreaming=>kudu集成kerberos

本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面
  • 假设kafka集成kerberos
  • 假设kudu集成kerberos
  • 假设用非root用户操作
  • spark基于yarn-cluster模式
代码编写,这里只介绍关键代码
  • 主类,以下代码仅供参考
package deng.yb.sparkStreaming;

import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import deng.yb.sparkStreaming.kafka.KafkaTools;
import deng.yb.sparkStreaming.utils.NginxInfo;
import deng.yb.sparkStreaming.utils.SpringContextUtil;

/**
 * Hello world!
 *
 */
@SuppressWarnings("unchecked")
public class EApp {
    private static final Logger logger = Logger.getLogger(App.class);
    private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
    private static Map<String, String> conf = new HashMap<String, String>();

    /**
     * epp接口-request
     */
    private static final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";

    /**
     * app接口-request
     */
    private static final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";

    /**
     * 在spring 配置的参数id
     */
    private static final String CONFIG = "commonConfig";

    /**
     * 以下配置参数皆为配置key spark模式
     */
    private static final String MASTER = "master";

    /**
     * spark-appName
     */
    private static final String APP_NAME = "appName";

    /**
     * 自定义字段
     */
    private static final String COLUMNS = "columns";

    /**
     * topic
     */
    private static final String TOPIC = "topic";

    /**
     * 表名
     */
    private static final String TABLE = "tables";

    static {
        String[] confs = new String[] { BEAN_CONF };
        // 把actx设置进去,后续可以共用
        SpringContextUtil
                .setApplicationContext(new ClassPathXmlApplicationContext(confs));
        conf = (Map<String, String>) SpringContextUtil.getBean(CONFIG);
    }

    public static void main(String args[]) {
    
        try {
            
            SparkSession spark = SparkSession.builder()
                    .appName(conf.get(APP_NAME)).master(conf.get(MASTER))
                    .getOrCreate();

            Map<String, Object> confMap = KafkaTools.kafkaConf(conf);

            String[] topicArr = conf.get(TOPIC).split(",");
            Collection<String> topics = Arrays.asList(topicArr);

            StreamingContext sc = new StreamingContext(spark.sparkContext(),
                    Durations.milliseconds(5000));
            JavaStreamingContext jssc = new JavaStreamingContext(sc);

            JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                    .createDirectStream(jssc, LocationStrategies
                            .PreferConsistent(), ConsumerStrategies
                            .<String, String> Subscribe(topics, confMap));
            
            jssc.sparkContext().setLogLevel("ERROR");
            stream.context().sparkContext().setLogLevel("ERROR");
            // nginx日志对应字段
            String[] columns = conf.get(COLUMNS).split(",");
            Map<String, String> colimnsMap = new LinkedHashMap<String, String>();

            // 把字段和类型映射
            String[] temp;
            for (String column : columns) {
                temp = column.split(":");
                colimnsMap.put(temp[0], temp[1]);
            }

            // 表名
            String[] tables = conf.get(TABLE).split(",");

            // epp表额外的字段
            String[] eppExtColumns = { "app_name", "end", "portal_user_id",
                    "resource", "start", "username", "app_id" };

            KuduContext kudu = new KuduContext(conf.get("kudu.instances"),
                    sc.sparkContext());


            // dstream transform
            // 第一层封装
            // 第二层切分
            // 第三层转换
            JavaDStream<LinkedHashMap<String,String>> linkMap = stream.map(record -> {
                logger.info("消息进来:" + record.value());
                LinkedHashMap<String,String> json = new LinkedHashMap<String, String>();
                String[] messages = record.value().split(",");
                int length = colimnsMap.size();
                
                int i = 0;
                for (Map.Entry<String, String> entry : colimnsMap
                        .entrySet()) {
                    if (i < length) {
                        json.put(entry.getKey(), messages[i]);
                    }

                    i += 1;
                }
                
                
                // 处理http_version字段
                String httpVersion;
                if (json.containsKey("http_version")
                        && (httpVersion = json.get("http_version")) != null) {
                    String[] httpVersionArry = httpVersion.split("_");
                    if (httpVersionArry != null
                            && httpVersionArry.length > 1) {
                        json.put("portal_name", httpVersionArry[0]);
                        json.put("channel", httpVersionArry[1]);
                        json.put("version", httpVersionArry[2]);
                    }
                }
                
                logger.info("封装完数据格式:"+json.toString());
                return json;
                
            }).cache();
            
            //EPP表
            linkMap.flatMap(new FlatMapFunction<LinkedHashMap<String,String>, JSONObject>(){
                @Override
                public Iterator<JSONObject> call(LinkedHashMap<String,String> json) throws Exception {
                    // TODO Auto-generated method stub
                    ArrayList<JSONObject> jsonArray = new ArrayList<JSONObject>();
                    String request = json.get("request");
                    if (request.indexOf(EPP_REQUEST) > -1) {
                        logger.info("消息拆分:" + json.toString());
                        // 这个进epp表
                        String requestBody = URLDecoder.decode(
                                json.get("app_id"), "utf-8");
                        String[] strArr;
                        JSONArray array = JSONObject.parseArray((strArr = requestBody
                                .split("=")).length > 1 ? strArr[1]
                                : strArr[0]);
                        // 根据appid拆分
                        for (int j = 0; j < array.size(); j++) {
                            JSONObject obj = array.getJSONObject(j);
                            JSONObject newJson = new JSONObject(
                                    new LinkedHashMap<String,Object>());

                            // 把原来的属性加上
                            for (String oldColumn : json.keySet()) {
                                newJson.put(oldColumn,
                                        json.get(oldColumn));
                            }
                            
                            for (String extColumn : eppExtColumns) {
                                newJson.put(extColumn,
                                        obj.get(extColumn));
                            }
                            // kudu表一定要有主键
                            newJson.put("id", UUID.randomUUID()
                                    .toString().replace("-", ""));
                            
                            logger.info("生成EPP主键:"+newJson.getString("id"));
                            jsonArray.add(newJson);
                        }
                        
                        return jsonArray.iterator();
                        
                    }
                    
                    return new ArrayList().iterator();
                
                }
                
            })
            
            .map(eppRowMap -> {
                logger.info("消息转换为epprow:" + eppRowMap.toString());
                List<Object> objArry = new ArrayList<Object>();
                eppRowMap.forEach((key, value) -> {
                    objArry.add(NginxInfo.valueTranForm(key, value));
                });
                return RowFactory.create(objArry.toArray());
            })
            
            .foreachRDD(eppRdd -> {
                    Dataset<Row> rows = spark.createDataFrame(
                            eppRdd,
                            DataTypes
                                    .createStructType(NginxInfo
                                            .getStructFieldList("EPP")));
                    kudu.insertRows(rows,
                            tables[0]);
            });
            
                
                jssc.start();
                jssc.awaitTermination();
                logger.info("完成!");

        } catch (Exception e) {
            logger.error("处理消息错误2!", e);
        }
    }

    private StructType contructStructType() {
        List<StructField> structFields = new ArrayList<StructField>();

        return null;
    }
}


  • KafkaTools类,主要获取kafka配置,代码仅供参考
public static Map<String, Object> kafkaConf(Map<String, String> conf) {

        if (conf == null) {
            return null;
        }

        // kafka配置
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

        //kafka集成kerberos后的security.inter.broker.protocol
        kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
        kafkaParams.put("sasl.kerberos.service.name", "kafka");

        kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                conf.get("bootStrapServers"));

        return kafkaParams;
    }
注意到,集成kerberos后,身份验证的代码并没有在项目写太多,只有kafka客户端配置加上kafkaParams.put("security.protocol", "SASL_PLAINTEXT")而已
  • 身份验证的操作分别交给spark-submit处理和调度器linux crontab 处理
  • 假设我用的是wms这个账号去跑任务
  • 新建kafka_client_jaas.conf文件
cd /usr/wms/sparkstreaming/

#该文件给kafka身份验证用
[wms@node1 sparkstreaming]$ vi kafka_client_jaas.conf 
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   storeKey=true
   useTicketCache=false
   serviceName="kafka"
   keyTab="./wms.keytab"
   principal="wms@W.COM";
};

#把wms.keytab也放在相应目录下,此时目录机构应该是如此
-rwxr-xr-x 1 root root 352 Jul 16 09:48 wms.keytab
[wms@node1 sparkstreaming]$ ll
总用量 114172
#conf.properties文件是spark应用的配置文件
-rwxr-xr-x 1 wms wms       897 7月  16 09:45 conf.properties
-rwxr-xr-x 1 wms wms       221 7月  16 09:45 kafka_client_jaas.conf
-rwxr-xr-x 1 wms wms       352 7月  16 09:45 wms.keytab


#scp到其他目录
scp /usr/wms/sparkstreaming/* root@bi-slave1:/usr/wms/sparkstreaming/
scp /usr/wms/sparkstreaming/* root@bi-slave2:/usr/wms/sparkstreaming/
scp /usr/wms/sparkstreaming/* root@bi-slave3:/usr/wms/sparkstreaming/
  • spark启动前,先初始化driver和executor是节点票据
#该操作主要是为了保证executor节点执行kudu操作前有权限
#这里我们写了一个批处理脚本,能在所有节点执行某个命令
#我们用linux调度工具,到点初始化wms用户票据,防止票据失效
#在root权限下操作
exit
[root@node1 sparkstreaming]# crontab -e
#每五分钟,在每台机器初始化wms用户票据,防止失效
*/5 * * * * ./doCommand.sh "su wms -c 'kinit -kt /usr/wms/sparkstreaming/wms.keytab wms@W.COM'" > /usr/wms/sparkstreaming/lastupdate
  • spark-submit
# 注意需要在配置文件目录下执行spark2-submit命令
# driver节点需要配置kafka的security.auth.login.config信息
# executor节点需要配置kafka的security.auth.login.config信息
# driver根据绝对路径读取配置
# executor根据相对路径读取配置
# 通过files配置把kafka_client_jaas.conf,wms.keytab发到executor节点
 spark2-submit  --driver-java-options=-Djava.security.auth.login.config=/etc/wonhighconf/bi/bi-sparkstreaming/kafka_client_jaas.conf  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --files kafka_client_jaas.conf,wms.keytab --master yarn --deploy-mode cluster  --class deng.yb.sparkStreaming.App /usr/wms/sparkstreaming/sparkStreaming-0.0.1-SNAPSHOT.jar
  • spark启动后,进入yarn查看spark日志
kudu入库日志.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容