分享

Swift源码分析----swift-object-replicator(2)

tntzbzc 发表于 2014-11-20 15:34:54 回帖奖励 阅读模式 关闭右栏 1 13879
问题导读
1.def update_deleted(self, job)实现了什么功能?
2.在def update(self, job)方法中,对于远程副本节点,循环执行,针对每一个节点实现了什么操作?








转到2,来看方法update_deleted的实现:
  1. if job['delete']:
  2.     self.run_pool.spawn(self.update_deleted, job)
复制代码

  1. def update_deleted(self, job):
  2.     """        
  3.     同步本分区下数据到远程副本分区,并删除本分区下对象数据;
  4.     1 获取指定分区目录下各个对象的suff----suffixes;
  5.     2 遍历指定分区所有副本(除了本分区)节点,在每个副本节点上:
  6.           2.1 调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;
  7.           注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
  8.           2.2 通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;
  9.     3 当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;
  10.     注:
  11.     这里没有进行本地分区数据和远程副本数据的比较验证工作,说明这个方法需要直接同步分区下所有数据到远程副本节点;
  12.     应该适用于如下情形:
  13.     假设本分区所在设备节点号为1,所有副本设备节点号为1/2/3/4,当执行rebalance操作后,所有设备节点号为2/3/4/5,在rebalance操作过程中,
  14.     1号设备上本分区则被标志为delete;此时,需要先同步1号设备本分区数据到2/3/4/5号设备上,然后删除1号设备本分区下的数据;
  15.     在这里虽然也执行了复制1号设备数据到2/3/4号设备,但是这里并没由进行冗余复制操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
  16.     """
  17.     def tpool_get_suffixes(path):
  18.         """
  19.         获取指定分区目录下各个对象的suff;
  20.         path = job['path'] = /srv/node/local_dev['device']/objects/partition
  21.         """
  22.         return [suff for suff in os.listdir(path)
  23.                 if len(suff) == 3 and isdir(join(path, suff))]
  24.         
  25.     self.replication_count += 1
  26.     self.logger.increment('partition.delete.count.%s' % (job['device'],))
  27.     begin = time.time()
  28.         
  29.     try:
  30.         responses = []
  31.             
  32.         # 获取指定分区目录下各个对象的suff;
  33.         # job['path'] = /srv/node/local_dev['device']/objects/partition
  34.         suffixes = tpool.execute(tpool_get_suffixes, job['path'])
  35.             
  36.         if suffixes:
  37.             # job['nodes']:同一个分区相关副本除了本节点的其他的节点(所以可能有多个);
  38.             for node in job['nodes']:
  39.                     
  40.                 # 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  41.                 # 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
  42.                 # 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
  43.                 # 注:
  44.                 # 这里没有进行本地和远程数据的比较和验证操作,而是直接把本地数据拷贝到远程地址;
  45.                 # 这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
  46.                 success = self.sync(node, job, suffixes)
  47.                 if success:
  48.                     with Timeout(self.http_timeout):
  49.                         # REPLICARE方法,对应sever里面的RELICATE方法;
  50.                         # REPLICATE方法就是获取指定分区下的哈希值文件(可能有多个,因为分区下可能映射了多个对象),用于判断对象数据是否发生改变;
  51.                         # 并获取方法执行的响应信息,即远程节点上副本的哈希值;
  52.                         conn = http_connect(
  53.                             node['replication_ip'],
  54.                             node['replication_port'],
  55.                             node['device'], job['partition'], 'REPLICATE',
  56.                             '/' + '-'.join(suffixes), headers=self.headers)
  57.                         conn.getresponse().read()
  58.                 responses.append(success)
  59.         if self.handoff_delete:
  60.             # delete handoff if we have had handoff_delete successes
  61.             delete_handoff = len([resp for resp in responses if resp]) >= self.handoff_delete
  62.         else:
  63.             # delete handoff if all syncs were successful
  64.             delete_handoff = len(responses) == len(job['nodes']) and all(responses)
  65.             
  66.         # suffixes为空或请求的三个已经都响应成功后删除本地partion下的文件;
  67.         if not suffixes or delete_handoff:
  68.             self.logger.info(_("Removing partition: %s"), job['path'])
  69.             tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
  70.     except (Exception, Timeout):
  71.         self.logger.exception(_("Error syncing handoff partition"))
  72.     finally:
  73.         self.partition_times.append(time.time() - begin)
  74.         self.logger.timing_since('partition.delete.timing', begin)
复制代码


2.1.获取指定分区目录下各个对象的suff----suffixes;
2.2.遍历指定分区所有副本(除了本节点)节点,在每个副本节点上:
    2.2.1.调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;
        注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
    2.2.2.通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;
2.3.当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;
转到2.2,来看方法sync的实现:
  1. def sync(self, node, job, suffixes):  # Just exists for doc anchor point
  2.     """
  3.     通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  4.     因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
  5.     可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
  6.     """
  7.     # def rsync;
  8.     # 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  9.     # 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
  10.     # 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
  11.     return self.sync_method(node, job, suffixes)
复制代码





注:这里调用的方法是rsync;
  1. def rsync(self, node, job, suffixes):
  2.     """
  3.     通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  4.     因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
  5.     可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
  6.     """
  7.     if not os.path.exists(job['path']):
  8.         return False
  9.         
  10.     # Rsync(remote synchronize)是一个远程数据同步工具,
  11.     # 可通过LAN/WAN快速同步多台主机间的文件。
  12.     # Rsync使用所谓的“Rsync算法”来使本地和远程两个主机之间的文件达到同步,
  13.     # 这个算法只传送两个文件的不同部分,而不是每次都整份传送,因此速度相当快;
  14.     args = [
  15.             'rsync',
  16.             '--recursive',
  17.             '--whole-file',
  18.             '--human-readable',
  19.             '--xattrs',
  20.             '--itemize-changes',
  21.             '--ignore-existing',
  22.             '--timeout=%s' % self.rsync_io_timeout,
  23.             '--contimeout=%s' % self.rsync_io_timeout,
  24.             '--bwlimit=%s' % self.rsync_bwlimit,
  25.         ]
  26.     # 获取远程节点的IP;
  27.     node_ip = rsync_ip(node['replication_ip'])
  28.         
  29.     # rsync_module = node_ip::object
  30.     if self.vm_test_mode:
  31.         rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
  32.     else:
  33.         rsync_module = '%s::object' % node_ip
  34.         
  35.     had_any = False
  36.         
  37.     # 遍历suffixes,分别生成suffix的具体路径,并加载到命令行变量args中;
  38.     # 如果不存在suffixes,则说明前面获取损坏的对象数据的操作是错误的,则直接返回;
  39.     # 这里也可以看到,命令rsync可以实现同时同步多个数据对象;
  40.     for suffix in suffixes:
  41.         # job['path'] = /srv/node/local_dev['device']/objects/partition
  42.         # spath = /srv/node/local_dev['device']/objects/partition/suffix
  43.         spath = join(job['path'], suffix)
  44.         if os.path.exists(spath):
  45.             args.append(spath)
  46.             had_any = True
  47.     if not had_any:
  48.         return False
  49.         
  50.     # 添加远程数据路径到命令行变量args中;
  51.     # rsync_module = node_ip::object;
  52.     args.append(join(rsync_module, node['device'], 'objects', job['partition']))
  53.         
  54.     # 实现同步本地分区下若干数据到远程节点相应的分区下;
  55.     return self._rsync(args) == 0
复制代码




2.2.1.遍历suffixes,分别生成suffix的具体路径(/srv/node/local_dev['device']/objects/partition/suffix),并加载到命令行变量args中;
2.2.2.添加远程数据路径到命令行变量args中;
2.2.3.调用方法_rsync实现同步本地分区下若干数据到远程节点相应的分区下;
转到2.2.3,来看方法_rsync的实现:
  1. def _rsync(self, args):  
  2.     """
  3.     实现同步本地分区下若干数据到远程节点相应的分区下
  4.     """  
  5.     start_time = time.time()  
  6.     ret_val = None  
  7.     try:  
  8.         with Timeout(self.rsync_timeout):  
  9.             # 此处即为同步操作了,推送模式;  
  10.             proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)  
  11.             results = proc.stdout.read()  
  12.             ret_val = proc.wait()  
  13.     except Timeout:  
  14.         self.logger.error(_("Killing long-running rsync: %s"), str(args))  
  15.         proc.kill()  
  16.         return 1  # failure response code  
  17.     total_time = time.time() - start_time  
  18.     for result in results.split('\n'):  
  19.         if result == '':  
  20.             continue  
  21.         if result.startswith('cd+'):  
  22.             continue  
  23.         if not ret_val:  
  24.             self.logger.info(result)  
  25.         else:  
  26.             self.logger.error(result)  
  27.     if ret_val:  
  28.         error_line = _('Bad rsync return code: %(ret)d <- %(args)s') % {'args': str(args), 'ret': ret_val}  
  29.         if self.rsync_error_log_line_length:  
  30.             error_line = error_line[:self.rsync_error_log_line_length]  
  31.         self.logger.error(error_line)  
  32.     elif results:  
  33.         self.logger.info(_("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), {'src': args[-2], 'dst': args[-1], 'time': total_time})  
  34.     else:  
  35.         self.logger.debug(_("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), {'src': args[-2], 'dst': args[-1], 'time': total_time})  
  36.     return ret_val  
复制代码





转到3,来看方法update的实现:
  1. def update(self, job):
  2.     """
  3.     实现复制一个分区的高级方法;      
  4.     对于远程副本节点,循环执行,针对每一个节点实现以下操作:
  5.     1 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象的哈希值;
  6.     2 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
  7.     3 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  8.     """
  9.     self.replication_count += 1
  10.     self.logger.increment('partition.update.count.%s' % (job['device'],))
  11.     begin = time.time()
  12.         
  13.     try:
  14.         # 方法get_hashes从具体的分区(具体的object)的哈希值文件hashes.pkl获取hashes值并更新,获取本地的hashes;
  15.         # job[path]为job_path = join(obj_path, partition) = /srv/node/local_dev['device']/objects/partition;
  16.         # local_hash为hashes.pkl中的反序列化回来的内容;
  17.         # hashed为改变的数目;
  18.         hashed, local_hash = tpool_reraise(
  19.             get_hashes,
  20.             job['path'], # job['path'] = /srv/node/local_dev['device']/objects/partition
  21.             do_listdir=(self.replication_count % 10) == 0,
  22.             reclaim_age=self.reclaim_age)
  23.             
  24.         self.suffix_hash += hashed
  25.         self.logger.update_stats('suffix.hashes', hashed)
  26.         # 其他副本对应的节点数目;
  27.         # 此时attempts_left 为2 若果replica为3;
  28.         attempts_left = len(job['nodes'])
  29.             
  30.             
  31.         # 此时的nodes为除去本节点外的所有节点;
  32.         # 因为job['nodes]不包含本地节点,
  33.         # get_more_nodes(int(job['partition']))能获得除去本partion所对应节点外的其他所有节点;
  34.         nodes = itertools.chain(
  35.             job['nodes'],
  36.             # get_more_nodes:这个方法实现了获取其他副本的节点;
  37.             # 这个方法说明了三副本带来的高可用性;
  38.             # 如果replicator进程检测到对远程node执行同步操作失败;
  39.             # 那么它就会通过ring类提供的get_more_nodes接口来获得其他副本存放的node进行同步;
  40.             self.object_ring.get_more_nodes(int(job['partition'])))
  41.             
  42.         # 其他副本对应的节点数目;
  43.         # 此时attempts_left 为2 若果replica为3;
  44.         # 对于远程副本节点,循环执行,针对每一个节点实现以下操作:
  45.         # 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象的哈希值;
  46.         # 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
  47.         # 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  48.         while attempts_left > 0:
  49.             # If this throws StopIterator it will be caught way below
  50.             node = next(nodes)
  51.             attempts_left -= 1
  52.             try:
  53.                 with Timeout(self.http_timeout):
  54.                     # REPLICARE方法,对应sever里面的RELICATE方法;
  55.                     # REPLICATE方法就是获取指定分区下的哈希值文件(可能有多个,因为分区下可能映射了多个对象),用于判断对象数据是否发生改变;
  56.                     #并获取方法执行的响应信息,即远程节点上副本的哈希值;
  57.                     resp = http_connect(
  58.                             node['replication_ip'], node['replication_port'],
  59.                             node['device'], job['partition'], 'REPLICATE',
  60.                             '', headers=self.headers).getresponse()
  61.                         
  62.                     if resp.status == HTTP_INSUFFICIENT_STORAGE:
  63.                         self.logger.error(_('%(ip)s/%(device)s responded as unmounted'), node)
  64.                         attempts_left += 1
  65.                         continue
  66.                         
  67.                     if resp.status != HTTP_OK:
  68.                         self.logger.error(_("Invalid response %(resp)s from %(ip)s"), {'resp': resp.status, 'ip': node['replication_ip']})
  69.                         continue
  70.                         
  71.                     # 获取远程节点上分区的哈希值;
  72.                     remote_hash = pickle.loads(resp.read())
  73.                     del resp
  74.                         
  75.                 # 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的;
  76.                 # 如果分区下某些对象数据发生改变,其对应的哈希值文件也会发生改变;
  77.                 # 如果有不同,说明分区下的某些对象文件数据发生了变化;
  78.                 # 示例:
  79.                 # 假如 local_hash 为 123 321 122 remote_hash 123 321 124 则 122为变化的  
  80.                 # 文件路径hash值后三位会不会重复  
  81.                 suffixes = [suffix for suffix in local_hash if
  82.                             local_hash[suffix] !=
  83.                             remote_hash.get(suffix, -1)]
  84.                     
  85.                 # 如果没有不同,说明对象数据都没有变化,则继续请求下一个节点;
  86.                 if not suffixes:
  87.                     continue
  88.                     
  89.                 # 针对那些和远程节点分区上不同的哈希值,这里进行重新计算;
  90.                 # 然后再一次和远程节点分区上的哈希值进行比较;
  91.                 # 这样做的目的是确保筛选的完全准确性;
  92.                 hashed, recalc_hash = tpool_reraise(
  93.                         get_hashes,
  94.                         job['path'], recalculate=suffixes,
  95.                         reclaim_age=self.reclaim_age)
  96.                 self.logger.update_stats('suffix.hashes', hashed)
  97.                 local_hash = recalc_hash
  98.                 suffixes = [suffix for suffix in local_hash if
  99.                             local_hash[suffix] !=
  100.                             remote_hash.get(suffix, -1)]
  101.                     
  102.                 # sync方法:
  103.                 # 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
  104.                 # 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
  105.                 # 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
  106.                 self.sync(node, job, suffixes)
  107.                     
  108.                 with Timeout(self.http_timeout):
  109.                     conn = http_connect(node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE',
  110.                                         '/' + '-'.join(suffixes), headers=self.headers)
  111.                     conn.getresponse().read()
  112.                 self.suffix_sync += len(suffixes)
  113.                 self.logger.update_stats('suffix.syncs', len(suffixes))
  114.             except (Exception, Timeout):
  115.                 self.logger.exception(_("Error syncing with node: %s") % node)
  116.         self.suffix_count += len(local_hash)
  117.     except (Exception, Timeout):
  118.         self.logger.exception(_("Error syncing partition"))
  119.     finally:
  120.         self.partition_times.append(time.time() - begin)
  121.         self.logger.timing_since('partition.update.timing', begin)
复制代码



3.1.通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象的哈希值;
3.2.找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
3.3.针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
注:方法sync的解析,前面已经完成,这里不再进行赘述;



博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

已有(0)人评论

跳转到指定楼层
溜溜小哥 发表于 2015-5-22 17:15:21
你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条