分享

OpenStack Swift源码导读之业务整体架构和Proxy进程

本帖最后由 xioaxu790 于 2014-9-29 14:52 编辑
问题导读
1、Swift的源码目录结构分别是什么?
2、你如何理解基于PasteDeploy的堆栈式WSGI架构?
3、什么是节点寻找过程?





OpenStack的源码分析在网上已经非常多了,针对各个部分的解读亦是非常详尽。这里我根据自己的理解把之前读过的Swift源码的一些要点记录一下,希望给需要的同学能带来一些帮助。

一、Swift的整体框架图
1.png

Swift代码树


如上图,Swift的源码目录结构。其中proxy是前端的业务接入进程。account、container和object目录分别是账户、容器和对象的业务处理逻辑进程。common目录是一些通用工具代码。common中比较重要的有:哈希环的处理逻辑。接下来会依次介绍各个进程的源码逻辑和一些关键点机制。

各个业务进程或模块之间的逻辑关系可以参考《Openstack Swift简介》文中的架构图。

二、Proxy进程的业务处理
首先需要掌握基于PasteDeploy的堆栈式WSGI架构。根据PasteDeploy定义的各个层,可以很快理清配置文件定义的代码流程,从middleware到server。找到最外层的middleware,即是业务的入口。对于proxy进程,可以简单给出业务时序图:
1.png

WSGI业务流程示意图

每一层的分工非常清晰,如在proxy进程默认配置文件中,最上层是做异常处理,所有的业务流程抛出的未处理的异常,在这里都将得到处理。

Proxy进程会分析请求的URI(account、container和object组成的资源路径)和请求方法(put、del等)来分析当前请求的资源的具体类型,然后分贝找到控制该资源的controller,由controller来分发请求到具体的资源server。分发到原则是一致性哈希环。一致性哈希环在系统初始化时由工具生成,在《Swift 和 Keystone单机安装总结》一文中有具体的操作步骤。

在《Openstack Swift简介》从理论上面介绍了具体的节点寻找过程。采用md5值加移位的方式来确定part,然后找到所有的虚拟节点。具体的代码为:
  1. container_partition, containers = self.app.container_ring.get_nodes(
  2. self.account_name, self.container_name)
  3. def get_nodes(self, account, container=None, obj=None):
  4. """
  5. Get the partition and nodes
  6. for an account/container/object.
  7. If a node is responsible
  8. for more than one replica, it will
  9. only appear in the
  10. output once.
  11. :param account: account name
  12. :param
  13. container: container name
  14. :param obj: object name
  15. :returns: a tuple of (partition, list of node dicts)
  16. Each node dict will have at least the following keys:
  17. ======
  18. ===============================================================
  19. id unique integer
  20. identifier amongst devices
  21. weight a float of the
  22. relative weight of this device as compared to
  23. others;
  24. this indicates how many partitions the builder will try
  25. to assign
  26. to this device
  27. zone integer indicating
  28. which zone the device is in; a given
  29. partition
  30. will not be assigned to multiple devices within the
  31. same zone
  32. ip the ip address of the
  33. device
  34. port the tcp port of the device
  35. device the device's name on disk (sdb1, for
  36. example)
  37. meta general use 'extra'
  38. field; for example: the online date, the
  39. hardware
  40. description
  41. ======
  42. ===============================================================
  43. """
  44. part = self.get_part(account,
  45. container, obj)
  46. return part,
  47. self._get_part_nodes(part)
  48. def get_part(self, account, container=None, obj=None):
  49. """
  50. Get the partition for an
  51. account/container/object.
  52. :param account: account name
  53. :param
  54. container: container name
  55. :param obj: object name
  56. :returns: the partition number
  57. """
  58. key = hash_path(account, container, obj, raw_digest=True)
  59. if time() >; self._rtime:
  60. self._reload()
  61. part = struct.unpack_from('>;I', key)[0] >>
  62. self._part_shift
  63. return part
  64. def _get_part_nodes(self, part):
  65. part_nodes = []
  66. seen_ids = set()
  67. for r2p2d in
  68. self._replica2part2dev_id:
  69. if
  70. part <; len(r2p2d):
  71. dev_id =
  72. r2p2d[part]
  73. if dev_id
  74. not in seen_ids:
  75. part_nodes.append(self.devs[dev_id])
  76. seen_ids.add(dev_id)
  77. return part_nodes
复制代码


然后根据quorum原则来决定当前请求至少需要几个节点成功即可返回。如NWR分别为322,则至少需要2个节点写成功,才能确保此次写成功。体现在公用的make_request方法中:
  1. def make_requests(self, req, ring, part, method, path, headers,
  2. query_string=''):
  3. """
  4. Sends an
  5. HTTP request to multiple nodes and aggregates the results.
  6. It attempts the primary nodes concurrently, then iterates
  7. over the
  8. handoff nodes as needed.
  9. :param req: a request sent by the client
  10. :param ring: the ring used for finding backend servers
  11. :param part: the partition number
  12. :param method: the method to send to the backend
  13. :param
  14. path: the path to send to the backend
  15. (full path ends up being /<$device>/<$part>/<$path>)
  16. :param headers: a list of dicts, where each dict
  17. represents one
  18. backend request that should be made.
  19. :param query_string:
  20. optional query string to send to the backend
  21. :returns: a
  22. swob.Response object
  23. """
  24. start_nodes = ring.get_part_nodes(part)
  25. nodes =
  26. GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
  27. pile = GreenAsyncPile(len(start_nodes))
  28. for head in
  29. headers:
  30. pile.spawn(self._make_request, nodes, part, method, path,
  31. head, query_string, self.app.logger.thread_locals)
  32. response = []
  33. statuses = []
  34. for
  35. resp in pile:
  36. if not resp:
  37. continue
  38. response.append(resp)
  39. statuses.append(resp[0])
  40. if self.have_quorum(statuses,
  41. len(start_nodes)):
  42. break
  43. # give any pending requests *some* chance to finish
  44. pile.waitall(self.app.post_quorum_timeout)
  45. while len(response) <; len(start_nodes):
  46. response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
  47. statuses, reasons, resp_headers, bodies = zip(*response)
  48. return self.best_response(req, statuses, reasons, bodies,
  49. '%s %s' % (self.server_type, req.method),
  50. headers=resp_headers)
复制代码




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条