分享

Kafka编程实例

hyj 2014-9-26 22:44:25 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 15107
本帖最后由 hyj 于 2014-9-26 23:08 编辑
问题导读

1.Kafka中Producer是什么?
2.如何实现Producer和Consumer应用程序?








编程
    Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。


下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。
发送简单消息给Kafka broker,Producer端编写类ClusterProducer。
  1. public classClusterProducer extends Thread {
  2.     private static final Log log =LogFactory.getLog(ClusterProducer.class);
  3.     public void sendData() {
  4.         Random rnd = new Random();
  5.         Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
  6.         if (props == null) {
  7.             log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
  8.            return;
  9.         }
  10.         //set the producer configurationproperties
  11.         ProducerConfig config = newProducerConfig(props);
  12.         Producer<String, String> producer= new Producer<String, String>(config);
  13.         //Send the data
  14.         int count = 1;
  15.         KeyedMessage<String, String>data;
  16.         while (count < 100) {
  17.             String sign = "*";
  18.             String ip = "192.168.2."+ rnd.nextInt(255);
  19.             StringBuffer sb = newStringBuffer();
  20.             for (int i = 0; i < count; i++){
  21.                 sb.append(sign);
  22.             }
  23.             log.info("set data:" +sb);
  24.             try {
  25.                 Thread.sleep(10);
  26.             } catch (InterruptedException e) {
  27.                 e.printStackTrace();
  28.             }
  29.             data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString());
  30.             producer.send(data);
  31.             count++;
  32.         }
  33.         producer.close();
  34.     }
  35.     public void run() {
  36.         sendData();
  37.     }
  38.     public static void main(String[] args) {
  39.         new ClusterProducer().sendData();
  40.     }
  41. }
复制代码

定于Consumer获取端,获取对应topic的数据:
  1. public class Consumerextends Thread {
  2.     private static final Log log =LogFactory.getLog(Consumer.class);
  3.     private final ConsumerConnector consumer;
  4.     private final String topic;
  5.     public Consumer(String topic) {
  6.         consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
  7.                 createConsumerConfig());
  8.         this.topic = topic;
  9.     }
  10.     private static ConsumerConfigcreateConsumerConfig() {
  11.         Properties props = new Properties();
  12.        props.put("zookeeper.connect", KafkaProperties.zkConnect);
  13.         props.put("group.id",KafkaProperties.groupId);
  14.        props.put("zookeeper.session.timeout.ms", "400");
  15.        props.put("zookeeper.sync.time.ms", "200");
  16.        props.put("auto.commit.interval.ms", "1000");
  17.         return new ConsumerConfig(props);
  18.     }
  19.     public void run() {
  20.         Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
  21.         topicCountMap.put(topic, newInteger(1));
  22.         Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
  23.         KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
  24.         ConsumerIterator<byte[], byte[]>it = stream.iterator();
  25.         while (it.hasNext()) {
  26.             log.info("+message: " +new String(it.next().message()));
  27.         }
  28.     }
  29.     public static void main(String[] args) {
  30.         Consumer client = new Consumer("cluster_statistics_topic");
  31.         client.
复制代码

   辅助类:
  1. public interface PropertiesSettings {
  2.     final static String CONSUMER_FILE_NAME = "consumer.properties";
  3.     final static String PRODUCER_FILE_NAME = "producer.properties";
  4.     final static String TOPIC_NAME = "cluster_statistics_topic";
  5.     final static String TOPIC_A = "cluster_statistics_topic_A";
  6. }
复制代码
  1. package com.kafka.utils;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.util.Properties;
  7. /**
  8. * @author JohnLiu
  9. * @version 0.1.0
  10. * @date 2014/8/27
  11. */
  12. public class PropertiesParser {
  13.     private static final Log log = LogFactory.getLog(PropertiesParser.class);
  14.     /* properties file type */
  15.     Properties props = null;
  16.     /* constructor method*/
  17.     public PropertiesParser(Properties props) {
  18.         this.props = props;
  19.     }
  20.     /**
  21.      * Get the trimmed String value of the property with the given
  22.      * <code>name</code>.  If the value the empty String (after
  23.      * trimming), then it returns null.
  24.      */
  25.     public String getStringProperty(String name) {
  26.         return getStringProperty(name, null);
  27.     }
  28.     /**
  29.      * Get the trimmed String value of the property with the given
  30.      * <code>name</code> or the given default value if the value is
  31.      * null or empty after trimming.
  32.      */
  33.     public String getStringProperty(String name, String def) {
  34.         String val = props.getProperty(name, def);
  35.         if (val == null) {
  36.             return def;
  37.         }
  38.         val = val.trim();
  39.         return (val.length() == 0) ? def : val;
  40.     }
  41.     private Properties loadPropertiesFile() {
  42.         Properties props = new Properties();
  43.         InputStream in;
  44.         ClassLoader cl = getClass().getClassLoader();
  45.         if (cl == null)
  46.             cl = findClassloader();
  47.         if (cl == null)
  48.             try {
  49.                 throw new ProcessingException("Unable to find a class loader on the current thread or class.");
  50.             } catch (ProcessingException e) {
  51.                 e.printStackTrace();
  52.             }
  53.         in = cl.getResourceAsStream(PropertiesSettings.CONSUMER_FILE_NAME);
  54.         try {
  55.             props.load(in);
  56.         } catch (IOException ioe) {
  57.             log.error("can't load " + PropertiesSettings.CONSUMER_FILE_NAME, ioe);
  58.         }
  59.         return props;
  60.     }
  61.     private ClassLoader findClassloader() {
  62.         // work-around set context loader for windows-service started jvms (QUARTZ-748)
  63.         if (Thread.currentThread().getContextClassLoader() == null && getClass().getClassLoader() != null) {
  64.             Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
  65.         }
  66.         return Thread.currentThread().getContextClassLoader();
  67.     }
  68.     public static Properties getProperties(final String fileName) {
  69.         Properties props = new Properties();
  70.         InputStream in = Thread.currentThread().getContextClassLoader()
  71.                 .getResourceAsStream(fileName);
  72.         try {
  73.             props.load(in);
  74.         } catch (IOException ioe) {
  75.             log.error("can't load " + fileName, ioe);
  76.         }
  77.         return props;
  78.     }
  79. }
复制代码

  配置参数文件consumer.properties:
  1. zookeeper.connect=bigdata09:2181,bigdata08:2181,bigdata07:2181
  2. group.id=cluster_group
  3. zookeeper.session.timeout.ms=400
  4. zookeeper.sync.time.ms=200
  5. auto.commit.interval.ms=1000
复制代码

  配置参数文件producer.properties:
  1. metadata.broker.list=bigdata09:9092,bigdata08:9092,bigdata07:9092
  2. serializer.class=kafka.serializer.StringEncoder
  3. #partitioner.class=com.kafka.producer.SimplePartitioner
  4. request.required.acks=1
复制代码

分别执行上面的代码,可以发送或者得到对应topic信息。


kafka编程中, scala-compilerjar, scalalibraryjar在哪个文件夹下?
这个是scala的编译库喝运行时库(kafka是用sbt管理依赖的),所以建议你使用sbt,会自动下载所有依赖

plc 编程实例




已有(1)人评论

跳转到指定楼层
buildhappy 发表于 2014-9-27 10:11:02
学习 学习   
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条