分享

flume与kafka整合高可靠教程1:kafka安装

本帖最后由 pig2 于 2017-7-10 19:41 编辑
问题导读

1.安装kafka是否需要安装zookeeper?
2.kafka安装需要哪些步骤?
3.如何验证kafka是否安装成功?






flume与kafka整合很多人都用到,但是网上却没有一份详细可靠的教程。说的都是些只言片语。这里整理份flume与kafka整合的教程。
flume原先并不兼容kafka。后来兼容添加上去。对于flume及与kafka的相关知识,推荐参考
flume应该思考的问题
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102
上面只是加深对flume的认识。下面我们开始整合flume与kafka。
思路:
1.安装kafka
2.安装flume,在配置中添加kafka相关配置
这里使用的版本:
kafka:kafka_2.11-0.9.0.1.tgz
flume:apache-flume-1.6.0-bin.tar.gz

kafka的安装

一、安装zookeeper

在master机器进行以下操作。
1. 解压zookeeper
下载zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
[mw_shl_code=bash,true]tar -zxvf ~/jar/zookeeper-3.4.6.tar.gz -C /data
[/mw_shl_code]
网盘下载
链接: https://pan.baidu.com/s/1i50vzb3 密码: p5s5


2. 配置zookeeper
涉及到的配置文件为[mw_shl_code=bash,true]${ZOOKEEPER_HOME}/conf/zoo.cfg
[/mw_shl_code]
zoo.cfg通过cp zoo_sample.cfg zoo.cfg得到。

[mw_shl_code=bash,true]# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zk_data
# the port at which the clients will connect
clientPort=2181

server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888[/mw_shl_code]

这儿解释下格式为server.X=host:port1:port2的意思,X表示当前host所运行的服务的zookeeper服务的id(在接下来填写myid时需要用到),port1表示zookeeper中的follower连接到leader的端口号,port2表示leadership时所用的端口号。注意:需要手动去创建dataDir所配置的/data/zk_data目录(mkdir -p /data/zk_data)更多配置可参考:
zookeeper配置文件详解
http://www.aboutyun.com/thread-13909-1-1.html


3. 填写myid
在zoo.cfg配置文件中的dataDir目录(在这儿是/data/data_zk)下创建myid文件,文件内容为zoo.cfg中master所对应的server.X。

[mw_shl_code=bash,true]echo "1" > /data/zk_data/myid
[/mw_shl_code]
1.jpg



4. 复制到其他节点

[mw_shl_code=bash,true]scp -r /data/zookeeper-3.4.6/ /data/zk_data  aboutyun@slave1:/data
scp -r /data/zookeeper-3.4.6/ /data/zk_data  aboutyun@slave2:/data[/mw_shl_code]

在slave1上,

[mw_shl_code=bash,true]echo “2” > /data/zk_data/myid
[/mw_shl_code]
在slave2上,

[mw_shl_code=bash,true]echo "3" >/data/zk_data/myid
[/mw_shl_code]

5. 添加到环境变量
在master、slave1、slave2上,分别将以下内容添加到~/.bashrc文件中

[mw_shl_code=bash,true]export ZOOKEEPER_HOME=/data/zookeeper-3.4.6
export PATH=$ZOOKEEPER_HOME/bin:$PATH[/mw_shl_code]
然后执行以下命令:source ~/.bashrc


6.  启动验证
在master、slave、slave2上,分别执行zookeeper启动命令。

[mw_shl_code=bash,true]zkServer.sh start
[/mw_shl_code]

由于启动时,每个节点都会试图去连接其它节点,因此先启动的刚开始会连接不上其它的,导致日志中会包含错误信息,在未全启动之前,这个属正常现象。启动完成后,使用jps命令和zkServer.sh status命令经行验证
master机器情况:

slave1机器情况:

slave2机器情况:

说明每个节点都成功启动了QuorumPeerMain进程,并且slave1上的进程为leader,master和slave2上的进程为follower

很多人想知道zookeeper在kafka中的作用,可参考下面文章
kafka在zookeeper中存储结构
http://www.aboutyun.com/forum.php?mod=viewthread&tid=9941



二、安装kafka
在master上进行如下操作:

1. 解压kafka

下载kafka:http://apache.fayea.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz

[mw_shl_code=bash,true]tar -zxvf ~/jar/kafka_2.11-0.9.0.1.tgz -C /data
[/mw_shl_code]

链接: https://pan.baidu.com/s/1sk8CAeT 密码: 8h2r

2. 配置kakfa
涉及到的配置文件为${KAFA_HOME}/config/server.properties
必须要配置的是这三个参数:broker.id、log.dirs、zookeeper.connect
broker.id表示当前broker的id,要求是唯一的非负数。log.dirs表示kafka日志的存放目录。zookeeper.connect表示连接的zookeeper的地址。

[mw_shl_code=bash,true]broker.id=0
log.dirs=/data/kafka-logs
zookeeper.connect=master:2181,slave1:2181,slave2:2181[/mw_shl_code]

注意:需要手动在本地创建/data/kafka-logs目录

3. 复制到其他节点

[mw_shl_code=bash,true]scp  /data/kafka_2.11-0.9.0.1/ /data/kafka-logs/ aboutyun@slave1:/data
scp  /data/kafka_2.11-0.9.0.1/ /data/kafka-logs/ aboutyun@slave2:/data[/mw_shl_code]
在slave1机器上将server.properties配置文件的broker.id值改为1在slave2机器上将server.properties配置文件的broker.id值改为2


4. 添加环境变量
在master、slave1、slave2机器上,分别将以下内容添加到~/.bashrc文件中

[mw_shl_code=bash,true]export KAFKA_HOME=/data/kafka_2.11-0.9.0.1
export PATH=$KAFKA_HOME/bin:$PATH[/mw_shl_code]
然后执行以下命令:source ~/.bashrc


5. 启动验证
在master、slave1、slave2机器上,分别执行kafka启动命令

[mw_shl_code=bash,true]cd $KAFKA_HOME
kafka-server-start.sh -daemon ./config/server.properties[/mw_shl_code]
之后在每台机器上执行jps命令:

如果每台机器上都成功启动了kafka这个进程,说明我们搭建成功。如果发现某台机器上没有kafka这个进程,可以将kafka的启动命令去掉参数-daemon(加上的话表示后台启动),这样可以直接在屏幕上看到错误信息。

三、kakfa使用示例

1. 创建topic

[mw_shl_code=bash,true]#创建一个有3个partition、1个副本的 test topic
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic test --replication-factor 1 --partitions 3[/mw_shl_code]

2. 创建producer

[mw_shl_code=bash,true]kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9094 --topic test[/mw_shl_code]



3. 创建consumer
重新打开一个窗口:

[mw_shl_code=bash,true]kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181  --topic test --from-beginning[/mw_shl_code]

4. 产生消息并接受

在producer的窗口中输入几条测试信息



查看consumer窗口的情况

说明成功消费了的产生的3条信息

下一篇flume与kafka整合高可靠教程2:flume与kafka整合安装
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22173



来自:
about云日志分析项目准备8:Kafka集群安装
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20671






本帖被以下淘专辑推荐:

已有(6)人评论

跳转到指定楼层
Brown羊羊 发表于 2017-7-10 10:40:34
整合的部分没有啊
回复

使用道具 举报

yunge2016 发表于 2017-7-10 13:44:01
是不是还没更新完呢 楼主,想看看具体flume如何采集kafka里的数据呢,具体怎么操作呀
回复

使用道具 举报

pig2 发表于 2017-7-10 19:42:11
Brown羊羊 发表于 2017-7-10 10:40
整合的部分没有啊

flume与kafka整合高可靠教程2:flume与kafka整合安装
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22173


回复

使用道具 举报

微微上翘 发表于 2017-7-17 09:02:20
好东西,学习学习
回复

使用道具 举报

hadoop_to_spark 发表于 2017-9-7 09:40:27
很详细,辛苦楼主了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条