分享

各位好,请教个flumeng问题

2278 发表于 2014-6-30 23:34:22 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 12 18971
我想用flumeng采集日志后发往jafka 然后java通过API 取出,请问怎么整合哦》? 可否在JAVA直接从flumeng中取出数据

已有(12)人评论

跳转到指定楼层
hyj 发表于 2014-7-1 00:03:38


我们从基本入手:
agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1


#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/usr/aboutyunlog
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false

#配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://master:8020/aboutyunlog
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=4
agent1.sinks.sink1.channel=channel1


#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/usr/aboutyun_tmp123
agent1.channels.channel1.dataDirs=/usr/aboutyun_tmp
flume程序给了我们自定义的内容,这些内容,可以通过配置文件,上面三个的作用,我们在来看一下:

Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。  
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。  
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。  

flume给了我们自定义sink的接口,比如Flume(ng) 自定义sink实现和属性注入,你所说的从flume中去数据是指自定义sink吧。如果你觉得模糊的话,咱们在看下面内容。

我们在看下面内容:

source可以接收外部源发送过来的数据。不同的source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。

channel是一个存储地,接收source的输出,直到有sink消费掉channel中的数据。channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。

sink会消费channel中的数据,然后送给外部源或者其他source。如数据可以写入到HDFS或者HBase中。

所以从上面我们看到你
source也就是数据源,其实这里就是一个目录,channel的作用是缓存,sink就是不断从chanel去数据,所以能够使用Java  api写数据的,能够自定义的就只有sink了。

如果你是想自定义其它的话,可能性不大。你也可以自己通过Java api用来传输数据。













回复

使用道具 举报

2278 发表于 2014-7-1 00:11:28
hyj 发表于 2014-7-1 00:03
我们从基本入手:
flume程序给了我们自定义的内容,这些内容,可以通过配置文件,上面三个的作用,我们 ...

首先感谢 大晚上的还回帖,还有几个地方不明白的是,flumeng没提供java的API 。他的sink 输出只有几种 如hdfs   text  、控制台,我java程序该如何取呢,我想通过传到jafka里来实现 可行否? 参考 http://www.aboutyun.com/thread-6855-1-1.html
回复

使用道具 举报

2278 发表于 2014-7-1 00:15:05
hyj 发表于 2014-7-1 00:03
我们从基本入手:
flume程序给了我们自定义的内容,这些内容,可以通过配置文件,上面三个的作用,我们 ...

我查看了下同事的,,那个type  好像是他自定义的一个类,我想跟flume怎么关联起来的?

1.jpg

回复

使用道具 举报

hyj 发表于 2014-7-1 00:55:34
本帖最后由 hyj 于 2014-7-1 00:57 编辑
首先第一步你有这个类:
开发完毕,然后打包jar包,将jar包加入到flume中,有两种方法:


方法一
进入vi /etc/profile 在文件中加入 export FLUME_CLASSPATH = XXX/XX/XXX.jar
在CALSSPATH 的尾部加入:$FLUME_CLASSPATH , 然后保存退出后,在命令行执行sh /etc/profile ,通过在命令行输入 flume classpath 查看插件jar包是否在其中,在其中表示配置完成(注:开发插件时引入的jar包必须都能够在flume classpath下找到,如果找不到的话像加入插件包的方式加入)


方法二:

简单的方法是将开发的jar包和需要引入的jar包直接复制到/usr/lib/flume/lib
修改flume-site.xml文件 如下所示:

  1. <property>
  2. <name>flume.plugin.classes</name>   
  3. <value>插件类得全路径名(如:hello.HelloWorldSink) </value>
  4.     <description>Comma separated list of plugin classes</description>
  5.   </property>
复制代码



将flume-site.xml文件放到master 和collector的/etc/flume/conf 下
启动集群配置即可




回复

使用道具 举报

hyj 发表于 2014-7-1 00:57:23
这里在补充一下插件开发的步骤:


flume插件开发一般分如下步骤:
1 编写插件类(Sink/Source/Decorator)相应的继承flume的(EventSink.Base/ EventSource.Base/EventSinkDecorator<S>)
Source必须实现四个方法:

  1. void open() throws IOException
  2.         Event next() throws IOException
  3. void close() throws IOException
  4. ReportEvent getReport()
复制代码




  Sink和Decorator必须实现四个方法:
  1. void open() throws IOException
  2.         void append(Event e) throws IOException
  3.         void close() throws IOException
  4.         ReportEvent getReport()
复制代码


主要介绍sink插件的开发
append(Event e)方法中的event参数是flume框架自带的类,日志文件的一行记录会在flume框架中封装成一个event,在append方法中对event的处理就可以当做是处理日志文件的每一行,Event包含的六个属性是:
               
        Unix timestamp  时间戳
                                Nanosecond timestamp
                                Priority
        Source host 主机地址(这个地址是产生日志的文件地址)
                                Body 日志文件的一行
        Metadata table with an arbitrary number of attribute value pairs.


在sink的插件类中一定要是加入下面的代码:
public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
                List<Pair<String, SinkBuilder>> builders = new ArrayList<Pair<String, SinkBuilder>>();
                builders.add(new Pair<String, SinkBuilder>("HBaseEventSinkV1",
                                builder()));
                return builders;
        }


因为flume的SinkFactoryImpl中需要通过这份方法将写的插件类注册到flume,如果不写会报错,"HBaseEventSinkV1" 就是注册名
以开发collector的插件类为例,完成插件类的开发之后,将插件类打包分别放到master和collector主机上(注:多个master和多个collector的情况下要都放置,最好目录都一样,便于配置flume classpath)。




回复

使用道具 举报

hyj 发表于 2014-7-1 01:01:30
根据上面的方法,首先你可以自定义个sink,然后把这个类放到flume中,放到flume中后,你在配置文件中,配置自己自定义的type
agent1.sinks.sink1.type=“自定义

这样他应该就能找到你的

你可以自定义到Kafka
回复

使用道具 举报

2278 发表于 2014-7-1 09:33:19
hyj 发表于 2014-7-1 01:01
根据上面的方法,首先你可以自定义个sink,然后把这个类放到flume中,放到flume中后,你在配置文件中,配置 ...

   flume  ng 没有flume-site.xml把? 我同事的好像在安装目录下新创建了一个文件夹 plugins.d  然后在把相关jar 和拓展打成的jar放里面。。不用配置了吗??
回复

使用道具 举报

hyj 发表于 2014-7-1 11:13:55
2278 发表于 2014-7-1 09:33
flume  ng 没有flume-site.xml把? 我同事的好像在安装目录下新创建了一个文件夹 plugins.d  然后在把 ...
你可以尝试下,其ng的可能有点区别,不过基本原理应该都差不多
回复

使用道具 举报

2278 发表于 2014-7-1 11:14:59
hyj 发表于 2014-7-1 11:13
你可以尝试下,其ng的可能有点区别,不过基本原理应该都差不多

好的,谢谢,
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条