分享

Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动

坎蒂丝_Swan 发表于 2014-12-14 18:45:04 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 20225
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:44 编辑

问题导读
问题1:服务ceilometer-agent-compute的初始化操作实现了哪些内容的操作?
问题2:服务ceilometer-agent-compute的启动操作周期性地实现了哪些任务?








ceilometer-agent-compute服务的初始化和启动


    本篇帖子将解析服务组件ceilometer-agent-compute的初始化和启动操作,ceilometer-agent-compute服务组件主要用来收集计算节点上的虚拟机实例的监控数据,在每一个计算节点上都要运行这个服务组件,该agent通过Stevedore管理了一组pollster插件,分别用来获取计算节点上虚拟机的CPU,Disk IO,Network IO,Instance这些信息,这些信息大部分是通过调用Hypervisor的API来获取的,需要定期Poll轮询收集信息。

    来看方法/ceilometer/cli.py----def agent_compute,这个方法即实现了ceilometer-agent-compute服务的初始化和启动操作。

  1. def agent_compute():  
  2.     """
  3.     加载并启动指定的服务compute_manager.AgentManager();
  4.     ceilometer.poll.compute:
  5.     该组件用来收集计算节点上的信息,在每一个计算节点上都要运行一个Compute Agent,
  6.     该Agent通过Stevedore管理了一组pollster插件,分别用来获取虚拟机的CPU,Disk IO,
  7.     Network IO,Instance这些信息,值得一提的是这些信息大部分是通过调用Hypervisor的API
  8.     来获取的,目前,Ceilometer提供了Libvirt的API和Hyper-V的API。
  9.     """  
  10.     service.prepare_service()  
  11.     os_service.launch(compute_manager.AgentManager()).wait()  
复制代码


1 服务ceilometer-agent-compute的初始化操作


服务ceilometer-agent-compute的初始化操作主要实现了以下内容的操作:

(1)根据指定参数获取命名空间ceilometer.poll.compute,获取与ceilometer.poll.compute相匹配的所有插件,并加载;ceilometer.poll.compute所指定的插件描述了如何获取收集所监控的虚拟机实例相关的监控采样数据;


(2)根据指定参数获取命名空间ceilometer.discover,获取与ceilometer.discover相匹配的所有插件,并加载;ceilometer.discover所指定的插件描述了如何发现主机上的监控的虚拟机。
ceilometer.discover = local_instances = ceilometer.compute.discovery:InstanceDiscovery


(3)获取管理员操作的上下文环境类的初始化对象;


(4)建立线程池,用于后续服务中若干操作的运行;

(5)加载命名空间ceilometer.compute.virt,描述了获取虚拟机实例的信息(实例数据,CPU数据,网卡数据和磁盘数据等)的方式;

class AgentManager(agent.AgentManager)----def __init__

  1. class AgentManager(agent.AgentManager):  
  2.     def __init__(self):  
  3.          
  4.         super(AgentManager, self).__init__('compute', ['local_instances'])        
  5.         """
  6.         定义获取虚拟机实例的信息(实例数据,CPU数据,网卡数据和磁盘数据等)的方式;
  7.         加载命名空间ceilometer.compute.virt;
  8.         ceilometer.compute.virt =
  9.             libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
  10.             hyperv = ceilometer.compute.virt.hyperv.inspector:HyperVInspector
  11.             vsphere = ceilometer.compute.virt.vmware.inspector:VsphereInspector
  12.         """  
  13.         self._inspector = virt_inspector.get_hypervisor_inspector()  
复制代码

class AgentManager(os_service.Service)----def __init__
  1. class AgentManager(os_service.Service):  
  2.   
  3.     def __init__(self, namespace, default_discovery=[]):  
  4.         super(AgentManager, self).__init__()  
  5.         self.default_discovery = default_discovery  
  6.         """
  7.         加载命名空间ceilometer.poll.compute的所有插件:
  8.         ceilometer.poll.compute =
  9.         disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster
  10.         disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster
  11.         disk.read.bytes = ceilometer.compute.pollsters.disk:ReadBytesPollster
  12.         disk.write.bytes = ceilometer.compute.pollsters.disk:WriteBytesPollster
  13.         disk.read.requests.rate = ceilometer.compute.pollsters.disk:ReadRequestsRatePollster
  14.         disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster
  15.         disk.read.bytes.rate = ceilometer.compute.pollsters.disk:ReadBytesRatePollster
  16.         disk.write.bytes.rate = ceilometer.compute.pollsters.disk:WriteBytesRatePollster
  17.         cpu = ceilometer.compute.pollsters.cpu:CPUPollster
  18.         cpu_util = ceilometer.compute.pollsters.cpu:CPUUtilPollster
  19.         network.incoming.bytes = ceilometer.compute.pollsters.net:IncomingBytesPollster
  20.         network.incoming.packets = ceilometer.compute.pollsters.net:IncomingPacketsPollster
  21.         network.outgoing.bytes = ceilometer.compute.pollsters.net:OutgoingBytesPollster
  22.         network.outgoing.packets = ceilometer.compute.pollsters.net:OutgoingPacketsPollster
  23.         network.incoming.bytes.rate = ceilometer.compute.pollsters.net:IncomingBytesRatePollster
  24.         network.outgoing.bytes.rate = ceilometer.compute.pollsters.net:OutgoingBytesRatePollster
  25.         instance = ceilometer.compute.pollsters.instance:InstancePollster
  26.         instance_flavor = ceilometer.compute.pollsters.instance:InstanceFlavorPollster
  27.         memory.usage = ceilometer.compute.pollsters.memory:MemoryUsagePollster
  28.         """  
  29.         self.pollster_manager = self._extensions('poll', namespace)  
  30.   
  31.         """
  32.         加载命名空间所有插件ceilometer.discover:
  33.         ceilometer.discover =
  34.         local_instances = ceilometer.compute.discovery:InstanceDiscovery
  35.         """  
  36.         self.discovery_manager = self._extensions('discover')  
  37.         self.context = context.RequestContext('admin', 'admin', is_admin=True)  
复制代码

class AgentManager(os_service.Service)----def _extensions
  1. def _extensions(category, agent_ns=None):   
  2.     """  
  3.     根据指定参数获取命名空间namespace,  
  4.     获取与namespace相匹配的所有插件,并加载;  
  5.     """   
  6.     namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns else 'ceilometer.%s' % category)   
  7.             
  8.     """  
  9.     获取与namespace相匹配的所有插件,并加载;  
  10.     """   
  11.     return extension.ExtensionManager(   
  12.         namespace=namespace,   
  13.         invoke_on_load=True,   
  14.     )   
复制代码

class Service(object)----def __init__
  1. class Service(object):   
  2.     """Service object for binaries running on hosts."""   
  3.     def __init__(self, threads=1000):   
  4.         self.tg = threadgroup.ThreadGroup(threads)   
  5.    
  6.         # signal that the service is done shutting itself down:   
  7.         self._done = event.Event()  
复制代码


2 服务ceilometer-agent-compute的启动操作


这里服务ceilometer-agent-compute与服务ceilometer-agent-central的启动操作的实现代码是完全一致的,只不过所获取的监控任务不同而已,所调用的采集监控项采样数据的方法不同而已;


服务ceilometer-agent-compute的启动操作周期性地实现以下任务:


(1)遍历任务(通道),获取每个任务指定获取的监控项的采样数据;


(2)针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列,其中实现采样数据发布的方式有三种,即RPC/UDP/FILE;

    其中,RPC将会发布相关消息到消息队列,后续的collector组件服务将会监听相应的消息队列来获取这些数据信息;UDP将会建立socket建立一个信息通道,实现发送相关消息数据,而后续的collector组件服务将会通过这个信息通道接收相关的消息数据;FILE将会直接保存相关消息数据到指定的日志文件中。

class AgentManager(os_service.Service)----def start

  1. class AgentManager(os_service.Service):   
  2.     def start(self):   
  3.         self.pipeline_manager = pipeline.setup_pipeline()   
  4.         for interval,task in self.setup_polling_tasks().iteritems():   
  5.             self.tg.add_timer(interval,   
  6.                               self.interval_task,   
  7.                               task=task)   
复制代码

class AgentManager(os_service.Service)----def interval_task
  1. def interval_task(task):   
  2.     task.poll_and_publish()  
复制代码

class PollingTask(object)----def poll_and_publish
  1. class AgentManager(os_service.Service):  
  2.     def poll_and_publish(self):  
  3.         """
  4.         创建轮徇的任务;
  5.         任务以一定时间间隔周期性地进行;
  6.          
  7.         遍历任务(通道),获取每个任务指定获取的监控项的采样数据;
  8.         针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列;
  9.         """  
  10.         """
  11.         通过nova客户端访问nova数据库,获取本地主机上所有的虚拟机实例;
  12.         ""
  13.         agent_resources = self.manager.discover()
  14.         with self.publish_context as publisher:
  15.             cache = {}     
  16.             """  
  17.             遍历任务(通道);  
  18.             获取每个任务指定获取的监控项的采样数据;  
  19.             """
  20.             for pollster in self.pollsters:
  21.                 key = pollster.name
  22.                 LOG.info(_("Polling pollster %s"), key)
  23.                 source_resources = list(self.resources[key].resources)
  24.                 try:
  25.                     """  
  26.                     get_samples:获取nova管理下虚拟机实例的某一监控项的采样数据;  
  27.                     注:这些信息大部分是通过调用Hypervisor的API来获取的,Ceilometer提供了Libvirt的API和Hyper-V的API。  
  28.                     class CPUPollster(plugin.ComputePollster)----def get_samples(self, manager, cache, resources)  
  29.                     class CPUUtilPollster(plugin.ComputePollster)----def get_samples(self, manager, cache, resources)  
  30.                     class _Base(plugin.ComputePollster)----def get_samples(self, manager, cache, resources)  
  31.                     class _DiskRatesPollsterBase(plugin.ComputePollster)----def get_samples(self, manager, cache, resources)  
  32.                     class InstancePollster(plugin.ComputePollster)----def get_samples(manager, cache, resources)  
  33.                     class InstanceFlavorPollster(plugin.ComputePollster)----def get_samples(manager, cache, resources)  
  34.                     class MemoryUsagePollster(plugin.ComputePollster)----def get_samples(self, manager, cache, resources)  
  35.                     class _Base(plugin.ComputePollster)----def get_samples(self, manager, cache, resources)  
  36.                     """
  37.                     samples = list(pollster.obj.get_samples(
  38.                         self.manager,
  39.                         cache,
  40.                         resources=source_resources or agent_resources,
  41.                     ))
  42.                      
  43.                     """  
  44.                     实现发布监控项采样数据样本;  
  45.                     """  
  46.                     publisher(samples)  
  47.                 except Exception as err:  
  48.                     LOG.warning(_(  
  49.                         'Continue after error from %(name)s: %(error)s')  
  50.                         % ({'name': pollster.name, 'error': err}),  
  51.                         exc_info=True)  
复制代码


方法小结:

本方法周期性地执行以下操作:


(1)遍历任务(通道),获取每个任务指定获取的监控项的采样数据;

(2)针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列;


class PublishContext----def __enter__


在上述代码中,发布监控项采样样本数据是由这个方法实现的,这个方法主要实现了以下内容:

实现发布监控项采样数据样本,其中实现采样数据发布的方式有三种,即RPC/UDP/FILE;


(1)class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
实现发布采样数据到一个日志文件;


(2)class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);


实现通过RPC发布采样数据;
* 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;


* 将之前从采样数据中提取的信息meters包装成msg;


* 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;


* 实现发布本地队列local_queue中的所有数据信息到队列metering中;


* 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会执行方法record_metering_data,实现存储到数据存储系统中(数据库);

(3)class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
通过UDP发布采样数据;

  1. class PublishContext(object):   
  2.     def __enter__(self):   
  3.         """  
  4.         实现发布监控项采样数据样本;  
  5.         publish_samples:  
  6.         1.class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);  
  7.         实现发布采样数据到一个日志文件;  
  8.         2.class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);  
  9.         通过RPC发布采样数据;  
  10.         * 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;  
  11.         * 将之前从采样数据中提取的信息meters包装成msg;  
  12.         * 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;  
  13.         * 实现发布本地队列local_queue中的所有数据信息到队列metering中;  
  14.         * 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会  
  15.           执行方法record_metering_data,实现存储到数据存储系统中(数据库);  
  16.         3.class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)  
  17.         通过UDP发布采样数据;  
  18.         """   
  19.         def p(samples):   
  20.             for p in self.pipelines:   
  21.                 p.publish_samples(self.context,   
  22.                                   samples)   
  23.         return p  
复制代码

2.1 实现发布采样数据到一个日志文件
  1. class FilePublisher(publisher.PublisherBase):   
  2.     def publish_samples(self, context, samples):   
  3.         if self.publisher_logger:   
  4.             for sample in samples:   
  5.                 self.publisher_logger.info(sample.as_dict())  
复制代码

2.2 通过RPC发布采样数据(具体见代码注释)
  1. class RPCPublisher(publisher.PublisherBase):   
  2.     def publish_samples(self, context, samples):   
  3.         """  
  4.         通过RPC发布信息;  
  5.         1.从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;  
  6.         2.将之前从采样数据中提取的信息meters包装成msg;  
  7.         3.将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;  
  8.         4.实现发布本地队列local_queue中的所有数据信息到队列metering中;  
  9.         5.其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会  
  10.           执行方法record_metering_data,实现存储到数据存储系统中(数据库);  
  11.         """   
  12.    
  13.         # 从若干采样数据信息中获取提取数据形成信息格式,为信息的发布或存储做准备;   
  14.         meters = [   
  15.             # meter_message_from_counter:   
  16.             # 为一个监控采样数据做好准备被发布或存储;   
  17.             # 从一个采样数据信息中获取提取信息形成msg;   
  18.             utils.meter_message_from_counter(   
  19.                 sample,   
  20.                 cfg.CONF.publisher.metering_secret)   
  21.             for sample in samples   
  22.         ]   
  23.    
  24.         # cfg.CONF.publisher_rpc.metering_topic:metering messages所使用的主题,默认为metering;   
  25.         topic = cfg.CONF.publisher_rpc.metering_topic   
  26.             
  27.         # 将之前从采样数据中提取的信息meters包装成msg;   
  28.         msg = {   
  29.             'method': self.target,   
  30.             'version': '1.0',   
  31.             'args': {'data': meters},   
  32.         }   
  33.             
  34.         # 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;   
  35.         self.local_queue.append((context, topic, msg))   
  36.    
  37.         if self.per_meter_topic:   
  38.             for meter_name, meter_list in itertools.groupby(   
  39.                     sorted(meters, key=operator.itemgetter('counter_name')),   
  40.                     operator.itemgetter('counter_name')):   
  41.                 msg = {   
  42.                     'method': self.target,   
  43.                     'version': '1.0',   
  44.                     'args': {'data': list(meter_list)},   
  45.                 }   
  46.                 topic_name = topic + '.' + meter_name   
  47.                 LOG.audit(_('Publishing %(m)d samples on %(n)s') % (   
  48.                           {'m': len(msg['args']['data']), 'n': topic_name}))   
  49.                 self.local_queue.append((context, topic_name, msg))   
  50.    
  51.         # 实现发布本地队列local_queue中的所有数据信息;   
  52.         self.flush()   
复制代码
  1. def flush(self):   
  2.     """  
  3.     实现发布本地队列中的所有数据信息;  
  4.     """            
  5.     # 获取本地队列的数据信息;   
  6.     queue = self.local_queue   
  7.     self.local_queue = []   
  8.             
  9.     # 实现循环发布队列queue中的信息数据;   
  10.     self.local_queue = self._process_queue(queue, self.policy) + \self.local_queue   
  11.             
  12.     if self.policy == 'queue':   
  13.         self._check_queue_length()
复制代码
  1. @staticmethod   
  2. def _process_queue(queue, policy):   
  3.     """  
  4.     实现循环发布队列queue中的信息数据;  
  5.     """   
  6.     while queue:   
  7.         # 取出第一位的topic、msg等数据;   
  8.         context, topic, msg = queue[0]   
  9.         try:   
  10.             # 实现远程发布信息,不返回任何值;   
  11.             rpc.cast(context, topic, msg)   
  12.         except (SystemExit, rpc.common.RPCException):   
  13.             samples = sum([len(m['args']['data']) for n, n, m in queue])   
  14.             if policy == 'queue':   
  15.                 LOG.warn(_("Failed to publish %d samples, queue them"),samples)   
  16.                 return queue   
  17.             elif policy == 'drop':   
  18.                 LOG.warn(_("Failed to publish %d samples, dropping them"),samples)   
  19.                 return []   
  20.             # default, occur only if rabbit_max_retries > 0   
  21.             raise   
  22.         else:   
  23.             # 从队列中删除发布后的信息;   
  24.             queue.pop(0)   
  25.     return []   
复制代码

2.3 通过UDP发布采样数据(具体见代码注释)
  1. class UDPPublisher(publisher.PublisherBase):   
  2.     def publish_samples(self, context, samples):   
  3.         """  
  4.         通过UDP协议发送meter信息到服务器端,实现监控信息的发布;  
  5.         """   
  6.         for sample in samples:   
  7.             """  
  8.             为一个监控采样数据做好准备被发布或存储;  
  9.             从一个采样数据信息中获取提取信息形成msg;  
  10.             """   
  11.             msg = utils.meter_message_from_counter(   
  12.                 sample,   
  13.                 cfg.CONF.publisher.metering_secret)   
  14.             host = self.host   
  15.             port = self.port   
  16.             LOG.debug(_("Publishing sample %(msg)s over UDP to "   
  17.                         "%(host)s:%(port)d") % {'msg': msg, 'host': host,'port': port})   
  18.             """  
  19.             通过UDP协议发送meter信息到服务器端,实现监控信息的发布;  
  20.             """   
  21.             try:   
  22.                 self.socket.sendto(msgpack.dumps(msg),(self.host, self.port))   
  23.             except Exception as e:   
  24.                 LOG.warn(_("Unable to send sample over UDP"))   
  25.                 LOG.exception(e)  
复制代码








Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动


欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条