分享

Flume-NG启动过程源码分析(三)

xioaxu790 2014-8-17 15:31:06 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 10954
问题导读
1、什么是是SimpleMaterializedConfiguration的一个实例?
2、如何分析startAllComponents(conf)方法?
3、flume的所有组件均实现自什么 接口?





      本篇分析加载配置文件后各个组件是如何运行的?
  加载完配置文件订阅者Application类会收到订阅信息执行:
  1.   @Subscribe
  2.   public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
  3.     stopAllComponents();
  4.     startAllComponents(conf);
  5.   }
复制代码


  MaterializedConfiguration conf就是getConfiguration()方法获取的配置信息,是SimpleMaterializedConfiguration的一个实例。

  handleConfigurationEvent方法在前面章节(一)中有过大致分析,包括:stopAllComponents()和startAllComponents(conf)。Application中的materializedConfiguration就是MaterializedConfiguration conf,stopAllComponents()方法中的materializedConfiguration是旧的配置信息,需要先停掉旧的组件,然后startAllComponents(conf)将新的配置信息赋给materializedConfiguration并依次启动各个组件。

  1、先看startAllComponents(conf)方法。代码如下:

  1. private void startAllComponents(MaterializedConfiguration materializedConfiguration) {//启动所有组件最基本的三大组件
  2.     logger.info("Starting new configuration:{}", materializedConfiguration);
  3.     this.materializedConfiguration = materializedConfiguration;
  4.     for (Entry<String, Channel> entry :
  5.       materializedConfiguration.getChannels().entrySet()) {
  6.       try{
  7.         logger.info("Starting Channel " + entry.getKey());
  8.         supervisor.supervise(entry.getValue(),
  9.             new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  10.       } catch (Exception e){
  11.         logger.error("Error while starting {}", entry.getValue(), e);
  12.       }
  13.     }
  14.     /*
  15.      * Wait for all channels to start.等待所有channel启动完毕
  16.      */
  17.     for(Channel ch: materializedConfiguration.getChannels().values()){
  18.       while(ch.getLifecycleState() != LifecycleState.START
  19.           && !supervisor.isComponentInErrorState(ch)){
  20.         try {
  21.           logger.info("Waiting for channel: " + ch.getName() +
  22.               " to start. Sleeping for 500 ms");
  23.           Thread.sleep(500);
  24.         } catch (InterruptedException e) {
  25.           logger.error("Interrupted while waiting for channel to start.", e);
  26.           Throwables.propagate(e);
  27.         }
  28.       }
  29.     }
  30.     for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
  31.         .entrySet()) {        //启动所有sink
  32.       try{
  33.         logger.info("Starting Sink " + entry.getKey());
  34.         supervisor.supervise(entry.getValue(),
  35.           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  36.       } catch (Exception e) {
  37.         logger.error("Error while starting {}", entry.getValue(), e);
  38.       }
  39.     }
  40.     for (Entry<String, SourceRunner> entry : materializedConfiguration
  41.         .getSourceRunners().entrySet()) {//启动所有source
  42.       try{
  43.         logger.info("Starting Source " + entry.getKey());
  44.         supervisor.supervise(entry.getValue(),
  45.           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  46.       } catch (Exception e) {
  47.         logger.error("Error while starting {}", entry.getValue(), e);
  48.       }
  49.     }
  50.     this.loadMonitoring();
  51.   }
复制代码


  三大组件都是通过supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START)启动的,其中,channel启动之后还要待所有的channel完全启动完毕之后才可再去启动sink和source。如果channel没有启动完毕就去启动另外俩组件,会出现错误,以为一旦sink或者source建立完毕就会立即与channel通信获取数据。稍后会分别分析sink和source的启动。

  supervisor是LifecycleSupervisor的一个对象,该类的构造方法会构造一个有10个线程,上限是20的线程池供各大组件使用。构造方法如下:

  1. public LifecycleSupervisor() {
  2.     lifecycleState = LifecycleState.IDLE;
  3.     supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();//存储所有历史上的组件及其监控信息
  4.     monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
  5.     monitorService = new ScheduledThreadPoolExecutor(10,
  6.         new ThreadFactoryBuilder().setNameFormat(
  7.             "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
  8.             .build());
  9.     monitorService.setMaximumPoolSize(20);
  10.     monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
  11.     purger = new Purger();
  12.     needToPurge = false;
  13.   }
复制代码


  supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState)方法则是具体执行启动各个组件的方法。flume的所有组件均实现自LifecycleAware 接口,如图:,这个接口就三个方法getLifecycleState(返回组件运行状态)、start(组件启动)、stop(停止组件)。supervise方法代码如下:
  1. public synchronized void supervise(LifecycleAware lifecycleAware,
  2.       SupervisorPolicy policy, LifecycleState desiredState) {
  3.   //检查线程池状态
  4.     if(this.monitorService.isShutdown()
  5.         || this.monitorService.isTerminated()
  6.         || this.monitorService.isTerminating()){
  7.       throw new FlumeException("Supervise called on " + lifecycleAware + " " +
  8.           "after shutdown has been initiated. " + lifecycleAware + " will not" +
  9.           " be started");
  10.     }
  11.   //如果该组件已经在监控,则拒绝二次监控
  12.     Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
  13.         "Refusing to supervise " + lifecycleAware + " more than once");
  14.     if (logger.isDebugEnabled()) {
  15.       logger.debug("Supervising service:{} policy:{} desiredState:{}",
  16.           new Object[] { lifecycleAware, policy, desiredState });
  17.     }
  18.   //新的组件
  19.     Supervisoree process = new Supervisoree();
  20.     process.status = new Status();
  21.     process.policy = policy;
  22.     process.status.desiredState = desiredState;
  23.     process.status.error = false;
  24.     MonitorRunnable monitorRunnable = new MonitorRunnable();
  25.     monitorRunnable.lifecycleAware = lifecycleAware;//组件
  26.     monitorRunnable.supervisoree = process;
  27.     monitorRunnable.monitorService = monitorService;
  28.     supervisedProcesses.put(lifecycleAware, process);
  29.     //创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。
  30.     ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
  31.         monitorRunnable, 0, 3, TimeUnit.SECONDS);  //启动MonitorRunnable,结束之后3秒再重新启动,可以用于重试
  32.     monitorFutures.put(lifecycleAware, future);
  33.   }
复制代码




  该方法首先monitorService是否是正常运行状态;然后构造Supervisoree process = new Supervisoree(),进行赋值并构造一个监控进程MonitorRunnable,放入线程池去执行。

  MonitorRunnable.run()方法:

  1. public void run() {
  2.       logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
  3.           supervisoree);
  4.       long now = System.currentTimeMillis();//获取现在的时间戳
  5.       try {
  6.         if (supervisoree.status.firstSeen == null) {
  7.           logger.debug("first time seeing {}", lifecycleAware);
  8.       //如果这个组件是是初次受监控
  9.           supervisoree.status.firstSeen = now;
  10.         }
  11.      //如果这个组件已经监控过
  12.         supervisoree.status.lastSeen = now;
  13.         synchronized (lifecycleAware) {//锁住组件
  14.           if (supervisoree.status.discard) {//该组件已经停止监控
  15.             // Unsupervise has already been called on this.
  16.             logger.info("Component has already been stopped {}", lifecycleAware);
  17.             return;//直接返回
  18.           } else if (supervisoree.status.error) {//该组件是错误状态
  19.             logger.info("Component {} is in error state, and Flume will not"
  20.                 + "attempt to change its state", lifecycleAware);
  21.             return;//直接返回
  22.           }
  23.           supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();//获取组件最新状态,没运行start()方法之前是LifecycleState.IDLE状态
  24.           if (!lifecycleAware.getLifecycleState().equals(
  25.               supervisoree.status.desiredState)) {//该组件最新状态和期望的状态不一致
  26.             logger.debug("Want to transition {} from {} to {} (failures:{})",
  27.                 new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
  28.                     supervisoree.status.desiredState,
  29.                     supervisoree.status.failures });
  30.             switch (supervisoree.status.desiredState) {//根据状态执行相应的操作
  31.               case START:
  32.                 try {
  33.                   lifecycleAware.start();   //启动组件,同时其状态也会变为LifecycleState.START
  34.                 } catch (Throwable e) {
  35.                   logger.error("Unable to start " + lifecycleAware
  36.                       + " - Exception follows.", e);
  37.                   if (e instanceof Error) {
  38.                     // This component can never recover, shut it down.
  39.                     supervisoree.status.desiredState = LifecycleState.STOP;
  40.                     try {
  41.                       lifecycleAware.stop();
  42.                       logger.warn("Component {} stopped, since it could not be"
  43.                           + "successfully started due to missing dependencies",
  44.                           lifecycleAware);
  45.                     } catch (Throwable e1) {
  46.                       logger.error("Unsuccessful attempt to "
  47.                           + "shutdown component: {} due to missing dependencies."
  48.                           + " Please shutdown the agent"
  49.                           + "or disable this component, or the agent will be"
  50.                           + "in an undefined state.", e1);
  51.                       supervisoree.status.error = true;
  52.                       if (e1 instanceof Error) {
  53.                         throw (Error) e1;
  54.                       }
  55.                       // Set the state to stop, so that the conf poller can
  56.                       // proceed.
  57.                     }
  58.                   }
  59.                   supervisoree.status.failures++;//启动错误失败次数+1
  60.                 }
  61.                 break;
  62.               case STOP:
  63.                 try {
  64.                   lifecycleAware.stop();    //停止组件
  65.                 } catch (Throwable e) {
  66.                   logger.error("Unable to stop " + lifecycleAware
  67.                       + " - Exception follows.", e);
  68.                   if (e instanceof Error) {
  69.                     throw (Error) e;
  70.                   }
  71.                   supervisoree.status.failures++;  //组件停止错误,错误次数+1
  72.                 }
  73.                 break;
  74.               default:
  75.                 logger.warn("I refuse to acknowledge {} as a desired state",
  76.                     supervisoree.status.desiredState);
  77.             }
  78.        //两种SupervisorPolicy(AlwaysRestartPolicy和OnceOnlyPolicy)后者还未使用过,前者表示可以重新启动的组件,后者表示只能运行一次的组件
  79.             if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
  80.               logger.error(
  81.                   "Policy {} of {} has been violated - supervisor should exit!",
  82.                   supervisoree.policy, lifecycleAware);
  83.             }
  84.           }
  85.         }
  86.       } catch(Throwable t) {
  87.         logger.error("Unexpected error", t);
  88.       }
  89.       logger.debug("Status check complete");
  90.     }
复制代码



  上面的 lifecycleAware.stop()和lifecycleAware.start()就是执行的sink、source、channel等的对应方法。

  这里的start需要注意如果是channel则是直接执行start方法;如果是sink或者PollableSource的实现类,则会在start()方法中启动一个线程来循环的调用process()方法来从channel拿数据(sink)或者向channel送数据(source);如果是EventDrivenSource的实现类,则没有process()方法,通过执行start()来执行想channel中送数据的操作(可以在此添加线程来实现相应的逻辑)。

  2、stopAllComponents()方法。顾名思义,就是停止所有组件的方法。该方法代码如下:

  1. private void stopAllComponents() {
  2.     if (this.materializedConfiguration != null) {
  3.       logger.info("Shutting down configuration: {}", this.materializedConfiguration);
  4.       for (Entry<String, SourceRunner> entry : this.materializedConfiguration
  5.           .getSourceRunners().entrySet()) {
  6.         try{
  7.           logger.info("Stopping Source " + entry.getKey());
  8.           supervisor.unsupervise(entry.getValue());
  9.         } catch (Exception e){
  10.           logger.error("Error while stopping {}", entry.getValue(), e);
  11.         }
  12.       }
  13.       for (Entry<String, SinkRunner> entry :
  14.         this.materializedConfiguration.getSinkRunners().entrySet()) {
  15.         try{
  16.           logger.info("Stopping Sink " + entry.getKey());
  17.           supervisor.unsupervise(entry.getValue());
  18.         } catch (Exception e){
  19.           logger.error("Error while stopping {}", entry.getValue(), e);
  20.         }
  21.       }
  22.       for (Entry<String, Channel> entry :
  23.         this.materializedConfiguration.getChannels().entrySet()) {
  24.         try{
  25.           logger.info("Stopping Channel " + entry.getKey());
  26.           supervisor.unsupervise(entry.getValue());
  27.         } catch (Exception e){
  28.           logger.error("Error while stopping {}", entry.getValue(), e);
  29.         }
  30.       }
  31.     }
  32.     if(monitorServer != null) {
  33.       monitorServer.stop();
  34.     }
  35.   }
复制代码




  首先,需要注意的是,stopAllComponents()放在startAllComponents(MaterializedConfiguration materializedConfiguration)方法之前的原因,由于配置文件的动态加载这一特性的存在,使得每次加载之前都要先把旧的组件停掉,然后才能去加载最新配置文件中的配置;

  其次,首次执行stopAllComponents()时,由于配置文件尚未赋值,所以并不会执行停止所有组件的操作以及停止monitorServer。再次加载时会依照顺序依次停止对source、sink以及channel的监控,通过supervisor.unsupervise(entry.getValue())停止对其的监控,然后停止monitorServer。supervisor.unsupervise方法如下:
  1. public synchronized void unsupervise(LifecycleAware lifecycleAware) {
  2.     Preconditions.checkState(supervisedProcesses.containsKey(lifecycleAware),
  3.         "Unaware of " + lifecycleAware + " - can not unsupervise");
  4.     logger.debug("Unsupervising service:{}", lifecycleAware);
  5.     synchronized (lifecycleAware) {
  6.     Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
  7.     supervisoree.status.discard = true;
  8.       this.setDesiredState(lifecycleAware, LifecycleState.STOP);
  9.       logger.info("Stopping component: {}", lifecycleAware);
  10.       lifecycleAware.stop();
  11.     }
  12.     supervisedProcesses.remove(lifecycleAware);
  13.     //We need to do this because a reconfiguration simply unsupervises old
  14.     //components and supervises new ones.
  15.     monitorFutures.get(lifecycleAware).cancel(false);
  16.     //purges are expensive, so it is done only once every 2 hours.
  17.     needToPurge = true;
  18.     monitorFutures.remove(lifecycleAware);
  19.   }
复制代码


  该方法首先会检查正在运行的组件当中是否有此组件supervisedProcesses.containsKey(lifecycleAware);如果存在,则对此组件标记为已取消监控supervisoree.status.discard = true;将状态设置为STOP,并停止组件lifecycleAware.stop();然后从删除此组件的监控记录,包括从记录正在处于监控的组件的结构supervisedProcesses以及记录组件及其对应的运行线程的结构monitorFutures中删除相应的组件信息,并且needToPurge = true会使得两小时执行一次的线程池清理操作。

  有一个问题就是,sink和source是如何找到对应的channel的呢??其实前面章节就已经讲解过,分别在AbstractConfigurationProvider.loadSources方法中通过ChannelSelector配置source对应的channel,而在source中通过getChannelProcessor()获取channels,通过channelProcessor.processEventBatch(eventList)将events发送到channel中;而在AbstractConfigurationProvider.loadSinks方法中sink.setChannel(channelComponent.channel)来设置此sink对应的channel,然后在sink的实现类中通过getChannel()获取设置的channel,并使用channel.take()从channel中获取event进行处理。

  以上三节是Flume-NG的启动、配置文件的加载、配置文件的动态加载、组件的执行的整个流程。文中的疏漏之处,请各位指教,我依然会后续继续完善这些内容的。





本文转载自:http://www.cnblogs.com/lxf20061900/p/3679240.html

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条