立即注册 登录
About云-梭伦科技 返回首页

nettman的个人空间 https://www.aboutyun.com/?21 [收藏] [复制] [分享] [RSS]

日志

通过代码实现kerberos 认证的 kafka

已有 979 次阅读2019-1-10 21:03 |系统分类:Kafka

由于不同的版本不一样,这里总结下相关内容

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

import static org.apache.kafka.clients.consumer.ConsumerConfig.*;

/**
 * @author sunzq
 * @since 2017/8/29
 */
public class Application {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(GROUP_ID_CONFIG, "test08291103");
//      props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");
        props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // topic name: test9
        consumer.subscribe(Collections.singleton("test9"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}
更多参考
https://www.cnblogs.com/kischn/p/7447306.html

4.编写生产消息代码

package com.cloudera;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**

 * Created by fayson on 2017/10/24.

 */

public class MyProducer {

 public static String TOPIC_NAME = "test3";

 public static void main(String[] args){

 System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");

 System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");

 System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

//       System.setProperty("sun.security.krb5.debug","true");

 Properties props = new Properties();

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

 props.put(ProducerConfig.ACKS_CONFIG, "all");

 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

 props.put("security.protocol", "SASL_PLAINTEXT");

 props.put("sasl.kerberos.service.name", "kafka");

 Producer<String,String> producer = new KafkaProducer<String,String>(props);

 for (int i = 0; i < 10; i++) {

 String key = "key-"+ i;

 String message = "Message-"+ i;

 ProducerRecord record= new ProducerRecord<String, String>(TOPIC_NAME, key, message);

 producer.send(record);

 System.out.println(key + "----"+ message);

 }

 producer.close();

 }

}

5.编写消费消息代码

package com.cloudera;

import org.apache.kafka.clients.consumer.*;

import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;

import java.util.Properties;

/**

 * Created by fayson on 2017/10/24.

 */

public class MyConsumer {

 private static String TOPIC_NAME = "test3";

 public static void main(String[] args){

 System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");

 System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");

 System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

 Properties props = new Properties();

 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");

 props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");

 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

 props.put("security.protocol", "SASL_PLAINTEXT");

 props.put("sasl.kerberos.service.name", "kafka");

 KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);

 TopicPartition partition0= new TopicPartition(TOPIC_NAME, 0);

 TopicPartition partition1= new TopicPartition(TOPIC_NAME, 1);

 TopicPartition partition2= new TopicPartition(TOPIC_NAME, 2);

 consumer.assign(Arrays.asList(partition0,partition1, partition2));

 ConsumerRecords<String,String> records = null;

 while (true){

 try {

 Thread.sleep(10000l);

 System.out.println();

 records = consumer.poll(Long.MAX_VALUE);

 for (ConsumerRecord<String, String> record : records) {

 System.out.println("Receivedmessage: (" + record.key() + "," + record.value() + ") at offset " + record.offset());

 }

        } **catch** (**InterruptedException** e){
            e.printStackTrace();
 }

    }
}
}
更多参考
https://cloud.tencent.com/developer/article/1078162

工具类,初始化kerberos环境
package deng.yb.kafka_kerberos.utils;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

public class MyProperties extends Properties{
    private Properties properties;
    
    private static final String JAAS_TEMPLATE =
            "KafkaClient {\n"
            + "com.sun.security.auth.module.Krb5LoginModule required\n" +
              "useKeyTab=true\n" +
              "keyTab=\"%1$s\"\n" +
              "principal=\"%2$s\";\n"
            + "};";
    
    
    public MyProperties(){
        properties = new Properties();
    }
    
    public MyProperties self(){
        return this;
    }
    
    public MyProperties put(String key , String value) {
        if (properties == null) {
            properties = new Properties();
        }
        
        properties.put(key, value);
        return self();
    }
    
    public static MyProperties initKerberos(){
     return new MyProperties()
                .put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer")
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                        "org.apache.kafka.common.serialization.StringDeserializer")
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                        "org.apache.kafka.common.serialization.StringDeserializer")
                .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
                .put("security.protocol", "SASL_PLAINTEXT")
                .put("sasl.kerberos.service.name", "kafka");
        
    }
    
    public static MyProperties initProducer(){
         return new MyProperties()
            .put(ProducerConfig.ACKS_CONFIG, "all")
            .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            .put("security.protocol", "SASL_PLAINTEXT")
            .put("sasl.kerberos.service.name", "kafka");
    }
    
    public Properties getProperties() {
        return properties;
    }
    
    //生成jaas.conf临时文件
    public static void configureJAAS(String keyTab, String principal) {
         String content = String.format(JAAS_TEMPLATE, keyTab, principal);
        
         File jaasConf = null;
         PrintWriter writer = null;
         
        try {
             
            jaasConf  = File.createTempFile("jaas", ".conf");
            writer = new PrintWriter(jaasConf);
            
            writer.println(content);
            
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        
        } finally {
            
             if (writer != null) {
                  writer.close();
             }
             
             jaasConf.deleteOnExit();
        }
        
        System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());
        
    }
    
}

生产者类
package deng.yb.kafka_kerberos;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import deng.yb.kafka_kerberos.utils.MyProperties;

public class Producer {
    //发送topic
    public static String TOPIC_NAME = "kafak2hdfs";

    public static void main(String[] args) {

        System.setProperty("java.security.krb5.conf",
                Thread.currentThread().getContextClassLoader().getResource("krb5.conf").getPath());
        //初始化jaas.conf文件
        MyProperties.configureJAAS(Thread.currentThread().getContextClassLoader().getResource("wms_dev.keytab").getPath(), "wms_dev@WONHIGH.COM");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

        //System.setProperty("sun.security.krb5.debug","true");
        
        //初始化kerberos环境
        MyProperties props = MyProperties.initProducer();
        
        //kafka brokers地址
        props.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "bi-slave1:9092,bi-slave2:9092,bi-slave3:9092");

        org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(
                props.getProperties());

        for (int i = 0; i < 10; i++) {

            String key = "key-" + i;

            String message = "Message-" + i;

            ProducerRecord record = new ProducerRecord<String, String>(
                    TOPIC_NAME, key, message);

            producer.send(record);

            System.out.println(key + "----" + message);

        }

        producer.close();

    }
}
消费者
package deng.yb.kafka_kerberos;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;

import deng.yb.kafka_kerberos.utils.MyProperties;

public class Comsumer {
    private static String TOPIC_NAME = "kafak2hdfs";

    public static void main(String[] args) {

        System.setProperty("java.security.krb5.conf", Thread.currentThread()
                .getContextClassLoader().getResource("krb5.conf").getPath());
        //初始化jaas.conf文件
        MyProperties.configureJAAS(Thread.currentThread().getContextClassLoader().getResource("wms_dev.keytab").getPath(), "wms_dev@WONHIGH.COM");

        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

        MyProperties props = MyProperties.initKerberos();

        props.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "bi-slave1:9092,bi-slave2:9092,bi-slave3:9092");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
                props.getProperties());
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        /*
         * TopicPartition partition0= new TopicPartition(TOPIC_NAME, 0);
         *
         * TopicPartition partition1= new TopicPartition(TOPIC_NAME, 1);
         *
         * TopicPartition partition2= new TopicPartition(TOPIC_NAME, 2);
         */

        // consumer.assign(Arrays.asList(partition0,partition1, partition2));

        ConsumerRecords<String, String> records = null;
        
        while (true) {
            try {
                Thread.sleep(1000);

                System.out.println();
                records = consumer.poll(Long.MAX_VALUE);

                for (ConsumerRecord<String, String> record : records) {

                    System.out.println("Receivedmessage: (" + record.key()
                            + "," + record.value() + ") at offset "
                            + record.offset());

                }

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        /*
         * while (true){
         *
         * try {
         *
         * Thread.sleep(10000l);
         *
         * System.out.println();
         *
         * records = consumer.poll(Long.MAX_VALUE);
         *
         * for (ConsumerRecord<String, String> record : records) {
         *
         * System.out.println("Receivedmessage: (" + record.key() + "," +
         * record.value() + ") at offset " + record.offset());
         *
         * }
         *
         * } **catch** (**InterruptedException** e){
         *
         * e.printStackTrace();
         *
         * }
         */

    }
}




路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条