Java连接Kafka Kerberos

Java连接Kafka Kerberos

平台:Ambari hdp 2.6.2.0
开启keberos

1.配置kafka_client_jaas.conf

  • 注意keyTabprincipal两个配置项
KafkaServer {  
    com.sun.security.auth.module.Krb5LoginModule required  
    useKeyTab=true 
    keyTab="/etc/security/keytabs/kafka.service.keytab"  
    storeKey=true  
    useTicketCache=false  
    principal="kafka/yamb2@EXAMPLE.COM"; 
};
KafkaClient {  
    com.sun.security.auth.module.Krb5LoginModule required  
    useKeyTab=true  
    keyTab="/etc/security/keytabs/kafka.service.keytab"  
    storeKey=true  
    useTicketCache=false  
    principal="kafka/yamb2@EXAMPLE.COM"; 
};
Client {  
    com.sun.security.auth.module.Krb5LoginModule required  
    useKeyTab=true  
    storeKey=true 
    useTicketCache=false  
    keyTab="/etc/security/keytabs/kafka.service.keytab"  
    principal="kafka/yamb2@EXAMPLE.COM"; 
};

2.kafka Producer Java Demo

  • 在kafka中创建一个topic:cw_test2019042301
  • kafka_client_jaas.conf为上一步配置的
  • krb5.conf为集群上的配置文件。默认目录为/etc/krb5.conf
package com.caiw.nuwapi;

import java.util.Date;
import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * @Author: caiwei
 * @Description:
 * @Date: create in 2019/4/12 14:54
 */
public class TestProducer {

    public static void main(String... args) throws InterruptedException {
        String topic = "cw_test2019042301";

        System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
//        System.setProperty("security.auth.useSubjectCredsOnly","false");

        Properties props = new Properties();
        props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");
        props.put("sasl.kerberos.service.name","kafka");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10000; i++){
            String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
            System.out.println(s);
            producer.send(new ProducerRecord<>(topic,s ));
            Thread.sleep(1000);
        }
    }
}

3.kafka Consumer Java Demo

  • 在kafka中创建一个topic:cw_test2019042301
  • kafka_client_jaas.conf为上一步配置的
  • krb5.conf为集群上的配置文件。默认目录为/etc/krb5.conf
package com.caiw.nuwapi;

import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
 * @Author: caiwei
 * @Description:
 * @Date: create in 2019/4/12 14:54
 */
public class TestConsumer {
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashedMap();
    int count = 0;

    public static void main(String[] args) {
        System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
        Properties props = new Properties();
        props.put("group.id", "test_2019042301");// 指定消费者组
        props.put("enable.auto.commit", "true");// 关闭自动提交
        //props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔
        // 反序列化消息主键
        props.put("auto.offset.reset", "earliest"); // 缓冲大小
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化消费记录
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("log4j.appender.kafkaAppender.Threshold", "ERROR");

//        props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
        props.put("bootstrap.servers", "192.168.23.162:6667,192.168.23.163:6667,192.168.23.164:6667");
//        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");
        props.put("sasl.kerberos.service.name","kafka");

        // 创建一个消费者实例对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅消费主题集合
        consumer.subscribe(Collections.singletonList("cw_test2019042301"));
        // 实时消费标识
        boolean flag = true;
        while (flag) {
            // 获取主题消息数据
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records){
                // 循环打印消息记录
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                //处理消息
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                //解析消息将消息存储到Hbase上的表中;
                // consumer.commitSync(currentOffsets);
                //手动提交偏移量
            }


        }
        // 出现异常关闭消费者对象
//      consumer.commitAsync();
//      consumer.commitSync();
        consumer.close();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容