分享

Openstack Cinder中建立volume过程的源码解析(7)----以及taskflow相关解析

shihailong123 发表于 2014-11-22 13:29:02 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 14645
问题导读:

1.关于flow中task执行的重要语句的实现基本解析完成,如何实现?
2.如果卷的建立出现异常,则如何执行相关的逆转回滚操作?









在上一篇博客中,方法def run(self, context, *args, **kwargs)中关于flow中task执行的重要语句的实现基本解析完成,这篇博客中,我将重点解析在这个方法中,如果卷的建立出现异常,则如何执行相关的逆转回滚操作。
首先来看方法def run(self, context, *args, **kwargs)的源码实现:
  1. def run(self, context, *args, **kwargs):   
  2.       """  
  3.       工作流(workflow)的执行操作;  
  4.       context = <cinder.context.RequestContext object at 0x382fd50> //从cinder请求中获取上下文环境信息;  
  5.       args = ()  
  6.       kwargs = {}  
  7.       """   
  8.         super(Flow, self).run(context, *args, **kwargs)   
  9.    
  10.         def resume_it():   
  11.             # self._leftoff_at = None   
  12.             if self._leftoff_at is not None:   
  13.                 return ([], self._leftoff_at)   
  14.                
  15.             # self.resumer = None   
  16.             # 注:这里还没有应用恢复策略,因为类的初始化过程中赋值为None;   
  17.             if self.resumer:   
  18.                 # self._ordering():获取迭代器包装的任务运行列表;   
  19.                 (finished, leftover) = self.resumer.resume(self, self._ordering())   
  20.             else:   
  21.                 finished = []   
  22.                 # self._ordering():获取迭代器包装的任务运行列表;   
  23.                 leftover = self._ordering()   
  24.                
  25.             # leftover:获取迭代器包装的任务运行列表;   
  26.             # finished = []   
  27.             # leftover = <listiterator object at 0x441fa50>   
  28.             return (finished, leftover)   
  29.    
  30.         # 改变目前的flow状态为新的状态STARTED,并执行通知操作;   
  31.         # flow状态标志为STARTED,表示任务开始运行操作;   
  32.         self._change_state(context, states.STARTED)   
  33.         try:   
  34.             # leftover:获取迭代器包装的任务运行列表;   
  35.             # those_finished = []   
  36.             # leftover = <listiterator object at 0x40c1990>   
  37.             those_finished, leftover = resume_it()   
  38.         except Exception:   
  39.             with excutils.save_and_reraise_exception():   
  40.                 self._change_state(context, states.FAILURE)   
  41.    
  42.         def run_it(runner, failed=False, result=None, simulate_run=False):   
  43.                
  44.             try:   
  45.                 # Add the task to be rolled back *immediately* so that even if   
  46.                 # the task fails while producing results it will be given a   
  47.                 # chance to rollback.   
  48.                     
  49.                 # RollbackTask:实现调用任务对应的可用的逆转回滚方法;   
  50.                 # runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0   
  51.                 rb = utils.RollbackTask(context, runner.task, result=None)   
  52.                     
  53.                 # 在回滚方法累加器中添加逆转回滚方法任务;   
  54.                 self._accumulator.add(rb)   
  55.    
  56.                 self.task_notifier.notify(states.STARTED, details={   
  57.                     'context': context,   
  58.                     'flow': self,   
  59.                     'runner': runner,   
  60.                 })   
  61.                     
  62.                 # simulate_run = False   
  63.                 if not simulate_run:   
  64.                     result = runner(context, *args, **kwargs)   
  65.    
  66.                 else:   
  67.                     if failed:   
  68.                         if not result:   
  69.                             result = "%s failed running." % (runner.task)   
  70.                         if isinstance(result, basestring):   
  71.                             result = exc.InvalidStateException(result)   
  72.                         if not isinstance(result, Exception):   
  73.                             LOG.warn("Can not raise a non-exception"   
  74.                                      " object: %s", result)   
  75.                             result = exc.InvalidStateException()   
  76.                         raise result   
  77.                 rb.result = result   
  78.                 runner.result = result   
  79.                 self.results[runner.uuid] = result   
  80.                     
  81.                 self.task_notifier.notify(states.SUCCESS, details={   
  82.                     'context': context,   
  83.                     'flow': self,   
  84.                     'runner': runner,   
  85.                 })   
  86.             except Exception as e:   
  87.                 runner.result = e   
  88.                 cause = utils.FlowFailure(runner, self, e)   
  89.                 with excutils.save_and_reraise_exception():   
  90.                     self.task_notifier.notify(states.FAILURE, details={   
  91.                         'context': context,   
  92.                         'flow': self,   
  93.                         'runner': runner,   
  94.                     })   
  95.                     self.rollback(context, cause)   
  96.    
  97.         # those_finished = []   
  98.         if len(those_finished):   
  99.             self._change_state(context, states.RESUMING)   
  100.             for (r, details) in those_finished:   
  101.                 failed = states.FAILURE in details.get('states', [])   
  102.                 result = details.get('result')   
  103.                 run_it(r, failed=failed, result=result, simulate_run=True)   
  104.    
  105.         # leftover:获取迭代器包装的任务运行列表;   
  106.         # leftover = <listiterator object at 0x40c1990>   
  107.         self._leftoff_at = leftover   
  108.         # 改变目前的flow状态为新的状态RUNNING,并执行通知操作;   
  109.         self._change_state(context, states.RUNNING)   
  110.                   
  111.         # 如果状态为中断,则返回;   
  112.         if self.state == states.INTERRUPTED:   
  113.             return   
  114.    
  115.         # 标志任务运行状态不为states.INTERRUPTED;   
  116.         was_interrupted = False   
  117.             
  118.         # leftover:获取迭代器包装的任务运行列表;   
  119.         # leftover = <listiterator object at 0x40c1990>   
  120.         for r in leftover:   
  121.             r.reset()   
  122.             run_it(r)   
  123.             if self.state == states.INTERRUPTED:   
  124.                 was_interrupted = True   
  125.                 break   
  126.    
  127.         if not was_interrupted:   
  128.             # Only gets here if everything went successfully.   
  129.             self._change_state(context, states.SUCCESS)   
  130.             self._leftoff_at = None
复制代码

   先来看语句:
except Exception as e:
    runner.result = e
    cause = utils.FlowFailure(runner, self, e)
    with excutils.save_and_reraise_exception():
        # Notify any listeners that the task has errored.
        self.task_notifier.notify(states.FAILURE, details={
            'context': context,
            'flow': self,
            'runner': runner,
        })
        self.rollback(context, cause)

当执行flow中task出现异常时会引发这个异常,即会执行这部分代码;
先来看看类FlowFailure的初始化方法:


  1. class FlowFailure(object):  
  2.     def __init__(self, runner, flow, exception):  
  3.         self.runner = runner  
  4.         self.flow = flow  
  5.         self.exc = exception  
  6.         self.exc_info = sys.exc_info()  
复制代码



它记录了关于异常的若干信息;
再来看方法rollback的源码实现:
  1. def rollback(self, context, cause):  
  2.     """
  3.     context = <cinder.context.RequestContext object at 0x35e8b90>
  4.     cause = <cinder.taskflow.utils.FlowFailure object at 0x35f79d0>
  5.     """         
  6.     self._change_state(context, states.REVERTING)  
  7.     try:  
  8.         self._accumulator.rollback(cause)  
  9.     finally:  
  10.         self._change_state(context, states.FAILURE)  
  11.               
  12.     # Rollback any parents flows if they exist...  
  13.     # self.parents = ()  
  14.     for p in self.parents:  
  15.         p.rollback(context, cause)  
复制代码



我们关注这个方法中的语句:
self._accumulator.rollback(cause)
进一步来看源码:
  1. def rollback(self, cause):  
  2.     LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)  
  3.          
  4.     for (i, f) in enumerate(self):  
  5.         LOG.debug("Calling rollback %s: %s", i + 1, f)  
  6.         try:  
  7.             f(cause)  
  8.         except Exception:  
  9.             LOG.exception(("Failed rolling back %s: %s due to inner exception."), i + 1, f)
复制代码



这里我们可以得到一些输出实例:
i = 0
f = cinder.volume.flows.base.InjectTask;volume:create==1.0
len(self) = 1
这是我在执行第一个task任务的过程中设置了一个异常之后,而输出的结果,所以我们可以理解,一旦某个task的执行过程中出现异常,就会按照task逆序来调用具体的逆转回滚方法来实现系统状态的恢复;在前面的博客中我们也说过,所要调用的逆转回滚对象也是随着task任务的执行所增加的。
我们回到方法def rollback(self, context, cause)所在类的初始化方法:
class RollbackAccumulator(object):
    def __init__(self):
        self._rollbacks = []

之前我们说过,这个类的作用就是获取回滚方法累加器类的实例化对象,其中存储了要执行回滚操作的任务对象;
我们回顾语句:
# RollbackTask:实现调用任务对应的可用的逆转回滚方法;
rb = utils.RollbackTask(context, runner.task, result=None)
# 在回滚方法累加器中添加逆转回滚方法任务;
self._accumulator.add(rb)

可见在变量self._rollbacks中存储的表示逆转回滚任务的就是类RollbackTask的实例化对象;
所以在语句:
for (i, f) in enumerate(self):
    f(cause)

实现了有序地调用变量self._rollbacks中存储的类RollbackTask的实例化对象,我们之前也曾测试过,这些逆转回滚任务的排列是按照之前task调用顺序的逆序进行排列的,所以这里调用的顺序也就是先调用最近执行的task对应的逆转回滚方法。
这里直接调用了类RollbackTask中的__call__方法,来看具体源码实现:
  1. def __call__(self, cause):  
  2.     """
  3.     实现调用任务对应的可用的逆转回滚方法;
  4.     """  
  5.     if ((hasattr(self.task, "revert") and  
  6.          isinstance(self.task.revert, collections.Callable))):  
  7.         #注:这里有几种不同的回滚方法;  
  8.         self.task.revert(self.context, self.result, cause)  
复制代码

   
这里依据变量self.task指定了具体的任务类,从而具体定位到任务类中的逆转回滚方法revert上,对于没有实现revert方法的,在其父类中实现了这个方法,说明执行这个任务类过后,如果遇到异常,不需要进行逆转回滚操作;
具体来看所涉及到的所有任务类中的revert方法(这些方法我们在前面的博客中进行过简单的解析,这里只是列出来看看即可):
  1. class QuotaReserveTask(base.CinderTask):  
  2.     """
  3.     根据给定的大小值和给定的卷类型信息实现保存单一的卷;
  4.     """  
  5.     def revert(self, context, result, cause):  
  6.         """
  7.         回调配额预留资源;
  8.         根据result中的reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
  9.         """  
  10.         if not result:  
  11.             return  
  12.         if context.quota_committed:  
  13.             return  
  14.   
  15.         reservations = result['reservations']  
  16.          
  17.         # 回调配额预留资源;  
  18.         try:  
  19.             QUOTAS.rollback(context, reservations)  
  20.         except exception.CinderException:  
  21.             LOG.exception(_("Failed rolling back quota for %s reservations"), reservations)  
  22.   
  23. class EntryCreateTask(base.CinderTask):  
  24.     """
  25.     在数据库中为给定的卷建立相关条目;
  26.     逆转操作:从数据库中删除volume_id建立的条目;
  27.     """  
  28.     def revert(self, context, result, cause):  
  29.         """
  30.         删除指定的卷在数据库中的数据条目信息,实现逆转回滚操作;
  31.         """  
  32.          
  33.         # 如果result为none,说明从来没有产生任何结果,因此不能删除任何数据信息;  
  34.         if not result:  
  35.             return  
  36.          
  37.         # quota_committed说明不能执行回滚操作,说明此时卷已经建立;  
  38.         if context.quota_committed:  
  39.             return  
  40.         vol_id = result['volume_id']  
  41.          
  42.         # 删除指定的卷在数据库中的数据条目信息;  
  43.         try:  
  44.             self.db.volume_destroy(context.elevated(), vol_id)  
  45.         except exception.CinderException:  
  46.             LOG.exception(_("Failed destroying volume entry %s"), vol_id)  
  47.   
  48. class QuotaCommitTask(base.CinderTask):  
  49.     """
  50.     提交新的资源配额的预留信息到数据库中;
  51.     """  
  52.     def revert(self, context, result, cause):  
  53.         if not result:  
  54.             return  
  55.         volume = result['volume_properties']  
  56.         try:  
  57.             reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}  
  58.             # 添加卷的类型选项到opts,opts表示保留选项信息的hash;  
  59.             QUOTAS.add_volume_type_opts(context,  
  60.                                         reserve_opts,  
  61.                                         volume['volume_type_id'])  
  62.             # 检测配额信息和并建立相应的资源配额预留资源;  
  63.             reservations = QUOTAS.reserve(context,  
  64.                                           project_id=context.project_id,  
  65.                                           **reserve_opts)  
  66.             # 提交资源配额的预留信息到数据库中;  
  67.             if reservations:  
  68.                 QUOTAS.commit(context, reservations, project_id=context.project_id)  
  69.         except Exception:  
  70.             LOG.exception(_("Failed to update quota for deleting volume: %s"), volume['id'])  
  71.   
  72. class OnFailureChangeStatusTask(base.CinderTask):  
  73.     """
  74.     这个task实现了当出现错误时,设置指定id的卷的状态为ERROR;
  75.     """  
  76.     def revert(self, context, result, cause):  
  77.         volume_spec = result.get('volume_spec')  
  78.         if not volume_spec:  
  79.             volume_spec = _find_result_spec(cause.flow)  
  80.   
  81.         volume_id = result['volume_id']  
  82.         _restore_source_status(context, self.db, volume_spec)  
  83.         _error_out_volume(context, self.db, volume_id, reason=cause.exc)  
  84.         LOG.error(_("Volume %s: create failed"), volume_id)  
  85.         exc_info = False  
  86.         if all(cause.exc_info):  
  87.             exc_info = cause.exc_info  
  88.         LOG.error(_('Unexpected build error:'), exc_info=exc_info)  
复制代码



OK!到这里为止,在卷的建立出现异常的情况下,如何来执行相关的逆转回滚操作的解析工作也基本完成了;所以,方法def run(self, context, *args, **kwargs)的执行过程也基本完成了解析工作。
所以我们就可以回到方法/cinder/volume/api.py----class API----def create中,我们可以看到,在方法的最后,从flow中获取返回结果作为建立卷操作的反馈信息返回给上层调用,从而完成了应用taskflow模式实现卷建立的整个过程,就是:
1.构建字典create_what,实现整合建立卷的具体参数;
2.构建并返回用于建立卷的flow;
3.执行构建的用于建立卷的flow;
4.从flow中获取建立卷的反馈信息;
在下一篇博客中,我将详细的分析task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的。


相关文章:
Openstack Cinder中建立volume过程的源码解析(1)
http://www.aboutyun.com/thread-10217-1-1.html
1.cinder中卷的建立的过程中,客户端传递过来的request的执行过程是怎样的?
2.__call__方法都通过什么方法封装?
3.如何调用指定中间件的__call__方法?


Openstack Cinder中建立volume过程的源码解析(2)
http://www.aboutyun.com/thread-10216-1-1.html
1.如何获取要执行的action方法及其相关的扩展方法?
2.Resource类中的__call__(self,request)方法如何实现?
3.meth的作用?
4.如何从request.environ中获取要执行的action方法?
5.如何对body进行反序列化操作?


Openstack Cinder中建立volume过程的源码解析(3)
http://www.aboutyun.com/thread-10215-1-1.html
1.get_serializer的作用?
2.isgeneratorfunction是用来做什么的?
3.什么是特殊的generator方法?
4.进行响应信息的序列化操作的步骤?
5.简述在cinder模块中实现客户端发送过来的请求信息操作的主要的步骤?

Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
http://www.aboutyun.com/thread-10214-1-1.html
1.简述cinder是如何实现卷的建立的?
2.简述taskflow库来实现卷的简历过程?
3.如何根据给定id来检索获取单个的卷的类型?
4.如何构建并返回用于建立卷的flow?
5.如何声明flow是否是真的?
6.简述建立卷的flow的步骤?


Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
http://www.aboutyun.com/thread-10213-1-1.html
1.如何实现Flow类的初始化的?
2.用于卷的建立的flow中都添加了哪些task?
3.ExtractVolumeRequestTask类的作用是什么?
4.如何完全或部分的重置flow的内部的状态?
5.如何从给定的卷中提取卷的id信息?
6.OnFailureChangeStatusTask类的作用?

Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析
http://www.aboutyun.com/thread-10212-1-1.html
1.如何来运行已经构建好的flow?
2.run的源码如何实现及实现过程是什么?
3.resume_it的实现过程是什么?
4.类Runner的初始化方法是什么?
5.run_it的源码如何实现?


Openstack Cinder中建立volume过程的源码解析(8)
http://www.aboutyun.com/thread-10219-1-1.html
1.VolumeCastTask的源码如何实现?
2.远程调用建立新卷的操作,有哪几个步骤?
3.task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的?


Openstack Cinder中建立volume过程的源码解析(9)
http://www.aboutyun.com/thread-10210-1-1.html
1.如何实现create_volume的源码?
2.Cast如何实现远程调create_volume?
3.如何实现调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立?




转自:http://blog.csdn.net/gaoxingnengjisuan/article/details/23710551

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条