分享

Kafka入门

desehawk 2015-3-11 00:47:52 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 36753

问题导读
1.如何安装配置Kafka?
2.如何创建topic?





下载Kafka
  1. http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
  2. 2.10表示Scala的版本,而0.8.1.1表示Kafka的版本
复制代码
解压Kafka惊讶的是Kafka内置了Zookeeper的安装包以及启停Zookeeper的脚本,版本比较低,是3.3.4版本。理论上不应该使用Kafka的版本,因为Zookeeper是个通用分布式配置和协调系统

配置Kafka 1. 修改配置文件config/server.properties

host.name和avertised.host.name默认是注释掉的,把它打开

  1.     # Hostname the broker will bind to. If not set, the server will bind to all interfaces  
  2.     host.name=localhost  
  3.       
  4.     # Hostname the broker will advertise to producers and consumers. If not set, it uses the  
  5.     # value for "host.name" if configured.  Otherwise, it will use the value returned from  
  6.     # java.net.InetAddress.getCanonicalHostName().  
  7.     advertised.host.name=localhost  
复制代码


2. 配置Zookeeper
    本文使用单独安装的Zookeeper,而不是使用Kafka自带的Zookeeper,Kafka为了能够知道它要连接的Zookeeper地址,配置文件中提供了一系列和Zookeeper相关的配置参数
     除了安装运行独立的Zookeeper,Kafka也可以使用安装包里的Zookeeper,如果Kafka要使用自己的Zookeeper,那么需要在 Kafka的bin目录下启动Zookeeper。因此,如果使用独立的Zookeeper的时候,就无  需启动Kafka下面的Zookeeper了。在 Kafka启动过程中看到有关Zookeeper的日志,这是Kafka作为Zookeeper的客户端正在建立与Zookeeper服务器的通讯
  • config/server.properties

  1.   ############################# Zookeeper #############################  
  2.       
  3.     # Zookeeper connection string (see zookeeper docs for details).  
  4.     # This is a comma separated host:port pairs, each corresponding to a zk  
  5.     # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".  
  6.     # You can also append an optional chroot string to the urls to specify the  
  7.     # root directory for all kafka znodes.  
  8.       
  9.     //2181是Zookeeper的clientPort  
  10.     zookeeper.connect=localhost:2181  
  11.       
  12.     # Timeout in ms for connecting to zookeeper  
  13.     zookeeper.connection.timeout.ms=1000000  
复制代码
  • config/producer.properties
    无相关配置

  • config/consumer.properties

  1.     # Zookeeper connection string  
  2.     # comma separated host:port pairs, each corresponding to a zk  
  3.     # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"  
  4.     zookeeper.connect=127.0.0.1:2181  
  5.       
  6.     # timeout in ms for connecting to zookeeper  
  7.     zookeeper.connection.timeout.ms=1000000
复制代码
启动Zookeeper1.根据之前Kafka对Zookeeper的配置,Zookeeper应该配置端口2181端口
2. 使用如下命令启动Zookeeper,启动Zookeeper的参数如下:


  1.     # The number of milliseconds of each tick  
  2.     tickTime=2000  
  3.     # The number of ticks that the initial   
  4.     # synchronization phase can take  
  5.     initLimit=10  
  6.     # The number of ticks that can pass between   
  7.     # sending a request and getting an acknowledgement  
  8.     syncLimit=5  
  9.     # the directory where the snapshot is stored.  
  10.     # do not use /tmp for storage, /tmp here is just   
  11.     # example sakes.  
  12.     dataDir=/home/hadoop/software/zookeeper-3.4.6/data  
  13.     # the port at which the clients will connect  
  14.     clientPort=2181  
复制代码
启动Kafka1.启动Kafka

  1. [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties  
复制代码
2. 启动日志
  1. 2015-01-11 01:11:12,490] INFO Verifying properties (kafka.utils.VerifiableProperties)  
  2.     [2015-01-11 01:11:12,558] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)  
  3.     [2015-01-11 01:11:12,558] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)  
  4.     [2015-01-11 01:11:12,558] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)  
  5.     [2015-01-11 01:11:12,558] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)  
  6.     [2015-01-11 01:11:12,559] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)  
  7.     [2015-01-11 01:11:12,559] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)  
  8.     [2015-01-11 01:11:12,559] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)  
  9.     [2015-01-11 01:11:12,559] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)  
  10.     [2015-01-11 01:11:12,559] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)  
  11.     [2015-01-11 01:11:12,559] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)  
  12.     [2015-01-11 01:11:12,559] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)  
  13.     [2015-01-11 01:11:12,560] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)  
  14.     [2015-01-11 01:11:12,560] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)  
  15.     [2015-01-11 01:11:12,560] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)  
  16.     [2015-01-11 01:11:12,560] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)  
  17.     [2015-01-11 01:11:12,607] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)  
  18.     [2015-01-11 01:11:12,609] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)  
  19.     [2015-01-11 01:11:12,640] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)  
  20.     [2015-01-11 01:11:12,640] INFO Client environment:host.name=hadoop.master (org.apache.zookeeper.ZooKeeper)  
  21.     [2015-01-11 01:11:12,640] INFO Client environment:java.version=1.7.0_67 (org.apache.zookeeper.ZooKeeper)  
  22.     [2015-01-11 01:11:12,640] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)  
  23.     [2015-01-11 01:11:12,640] INFO Client environment:java.home=/home/hadoop/software/jdk1.7.0_67/jre (org.apache.zookeeper.ZooKeeper)  
  24.     [2015-01-11 01:11:12,640] INFO Client environment:java.class.path=:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)  
  25.     [2015-01-11 01:11:12,640] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)  
  26.     [2015-01-11 01:11:12,640] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)  
  27.     [2015-01-11 01:11:12,640] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)  
  28.     [2015-01-11 01:11:12,640] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)  
  29.     [2015-01-11 01:11:12,640] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)  
  30.     [2015-01-11 01:11:12,640] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)  
  31.     [2015-01-11 01:11:12,640] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)  
  32.     [2015-01-11 01:11:12,640] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)  
  33.     [2015-01-11 01:11:12,640] INFO Client environment:user.dir=/home/hadoop/software/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)  
  34.     [2015-01-11 01:11:12,641] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7a50a6d4 (org.apache.zookeeper.ZooKeeper)  
  35.     [2015-01-11 01:11:12,643] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)  
  36.     [2015-01-11 01:11:12,706] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)  
  37.     [2015-01-11 01:11:12,716] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)  
  38.     [2015-01-11 01:11:12,756] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x14ad79bb13d0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)  
  39.     [2015-01-11 01:11:12,759] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)  
  40.     [2015-01-11 01:11:12,919] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)  
  41.     [2015-01-11 01:11:12,948] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)  
  42.     [2015-01-11 01:11:12,975] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)  
  43.     SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
  44.     SLF4J: Defaulting to no-operation (NOP) logger implementation  
  45.     SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
  46.     [2015-01-11 01:11:13,039] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)  
  47.     [2015-01-11 01:11:13,063] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)  
  48.     [2015-01-11 01:11:13,163] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)  
  49.     [2015-01-11 01:11:13,219] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)  
  50.     [2015-01-11 01:11:13,367] INFO Registered broker 0 at path /brokers/ids/0 with address hadoop.master:9092. (kafka.utils.ZkUtils$)  
  51.     [2015-01-11 01:11:13,379] INFO [Kafka Server 0], started (kafka.server.KafkaServer)  
  52.     [2015-01-11 01:11:13,486] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)  
复制代码



Kafka简单测试1.创建一个topic


  1.     ///创建一个Topic,取名为test
  2.     [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test  
  3.     Created topic "test".  
  4.     ///列出创建的Topic,这里只有一个test  
  5.     [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181  
  6.     test  
复制代码
2. Producer创建消息
    启动时,除了打印SLF4J之外,没有别的。下面可以直接输入生产的数据

  1.     [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
  2.     SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
  3.     SLF4J: Defaulting to no-operation (NOP) logger implementation  
  4.     SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
  5.       
  6.       
  7.     This is mesage  
  8.     This is a test  
复制代码
3. Consumer消费消息

    启动时,除了打印SLF4J之外,没有别的


  1.     [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
  2.     SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
  3.     SLF4J: Defaulting to no-operation (NOP) logger implementation  
  4.     SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
  5.       
  6.       
  7.     This is mesage  
  8.     This is a test
复制代码
4.此时在Producer终端输入内容,在Consumer终端可以立即收到




已有(3)人评论

跳转到指定楼层
落魂草 发表于 2015-3-11 19:33:16
回复

使用道具 举报

gqx1984 发表于 2015-3-21 17:16:49
先留个印,后面项目用到
回复

使用道具 举报

你微笑时很美呀 发表于 2018-8-16 10:32:20
收藏了  小白初学Kafka  
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条