分享

Flume 1.7 源码分析

本帖最后由 desehawk 于 2017-2-27 17:34 编辑

问题导读
1.如何配置maven获取源码?
2.本文认为flume启动包含哪两个步骤?
3.源码如何实现获取启动配置的?






1源码编译

1 说明

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,目前已经是Apache的一个子项目。Flume是一个专用工具被设计为旨在往HDFS、HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。本文将详细分析Flume核心模块的源码实现。

2 下载、编译

2.1 源码检出

Flume源码的Git地址是:https://git-wip-us.apache.org/repos/asf/flume.git。本文采用的分支是flume-1.7。

2.2 源码编译

Flume采用Maven进行构建。本文采用IDEA作为编译、调试工具。

2.2.1 Maven依赖下载

配置阿里云的Maven服务器,速度较快,并可完成绝大多数包的下载。

[mw_shl_code=bash,true]<mirror>  
    <id>nexus-aliyun</id>
    <mirrorOf>*</mirrorOf>
    <name>Nexus aliyun</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>[/mw_shl_code]

使用阿里云Maven服务器,仍然有一些包无法解决,例如FlumeAuth模块的hadoop-minikdc和SolrSink的kite-morphlines-all等。由于这部分缺失的包,基本都是属于flume的插件模块之中,可直接关闭这些插件模块,不会影响主程序的执行和源码阅读。同时,还可移除其他的你不需要使用的source和sink和channel插件模块,可加快编译速度。
导入好的项目如下图所示:
1.png
其中比较重要的就是上图圈起来的模块了,其中flume-ng-core存放了最核心部分的代码,包含基础的Source、Channel、Sink等;flume-ng-node则是存放了程序启动的代码(入口函数)。
其他可能会用到的模块就是flume-ng-sources、flume-ng-channels、flume-ng-sinks,这3个模块存放了非必须的flume组件(flume-ng-core中未包含的),里面有些组件也是很常用的。
2.2.2 编译打包
使用mvn clean install -Dmaven.test.skip=true进行打包,看到BUILD SUCCESS则代表编译成功,可进行下一步。
本步骤一定要有,不然debug的时候会报某些类找不到,这些类大多数是由avro定义的文件编译后才会生成的class文件。
2.2.3 简单测试
在TestMemoryChannel类右键选择Run ‘TestMemoryChannel’运行JUnit单元测试。检查是否有报错。


2整体架构

Flume有三大组件:Source、Channel、Sink。
flume架构.png
  • Source就是数据来源,例如Web Server产生日志后,可使用ExecSource执行tail -F命令后不断监听日志文件新生成的数据,然后传给Channel。
  • Channel就是一个缓存队列,由于读取数据和写入数据的速度可能不匹配,假如用同步完成的方式可能效率低下,所以Source把数据写到Channel这个队列里面,Sink再用另外的线程去读取。
  • Sink就是最终的存储,例如可以是HDFS或LOG文件输出等,Sink负责去Channel里面读取数据,并存储。
在程序启动时,会启动所有的SourceRunner、Channel、SinkRunner。其中Channel的启动,没做什么特别的事情,就是初始化一下状态、创建一下计数器,算做一个被动的角色。比较重要的是SourceRunner和SinkRunner。
  • SourceRunner会调用Source的start方法。以ExecSource为例,其start方法就是启动一个线程,去不断获取标准输出流写入一个列表(eventList),同时再启动一个线程去定期批量地把列表中的数据往Channel发,如下图所示。
  • SinkRunner则是不断循环调用SinkProcess的process的方法,SinkProcess有几种类型,用于决定选择哪个Sink进行存储(Sink可以有多个),选择了Sink后,调用其process方法。Sink的process方法,主要做的就是去Channel中读取数据,并写入对应的存储,如下图所示。
flume图.png


3 程序入口

启动Flume的过程可以简单分为2个步骤:
1. 获取相关配置文件(一般来说就是flume-conf.properties)。
2. 启动各组件。不特别说明,本文中的组件是指实现了LifecycleAware接口的类的对象,一般就是Source、Channel、Sink这3种对象。

3.1 获取启动配置

3.1.1 Main函数

启动Flume的Main函数在flume-ng-node模块的org.apache.flume.node.Application。该函数的功能可以简单划分为以下三个步骤:
1. 使用commons.cli类获取命令行参数(就是启动时传入的参数)
2. 根据启动参数确定的读取配置的方式。读取配置的方式总共有4种,分别根据配置是保存在zookeeper上还是本地properties文件、以及是否reload(自动重载配置文件)分为4种方式。
3. 根据相应的配置启动程序,并注册关闭钩子。
接下来以properties文件、不重载的方式为例,主要的代码如下:

[mw_shl_code=java,true]PropertiesFileConfigurationProvider configurationProvider =
    new PropertiesFileConfigurationProvider(agentName, configurationFile);
//创建Application对象,包含初始化组件列表(components),初始化LifecycleSupervisor。
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
//start方法用于检查所有组件是否是启动状态,如果不是则启动该组件。
application.start();
//监听程序关闭事件,用于当程序被kill后能够执行一些清理工作。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
  public void run() {
    appReference.stop();
  }
});[/mw_shl_code]

上面的代码,有两处比较关键:

configurationProvider.getConfiguration()会返回一个MaterializedConfiguration类型的对象,用于从文件形式的配置转为物化的配置,即包含实际的channel、sinkRunner等对象的实例,在“物化配置”一节分析。
handleConfigurationEvent用于停止所有components,并使用新的配置进行启动,在“使用新配置重启”一节分析。

4.1.2 物化配置

configurationProvider.getConfiguration()方法主要做了以下两件事:
1. 读取配置文件(flume-conf.properties),保存在AgentConfiguration对象中。

[mw_shl_code=java,true]public static class AgentConfiguration {
  private final String agentName;
  private String sources;
  private String sinks;
  private String channels;
  private String sinkgroups;
  private final Map<String, ComponentConfiguration> sourceConfigMap;
  private final Map<String, ComponentConfiguration> sinkConfigMap;
  private final Map<String, ComponentConfiguration> channelConfigMap;
  private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
  private Map<String, Context> sourceContextMap;
  private Map<String, Context> sinkContextMap;
  private Map<String, Context> channelContextMap;
  private Map<String, Context> sinkGroupContextMap;
  private Set<String> sinkSet;
  private Set<String> sourceSet;
  private Set<String> channelSet;
  private Set<String> sinkgroupSet;
}[/mw_shl_code]

到这个步骤还仅仅是做好了分类的文本形式的配置项。
2. 创建出配置中的各组件实例,并添加到MaterializedConfiguration实例中。

[mw_shl_code=java,true]public interface MaterializedConfiguration {
  public void addSourceRunner(String name, SourceRunner sourceRunner);
  public void addSinkRunner(String name, SinkRunner sinkRunner);
  public void addChannel(String name, Channel channel);
  public ImmutableMap<String, SourceRunner> getSourceRunners();
  public ImmutableMap<String, SinkRunner> getSinkRunners();
  public ImmutableMap<String, Channel> getChannels();
}[/mw_shl_code]

在这个实例中,可以获取配置文件中配置的所有的source、channel、sink,并且是“物化”的,即可以直接取得相关组件的实例。

4.2 启动所有组件

4.2.1 使用新配置重启

有了上面的MaterializedConfiguration实例,我们就可以启动组件了。
在handleConfigurationEvent方法中,首先会停止所有组件,然后再启动所有组件。

[mw_shl_code=java,true]stopAllComponents();
startAllComponents(conf); //这里的conf就是上节的MaterializedConfiguration。[/mw_shl_code]
在startAllComponents方法中,会遍历组件列表(SourceRunners、SinkRunners、Channels),分别调用supervise方法。以Channel为例:
[mw_shl_code=java,true]for (Entry<String, Channel> entry :
    materializedConfiguration.getChannels().entrySet()) {
  try {
    logger.info("Starting Channel " + entry.getKey());
    supervisor.supervise(entry.getValue(),
        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  } catch (Exception e) {
    logger.error("Error while starting {}", entry.getValue(), e);
  }
}[/mw_shl_code]


这个supervise方法简单来说,就是将相应组件的状态转化为期望的状态。例如上面代码中的LifecycleState.START就是期望的状态。

4.2.2 LifecycleSupervisor

上节的supervisor是一个LifecycleSupervisor对象。前面有说到,在创建Application的时候初始化了一个LifecycleSupervisor对象,就是这里的supervisor。这个对象,我理解为各组件生命周期的管理者,用于实时监控所有组件的状态,如果不是期望的状态(desiredState),则进行状态转换。

上节的代码中调用了supervisor.supervise方法,接下来分析一下supervise这个方法:

[mw_shl_code=java,true]public synchronized void supervise(LifecycleAware lifecycleAware,
    SupervisorPolicy policy, LifecycleState desiredState) {
  //省略状态检查的代码
Supervisoree process = new Supervisoree();
  process.status = new Status();
  process.policy = policy;
  process.status.desiredState = desiredState;
  process.status.error = false;
  MonitorRunnable monitorRunnable = new MonitorRunnable();
  monitorRunnable.lifecycleAware = lifecycleAware;
  monitorRunnable.supervisoree = process;
  monitorRunnable.monitorService = monitorService;
  supervisedProcesses.put(lifecycleAware, process);
  ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
      monitorRunnable, 0, 3, TimeUnit.SECONDS);
  monitorFutures.put(lifecycleAware, future);
}[/mw_shl_code]

由于所有的组件都实现了LifecycleAware接口,所以这里的supervise方法传入的是LifecycleAware接口的对象。

可以看到创建了一个Supervisoree对象,顾名思义,就是被监控的的对象,该对象有以下几种状态:IDLE, START, STOP, ERROR。
scheduleWithFixedDelay每隔3秒触发一次监控任务(monitorRunnable)。

4.2.3 MonitorRunnable

在MonitorRunnable中主要是检查组件的状态,并实现从lifecycleState到desiredState的转变。

[mw_shl_code=java,true]switch (supervisoree.status.desiredState) {
  case START:
    try {
      lifecycleAware.start();
    } catch (Throwable e) {省略}
    break;
  case STOP:
    try {
      lifecycleAware.stop();
    } catch (Throwable e) {省略}
    break;
  default:
    logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState);
}[/mw_shl_code]
到这里为止,可以看到监控的进程,调用了组件自己的start和stop方法来启动、停止。前面有提到有3种类型的组件,SourceRunner、Channel、SinkRunner,而Channel的start只做了初始化计数器,没什么实质内容,所以接下来从SourceRunner的启动(从Source写数据到Channel)和SinkRunner的启动(从Channel获取数据写入Sink)来展开说明。


作者:Lnho
来自:csdn

已有(3)人评论

跳转到指定楼层
qq642169746 发表于 2017-2-28 09:27:35
这个好,写的很不错,需要学习
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条