分享

Swift源码分析----swift-account-reaper(1)

tntzbzc 发表于 2014-11-20 15:34:48 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13798
问题导读

1.账户收割操作的主要职责是什么?
2.调用哪个方法实现收割指定device的操作?




概述部分:
账户下数据收割操作的守护进程;
账户收割操作的主要职责就是对于账户下状态标志为deleted的分区数据(容器和对象)实现删除操作;
具体的实现是调用了reap_account、reap_container和reap_object方法,实现递归删除账户下的相关数据;
对于分区的相关副本也要实现删除操作;
这里定义的once=True,说明系统默认调用守护进程类Daemon中的run_once方法;
从而最终实现调用AccountReaper类中的run_once方法;
如果调用的是AccountReaper类中的run_forever方法,则会实现循环实现对账户下状态标志为deleted的数据实现收割操作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
  1. from swift.account.reaper import AccountReaper
  2. from swift.common.utils import parse_options
  3. from swift.common.daemon import run_daemon
  4. if __name__ == '__main__':
  5.     conf_file, options = parse_options(once=True)
  6.     run_daemon(AccountReaper, conf_file, **options)
复制代码

  1. def run_once(self, *args, **kwargs):
  2.      """
  3.      进行一次任务收割操作;
  4.      对已经删除了的账户下的数据对象和容器进行删除操作;
  5.      """
  6.     self.logger.debug(_('Begin devices pass: %s'), self.devices)
  7.     begin = time()
  8.     try:
  9.         # self.devices = conf.get('devices', '/srv/node')
  10.         # 循环获取/srv/node/目录下的device;
  11.         # 删除所有遍历的device,实现收割所有device;
  12.         for device in os.listdir(self.devices):
  13.             if self.mount_check and not ismount(os.path.join(self.devices, device)):
  14.                 self.logger.increment('errors')
  15.                 self.logger.debug(_('Skipping %s as it is not mounted'), device)
  16.                 continue
  17.             
  18.             # 实现收割指定device操作;
  19.             self.reap_device(device)
  20.     except (Exception, Timeout):
  21.         self.logger.exception(_("Exception in top-level account reaper "
  22.                                     "loop"))
  23.     elapsed = time() - begin
  24.     self.logger.info(_('Devices pass completed: %.02fs'), elapsed)
复制代码



循环获取/srv/node/目录下的device,调用方法reap_device实现收割指定device的操作;
  1. def reap_device(self, device):
  2.      """
  3.      实现收割device操作;
  4.      这个方法将扫描设备的accounts目录;
  5.      找到相应的数据库文件,然后对数据库文件内容进行判断;
  6.      如果账户被标志为‘DELETE’,调用reap_account实现删除账户下的所有的containers相关数据;
  7.      """
  8.     # self.devices = conf.get('devices', '/srv/node')
  9.     # datadir = /srv/node/device/accounts
  10.     datadir = os.path.join(self.devices, device, DATADIR)
  11.     if not os.path.exists(datadir):
  12.         return
  13.         
  14.     # 循环获取/srv/node/device/accounts目录下的partition;
  15.     for partition in os.listdir(datadir):
  16.         # partition_path = /srv/node/device/accounts/partition
  17.         partition_path = os.path.join(datadir, partition)
  18.         if not partition.isdigit():
  19.             continue
  20.             
  21.             # 获取一个分区partition所有副本相关的节点nodes;
  22.             # get_account_ring:获取swift.common.ring.Ring对象,名称为'account';
  23.             # get_part_nodes:获取一个分区所有副本相关的节点信息;
  24.             nodes = self.get_account_ring().get_part_nodes(int(partition))
  25.             if nodes[0]['ip'] not in self.myips or not os.path.isdir(partition_path):
  26.                 continue
  27.             for suffix in os.listdir(partition_path):
  28.                 # suffix_path = /srv/node/device/accounts/partition/suffix
  29.                 suffix_path = os.path.join(partition_path, suffix)
  30.                 if not os.path.isdir(suffix_path):
  31.                     continue
  32.                 for hsh in os.listdir(suffix_path):
  33.                     # hsh_path = /srv/node/device/accounts/partition/suffix/hsh
  34.                     hsh_path = os.path.join(suffix_path, hsh)
  35.                     if not os.path.isdir(hsh_path):
  36.                         continue
  37.                     
  38.                     # 循环遍历hsh_path = /srv/node/device/accounts/partition/suffix/hsh/下所有的文件;
  39.                     # 针对后缀为.db的文件,执行收割操作,即如果其状态标志为‘DELETE’并且broker不为空,则执行删除对应账户数据操作;
  40.                     for fname in sorted(os.listdir(hsh_path), reverse=True):
  41.                         if fname.endswith('.ts'):
  42.                             break
  43.                         elif fname.endswith('.db'):
  44.                             self.start_time = time()
  45.                             broker = AccountBroker(os.path.join(hsh_path, fname))
  46.                            
  47.                             # 如果broker状态标志为‘DELETE’并且broker不为空,则执行删除操作;
  48.                             # reap_account:删除要删除的account下的所有的containers相关数据;
  49.                             if broker.is_status_deleted() and not broker.empty():
  50.                                 self.reap_account(broker, partition, nodes)
复制代码




1.嵌套循环遍历目录/srv/node/device/accounts/下的所有文件;
2.针对后缀为.db的文件,执行收割操作,即如果其状态标志为‘DELETE’且数据库文件不为空,则调用reap_account方法执行账户下所有容器数据的删除操作;
  1. def reap_account(self, broker, partition, nodes):  
  2.      """
  3.      实现收割account操作;
  4.      删除要删除的account下的所有的containers相关数据;
  5.      """  
  6.     begin = time()  
  7.     info = broker.get_info()  
  8.     if time() - float(info['delete_timestamp']) <= self.delay_reaping:  
  9.         return False  
  10.     account = info['account']  
  11.     self.logger.info(_('Beginning pass on account %s'), account)  
  12.     self.stats_return_codes = {}  
  13.     self.stats_containers_deleted = 0  
  14.     self.stats_objects_deleted = 0  
  15.     self.stats_containers_remaining = 0  
  16.     self.stats_objects_remaining = 0  
  17.     self.stats_containers_possibly_remaining = 0  
  18.     self.stats_objects_possibly_remaining = 0  
  19.          
  20.     # 删除要删除的account下的所有的containers相关数据;  
  21.     try:  
  22.         marker = ''  
  23.         while True:  
  24.             # 获取要删除的account下的containers排序列表;  
  25.             # 返回列表(name, object_count, bytes_used, 0);  
  26.             containers = list(broker.list_containers_iter(1000, marker, None, None, None))  
  27.             if not containers:  
  28.                 break  
  29.   
  30.             try:  
  31.                 for (container, _junk, _junk, _junk) in containers:  
  32.                     # reap_container:实现删除容器container下数据和容器container本身;  
  33.                     # 在一个绿色线程中运行方法reap_container,来实现删除container相关数据;  
  34.                     self.container_pool.spawn(self.reap_container,account,partition,nodes,container)  
  35.                     self.container_pool.waitall()  
  36.                 except (Exception, Timeout):  
  37.                     self.logger.exception(_('Exception with containers for account %s'), account)  
  38.   
  39.                 marker = containers[-1][0]  
  40.                 if marker == '':  
  41.                     break  
  42.         log = 'Completed pass on account %s' % account  
  43.     except (Exception, Timeout):  
  44.         self.logger.exception(_('Exception with account %s'), account)  
  45.         log = _('Incomplete pass on account %s') % account  
  46.     if self.stats_containers_deleted:  
  47.         log += _(', %s containers deleted') % self.stats_containers_deleted  
  48.     if self.stats_objects_deleted:  
  49.         log += _(', %s objects deleted') % self.stats_objects_deleted  
  50.     if self.stats_containers_remaining:  
  51.         log += _(', %s containers remaining') % self.stats_containers_remaining  
  52.     if self.stats_objects_remaining:  
  53.         log += _(', %s objects remaining') % self.stats_objects_remaining  
  54.     if self.stats_containers_possibly_remaining:  
  55.         log += _(', %s containers possibly remaining') % self.stats_containers_possibly_remaining  
  56.     if self.stats_objects_possibly_remaining:  
  57.         log += _(', %s objects possibly remaining') % self.stats_objects_possibly_remaining  
  58.     if self.stats_return_codes:  
  59.         log += _(', return codes: ')  
  60.         for code in sorted(self.stats_return_codes):  
  61.             log += '%s %sxxs, ' % (self.stats_return_codes[code], code)  
  62.         log = log[:-2]  
  63.     log += _(', elapsed: %.02fs') % (time() - begin)  
  64.     self.logger.info(log)  
  65.     self.logger.timing_since('timing', self.start_time)  
  66.     if self.stats_containers_remaining and begin - float(info['delete_timestamp']) >= self.reap_not_done_after:  
  67.         self.logger.warn(_('Account %s has not been reaped since %s') % (account, ctime(float(info['delete_timestamp']))))  
  68.     return True  
复制代码




1.实现获取要删除的account下的container排序列表;
2.循环遍历所有container,针对每一个container,在一个绿色线程中运行方法reap_container来实现删除container的相关数据;
下一篇博客将继续swift-account-reaper的分析工作


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条