分享

OpenStack Nova-cell服务的源码解析(1)

tntzbzc 发表于 2014-11-18 18:04:12 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 16611
问题导读

1.class CellsManager(manager.Manager)定义和实现了什么功能?
2.class _BaseMessageMethods(base.Base)的作用是什么?










这篇我将对nova-cell服务的源码进行解析。需要说明的是,这里我都是以OpenStack的Grizzly版本为例进行解析,在这个版本中,默认是不启动这个服务的,而且在具体的cell虚拟机建立应用中,调度器还只是以随机的方式来选择建立虚拟机实例的cell,在后续的版本中,具体的调度器算法将会改进。
1.nova-cell服务的源码结构图
                       



2.nova-cell服务的源码结构解析
/nova/cells/driver.py
class BaseCellsDriver(object):
cells通讯驱动基类,这个类主要实现了消息消费者处理的相关方法;

    def start_consumers(self, msg_runner):
    启动处理消息的消费者服务;

    def stop_consumers(self):
    关闭处理消息的消费者服务;

    def send_message_to_cell(self, cell_state, message):
    发送消息到一个cell;

/nova/cells/manager.py
注:在这个文件中,只有一个类CellsManager,定义和实现了cell管理的API方法;在类CellsManager所定义的方法中主要分为两种类型,即针对特定cell的处理方法和针对所有cell的处理方法;

class CellsManager(manager.Manager):
这个类主要定义和实现了用于管理cell的API;
大多数的方法属于“有针对性”或者“广播”的消息处理方式,分别对应于路由信息到特定的cell和路由信息到多个cell之上;

    def post_start_hook(self):
    为通过RPC来处理cell间的通讯,启动了两个独立的消费者;
    如果本cell有子cell;
    通知子cell发送它们的capabilities值和capacities值;
    并对子cell的所有父cell的capabilities值和capacities值相关信息进行更新操作;
    如果本cell在cell树的底层(即没有子cell);
    发送本cell的capabilities值和capacities值到所有父cell,并对所有父cell的相关信息进行更新操作;

    def _update_our_parents(self, ctxt):
    如果本cell在cell树的底层(即没有子cell),则更新本cell的所有父cell,应用本cell的capabilities值和capacity值;
    发送本cell的capabilities值到父cell,并对父cell的相关信息进行更新操作;
    发送本cell的capacities值到父cell,并对父cell的相关信息进行更新操作;

    def _heal_instances(self, ctxt):
    为一些实例周期性的发送更新任务到父cell;

    def _sync_instance(self, ctxt, instance):
    广播instance_update或instance_destroy操作信息给父cell来执行;

    def schedule_run_instance(self, ctxt, host_sched_kwargs):
    选择一个合适的cell来建立新的实例,并转发相应的请求;

    def get_cell_info_for_neighbors(self, _ctxt):
    返回所有相邻cell的信息(本cell的所有父cell和所有子cell);

    def run_compute_api_method(self, ctxt, cell_name, method_info, call):
    在特定的cell中调用一个compute API方法;
    运行变量compute_api指定的类中一个指定的方法;

    def instance_update_at_top(self, ctxt, instance):
    在top等级的cell上,更新实例;

    def instance_destroy_at_top(self, ctxt, instance):
    在cell树的顶层删除一个实例;
    这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_destroy_at_top;

    def instance_delete_everywhere(self, ctxt, instance, delete_type):
    在每一个cell中调用compute API的delete()或者soft_delete()方法;
    这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;
    所以,我们需要在所有的cell上运行这个方法;

    def instance_fault_create_at_top(self, ctxt, instance_fault):
    在顶层cell上建立一个实例的断点(???);

    def bw_usage_update_at_top(self, ctxt, bw_update_info):
    更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;

    def sync_instances(self, ctxt, project_id, updated_since, deleted):
    强制对所有实例实行同步操作;
    def service_get_all(self, ctxt, filters):
    返回在本cell中和所有子cell中的服务;
    并把每个服务以及相关的计算节点都和cell相关联起来;

    def service_get_by_compute_host(self, ctxt, host_name):
    在当前的cell中为计算主机返回获取的服务入口,主要执行了以下的步骤:
    通过host_name获取cell_name的信息;
    根据compute host信息获取相关服务;
    把获取的服务和计算节点和cell关联起来;

    def proxy_rpc_to_manager(self, ctxt, topic, rpc_message, call, timeout):
    为给定的compute topic代理PRC;

    def task_log_get_all(self, ctxt, task_name, period_beginning, period_ending, host=None, state=None):
    从所有的cell或者特定的一个cell的数据库中获取任务日志;

    def compute_node_get(self, ctxt, compute_id):
    在一个特定的cell上,通过ID值获取计算节点;
    并把计算节点和这个cell相关联起来;

    def compute_node_get_all(self, ctxt, hypervisor_match=None):
    返回所有cell中的计算节点列表;

    def compute_node_stats(self, ctxt):
    通过各个cell的各自计算操作,实现获取所有cell上的各个资源参数的总和;

    def actions_get(self, ctxt, cell_name, instance_uuid):
    为给定的实例获取所有的实例操作;

    def action_get_by_request_id(self, ctxt, cell_name, instance_uuid, request_id):
    通过request_id和instance_uuid为给定的实例获取操作信息;

    def action_events_get(self, ctxt, cell_name, action_id):
    在cell_name指定的cell上,通过action id获取相关事件信息;

    def consoleauth_delete_tokens(self, ctxt, instance_uuid):
    在API cell中为指定的实例删除consoleauth令牌;

    def validate_console_port(self, ctxt, instance_uuid, console_port, console_type):
    验证子cell中计算节点的控制台端口;


/nova/cells/messaging.py
在这个文件中,有如下些类:
class _BaseMessage(object):
cell通信模块的基类,主要定义和实现了消息队列和消息响应相关的处理方法;
下面的三个实现通信模块的类,分别对应三种cell处理的消息类型:

class _TargetedMessage(_BaseMessage):
这个类继承自类_BaseMessage,实现通信模块的若干方法,针对于消息处理目标是特定的cell的情况;

class _BroadcastMessage(_BaseMessage):
这个类继承自类_BaseMessage,实现通信模块的若干方法,针对于消息处理目标是所有的cell的情况;

class _ResponseMessage(_TargetedMessage):
这个类继承自类_TargetedMessage,实现通信模块的若干方法,针对于执行操作后返回的响应信息的处理;

class _BaseMessageMethods(base.Base):
实现所有处理cell方法的基类;
下面三个实现处理消息方法的类,分别对应三种cell处理的消息类型:

class _ResponseMessageMethods(_BaseMessageMethods):
这个类继承自类_BaseMessageMethods,实现了针对处理响应信息方法;

class _TargetedMessageMethods(_BaseMessageMethods):
这个类继承自类_BaseMessageMethods,实现若干处理cell消息的方法,针对于消息处理目标是特定的cell情况;

class _BroadcastMessageMethods(_BaseMessageMethods):
这个类继承自类_BaseMessageMethods,实现若干处理cell消息的方法,针对于消息处理目标是所有的cell情况;

class MessageRunner(object):
这个类实现了若干方法,分别对应CellsManager类(管理cell方法API)中的方法,根据不同的消息类型(特定cell、所有cell和响应消息)进行封装,用于从上述的类_ResponseMessageMethods、_TargetedMessageMethods和_BroadcastMessageMethods中调用对应的方法,实现对不同的消息类型的处理。

来看类中具体的方法:
class _BaseMessage(object):
    def _append_hop(self):
    加入跳信息到routing_path;

   def _at_max_hop_count(self,do_raise=True):
    检测本cell是否处于最大的跳数;

   def _process_locally(self):
    方法确定了我们应该在本cell中执行处理这个消息的方法;
    通过类MessageRunner调用适当的方法用来处理消息;
    捕获响应或者异常,并把捕获的响应或者异常编码成类Response的实例对象,并返回;

   def _setup_response_queue(self):
    通过类MessageRunner中的方法_setup_response_queue,建立一个响应队列;

   def _cleanup_response_queue(self):
    调用类MessageRunner中的方法_cleanup_response_queue来删除一个响应队列;

   def _wait_for_json_responses(self,num_responses=1):
    在允许等待的时间内,获取响应列表responses,并返回;

   def _send_json_responses(self,json_responses, neighbor_only=False, fanout=False):
    执行发送响应列表到目标cell;
    Targeted型的消息只有一个响应,而Broadcast型的消息可能有多个响应;
    如果本cell是消息的源,则响应将会从self.process()被返回;  

   def _send_response(self,response, neighbor_only=False):
    发送执行消息处理方法的结果的响应对象到源cell;
    如果本cell就是源cell,则直接获取这个响应对象;  

   def _send_response_from_exception(self,exc_info):
    从sys.exc_info()返回一个异常,编码成Response类型响应,并发送它;

   def _to_dict(self):
    转换消息到字典格式;

   def to_json(self):
    转换消息到JSON格式,用于发送到相邻的cell;

   def source_is_us(self):
    判断本cell是否建立了这个消息,即是否是这个消息的源;

   def process(self):
    执行处理消息的方法;

class _TargetedMessage(_BaseMessage):
   def _get_next_hop(self):
    返回本cell的下一跳(hop),如果下一跳(hop)就是当前的cell,则返回none;

   def process(self):
    执行处理针对性的消息的方法,并返回执行方法所获取的结果响应到源cell;
    如果本cell就是源cell,则还要实现在允许等待的时间内,获取远程执行消息处理方法返回的响应列表;
    根据所处理消息的类型不同,这个响应列表中的元素可以是一个也可以是多个;

class _BroadcastMessage(_BaseMessage):
   def _get_next_hops(self):
    设置下一层次的跳(hops),并返回跳(hops)的数目;

   def _send_to_cells(self,target_cells):
    发送信息到多个cell;

   def _send_json_responses(self,json_responses):
    发送信息的响应列表;

   def process(self):
    运行广播消息程序;

class _ResponseMessage(_TargetedMessage):
   def process(self):
    执行一个响应消息;

class _BaseMessageMethods(base.Base):
   def task_log_get_all(self,
message, task_name, period_beginning, period_ending, host, state):

    从数据库获取任务日志;

class _ResponseMessageMethods(_BaseMessageMethods):
   def parse_responses(self,message, orig_message, responses):
    添加响应到响应队列;

class _TargetedMessageMethods(_BaseMessageMethods):
   def schedule_run_instance(self,
message, host_sched_kwargs):

    父cell通知本cell来调度新的实例用于建立;

   def run_compute_api_method(self,message, method_info):
    运行变量compute_api指定的类中一个指定的方法;

   def update_capabilities(self,
message, cell_name, capabilities):

    一个子cell通知我们关于它的capabilities值;

   def update_capacities(self,message, cell_name, capacities):
    一个子cell通知我们关于它的capacity值;

   def announce_capabilities(self,message):
    一个父cell通知本cell发送我们的capabilities值;
    所以执行发送capabilities值到父cell的操作,并更新父cell的capabilities值;

   def announce_capacities(self,message):
    一个父cell通知本cell发送我们的capacity值;
    所以执行发送capabilities值到父cell的操作,并更新父cell的capacity值;

   def service_get_by_compute_host(self,message, host_name):
    为计算主机返回服务入口;
    根据compute host信息获取相关服务;

   def proxy_rpc_to_manager(self,message, host_name, rpc_message, topic, timeout):
    为给定的compute topic代理PRC;

   def compute_node_get(self,message, compute_id):
    通过ID值获取相应的计算节点;

   def actions_get(self, message,instance_uuid):
    为给定的实例获取所有的实例操作;

   def action_get_by_request_id(self,message, instance_uuid, request_id):
    通过request_id和instance_uuid为给定的实例获取操作信息;

   def action_events_get(self,message, action_id):
    通过action_id获取事件信息;

   def validate_console_port(self,message, instance_uuid, console_port, console_type):
    验证子cell中计算节点的控制台端口;

class _BroadcastMessageMethods(_BaseMessageMethods):
   def _at_the_top(self):
    确定是否是API级别的cell;

   def instance_update_at_top(self,
message, instance, **kwargs):

    如果是top级别的cell,则更新数据库中的实例;

   def instance_destroy_at_top(self,message, instance, **kwargs):
    如果本cell是一个顶层的cell,则从DB中删除指定的实例;

   def instance_delete_everywhere(self,message, instance, delete_type, **kwargs):
    在每一个cell中调用compute API的delete()或者soft_delete()方法;
    这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;
    所以,我们需要在所有的cell上运行这个方法;

   def instance_fault_create_at_top(self,message, instance_fault, **kwargs):
    如果我们是顶层cell,则执行从DB删除一个实例的操作;

   def bw_usage_update_at_top(self,message, bw_update_info, **kwargs):
    更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;

   def _sync_instance(self,ctxt, instance):
    实例数据的同步;

   def sync_instances(self,message, project_id, updated_since, deleted, **kwargs):
    实例数据的同步实现;

   def service_get_all(self,message, filters):
    获取message和filters所限制的所有的服务;

   def compute_node_get_all(self,message, hypervisor_match):
    返回本cell中的所有计算节点;

   def compute_node_stats(self,message):
    从本cell上所有的计算节点获取各个资源参数信息,并求和,即获取本cell上所有计算节点的资源参数分别的总和;

   def consoleauth_delete_tokens(self,message, instance_uuid):
    在API cell中为指定的实例删除consoleauth令牌;

class MessageRunner(object):
   def _process_message_locally(self,message):
    消息处理进程会调用这个方法,当确定了应该在本cell上处理的时候;(也就是确定了本cell是目标cell)
    寻找到基于消息类型的合适方法,并调用它;
    如果需要的话,调用者捕获异常,并返回结果到cell;

   def _put_response(self,response_uuid, response):
    添加响应到响应队列;

   def _setup_response_queue(self,message):
    设置一个eventlet队列,用于存储获取的响应;
    响应是被目标cell以_ResponseMessage的形式发送回源cell的;

   def _cleanup_response_queue(self,message):
    当正在接受响应或者已经时间超时的时候,停止跟踪响应队列;

   def _create_response_message(self,ctxt, direction, target_cell, response_uuid, response_kwargs, **kwargs):
    建立一个ResponseMessage类的对象;

   def message_from_json(self,json_message):
    转换一个JSON格式的消息到一个适当的消息实例;

   def ask_children_for_capabilities(self,ctxt):
    通知子cell发送它们的capabilities值;
    并对父cell的capabilities值相关信息进行更新操作;
    这个方法将会在nova-cell服务启动时调用;

   def ask_children_for_capacities(self,ctxt):
    通知子cell发送它们的capacities值;
    并对父cell的capacities值相关信息进行更新操作;
    这个方法将会在nova-cell服务启动时调用;

   def tell_parents_our_capabilities(self,ctxt):
    发送本cell的capabilities值到父cell,并对父cell的相关信息进行更新操作;

   def tell_parents_our_capacities(self,ctxt):
    发送本cell的capacities值到父cell,并对父cell的相关信息进行更新操作;

   def schedule_run_instance(self,ctxt, target_cell, host_sched_kwargs):
    这个方法被调度器所调用,通知子cell来调度一个新的实例用于建立;

   def run_compute_api_method(self,ctxt, cell_name, method_info, call):
    在特定的cell中调用一个compute API方法;
    运行变量compute_api指定的类中一个指定的方法;

   def instance_update_at_top(self,ctxt, instance):
    在top等级的cell上,更新实例;
    这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_update_at_top;

   def instance_destroy_at_top(self,ctxt, instance):
    在cell树的顶层删除一个实例,这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_destroy_at_top;
   
def instance_delete_everywhere(self,ctxt, instance, delete_type):
    在每一个cell中调用compute API的delete()或者soft_delete()方法;
    这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;
    所以,我们需要在所有的cell上运行这个方法;

   def instance_fault_create_at_top(self,ctxt, instance_fault):
    在顶层cell上建立一个实例的断点(???);

   def bw_usage_update_at_top(self,ctxt, bw_update_info):
    更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;

   def sync_instances(self,ctxt, project_id, updated_since, deleted):
    强制对所有实例实行同步操作;

   def service_get_all(self,ctxt, filters=None):
    获取所有的服务;

   def service_get_by_compute_host(self,ctxt, cell_name, host_name):
    在当前的cell中为计算主机返回获取的服务入口,主要执行了以下的步骤:
    通过host_name获取cell_name的信息;
    根据compute host信息获取相关服务;
    把获取的服务和计算节点和cell关联起来;

   def proxy_rpc_to_manager(self,ctxt, cell_name, host_name, topic, rpc_message, call, timeout):
    为给定的compute topic代理PRC,并获取方法的返回响应;

   def task_log_get_all(self,ctxt, cell_name, task_name, period_beginning, period_ending, host=None, state=None):
    从所有的cell或者特定的一个cell的数据库中获取任务日志;
    返回响应对象的列表;

   def compute_node_get_all(self,ctxt, hypervisor_match=None):
    返回本cell的所有子cell的计算节点列表;
    这里是采用广播的方式,获取每一个cell下的计算节点;
    而且这里获取的方向是'down',所以实现的就是获取所有子cell的相关计算节点;

   def compute_node_stats(self,ctxt):
    采用广播的方式实现每一个cell计算自己的各个资源参数的总和;
    而且这里获取的方向是'down',所以实现的就是获取所有子cell的各个资源参数的总和;

   def compute_node_get(self,ctxt, cell_name, compute_id):
    在一个特定的cell上,通过ID值获取一个计算节点;
    def actions_get(self, ctxt, cell_name,
instance_uuid):
    为给定的实例获取所有的实例操作;

   def action_get_by_request_id(self,ctxt, cell_name, instance_uuid, request_id):
    通过request_id和instance_uuid为给定的实例获取操作信息;

   def action_events_get(self,ctxt, cell_name, action_id):
    在cell_name指定的cell上,通过action id获取相关事件信息;

   def consoleauth_delete_tokens(self,ctxt, instance_uuid):
    在API cell中为指定的实例删除consoleauth令牌;
    方法在cell树中的执行方向是'up';

   def validate_console_port(self,ctxt, cell_name, instance_uuid, console_port, console_type):
    验证子cell中计算节点的控制台端口;

   def get_message_types():
    获取处理消息的类型,为指定类型消息、广播类型消息或者响应类型消息;

class Response(object):
处理cell返回的响应方法类;

   def to_json(self):
   def from_json(cls, json_message):
   def value_or_raise(self):
/nova/cells/rpc_driver.py
cell RPC通信驱动,通过RPC实现cell的通信;

class CellsRPCDriver(driver.BaseCellsDriver):
这个类继承自类BaseCellsDriver,是通过RPC实现cell间通信的驱动类;
主要用于处理通信中的消费者相关;

   def _start_consumer(self,dispatcher, topic):
    启动两个RPC消费者,topic类型和fanout类型;
    建立绿色线程,实现处理所有的队列/消费者信息;

   def start_consumers(self,msg_runner):
    为通过RPC来处理cell间的通讯,启动两个RPC消费者,topic类型消费者和fanout类型消费者;

   def stop_consumers(self):
    关闭RPC消费者;

   def send_message_to_cell(self,cell_state, message):
    调用类IntercellRPCAPI下的方法send_message_to_cell,来实现发送消息到cell;
class InterCellRPCAPI(rpc_proxy.RpcProxy):
这个类继承自类RpcProxy,主要实现cell间通过RPC实现通信的客户端方法;

   def _get_server_params_for_cell(next_hop):
    为rpc的调用获取服务的相关参数;

   def send_message_to_cell(self,cell_state, message):
    实现发送消息到cell;

class InterCellRPCDispatcher(object):
这个类主要实现了RPC的分发程序类,用来处理从其它cell接收的信息;

   def process_message(self,_ctxt, message):
    实现从其它的cell接收消息;

/nova/cells/rpcapi.py
文件中定义了一个类CellsAPI,即Cell RPC API客户端类,用来通过RPC实现对远程cell处理方法的调用;

class CellsAPI(rpc_proxy.RpcProxy):
即Cell RPC API客户端类;
   def cast_compute_api_method(self,ctxt, cell_name, method, *args, **kwargs):
    在一个特定的cell中,以cast的方式发送消息,实现远程节点执行指定的compute
API方法;

   def call_compute_api_method(self,ctxt, cell_name, method, *args, **kwargs):
    在一个特定的cell中,以call的方式发送消息,实现远程节点执行指定的compute
API方法;

   def schedule_run_instance(self,ctxt, **kwargs):
    调度一个新的实例用于建立;

   def nstance_update_at_top(self,ctxt, instance):
    在API的级别更新实例信息;

   def instance_destroy_at_top(self,ctxt, instance):
    在API这个级别销毁要建立的实例;

   def instance_delete_everywhere(self,ctxt, instance, delete_type):
    在每个cell上都运行删除一个指定实例的操作(因为不知道实例位于哪个cell上);

   def instance_fault_create_at_top(self,ctxt, instance_fault):
    在cell树的顶层建立一个实例故障;

   def bw_usage_update_at_top(self,ctxt, uuid, mac, start_period, bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):
    广播信息实现节点带宽使用率的更新;

   def instance_info_cache_update_at_top(self,ctxt, instance_info_cache):
    广播通知实例缓存信息的改变;

   def get_cell_info_for_neighbors(self,ctxt):
    获取本cell相邻的所有cell的信息;

   def sync_instances(self,ctxt, project_id=None, updated_since=None, deleted=False):
    要求所有cell实现实例数据的同步;

   def service_get_all(self,ctxt, filters=None):
   def service_get_by_compute_host(self,ctxt, host_name):
   def proxy_rpc_to_manager(self,ctxt, rpc_message, topic, call=False, timeout=None):
   def task_log_get_all(self,ctxt, task_name, period_beginning, period_ending, host=None, state=None):
   def compute_node_get(self,ctxt, compute_id):
    在指定的cell中获取计算节点;

   def compute_node_get_all(self,ctxt, hypervisor_match=None):
    获取所有cell中的由hypervisor过滤的计算节点列表;

   def compute_node_stats(self,ctxt):
    返回所有cell中计算节点的统计信息;

   def actions_get(self, ctxt,instance):
   def action_get_by_request_id(self,ctxt, instance, request_id):
   def action_events_get(self,ctxt, instance, action_id):
   def consoleauth_delete_tokens(self,ctxt, instance_uuid):
   def validate_console_port(self,ctxt, instance_uuid, console_port, console_type):
   
/nova/cells/scheduler.py
这个文件中主要实现了类CellsScheduler,即cell调度器实现类;
主要针对的就是在cell中建立虚拟机实例方面的应用;

class CellsScheduler(base.Base):
这个类继承自Base类,即cell调度器实现类;

   def _create_instances_here(self,ctxt, request_spec):
    在本cell上建立虚拟机实例;

   def _create_action_here(self,ctxt, instance_uuids):
    获取instance_uuids中各个实例的启动信息;
    并把这些信息加入到要启动的实例属性中;

   def _get_possible_cells(self):
    获取所有合适的cell;

   def _run_instance(self,message, host_sched_kwargs):
    尝试调度实例;
    如果没有合适的cell使用,则引发异常;

   def run_instance(self,message, host_sched_kwargs):
    选择一个cell,在它上面我们要建立一个新的实例;

/nova/cells/state.py
这个文件中主要定义了两个类CellState和CellStateManager,主要实现的时对cell一些状态信息进行处理的操作,方法都比较简单,这里就不进行一一解析了;

/nova/cells/utils.py
这个文件中主要定义了一些cell实用的操作方法,方法都比较简单,这里就不进行一一解析了;
到目前为止,nova-cell服务的源码架构进行了简单的分析,下一篇博客中我将以几个例子对nova-cell服务的源码架构和nova-cell服务的具体实现流程进行解析。


博客地址:http://blog.csdn.net/gaoxingnengjisuan




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条