分享

如何将本地目录下txt文件内容发送给Kafka(求代码范例)

harley 发表于 2017-1-8 21:04:37 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 40239
本帖最后由 harley 于 2017-1-8 21:07 编辑

本人刚接触Kafka, 是个小白,求大神帮忙解答,谢谢!

应用场景:
我需要从Oracle中不断把业务数据导入到Kafka, 并写入HBase或Hive

目前想使用的方法:
1. 每15分钟从Oracle中导出数据生成txt文件放于本地目录(非Hadoop节点)
2. 同时将文件内容取出并发送给Kafka
3. 再用Spark Streaming来订阅Kafka中的数据进行处理写入Hive

目前有以下二点问题,求大神解答:
1.以上方法是否可行? 有没有更实时简单的方法(除Oracle golden gate外)
2.如何将文本文件传送给Kafka (求代码范例)?
3.除Spark Streaming外,有无其它实时处理方案?

已有(6)人评论

跳转到指定楼层
J20_果农 发表于 2017-1-8 21:39:45
本帖最后由 J20_果农 于 2017-1-8 21:41 编辑

目前有以下二点问题,求大神解答:
1.以上方法是否可行? 有没有更实时简单的方法(除Oracle golden gate外)
答:为什么这么折腾呢?直接用sqoop 从oracle导入hive就可以了。
2.如何将文本文件传送给Kafka (求代码范例)?
答:写个生产者线程类,读取文件内容,然后发送给kafka,具体代码可以看kafka官方例子
核心代码如下:producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

读写文件的方法:
[mw_shl_code=java,true]        /**
         * 以行为单位读取文件,常用于读面向行的格式化文件
         * @param fileName        文件名
         */
        public static void readFileByLines(String fileName){
                File file = new File(fileName);
                BufferedReader reader = null;
                try {
                        System.out.println("以行为单位读取文件内容,一次读一整行:");
                        reader = new BufferedReader(new FileReader(file));
                        String tempString = null;
                        int line = 1;
                        //一次读入一行,直到读入null为文件结束
                        while ((tempString = reader.readLine()) != null){
                                //显示行号
                                System.out.println("line " + line + ": " + tempString);
                                line++;
                        }
                        reader.close();
                } catch (IOException e) {
                        e.printStackTrace();
                } finally {
                        if (reader != null){
                                try {
                                        reader.close();
                                } catch (IOException e1) {
                                }
                        }
                }
        }[/mw_shl_code]

3.除Spark Streaming外,有无其它实时处理方案?
答:也可以使用storm来处理

回复

使用道具 举报

harley 发表于 2017-1-8 22:40:52
@J20_果农 非常感谢你的答复 基本上解决了我的问题
因为数据量较大,一个小时上千万的数据,之前测试sqoop有卡死的情况,目前是用sql loader导出效率会更高一些.

我需要的最终方案是数据能从Oracle尽量实时的写入HBase或Hive,因刚研究实时数据处理,在网上查了好久,比较乱,还没理清楚

能否再帮忙解答一下,往Kafka写数据只能一行一行的发送吗,不能一个文件一个文件的发送吗
回复

使用道具 举报

einhep 发表于 2017-1-9 08:51:26
harley 发表于 2017-1-8 22:40
@J20_果农 非常感谢你的答复 基本上解决了我的问题
因为数据量较大,一个小时上千万的数据,之前测试sqoop ...

这个不是的


创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic"例子,它设置了2个参数max message size 和 flush rate.
bin/kafka-topics.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka02 --create --topic my-topic --partitions 1   --replication-factor 1
--config max.message.bytes=64000 --config flush.messages=1

可以设置topic的大小和传输率

下面是一个topic模型,更多可以在找找资料




回复

使用道具 举报

harley 发表于 2017-1-10 22:38:34
本帖最后由 harley 于 2017-1-10 22:40 编辑

@einhep 你说的是topic设置,我问的是怎么样多条记录一起发送,而不是一条一条的Send, 例如我将txt中所有记录抓出,一起发送给Kafka,避免数据太多,单条单条发送中间可能丢失数据

在网上有看到这个的说明,但是没有找到MessageSet怎么用的例子:
Kafka系统默认支持MessageSet,把多条Message自动地打成一个Group后发送出去,均摊后拉低了每次通信的RTT。而且在组织MessageSet的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。

能帮忙说明一下吗,谢谢啦! @einhep @J20_果农
回复

使用道具 举报

lihao 发表于 2018-1-10 09:32:15
J20_果农 发表于 2017-1-8 21:39
目前有以下二点问题,求大神解答:
1.以上方法是否可行? 有没有更实时简单的方法(除Oracle golden gate外 ...

你好,这个读取文件的代码没有返回值,如何向send中传数据啊?
回复

使用道具 举报

lihao 发表于 2018-1-10 09:59:02
lihao 发表于 2018-1-10 09:32
你好,这个读取文件的代码没有返回值,如何向send中传数据啊?

不好意思,已经搞定了,谢谢代码提供者了。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条