分享

OpenStack Nova-scheduler组件的源码解析(2)

tntzbzc 发表于 2014-11-19 00:01:38 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 31783
问题导读
1.host_state.update_from_compute_node(compute)这条语句实现了什么功能?
2.哪一个函数循环实现了为每一个实例获取合适的主机后,返回选择的主机列表?
3._schedule实现有哪三步?





上一篇:
OpenStack Nova-scheduler组件的源码解析(1)

这篇博客中,我会针对建立虚拟机实例的请求,来解析Nova调度器选取最优主机节点的过程。
首先来看方法/nova/scheduler/manager.py----def run_instance:
  1. def run_instance(self, context, request_spec, admin_password,
  2.             injected_files, requested_networks, is_first_time,
  3.             filter_properties):
  4.         """
  5.         在驱动上尝试调用schedule_run_instance;
  6.         当引发异常的时候设置实例vm_state为ERROR;
  7.         context:上下文信息;
  8.         request_spec:请求规范;
  9.         admin_password:admin用户密码;
  10.         injected_files:注入的文件;
  11.         requested_networks:请求的网络信息;
  12.         is_first_time:标志是否是第一次;
  13.         filter_properties:过滤器属性信息;
  14.         """
  15.         
  16.         # 获取要运行的实例UUID值;
  17.         instance_uuids = request_spec['instance_uuids']
  18.         # EventReporter:上下文管理类;
  19.         # 获取EventReporter类的对象;
  20.         with compute_utils.EventReporter(context, conductor_api.LocalAPI(),
  21.                                          'schedule', *instance_uuids):
  22.             
  23.             # schedule_run_instance这个方法在/nova/scheduler/chance.py和/nova/scheduler/filter_scheduler.py中都有定义;
  24.             # 这里具体调用的是哪一个方法,要看driver的定义;
  25.             # 这个类的初始化方法中定义了self.driver = importutils.import_object(scheduler_driver);
  26.             # 也就是说,调用哪一个方法,是由scheduler_driver来具体指定的;
  27.             # 初始化类中具体定义了,如果scheduler_driver没有被定义,则采用配置参数来给它赋值;
  28.             # 也就是scheduler_driver = CONF.scheduler_driver;
  29.             # 而配置参数CONF.scheduler_driver默认的值是nova.scheduler.filter_scheduler.FilterScheduler;
  30.             # 从而可以知道,当没有指明scheduler_driver的值时,默认调用/nova/scheduler/filter_scheduler.py中的schedule_run_instance方法;
  31.             # 这里我会把两个方法都分别跟踪进行分析的;
  32.             
  33.             # /nova/scheduler/filter_scheduler.py中的schedule_run_instance方法完成了以下的操作;
  34.             # 获取要建立实例的数目;
  35.             # 循环每一个实例;
  36.             # 获取分配给这个实例的主机;
  37.             # 拨备创建实例请求的各种资源;
  38.             # 远程发送运行实例的消息到队列;
  39.             # 发布调度运行实例结束这个事件的通知;  
  40.             
  41.             # /nova/scheduler/chance.py中的schedule_run_instance方法完成了以下的操作;
  42.             # 循环遍历每个实例;
  43.             # 为每个实例随机挑选一个主机;
  44.             # 在一个实例上设置给定的属性并更新实例相关的数据;   
  45.             # 远程发送建立实例的消息到队列;
  46.             
  47.             #注:研究一下这两个方法的实现有什么不同;
  48.             #研究这两个方法之后会对很多东西学的明白许多;
  49.             try:
  50.                 # driver = nova.scheduler.filter_scheduler.FilterScheduler
  51.                 return self.driver.schedule_run_instance(context,
  52.                         request_spec, admin_password, injected_files,
  53.                         requested_networks, is_first_time, filter_properties)
  54.             except exception.NoValidHost as ex:
  55.                 # don't re-raise
  56.                 self._set_vm_state_and_notify('run_instance',
  57.                                               {'vm_state': vm_states.ERROR,
  58.                                               'task_state': None},
  59.                                               context, ex, request_spec)
  60.             except Exception as ex:
  61.                 with excutils.save_and_reraise_exception():
  62.                     self._set_vm_state_and_notify('run_instance',
  63.                                                   {'vm_state': vm_states.ERROR,
  64.                                                   'task_state': None},
  65.                                                   context, ex, request_spec)
复制代码




我们来看语句return self.driver.schedule_run_instance(context,request_spec, admin_password, injected_files,requested_networks, is_first_time, filter_properties)
来看这个方法所处的类SchedulerManager的初始化方法:


  1. class SchedulerManager(manager.Manager):
  2.     """
  3.     选择一个主机来运行实例;
  4.     """
  5.     RPC_API_VERSION = '2.6'
  6.     def __init__(self, scheduler_driver=None, *args, **kwargs):
  7.         
  8.         # 如果scheduler_driver没有定义,则赋值CONF.scheduler_driver给scheduler_driver;
  9.         # scheduler_driver:这个配置参数默认值为nova.scheduler.filter_scheduler.FilterScheduler;
  10.         if not scheduler_driver:
  11.             scheduler_driver = CONF.scheduler_driver
  12.         
  13.         # import_object:导入一个类,然后返回这个类的实例对象;
  14.         # 导入scheduler_driver指定的类;
  15.         # 例如scheduler_driver的值为nova.scheduler.filter_scheduler.FilterScheduler;
  16.         # 则导入nova.scheduler.filter_scheduler.FilterScheduler这个类;
  17.         # 这样也就实现了类的动态的导入;
  18.         self.driver = importutils.import_object(scheduler_driver)
  19.         super(SchedulerManager, self).__init__(*args, **kwargs)
复制代码




再来看scheduler_driver的定义:

  1. scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
  2.         default='nova.scheduler.filter_scheduler.FilterScheduler',
  3.         help='Default driver to use for the scheduler')
复制代码




我们可以看到调度器所应用的选取主机方法是动态导入的,默认是应用nova.scheduler.filter_scheduler.FilterScheduler类所定义的基于主机过滤器的调度器方法。
再来看方法schedule_run_instance的代码实现:

  1. def schedule_run_instance(self, context, request_spec,
  2.                               admin_password, injected_files,
  3.                               requested_networks, is_first_time,
  4.                               filter_properties):
  5.         """
  6.         这个方法被从nova.compute.api调用,来提供实例;
  7.         返回创建的实例的列表;
  8.                      
  9.         获取要建立实例的数目;
  10.         循环每一个实例,获取分配给这个实例的主机;
  11.         拨备创建实例请求的各种资源;
  12.         远程发送运行实例的消息到队列;
  13.         发布调度运行实例结束这个事件的通知;   
  14.         """
  15.         # request_spec:请求规范;
  16.         # 这个参数的格式字典化;
  17.         payload = dict(request_spec=request_spec)
  18.         
  19.         # notifier.publisher_id("scheduler"):获取信息发布者ID;
  20.         # 返回值为"scheduler.host";
  21.         # notify:使用指定的驱动程序发布通知;(这个方法需要进一步分析它的具体实现过程)
  22.         notifier.notify(context, notifier.publisher_id("scheduler"),'scheduler.run_instance.start', notifier.INFO, payload)
  23.         # 从request_spec当中获取instance_uuids值;
  24.         instance_uuids = request_spec.pop('instance_uuids')
  25.         # 获取要建立实例的数目;
  26.         num_instances = len(instance_uuids)
  27.         LOG.debug(_("Attempting to build %(num_instances)d instance(s)") % locals())
  28.         # 返回满足要求规格的主机列表;
  29.         # 循环为每一个实例获取合适的主机后,返回选择的主机列表;
  30.         weighed_hosts = self._schedule(context, request_spec, filter_properties, instance_uuids)
  31.         # @@@这个语句没有弄明白,后面要继续看一下;
  32.         filter_properties.pop('context', None)
  33.         # 循环遍历instance_uuids;
  34.         for num, instance_uuid in enumerate(instance_uuids):
  35.             request_spec['instance_properties']['launch_index'] = num
  36.             try:
  37.                 # 获取分配给这个实例的主机;
  38.                 try:
  39.                     weighed_host = weighed_hosts.pop(0)
  40.                 except IndexError:
  41.                     raise exception.NoValidHost(reason="")
  42.                 # context:上下文信息;
  43.                 # weighed_host:获取分配给这个实例的主机;
  44.                 # request_spec:请求规范;
  45.                 # filter_properties:过滤器属性信息;
  46.                 # requested_networks:请求的网络信息;
  47.                 # injected_files:注入文件;
  48.                 # admin_password:admin密码;
  49.                 # is_first_time:标志是否是第一次;
  50.                 # instance_uuid:每个实例的UUID;
  51.                 # _provision_resource:在这个区域内拨备创建请求的各种资源;
  52.                 # 远程发送建立实例的消息到队列;
  53.                 self._provision_resource(context, weighed_host,
  54.                                          request_spec,
  55.                                          filter_properties,
  56.                                          requested_networks,
  57.                                          injected_files, admin_password,
  58.                                          is_first_time,
  59.                                          instance_uuid=instance_uuid)
  60.             except Exception as ex:
  61.                 driver.handle_schedule_error(context, ex, instance_uuid, request_spec)
  62.             retry = filter_properties.get('retry', {})
  63.             retry['hosts'] = []
  64.         # @@@notify:使用指定的驱动程序发布通知;
  65.         # 应该是发布调度运行实例结束这个事件的通知;
  66.         notifier.notify(context, notifier.publisher_id("scheduler"),
  67.                         'scheduler.run_instance.end', notifier.INFO, payload)
复制代码




我们比较关注语句weighed_hosts = self._schedule(context, request_spec, filter_properties, instance_uuids),这条语句通过调用方法_schedule实现了循环为每一个实例获取合适的主机后,返回可用的主机列表。具体来看方法_schedule的代码实现:


  1. def _schedule(self, context, request_spec, filter_properties, instance_uuids=None):
  2.         """
  3.         返回满足要求规格的主机列表;
  4.         循环为每一个实例获取合适的主机后,返回选择的主机列表;
  5.         
  6.         对主机进行过滤;
  7.         FilterManager类继承自Scheduler类;
  8.         在Scheduler类的初始化中,加载了所有可用的filter类;
  9.         根据配置文件中scheduler_default_filters字段的定义选择默认使用的一个或多个filter;
  10.         依次对每个主机调用filter类的host_passes()方法,如果返回都为True,则主机通过过滤;
  11.         
  12.         对所有通过过滤的主机计算权值;
  13.         """
  14.         # elevated:返回带有admin标志设置的context的版本;
  15.         elevated = context.elevated()
  16.         # 获取实例属性信息(instance_properties);
  17.         instance_properties = request_spec['instance_properties']
  18.         # 获取实例类型信息(instance_type);
  19.         instance_type = request_spec.get("instance_type", None)
  20.         update_group_hosts = False
  21.         # 获取scheduler_hints;
  22.         scheduler_hints = filter_properties.get('scheduler_hints') or {}
  23.         # 获取group信息;
  24.         group = scheduler_hints.get('group', None)
  25.         if group:
  26.             # group_hosts:获取group中所有具有经过过滤后符合过滤条件的主机的列表;
  27.             group_hosts = self.group_hosts(elevated, group)
  28.             # 表示已经更新过group_hosts;
  29.             update_group_hosts = True
  30.             # 如果filter_properties中不包含'group_hosts',则增加'group_hosts'到filter_properties中;
  31.             if 'group_hosts' not in filter_properties:
  32.                 filter_properties.update({'group_hosts': []})
  33.                
  34.             # 获取filter_properties中原有的'group_hosts';
  35.             # 把两部分'group_hosts'加在一起;(@@@应该都是符合条件的,自己的理解;)
  36.             configured_hosts = filter_properties['group_hosts']
  37.             filter_properties['group_hosts'] = configured_hosts + group_hosts
  38.         # 获取配置选项;
  39.         config_options = self._get_configuration_options()
  40.         
  41.         # 要建立实例的属性信息对象拷贝;
  42.         properties = instance_properties.copy()
  43.         
  44.         # 从instance_uuids获取properties['uuid'];
  45.         if instance_uuids:
  46.             properties['uuid'] = instance_uuids[0]
  47.         # 记录尝试次数;
  48.         self._populate_retry(filter_properties, properties)
  49.         # 更新过滤器属性信息;
  50.         filter_properties.update({'context': context,
  51.                                   'request_spec': request_spec,
  52.                                   'config_options': config_options,
  53.                                   'instance_type': instance_type})
  54.         # 从request_spec获取有用的信息,填充到过滤器属性当中;
  55.         # 分别是filter_properties['os_type']和filter_properties['project_id'];
  56.         self.populate_filter_properties(request_spec, filter_properties)
  57.         # 寻找可以接受的本地主机列表通过对我们的选项进行反复的过滤和称重;
  58.         # 这里我们使用了迭代,所以只遍历了一次这个列表;
  59.         # 获取并返回HostStates列表;
  60.         
  61.         # 过滤掉不满足要求的主机;
  62.         hosts = self.host_manager.get_all_host_states(elevated)
  63.         selected_hosts = []
  64.         
  65.         # 获取要建立的实例数目;
  66.         if instance_uuids:
  67.             num_instances = len(instance_uuids)
  68.         else:
  69.             num_instances = request_spec.get('num_instances', 1)
  70.             
  71.         # 遍历num_instances个实例,为每个实例选取合适的运行主机;
  72.         for num in xrange(num_instances):
  73.             # Filter local hosts based on requirements ...
  74.             # 基于具体要求过滤本地主机;
  75.             # get_filtered_hosts:过滤主机,返回那些通过所有过滤器的主机;
  76.             hosts = self.host_manager.get_filtered_hosts(hosts, filter_properties)
  77.             if not hosts:
  78.                 break
  79.             LOG.debug(_("Filtered %(hosts)s") % locals())
  80.             
  81.             # 对主机进行称重;
  82.             # 获取并返回一个WeighedObjects的主机排序列表(最高分排在第一);
  83.             weighed_hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties)
  84.             # scheduler_host_subset_size:这个参数定义了新的实例将会被调度到一个主机上,这个主机是随机的从最好的(分数最高的)N个主机组成的子集中选择出来的;
  85.             # 这个参数定义了这个子集的大小,供选择最好的主机使用;
  86.             # 如果值为1,则由称重函数返回第一个主机;
  87.             # 这个值至少为1,任何小于1的值都会被忽略,而且会被1所代替;
  88.             # 这个参数的默认值为1;
  89.             scheduler_host_subset_size = CONF.scheduler_host_subset_size
  90.             if scheduler_host_subset_size > len(weighed_hosts):
  91.                 scheduler_host_subset_size = len(weighed_hosts)
  92.             if scheduler_host_subset_size
复制代码



在这个方法的实现过程中,主要就是三步:
1.语句:hosts = self.host_manager.get_all_host_states(elevated)
实现了过滤掉不可用的主机节点,获取可用的主机节点列表;
2 语句:hosts = self.host_manager.get_filtered_hosts(hosts, filter_properties)
针对建立一个虚拟机实例的要求,实现了用系统指定的过滤器对上面获取的可用主机节点列表进行过滤,进一步得到符合过滤器要求的主机列表;
3 语句:weighed_hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties)
实现了对过滤后的主机列表中的每一个主机节点进行称重操作,选取某个标准下最优的主机节点作为建立虚拟机实例的目标主机;
好了,具体来对这些语句进行解析。
1.hosts = self.host_manager.get_all_host_states(elevated)
具体来看方法get_all_host_states的代码实现:



  1. def get_all_host_states(self, context):
  2.         """
  3.         获取并返回HostStates列表;
  4.         HostStates代表了所有HostManager知道的主机;
  5.         另外,HostState中的每个consumable resources都是预填充的;
  6.         基于数据库中的数据进行调整;      
  7.         过滤掉不满足要求的主机;     
  8.         """
  9.         # 获取可用计算节点的资源使用情况;
  10.         # 获取所有computeNodes(计算节点);
  11.         compute_nodes = db.compute_node_get_all(context)
  12.         # 集合;
  13.         seen_nodes = set()
  14.         for compute in compute_nodes:
  15.             service = compute['service']
  16.             if not service:
  17.                 LOG.warn(_("No service for compute ID %s") % compute['id'])
  18.                 continue
  19.             # 获取主机host;
  20.             host = service['host']
  21.             # 获取hypervisor_hostname作为节点名;
  22.             node = compute.get('hypervisor_hostname')
  23.             # 组成state_key;
  24.             state_key = (host, node)
  25.             # 获取capabilities;
  26.             capabilities = self.service_states.get(state_key, None)
  27.             # 获取host_state;
  28.             host_state = self.host_state_map.get(state_key)
  29.             
  30.             # 更新只读的capabilities字典;
  31.             if host_state:
  32.                 host_state.update_capabilities(capabilities, dict(service.iteritems()))
  33.             # @@@跟踪一台主机的可变和不可变的信息;
  34.             # 还试图删除以前使用的数据结构,并锁定访问;
  35.             else:
  36.                 host_state = self.host_state_cls(host, node,capabilities=capabilities,service=dict(service.iteritems()))
  37.                 self.host_state_map[state_key] = host_state
  38.                
  39.             # update_from_compute_node:从compute信息更新它的主机信息;
  40.             host_state.update_from_compute_node(compute)
  41.             # 更新集合seen_nodes;
  42.             seen_nodes.add(state_key)
  43.         # 从host_state_map中删除状态不是活跃的计算节点;
  44.         # 获取并删除状态不活跃的计算节点;
  45.         dead_nodes = set(self.host_state_map.keys()) - seen_nodes
  46.         for state_key in dead_nodes:
  47.             host, node = state_key
  48.             LOG.info(_("Removing dead compute node %(host)s:%(node)s "
  49.                        "from scheduler") % locals())
  50.             del self.host_state_map[state_key]
  51.         return self.host_state_map.itervalues()
复制代码






1.1 compute_nodes = db.compute_node_get_all(context)
这条语句实现了从数据库获取所有computeNodes(计算节点),具体来看方法compute_node_get_all的代码实现:
  1. def compute_node_get_all(context):
  2.     """
  3.     通过查询数据库所有所有ComputeNode(数据节点);
  4.     """
  5.     return model_query(context, models.ComputeNode).\
  6.             options(joinedload('service')).\
  7.             options(joinedload('stats')).\
  8.             all()
复制代码


1.2 host_state = self.host_state_cls(host, node,capabilities=capabilities,service=dict(service.iteritems()))
这条语句实现了初始化主机的一些参数,具体来看代码:
  1. host_state_cls = HostState
  2. class HostState(object):
  3.     def __init__(self, host, node, capabilities=None, service=None):
  4.         self.host = host
  5.         self.nodename = node
  6.         self.update_capabilities(capabilities, service)
  7.         # Mutable available resources.
  8.         # These will change as resources are virtually "consumed".
  9.         self.total_usable_disk_gb = 0
  10.         self.disk_mb_used = 0
  11.         self.free_ram_mb = 0
  12.         self.free_disk_mb = 0
  13.         self.vcpus_total = 0
  14.         self.vcpus_used = 0
  15.         # Valid vm types on this host: 'pv', 'hvm' or 'all'
  16.         if 'allowed_vm_type' in self.capabilities:
  17.             self.allowed_vm_type = self.capabilities['allowed_vm_type']
  18.         else:
  19.             self.allowed_vm_type = 'all'
  20.         # Additional host information from the compute node stats:
  21.         self.vm_states = {}
  22.         self.task_states = {}
  23.         self.num_instances = 0
  24.         self.num_instances_by_project = {}
  25.         self.num_instances_by_os_type = {}
  26.         self.num_io_ops = 0
  27.         # Resource oversubscription values for the compute host:
  28.         self.limits = {}
  29.         self.updated = None
复制代码


1.3 host_state.update_from_compute_node(compute)
这条语句实现了从compute信息更新主机信息,具体来看代码:
  1. def update_from_compute_node(self, compute):
  2.         """
  3.         从compute_node信息更新它的主机信息;
  4.         """
  5.         
  6.         # 若果已经更新过,则直接返回;
  7.         if (self.updated and compute['updated_at'] and self.updated > compute['updated_at']):
  8.             return
  9.         # 获取all_ram_mb;
  10.         all_ram_mb = compute['memory_mb']
  11.         # 假设如果使用qcow2磁盘,则虚拟磁盘大小都被实例所消耗;
  12.         least = compute.get('disk_available_least')
  13.         # 获取剩余的磁盘空间free_disk_mb;
  14.         free_disk_mb = least if least is not None else compute['free_disk_gb']
  15.         free_disk_mb *= 1024
  16.         # 获取使用的磁盘空间;
  17.         self.disk_mb_used = compute['local_gb_used'] * 1024
  18.         # free_ram_mb可以是负值;
  19.         self.free_ram_mb = compute['free_ram_mb']
  20.         # 获取total_usable_ram_mb;
  21.         self.total_usable_ram_mb = all_ram_mb
  22.         # 获取total_usable_disk_gb;
  23.         self.total_usable_disk_gb = compute['local_gb']
  24.         self.free_disk_mb = free_disk_mb
  25.         self.vcpus_total = compute['vcpus']
  26.         self.vcpus_used = compute['vcpus_used']
  27.         self.updated = compute['updated_at']
  28.         stats = compute.get('stats', [])
  29.         statmap = self._statmap(stats)
  30.         # 追踪主机上的实例数目;
  31.         self.num_instances = int(statmap.get('num_instances', 0))
  32.         # 通过project_id获取实例数目;
  33.         project_id_keys = [k for k in statmap.keys() if
  34.                 k.startswith("num_proj_")]
  35.         for key in project_id_keys:
  36.             project_id = key[9:]
  37.             self.num_instances_by_project[project_id] = int(statmap[key])
  38.         # 在一定的vm_states中追踪实例的数目;
  39.         vm_state_keys = [k for k in statmap.keys() if k.startswith("num_vm_")]
  40.         for key in vm_state_keys:
  41.             vm_state = key[7:]
  42.             self.vm_states[vm_state] = int(statmap[key])
  43.         # 在一定的task_states中追踪实例的数目;
  44.         task_state_keys = [k for k in statmap.keys() if
  45.                 k.startswith("num_task_")]
  46.         for key in task_state_keys:
  47.             task_state = key[9:]
  48.             self.task_states[task_state] = int(statmap[key])
  49.         # 通过host_type追踪实例数目;
  50.         os_keys = [k for k in statmap.keys() if k.startswith("num_os_type_")]
  51.         for key in os_keys:
  52.             os = key[12:]
  53.             self.num_instances_by_os_type[os] = int(statmap[key])
  54.         # 获取num_io_ops;
  55.         self.num_io_ops = int(statmap.get('io_workload', 0))
复制代码






2.hosts = self.host_manager.get_filtered_hosts(hosts, filter_properties)
具体来看方法get_filtered_hosts的代码实现:
  1. def get_filtered_hosts(self, hosts, filter_properties, filter_class_names=None):
  2.         """
  3.         过滤主机,返回那些通过所有过滤器的主机;
  4.         """
  5.         def _strip_ignore_hosts(host_map, hosts_to_ignore):
  6.             ignored_hosts = []
  7.             for host in hosts_to_ignore:
  8.                 if host in host_map:
  9.                     del host_map[host]
  10.                     ignored_hosts.append(host)
  11.             ignored_hosts_str = ', '.join(ignored_hosts)
  12.             msg = _('Host filter ignoring hosts: %(ignored_hosts_str)s')
  13.             LOG.debug(msg, locals())
  14.         def _match_forced_hosts(host_map, hosts_to_force):
  15.             for host in host_map.keys():
  16.                 if host not in hosts_to_force:
  17.                     del host_map[host]
  18.             if not host_map:
  19.                 forced_hosts_str = ', '.join(hosts_to_force)
  20.                 msg = _("No hosts matched due to not matching 'force_hosts'"
  21.                         "value of '%(forced_hosts_str)s'")
  22.                 LOG.debug(msg, locals())
  23.                 return
  24.             forced_hosts_str = ', '.join(host_map.iterkeys())
  25.             msg = _('Host filter forcing available hosts to '
  26.                     '%(forced_hosts_str)s')
  27.             LOG.debug(msg, locals())
  28.         # 返回经过验证的可用的过滤器;
  29.         filter_classes = self._choose_host_filters(filter_class_names)
  30.         ignore_hosts = filter_properties.get('ignore_hosts', [])
  31.         force_hosts = filter_properties.get('force_hosts', [])
  32.         if ignore_hosts or force_hosts:
  33.             name_to_cls_map = dict([(x.host, x) for x in hosts])
  34.             if ignore_hosts:
  35.                 _strip_ignore_hosts(name_to_cls_map, ignore_hosts)
  36.                 if not name_to_cls_map:
  37.                     return []
  38.             if force_hosts:
  39.                 _match_forced_hosts(name_to_cls_map, force_hosts)
  40.                 # NOTE(vish): Skip filters on forced hosts.
  41.                 if name_to_cls_map:
  42.                     return name_to_cls_map.values()
  43.             hosts = name_to_cls_map.itervalues()
  44.         return self.filter_handler.get_filtered_objects(filter_classes, hosts, filter_properties)
复制代码







2.1 filter_classes = self._choose_host_filters(filter_class_names)
这条语句实现了返回经过验证的可用的过滤器,具体来看方法_choose_host_filters的代码实现:
  1. def _choose_host_filters(self, filter_cls_names):
  2.         """        
  3.         返回经过验证的可用的过滤器;
  4.         """
  5.         # 使用默认过滤器;
  6.         if filter_cls_names is None:
  7.             # CONF.scheduler_default_filters:这个参数定义了当请求中没有指定特定的过滤器类的时候,默认应用的用于过滤主机的过滤器类的名称列表;
  8.             # 参数的默认值为:
  9.             # ['RetryFilter','AvailabilityZoneFilter','RamFilter','ComputeFilter','ComputeCapabilitiesFilter','ImagePropertiesFilter']
  10.             filter_cls_names = CONF.scheduler_default_filters
  11.         if not isinstance(filter_cls_names, (list, tuple)):
  12.             filter_cls_names = [filter_cls_names]
  13.         good_filters = []
  14.         bad_filters = []
  15.         
  16.         # 遍历所有配置的过滤器(我们使用的是默认过滤器);
  17.         for filter_name in filter_cls_names:
  18.             found_class = False
  19.             # 遍历所有可用的过滤器;
  20.             for cls in self.filter_classes:
  21.                 # 如果在可用过滤器中找到配置的过滤器,则认为它可以使用;
  22.                 if cls.__name__ == filter_name:
  23.                     good_filters.append(cls)
  24.                     found_class = True
  25.                     break
  26.             if not found_class:
  27.                 bad_filters.append(filter_name)
  28.         if bad_filters:
  29.             msg = ", ".join(bad_filters)
  30.             raise exception.SchedulerHostFilterNotFound(filter_name=msg)
  31.         return good_filters
复制代码


2.2 return self.filter_handler.get_filtered_objects(filter_classes, hosts, filter_properties)
  1. def get_filtered_objects(self, filter_classes, objs, filter_properties):
  2.     for filter_cls in filter_classes:
  3.         objs = filter_cls().filter_all(objs, filter_properties)
  4.     return list(objs)
复制代码

  1. def filter_all(self, filter_obj_list, filter_properties):
  2.         for obj in filter_obj_list:
  3.             if self._filter_one(obj, filter_properties):
  4.                 yield obj
复制代码
  1. def _filter_one(self, obj, filter_properties):
  2.         """
  3.         如果通过过滤器则返回TRUE,否则返回FALSE;
  4.         """
  5.         return self.host_passes(obj, filter_properties)
复制代码







3. weighed_hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties)
  1. def get_weighed_hosts(self, hosts, weight_properties):
  2.     """
  3.     对主机进行称重;
  4.     返回一个WeighedObjects的主机排序列表(最高分排在第一);
  5.     """      
  6.     # get_weighed_objects:返回一个WeighedObjects的主机排序列表(最高分排在第一);
  7.     return self.weight_handler.get_weighed_objects(self.weight_classes, hosts, weight_properties)
复制代码


  1. # scheduler_weight_classes:这个参数定义了使用哪个称重类来称重主机;
  2. # 默认值是nova.scheduler.weights.all_weighers;
  3. # 从名称列表CONF.scheduler_available_filters获取可装载的类;
  4. # 返回类的列表;
  5. # 所以这条语句的意思就是获取nova.scheduler.weights.all_weighers指定的所有的类;
  6. self.weight_classes = self.weight_handler.get_matching_classes(CONF.scheduler_weight_classes)
复制代码
  1. cfg.ListOpt('scheduler_weight_classes',
  2.             default=['nova.scheduler.weights.all_weighers'],
  3.             help='Which weight class names to use for weighing hosts'),
  4. # 这个参数定义了使用哪个称重类来称重主机;
  5. # 默认值是nova.scheduler.weights.all_weighers;
复制代码


  1. def all_weighers():
  2.     """
  3.     Return a list of weight plugin classes found in this directory.
  4.     """
  5.     # least_cost_functions:这个参数定义了是否应用称重方法LeastCostScheduler;
  6.     # 参数的默认值为None;
  7.     # compute_fill_first_cost_fn_weight:参数的默认值为None;
  8.     if (CONF.least_cost_functions is not None or
  9.             CONF.compute_fill_first_cost_fn_weight is not None):
  10.         LOG.deprecated(('least_cost has been deprecated in favor of the RAM Weigher.'))
  11.         return least_cost.get_least_cost_weighers()
  12.     return HostWeightHandler().get_all_classes()
复制代码



  1. def get_weighed_objects(self, weigher_classes, obj_list, weighing_properties):
  2.     """
  3.     返回一个WeighedObjects的排序列表(最高分排在第一);
  4.     """
  5.     if not obj_list:
  6.         return []
  7.     # object_class = WeighedObject
复制代码


  1. class WeighedObject(object):
  2.     """
  3.     权重信息对象类;
  4.     """
  5.     def __init__(self, obj, weight):
  6.         self.obj = obj
  7.         self.weight = weight
复制代码
  1. def get_least_cost_weighers():
  2.     cost_functions = _get_cost_functions()
  3.     # Unfortunately we need to import this late so we don't have an
  4.     # import loop.
  5.     from nova.scheduler import weights
  6.     class _LeastCostWeigher(weights.BaseHostWeigher):
  7.         def weigh_objects(self, weighted_hosts, weight_properties):
  8.             for host in weighted_hosts:
  9.                 host.weight = sum(weight * fn(host.obj, weight_properties)
  10.                             for weight, fn in cost_functions)
  11.     return [_LeastCostWeigher]
复制代码



   
  1. def _get_cost_functions():
  2.     """
  3.     Returns a list of tuples containing weights and cost functions to
  4.     use for weighing hosts
  5.     """
  6.     cost_fns_conf = CONF.least_cost_functions
  7.     if cost_fns_conf is None:
  8.         # The old default.  This will get fixed up below.
  9.         fn_str = 'nova.scheduler.least_cost.compute_fill_first_cost_fn'
  10.         cost_fns_conf = [fn_str]
  11.     cost_fns = []
  12.     for cost_fn_str in cost_fns_conf:
  13.         short_name = cost_fn_str.split('.')[-1]
  14.         if not (short_name.startswith('compute_') or
  15.                 short_name.startswith('noop')):
  16.             continue
  17.         # Fix up any old paths to the new paths
  18.         if cost_fn_str.startswith('nova.scheduler.least_cost.'):
  19.             cost_fn_str = ('nova.scheduler.weights.least_cost' +
  20.                        cost_fn_str[25:])
  21.         try:
  22.             # NOTE: import_class is somewhat misnamed since
  23.             # the weighing function can be any non-class callable
  24.             # (i.e., no 'self')
  25.             cost_fn = importutils.import_class(cost_fn_str)
  26.         except ImportError:
  27.             raise exception.SchedulerCostFunctionNotFound(
  28.                     cost_fn_str=cost_fn_str)
  29.         try:
  30.             flag_name = "%s_weight" % cost_fn.__name__
  31.             weight = getattr(CONF, flag_name)
  32.         except AttributeError:
  33.             raise exception.SchedulerWeightFlagNotFound(
  34.                     flag_name=flag_name)
  35.         # Set the original default.
  36.         if (flag_name == 'compute_fill_first_cost_fn_weight' and
  37.                 weight is None):
  38.             weight = -1.0
  39.         cost_fns.append((weight, cost_fn))
  40. return cost_fns
复制代码

其实,流程是比较简单的,就是过滤和称重的过程的。



上一篇:
OpenStack Nova-scheduler组件的源码解析(1)













感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn


已有(2)人评论

跳转到指定楼层
tongshan123 发表于 2015-4-22 14:01:34
OpenStack Nova-scheduler组件的源码解析(1)和OpenStack Nova-scheduler组件的源码解析(2)有什么区别?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条