分享

Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动

坎蒂丝_Swan 发表于 2014-12-13 19:06:31 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 11465
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:43 编辑

问题导读
问题1:类SingletonAlarmService的初始化操作完成了哪些内容?
问题2:PartitionedAlarmService类初始化操作完成了哪些内容?









ceilometer-alarm-evaluator服务的初始化和启动



    本篇博客将解析服务组件ceilometer-alarm-evaluator的初始化和启动操作,这个服务组件即ceilometer报警服务;来看方法/ceilometer/cli.py----def alarm_evaluator,这个方法即实现了ceilometer-alarm-evaluator服务的初始化和启动操作。


  1. def alarm_evaluator():  
  2.     """
  3.     加载并启动SingletonAlarmService服务(单例报警服务);
  4.     报警服务系统:SingletonAlarmService和PartitionedAlarmService;
  5.     默认报警服务:ceilometer.alarm.service.SingletonAlarmService;
  6.     如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
  7.     """  
  8.     service.prepare_service()  
  9.       
  10.     """
  11.     launch:加载并启动SingletonAlarmService服务,最终调用了服务的start方法实现服务的启动;
  12.     配置文件定义了默认报警服务:ceilometer.alarm.service.SingletonAlarmService
  13.     SingletonAlarmService:单例的报警服务;
  14.     """  
  15.     eval_service = importutils.import_object(cfg.CONF.alarm.evaluation_service)  
  16.     os_service.launch(eval_service).wait(<span style="font-size:18px;"><span style="font-family:KaiTi_GB2312;">)</span></span>  
复制代码

方法小结:
    1.报警服务系统:
      SingletonAlarmService和PartitionedAlarmService;


    2.默认报警服务:
      ceilometer.alarm.service.SingletonAlarmService;


    3.如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;


1 单例报警服务SingletonAlarmService的初始化和启动操作


1.1 SingletonAlarmService类初始化操作


    类SingletonAlarmService的初始化操作主要完成了两部分内容:

    * 加载命名空间ceilometer.alarm.evaluator中的所有插件;
      ceilometer.alarm.evaluator =
          threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
          combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;

  * 建立线程池,用于后续报警器服务中若干操作的运行;

class SingletonAlarmService----def __init__
  1. class SingletonAlarmService(AlarmService, os_service.Service):  
  2.     def __init__(self):  
  3.         super(SingletonAlarmService, self).__init__()  
  4.         """
  5.         _load_evaluators:
  6.         这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
  7.         namespace = ceilometer.alarm.evaluator
  8.             ceilometer.alarm.evaluator =
  9.             threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
  10.             combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
  11.         """  
  12.         self._load_evaluators()  
  13.         self.api_client = None  
复制代码

class Service----def __init__

  1. class Service(object):  
  2.     def __init__(self, threads=1000):  
  3.         self.tg = threadgroup.ThreadGroup(threads)  
  4.         # signal that the service is done shutting itself down:  
  5.         self._done = event.Event()  
复制代码

class AlarmService----def _load_evaluators
  1. class AlarmService(object):  
  2.     EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"  
  3.   
  4.     def _load_evaluators(self):  
  5.         """
  6.         这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
  7.         namespace = ceilometer.alarm.evaluator
  8.         ceilometer.alarm.evaluator =
  9.         threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
  10.         combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
  11.         """  
  12.         self.evaluators = extension.ExtensionManager(  
  13.         namespace=self.EXTENSIONS_NAMESPACE,  
  14.         invoke_on_load=True,  
  15.         invoke_args=(rpc_alarm.RPCAlarmNotifier(),)  
  16.         )  
  17.         # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;  
  18.         self.supported_evaluators = [ext.name for ext in  
  19.                                      self.evaluators.extensions]  
复制代码

1.2 SingletonAlarmService类启动操作



    类的启动操作实现了单例报警器服务SingletonAlarmService的启动操作;


    按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms,方法self._evaluate_assigned_alarms实现获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;

class SingletonAlarmService----def start

  1. def start(self):  
  2.     """
  3.     单例报警器服务SingletonAlarmService的启动操作;
  4.     按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
  5.     获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
  6.     """  
  7.     super(SingletonAlarmService, self).start()  
  8.     if self.evaluators:  
  9.         """
  10.         评估周期60s;
  11.         """  
  12.         interval = cfg.CONF.alarm.evaluation_interval  
  13.               
  14.         # add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;  
  15.         # _evaluate_assigned_alarms:  
  16.         # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;  
  17.         # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:  
  18.         # 单一报警器状态的评估判定;  
  19.         # 联合报警器状态的评估判定;  
  20.         self.tg.add_timer(  
  21.             interval,  
  22.             self._evaluate_assigned_alarms,  
  23.             0)  
  24.     # Add a dummy thread to have wait() working  
  25.     self.tg.add_timer(604800, lambda: None)  
复制代码

class AlarmService----def _evaluate_assigned_alarms

    这个方法实现获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;


    1.获取当前部分的所有报警器;


    2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;


  1. def _evaluate_assigned_alarms(self):  
  2.     """
  3.     获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
  4.     1.获取当前部分的所有报警器;
  5.     2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
  6.       来实现单一报警器模式或者联合报警器模式的评估判定;
  7.     """  
  8.     try:  
  9.         # 获取所有报警器列表;  
  10.         # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;  
  11.         alarms = self._assigned_alarms()  
  12.         LOG.info(_('initiating evaluation cycle on %d alarms') %  
  13.                  len(alarms))  
  14.               
  15.         # 遍历所有的报警器;  
  16.         # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:  
  17.         # 单一报警器状态的评估判定;  
  18.         # 联合报警器状态的评估判定;  
  19.         for alarm in alarms:  
  20.             self._evaluate_alarm(alarm)  
  21.     except Exception:  
  22.         LOG.exception(_('alarm evaluation cycle failed'))  
复制代码


2 分布式报警服务PartitionedAlarmService的初始化和启动操作

2.1 PartitionedAlarmService类初始化操作

    PartitionedAlarmService类初始化操作和SingletonAlarmService类初始化操作内容大致是相同的,同样主要完成了两部分内容:

    * 加载命名空间ceilometer.alarm.evaluator中的所有插件;
      ceilometer.alarm.evaluator =
          threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
          combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;

    * 建立线程池,用于后续报警器服务中若干操作的运行;

    * 初始化分布式报警协议实现类PartitionCoordinator;

class PartitionedAlarmService----def __init__

  1. class PartitionedAlarmService(AlarmService, rpc_service.Service):  
  2.     """
  3.     分布式报警器系统服务;
  4.     """  
  5.     def __init__(self):  
  6.         super(PartitionedAlarmService, self).__init__(  
  7.             cfg.CONF.host,  
  8.             cfg.CONF.alarm.partition_rpc_topic,  
  9.             self  
  10.         )  
  11.         """
  12.         加载命名空间ceilometer.alarm.evaluator中的所有插件;
  13.         namespace = ceilometer.alarm.evaluator
  14.             ceilometer.alarm.evaluator =
  15.             threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
  16.             combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
  17.         """  
  18.         self._load_evaluators()  
  19.         self.api_client = None  
  20.         self.partition_coordinator = coordination.PartitionCoordinator()  
复制代码

class Service----def __init__

  1. class Service(object):  
  2.     def __init__(self, threads=1000):  
  3.         self.tg = threadgroup.ThreadGroup(threads)  
  4.         # signal that the service is done shutting itself down:  
  5.         self._done = event.Event()  
复制代码

class AlarmService----def _load_evaluators

  1. class AlarmService(object):  
  2.     EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"  
  3.   
  4.     def _load_evaluators(self):  
  5.         """
  6.         这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
  7.         namespace = ceilometer.alarm.evaluator
  8.         ceilometer.alarm.evaluator =
  9.         threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
  10.         combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
  11.         """  
  12.         self.evaluators = extension.ExtensionManager(  
  13.         namespace=self.EXTENSIONS_NAMESPACE,  
  14.         invoke_on_load=True,  
  15.         invoke_args=(rpc_alarm.RPCAlarmNotifier(),)  
  16.         )  
  17.         # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;  
  18.         self.supported_evaluators = [ext.name for ext in  
  19.                                      self.evaluators.extensions]  
复制代码

2.2 PartitionedAlarmService类启动操作

分布式报警器系统服务分布式报警器系统服务的启动和运行,按照一定的时间间隔周期性的执行以下操作:


1.实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);


2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
  如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
  情况1:所有报警器都要实现重新分配操作;
  情况2:只有新建立的报警器需要实现分配操作;


3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
  针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
  单一报警器模式或者联合报警器模式的评估判定;

class PartitionedAlarmService----def start

  1. def start(self):  
  2.     """
  3.     分布式报警器系统服务分布式报警器系统服务的启动和运行;
  4.     按照一定的时间间隔周期性的执行以下操作:
  5.     1.实现广播当前partition的存在性的存在性到所有的partition
  6.     (包括uuid和优先级信息);
  7.     2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
  8.       如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
  9.       情况1:所有报警器都要实现重新分配操作;
  10.       情况2:只有新建立的报警器需要实现分配操作;
  11.     3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
  12.       针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
  13.       单一报警器模式或者联合报警器模式的评估判定;
  14.     """  
  15.     # 启动PartitionedAlarmService服务;  
  16.     super(PartitionedAlarmService, self).start()  
  17.     if self.evaluators:  
  18.         """
  19.         报警评估周期60s;
  20.         """  
  21.         eval_interval = cfg.CONF.alarm.evaluation_interval  
  22.               
  23.         """
  24.         self.tg = threadgroup.ThreadGroup(1000)
  25.         按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
  26.         通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
  27.         """  
  28.         self.tg.add_timer(  
  29.             eval_interval / 4, # 15s  
  30.             self.partition_coordinator.report_presence,  
  31.             0)  
  32.               
  33.         """
  34.         按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
  35.         self.partition_coordinator.check_mastership:
  36.         实现定期检测主控权角色;
  37.         确定当前的partition是否是主控角色;
  38.         如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
  39.         """  
  40.         self.tg.add_timer(  
  41.             eval_interval / 2, # 30s  
  42.             self.partition_coordinator.check_mastership,  
  43.             eval_interval,     # 60s  
  44.             # _client:构建或重新使用一个经过验证的API客户端;  
  45.             *[eval_interval, self._client])  
  46.               
  47.         """
  48.         add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
  49.         self._evaluate_assigned_alarms:
  50.         先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
  51.         针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
  52.         单一报警器状态的评估判定;
  53.         联合报警器状态的评估判定;
  54.         """  
  55.         self.tg.add_timer(  
  56.             eval_interval,     # 60s  
  57.             # 执行报警器的评估操作;  
  58.             self._evaluate_assigned_alarms,  
  59.             eval_interval)  
  60.          
  61.     # Add a dummy thread to have wait() working  
  62.     self.tg.add_timer(604800, lambda: None)  
复制代码

class Service(service.Service)----def start

    因为这里实现的是分布式报警系统,涉及到节点服务间的消息通信,所以这里应用RPC消息队列实现服务间的通信,进行相关的初始化操作;

  1. class Service(service.Service):  
  2.     def start(self):  
  3.         """
  4.         为RPC通信建立到信息总线的连接;
  5.         1.建立指定类型的消息消费者;        
  6.         2.执行方法initialize_service_hook;
  7.         3.启动协程实现等待并消费处理队列中的消息;
  8.         """  
  9.         super(Service, self).start()  
  10.   
  11.         """
  12.         为RPC通信建立到信息总线的连接;
  13.         建立一个新的连接,或者从连接池中获取一个;
  14.         """  
  15.         self.conn = rpc.create_connection(new=True)  
  16.         LOG.debug(_("Creating Consumer connection for Service %s") %  
  17.                   self.topic)  
  18.   
  19.         """
  20.         RpcDispatcher:RPC消息调度类;
  21.         """  
  22.         dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],  
  23.                                                   self.serializer)  
  24.   
  25.         # Share this same connection for these Consumers  
  26.         """
  27.         create_consumer:建立指定类型的消息消费者(fanout or topic);
  28.         1.创建以服务的topic为路由键的消费者;
  29.         2.创建以服务的topic和本机名为路由键的消费者
  30.           (基于topic&host,可用来接收定向消息);
  31.         3.fanout直接投递消息,不进行匹配,速度最快
  32.           (fanout类型,可用于接收广播消息);
  33.         """  
  34.         self.conn.create_consumer(self.topic, dispatcher, fanout=False)  
  35.         node_topic = '%s.%s' % (self.topic, self.host)  
  36.         self.conn.create_consumer(node_topic, dispatcher, fanout=False)  
  37.         self.conn.create_consumer(self.topic, dispatcher, fanout=True)  
  38.   
  39.         # Hook to allow the manager to do other initializations after  
  40.         # the rpc connection is created.  
  41.         """
  42.         在消息消费进程启动前,必须先声明消费者;
  43.         建立一个'topic'类型的消息消费者;
  44.         根据消费者类(TopicConsumer)和消息队列名称
  45.         (pool_name:  ceilometer.collector.metering)
  46.         以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
  47.         """  
  48.         if callable(getattr(self.manager, 'initialize_service_hook', None)):  
  49.             self.manager.initialize_service_hook(self)  
  50.   
  51.         """
  52.         启动消费者线程;
  53.         consume_in_thread用evelent.spawn创建一个协程一直运行;
  54.         等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
  55.         启动协程实现等待并消费处理队列中的消息;
  56.         """  
  57.         self.conn.consume_in_thread()
复制代码


class PartitionedAlarmService----def initialize_service_hook


  1. class PartitionedAlarmService(AlarmService, rpc_service.Service):  
  2.     def initialize_service_hook(self, service):  
  3.         LOG.debug(_('initialize_service_hooks'))  
  4.          
  5.         # create_worker:建立一个'topic'类型的消息消费者;  
  6.         # 指定主题topic(alarm_partition_coordination)和队列名称pool_name(ceilometer.alarm.alarm_partition_coordination);  
  7.         self.conn.create_worker(  
  8.             # alarm_partition_coordination  
  9.             cfg.CONF.alarm.partition_rpc_topic,  
  10.             rpc_dispatcher.RpcDispatcher([self]),  
  11.             # ceilometer.alarm.alarm_partition_coordination  
  12.             'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic,  
  13.         )  
复制代码
至此,ceilometer-alarm-evaluator服务的初始化和启动操作分析完成。


附录:python的多重父类继承


    在上面的分析中,我们看到在服务初始化和启动的过程中,若干方法都是多重父类继承,这里需要注意的是父类方法的搜索顺序;实际上python经典类的父类方法搜索顺序是深度优先,而python新式类的父类方法搜索顺序是广度优先;

    我写了一个小小的测试程序,来看新式类的父类方法搜索顺序:

  1. class A(object):  
  2.     def start(self):  
  3.         print 'A-start'  
  4.   
  5. class B0(A):  
  6.     def start(self):  
  7.         super(B0,self).start()  
  8.         print 'B0-start'  
  9.   
  10. class B1(A):  
  11.     def start(self):  
  12.         super(B1,self).start()  
  13.         print 'B1-start'  
  14.   
  15. class C(B0,B1):  
  16.     def start(self):  
  17.         super(C,self).start()  
  18.         print 'C-start'  
  19.   
  20. TEST = C()  
  21. TEST.start()  
  22.   
  23. 输出为:  
  24. A-start  
  25. B1-start  
  26. B0-start  
  27. C-start  
复制代码







Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动




欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条