分享

OpenStack建立实例完整过程源码详细分析(9)

xioaxu790 发表于 2014-6-13 10:24:03 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 7779
本帖最后由 xioaxu790 于 2014-6-13 10:28 编辑
问题导读:
1、方法quota_reserve执行的过程分为哪几步?


2、如何删除一个实例?




继续看方法quota_reserve:
  1. def quota_reserve(context, resources, quotas, deltas, expire, until_refresh, max_age, project_id=None):  
  2.       
  3.     elevated = context.elevated()  
  4.     # 获取db_session的session;  
  5.     # get_session:返回一个SQLAlchemy session,若没有定义,则新建一个SQLAlchemy session;  
  6.     session = get_session()  
  7.     with session.begin():  
  8.   
  9.         # 获取context中的project_id;  
  10.         if project_id is None:  
  11.             project_id = context.project_id  
  12.   
  13.         # 从quota_usages表中获得当前工程的各种资源的使用情况;  
  14.         usages = _get_quota_usages(context, session, project_id)  
  15.   
  16.         # Handle usage refresh  
  17.         work = set(deltas.keys())  
  18.         while work:  
  19.             # 任意地从集合work中取出一个元素;  
  20.             resource = work.pop()  
  21.   
  22.             refresh = False  
  23.             if resource not in usages:  
  24.                 usages[resource] = _quota_usage_create(elevated,project_id,resource,0, 0,until_refresh or None,session=session)  
  25.                 refresh = True  
  26.             elif usages[resource].in_use < 0:  
  27.                 refresh = True  
  28.             elif usages[resource].until_refresh is not None:  
  29.                 usages[resource].until_refresh -= 1  
  30.                 if usages[resource].until_refresh <= 0:  
  31.                     refresh = True  
  32.             elif max_age and (usages[resource].updated_at - timeutils.utcnow()).seconds >= max_age:  
  33.                 refresh = True  
  34.   
  35.             # 执行更新(同步)usage;  
  36.             if refresh:  
  37.                 # Grab the sync routine  
  38.                 # 获取同步方法,_sync_*(),这些方法定义在quota模块中,不同的资源有不同的同步方法;  
  39.                 sync = resources[resource].sync  
  40.   
  41.                 # 查询当前正在使用的实时的资源的数据信息;  
  42.                 updates = sync(elevated, project_id, session)  
  43.                 for res, in_use in updates.items():  
  44.                     # 如果试实时使用的资源没有在usages中,那么把它添加进去;  
  45.                     if res not in usages:  
  46.                         usages[res] = _quota_usage_create(elevated,project_id,res,0, 0,until_refresh or None,session=session)  
  47.   
  48.                     # 更新usage中的in_use数据信息;  
  49.                     usages[res].in_use = in_use  
  50.                     # 更新usage中的until_refresh数据信息;  
  51.                     usages[res].until_refresh = until_refresh or None  
  52.   
  53.                     work.discard(res)  
  54.   
  55.         # 检测资源数据中in_use加上delta之后,可能小于0的情况;  
  56.         unders = [resource for resource, delta in deltas.items()  
  57.                   if delta < 0 and  
  58.                   delta + usages[resource].in_use < 0]  
  59.   
  60.         # Now, let's check the quotas  
  61.         # 检测这个resource的hard_limit是否小于in_use+resourced+delta之和;  
  62.         overs = [resource for resource, delta in deltas.items()  
  63.                  if quotas[resource] >= 0 and delta >= 0 and  
  64.                  quotas[resource] < delta + usages[resource].total]  
  65.   
  66.         # Create the reservations  
  67.         # 如果没有超过的话,更新reservations表,再次更新usages  
  68.         if not overs:  
  69.             reservations = []  
  70.             for resource, delta in deltas.items():  
  71.                 # str(uuid.uuid4()):随机获取uuid值,是唯一的;  
  72.                 # usages[resource]:更新过的当前资源在quota_usages表中的使用情况;  
  73.                 reservation = reservation_create(elevated,str(uuid.uuid4()),usages[resource],project_id,resource, delta, expire,session=session)  
  74.                 reservations.append(reservation.uuid)  
  75.   
  76.                 # 更新usages中的reserved值,加上变化值;  
  77.                 if delta > 0:  
  78.                     usages[resource].reserved += delta  
  79.   
  80.         # Apply updates to the usages table  
  81.         # 更新quota_usages表;  
  82.         for usage_ref in usages.values():  
  83.             usage_ref.save(session=session)  
  84.   
  85.     if unders:  
  86.         LOG.warning(_("Change will make usage less than 0 for the following resources: %(unders)s") % locals())  
  87.     if overs:  
  88.         usages = dict((k, dict(in_use=v['in_use'], reserved=v['reserved']))  
  89.                       for k, v in usages.items())  
  90.         raise exception.OverQuota(overs=sorted(overs), quotas=quotas,usages=usages)  
  91.   
  92.     return reservations  
复制代码


    继续来看代码,看看是如何执行refresh更新(同步)操作的;
  1. # 执行更新(同步)usage;  
  2. if refresh:  
  3. # Grab the sync routine  
  4. # 获取同步方法,_sync_*(),这些方法定义在quota模块中,不同的资源有不同的同步方法;  
  5.     sync = resources[resource].sync  
  6.   
  7.     # 查询当前正在使用的实时的资源的数据信息;  
  8.     updates = sync(elevated, project_id, session)  
  9.     for res, in_use in updates.items():  
  10.         # 如果试实时使用的资源没有在usages中,那么把它添加进去;  
  11.         if res not in usages:  
  12.             usages[res] = _quota_usage_create(elevated,project_id,res,0, 0,until_refresh or None,session=session)  
  13.   
  14.         # 更新usage中的in_use数据信息;  
  15.         usages[res].in_use = in_use  
  16.         # 更新usage中的until_refresh数据信息;  
  17.         usages[res].until_refresh = until_refresh or None  
  18.   
  19.         work.discard(res)  
复制代码


    回顾一下,在quota.py中resources的定义:
  1. resources = [   
  2.     ReservableResource('instances', _sync_instances, 'quota_instances'),   
  3.     ReservableResource('cores', _sync_instances, 'quota_cores'),   
  4.     ReservableResource('ram', _sync_instances, 'quota_ram'),   
  5.     ReservableResource('floating_ips', _sync_floating_ips, 'quota_floating_ips'),   
  6.     ReservableResource('fixed_ips', _sync_fixed_ips, 'quota_fixed_ips'),   
  7.     ReservableResource('security_groups', _sync_security_groups, 'quota_security_groups'),   
  8.         
  9.     AbsoluteResource('metadata_items', 'quota_metadata_items'),   
  10.     AbsoluteResource('injected_files', 'quota_injected_files'),   
  11.     AbsoluteResource('injected_file_content_bytes', 'quota_injected_file_content_bytes'),   
  12.     AbsoluteResource('injected_file_path_bytes', 'quota_injected_file_path_bytes'),   
  13.    
  14.     CountableResource('security_group_rules', db.security_group_rule_count_by_group, 'quota_security_group_rules'),   
  15.     CountableResource('key_pairs', db.key_pair_count_by_user, 'quota_key_pairs'),   
  16.     ]   
复制代码


    这些_sync_*就是同步方法,也就是这里通过语句:sync = resources[resource].sync来获取的方法。通过这些方法能够实时查询到工程当前所使用资源的情况,也就能够用于刷新(同步)资源使用信息的操作。
    阅读代码我们可以知道:
    deltas.keys() = ['instances', 'ram', 'cores']
    work = set(deltas.keys())
    resource = work.pop()
    sync = resources[resource].sync
    所以sync只能从'instances', 'ram', 'cores'三个资源中选取对应的同步方法,而这三个资源对应的同步方法都是_sync_instances,进而我们来看看_sync_instances这个方法:
  1. def _sync_instances(context, project_id, session):  
  2.     return dict(zip(('instances', 'cores', 'ram'),  
  3.                     db.instance_data_get_for_project(  
  4.                 context, project_id, session=session)))  
复制代码

    再看方法instance_data_get_for_project:
  1. def instance_data_get_for_project(context, project_id, session=None):  
  2.     result = model_query(context,  
  3.                          func.count(models.Instance.id),  
  4.                          func.sum(models.Instance.vcpus),  
  5.                          func.sum(models.Instance.memory_mb),  
  6.                          base_model=models.Instance,  
  7.                          session=session).\  
  8.                      filter_by(project_id=project_id).\  
  9.                      first()  
  10.     # NOTE(vish): convert None to 0  
  11.     return (result[0] or 0, result[1] or 0, result[2] or 0)  
复制代码

    在来看看类Instance,这个类定义了虚拟机实例信息的数据库表;
  1. class Instance(BASE, NovaBase):  
  2.     """
  3.     表示一个来宾VM的类;
  4.     构建数据表instances的数据结构;
  5.     """  
  6.          
  7.     # 表名instances;  
  8.     __tablename__ = 'instances'  
  9.     injected_files = []  
  10.   
  11.     # 定义数据表instances的字段id;  
  12.     id = Column(Integer, primary_key=True, autoincrement=True)  
  13.     @property  
  14.     def name(self):  
  15.         try:  
  16.             base_name = CONF.instance_name_template % self.id  
  17.         except TypeError:  
  18.             # Support templates like "uuid-%(uuid)s", etc.  
  19.             info = {}  
  20.             # NOTE(russellb): Don't use self.iteritems() here, as it will  
  21.             # result in infinite recursion on the name property.  
  22.             for column in iter(object_mapper(self).columns):  
  23.                 key = column.name  
  24.                 # prevent recursion if someone specifies %(name)s  
  25.                 # %(name)s will not be valid.  
  26.                 if key == 'name':  
  27.                     continue  
  28.                 info[key] = self[key]  
  29.             try:  
  30.                 base_name = CONF.instance_name_template % info  
  31.             except KeyError:  
  32.                 base_name = self.uuid  
  33.         return base_name  
  34.   
  35.     def _extra_keys(self):  
  36.         return ['name']  
  37.   
  38.     user_id = Column(String(255)) # 定义数据表instances的字段user_id;  
  39.     project_id = Column(String(255)) # 定义数据表instances的字段project_id;  
  40.   
  41.     image_ref = Column(String(255)) # 定义数据表instances的字段image_ref;  
  42.     kernel_id = Column(String(255)) # 定义数据表instances的字段kernel_id;  
  43.     ramdisk_id = Column(String(255)) # 定义数据表instances的字段ramdisk_id;  
  44.     hostname = Column(String(255)) # 定义数据表instances的字段hostname;  
  45.   
  46.     launch_index = Column(Integer) # 定义数据表instances的字段lauch_index;  
  47.     key_name = Column(String(255)) # 定义数据表instances的字段key_name;  
  48.     key_data = Column(Text) # 定义数据表instances的字段key_data;  
  49.   
  50.     power_state = Column(Integer) # 定义数据表instances的字段power_state;  
  51.     vm_state = Column(String(255)) # 定义数据表instances的字段vm_state;  
  52.     task_state = Column(String(255)) # 定义数据表instances的字段task_state;  
  53.   
  54.     memory_mb = Column(Integer) # 定义数据表instances的字段memory_mb;  
  55.     vcpus = Column(Integer) # 定义数据表instances的字段vcpus;  
  56.     root_gb = Column(Integer) # 定义数据表instances的字段root_gb;  
  57.     ephemeral_gb = Column(Integer) # 定义数据表instances的字段ephemeral_gb;  
  58.   
  59.     # 定义数据表instances的字段host;  
  60.     # 注意这个字段不是与hostname相关的,而是指nova的节点;  
  61.     host = Column(String(255))  # , ForeignKey('hosts.id'))  
  62.       
  63.     # 定义数据表instances的字段node;  
  64.     # 识别实例所在的"ComputeNode" ;  
  65.     # 这与ComputeNode.hypervisor_hostname.  
  66.     node = Column(String(255))  
  67.   
  68.     instance_type_id = Column(Integer) # 定义数据表instances的字段instance_type_id;  
  69.   
  70.     user_data = Column(Text) # 定义数据表instances的字段user_data;  
  71.   
  72.     reservation_id = Column(String(255)) # 定义数据表instances的字段reservation_id;  
  73.   
  74.     scheduled_at = Column(DateTime) # 定义数据表instances的字段scheduled_at;  
  75.     launched_at = Column(DateTime) # 定义数据表instances的字段launched_at;  
  76.     terminated_at = Column(DateTime) # 定义数据表instances的字段terminated_at;  
  77.   
  78.     availability_zone = Column(String(255)) # 定义数据表instances的字段availability_zone;  
  79.   
  80.     # User editable field for display in user-facing UIs  
  81.     display_name = Column(String(255)) # 定义数据表instances的字段display_name;  
  82.     display_description = Column(String(255)) # 定义数据表instances的字段display_description;  
  83.   
  84.     # To remember on which host an instance booted.  
  85.     # An instance may have moved to another host by live migration.  
  86.     # 记住在哪个主机上启动了一个实例;  
  87.     # 实例可能已经通过实时迁移到了另一个主机上;  
  88.     launched_on = Column(Text) # 定义数据表instances的字段launched_on;  
  89.     locked = Column(Boolean) # 定义数据表instances的字段locked;  
  90.   
  91.     os_type = Column(String(255)) # 定义数据表instances的字段os_type;  
  92.     architecture = Column(String(255)) # 定义数据表instances的字段architecture;  
  93.     vm_mode = Column(String(255)) # 定义数据表instances的字段vm_mode;  
  94.     uuid = Column(String(36)) # 定义数据表instances的字段uuid;  
  95.   
  96.     root_device_name = Column(String(255)) # 定义数据表instances的字段root_device_name;  
  97.     default_ephemeral_device = Column(String(255), nullable=True) # 定义数据表instances的字段default_ephemeral_device;  
  98.     default_swap_device = Column(String(255), nullable=True) # 定义数据表instances的字段default_swap_device;  
  99.     config_drive = Column(String(255)) # 定义数据表instances的字段config_drive;  
  100.   
  101.     # User editable field meant to represent what ip should be used to connect to the instance  
  102.     # 用户可编辑字段,代表着哪一个IP能够被用来连接实例;  
  103.     access_ip_v4 = Column(types.IPAddress()) # 定义数据表instances的字段access_ip_v4;  
  104.     access_ip_v6 = Column(types.IPAddress()) # 定义数据表instances的字段access_ip_v6;  
  105.   
  106.     auto_disk_config = Column(Boolean()) # 定义数据表instances的字段auto_disk_config;  
  107.     progress = Column(Integer) # 定义数据表instances的字段progress;  
  108.   
  109.     # EC2 instance_initiated_shutdown_terminate  
  110.     # True: -> 'terminate'  
  111.     # False: -> 'stop'  
  112.     # Note(maoy): currently Nova will always stop instead of terminate  
  113.     # no matter what the flag says. So we set the default to False.  
  114.     # 当前的nova应该是stop而不是terminate,所以我们默认设置标志为False;  
  115.     shutdown_terminate = Column(Boolean(), default=False, nullable=False) # 定义数据表instances的字段shutdown_terminate;  
  116.   
  117.     # EC2 disable_api_termination  
  118.     disable_terminate = Column(Boolean(), default=False, nullable=False) # 定义数据表instances的字段disable_terminate;  
  119.   
  120.     # OpenStack compute cell name.  This will only be set at the top of  
  121.     # the cells tree and it'll be a full cell name such as 'api!hop1!hop2'  
  122.     # OpenStack的计算单元名;  
  123.     cell_name = Column(String(255)) # 定义数据表instances的字段cell_name;  
复制代码

    由此我们可以知道,语句updates = sync(elevated, project_id, session)实现的就是定位到资源的同步方法,实时查询所需要更新的资源使用信息;比如这里调用的同步方法就是方法_sync_instances,它实现了从数据库中查询到匹配的数据表'instances',进而获取其id、vcpus和memory_mb三种资源的实时的使用情况,分别赋值给'instances','cores'和 'ram',以字典的形式返回给updates;
    这里我调试运行了一下,得到了获取的实时资源使用数据信息:
    当建立了10个实例的时候:
  1. updates = {instances:10, ram:20480, cores:10}  
复制代码

    删除了1个实例之后:
  1. updates = {instances:9, ram:18432, cores:9}  
复制代码

    所以在这里我们就能够理解,为什么应用语句:resource = work.pop() 来随机获取work集合中的一个资源,原因就是针对'instances', 'ram', 'cores'无论哪一个资源,都会调用同样的同步方法_sync_instances来获取实时的工程资源使用情况,都会得到这三个资源的实时的使用数据信息。
    继续看下面的代码:
  1. for res, in_use in updates.items():  
复制代码
  

    这段代码完成的功能就是如果获取的实时使用的资源没有在usages中,那么就把它添加进去,并更新它的in_use和until_refresh数据信息。
    继续看代码:
  1. unders = [resource for resource, delta in deltas.items()  
  2.           if delta < 0 and delta + usages[resource].in_use < 0]  
  3.     检测资源数据中in_use加上delta之后,可能小于0的情况,如果存在这种情况,后面将会引发异常;
  4. [python] view plaincopyprint?在CODE上查看代码片派生到我的代码片
  5. overs = [resource for resource, delta in deltas.items()  
  6.          if quotas[resource] >= 0 and delta >= 0 and  
  7.          quotas[resource] < delta + usages[resource].total]  
复制代码

    检测这个resource的hard_limit是否小于in_use+resourced+delta之和,如果over值为真,说明delta + usages[resource].total的值已经大于系统限定的资源配额的数值,后面将会引发异常;

继续看下一段代码:
  1. if not overs:  
  2.     reservations = []  
  3.     for resource, delta in deltas.items():  
  4.         # str(uuid.uuid4()):随机获取uuid值,是唯一的;  
  5.         # usages[resource]:更新过的当前资源在quota_usages表中的使用情况;  
  6.         reservation = reservation_create(elevated,str(uuid.uuid4()),usages[resource],project_id,resource, delta, expire,session=session)  
  7.         reservations.append(reservation.uuid)  
  8.   
  9.         # 更新usages中的reserved值,加上变化值;  
  10.         if delta > 0:  
  11.             usages[resource].reserved += delta  
  12.   
  13.         # 更新quota_usages表;  
  14.         for usage_ref in usages.values():  
  15.             usage_ref.save(session=session)  
复制代码


    这段代码是当没有发生资源配额超出限制值的时候,建立更新reservations数据表,并再次更新usages。reservations表记录的是每次分配的各种资源的变化值,即delta保存的值,它和quota_usages通过usage_id外键关联。这个表中的记录是不更新的,每次分配都会相应的增加记录。
    最后返回reservations。

至此,方法quota_reserve解析完成:
    这个函数执行的过程主要分为以下几步:

(1)同步
    为什么要同步呢?这里同步的是什么呢?因为在为工程分配资源时,可能有各种特殊情况导致quota_usages表中记录的in_use不准确,需要得到当前实际使用的资源的情况,更新一下in_use,得到真实的资源使用情况。这里的特殊情况有一下4个:
    1)当前申请的资源没有在quota_usages表中记录
    2)当前申请的资源在quota_usages表中的in_use值小于0
    3)当前申请的资源的until_refresh值不为空,减1之后小于0
    4)当前申请的资源在quota_usages表中更新的时间减去现在的时间大于max_age
    如果符合这四种情况之一,就执行同步。同步时,是调用当前资源的_sync_*()函数,去相关的表中查询实时的使用情况,比如_sync_instances()就是去instances表中查询出当前工程的instances数量,vcpu之和,ram之和,然后以字典的方式返回。然后根据这些实时的数据,更新in_use值。

  (2)检查
    根据各种资源的配额,和变化的情况(delta),来检查两种极端的情况:under和over。under是检查delta为负数的情况,即执行了删除等操作,使delta为负,in_use减少,导致in_use值可能小于0。over是检查delta为正数时的情况,in_use+delta就有可能大于最大的限额了。这里只对over的情况进行处理,即它只关心上限,不关心下限。如果没有over的话,就有下面的第(3)步了。如果over的话,就直接进行第(4)步。

(3)向reservations表中增加记录,记录申请资源的delta值。

(4)把in_use值写入到quota_usages表中保存。(不论under还是over都执行)

(5)如果over,即超出最大限额,则报出OverQuota异常。  
    至此,方法_check_num_instances_quota也全部解析结束,这个方法实现了根据配额资源限制所要建立实例的数目。方法返回两个参数max_count和reservations,返回值max_count表示建立实例的最大数目,返回值reservations表示建立的预定(分配)的资源的UUID的列表。



本文,接着上一篇内容:
OpenStack建立实例完整过程源码详细分析(8)



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条