分享

Kafka与FlumeNG整合实践

nettman 2014-5-29 18:00:42 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13782
问题导读:
1.与flume 简单安装二者的区别是什么?
2.二者在配置上有什么区别?
3.如何配置以netcat为源?







1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka
  1. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ cat conf/producer1.properties
  2. #agent section
  3. producer.sources = s
  4. producer.channels = c
  5. producer.sinks = r
  6. #source section
  7. #producer.sources.s.type = seq
  8. producer.sources.s.type = netcat
  9. producer.sources.s.bind = localhost
  10. producer.sources.s.port = 44444
  11. producer.sources.s.channels = c
  12. # Each sink's type must be defined
  13. producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
  14. producer.sinks.r.metadata.broker.list=127.0.0.1:9092
  15. producer.sinks.r.partition.key=0
  16. producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
  17. producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
  18. producer.sinks.r.request.required.acks=0
  19. producer.sinks.r.max.message.size=1000000
  20. producer.sinks.r.producer.type=sync
  21. producer.sinks.r.custom.encoding=UTF-8
  22. producer.sinks.r.custom.topic.name=test
  23. #Specify the channel the sink should use
  24. producer.sinks.r.channel = c
  25. # Each channel's type is defined.
  26. producer.channels.c.type = memory
  27. producer.channels.c.capacity = 1000
  28. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$
复制代码

2.配置consumer,source是Kafka,sink是logger
  1. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ cat conf/comsumer1.properties
  2. umer config
  3. ###########################################
  4. consumer.sources = s
  5. consumer.channels = c
  6. consumer.sinks = r
  7. consumer.sources.s.type = seq
  8. consumer.sources.s.channels = c
  9. consumer.sinks.r.type = logger
  10. consumer.sinks.r.channel = c
  11. consumer.channels.c.type = memory
  12. consumer.channels.c.capacity = 100
  13. consumer.sources.s.type = org.apache.flume.plugins.KafkaSource
  14. consumer.sources.s.zookeeper.connect=127.0.0.1:2181
  15. consumer.sources.s.group.id=testGroup
  16. consumer.sources.s.zookeeper.session.timeout.ms=400
  17. consumer.sources.s.zookeeper.sync.time.ms=200
  18. consumer.sources.s.auto.commit.interval.ms=1000
  19. consumer.sources.s.custom.topic.name=test
  20. consumer.sources.s.custom.thread.per.consumer=4
复制代码
3.分别运行着两个agent
  1. bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
复制代码
  1. bin/flume-ng agent --conf conf  --conf-file conf/comsumer1.properties   --name consumer -Dflume.root.logger=INFO,console
复制代码

4.这时telnet上端口44444
  1. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444
  2. Trying ::1...
  3. Trying 127.0.0.1...
  4. Connected to localhost.
  5. Escape character is '^]'.
  6. 1111111111111111
  7. OK
  8. kak^Hfkakakkakakakkakkakkaakaknnnm
  9. OK
  10. abcdefghijklmnopqrstuvwxyz
  11. OK
复制代码
两个agent都有信息输出
  1. 2014-01-15 20:01:05,047 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Property metadata.broker.list is overridden to 127.0.0.1:9092
  2. ] -- [{ headers:{} body: 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 1111111111111111 }]lume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [1111111111111111
  3. 2014-01-15 20:01:35,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Fetching metadata from broker id:0,host:127.0.0.1,port:9092 with correlation id 0 for 1 topic(s) Set(test)
  4. 2014-01-15 20:01:35,704 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Connected to 127.0.0.1:9092 for producing
  5. 2014-01-15 20:01:35,727 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Disconnecting from 127.0.0.1:9092
  6. 2014-01-15 20:01:35,767 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Connected to stormspark:9092 for producing
  7. ] -- [{ headers:{} body: 6B 61 6B 08 66 6B 61 6B 61 6B 6B 61 6B 61 6B 61 kak.fkakakkakaka }]lume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [kafkakakkakakakkakkakkaakaknnnm
  8. ] -- [{ headers:{} body: 61 62 63 64 65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 abcdefghijklmnop }]lume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [abcdefghijklmnopqrstuvwxyz
  9. ] -- [{ headers:{} body: 71 75 69 74 0D                                  quit. }]rg.apache.flume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [quit
复制代码
另一个
  1. 2014-01-15 19:58:02,434 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 32 32 32 32 32 32 32 32 32 32 32 32 32 32 32 32 2222222222222222 }
  2. ]014-01-15 20:01:35,771 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: 1111111111111111
  3. 2014-01-15 20:01:36,487 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 1111111111111111 }
  4. ]014-01-15 20:02:13,784 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: kafkakakkakakakkakkakkaakaknnnm
  5. 2014-01-15 20:02:14,500 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 6B 61 6B 08 66 6B 61 6B 61 6B 6B 61 6B 61 6B 61 kak.fkakakkakaka }
  6. ]014-01-15 20:02:28,960 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: abcdefghijklmnopqrstuvwxyz
  7. 2014-01-15 20:02:29,506 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 61 62 63 64 65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 abcdefghijklmnop }
  8. ]014-01-15 20:03:54,986 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: quit
  9. 2014-01-15 20:03:55,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 71 75 69 74 0D                                  quit. }
  10. ^C2014-01-15 20:09:10,094 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSup
复制代码










加微信w3aboutyun,可拉入技术爱好者群

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条