分享

Openstack之keystone源代码分析1--WSGI接口流程分析

tntzbzc 发表于 2014-11-19 22:40:19 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 42923
问题导读
1.keystone是怎么通过WSGI接口访问其中的服务的?
2.你认为add_routes作用是什么?










keystone服务的启动后我们是怎么通过WSGI接口访问其中的服务的呢?
keystone-paste.ini配置文件最下面
  1. [composite:main]
  2. use = egg:Paste#urlmap
  3. /v2.0 = public_api
  4. [composite:admin]
  5. use = egg:Paste#urlmap
  6. /v2.0 = admin_api
复制代码


一般调用keystone v2.0版本的api的请求路径为http://192.168.1.x:5000/v2.0/xxxxxxx(public endpoint)和http://192.168.1.x:35357/v2.0/xxxxx(admin port)
admin和public的处理方式都是一样的,只不过权限不同,这里只分析public_api的流程,keystone-paste.ini的流程如下
  1. [pipeline:public_api]
  2. pipeline = access_log sizelimit url_normalize token_auth admin_token_auth xml_body json_body ec2_extension user_crud_extension public_service
复制代码


pastedeploy过滤流程如下
  1. [filter:access_log]
  2. paste.filter_factory = keystone.contrib.access:AccessLogMiddleware.factory
  3. [filter:sizelimit]
  4. paste.filter_factory = keystone.middleware:RequestBodySizeLimiter.factory
  5. [filter:url_normalize]
  6. paste.filter_factory = keystone.middleware:NormalizingFilter.factory
  7. [filter:token_auth]
  8. paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory
  9. [filter:admin_token_auth]
  10. paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory
  11. [filter:xml_body]
  12. paste.filter_factory = keystone.middleware:XmlBodyMiddleware.factory
  13. [filter:json_body]
  14. paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory
  15. [filter:ec2_extension]
  16. paste.filter_factory = keystone.contrib.ec2:Ec2Extension.factory
  17. [filter:user_crud_extension]
  18. paste.filter_factory = keystone.contrib.user_crud:CrudExtension.factory
  19. [app:public_service]
  20. paste.app_factory = keystone.service:public_app_factory
复制代码


一个一个来分析
首先是keystone.contrib.access:AccessLogMiddleware.factory
keystone.contrib.access模块下有两个文件init.py,core.py,init.py只有一行代码
  1. from keystone.contrib.access.core import *
复制代码


显而易见,关键代码在core.py中,core.py代码如下
  1. CONF = config.CONF
  2. LOG = logging.getLogger('access')
  3. APACHE_TIME_FORMAT = '%d/%b/%Y:%H:%M:%S'
  4. APACHE_LOG_FORMAT = (
  5.     '%(remote_addr)s - %(remote_user)s [%(datetime)s] "%(method)s %(url)s '
  6.     '%(http_version)s" %(status)s %(content_length)s')
  7. class AccessLogMiddleware(wsgi.Middleware):
  8.     """Writes an access log to INFO."""
  9.     @webob.dec.wsgify
  10.     def __call__(self, request):
  11.         data = {
  12.             'remote_addr': request.remote_addr,
  13.             'remote_user': request.remote_user or '-',
  14.             'method': request.method,
  15.             'url': request.url,
  16.             'http_version': request.http_version,
  17.             'status': 500,
  18.             'content_length': '-'}
  19.         try:
  20.             response = request.get_response(self.application)
  21.             data['status'] = response.status_int
  22.             data['content_length'] = len(response.body) or '-'
  23.         finally:
  24.             # must be calculated *after* the application has been called
  25.             now = timeutils.utcnow()
  26.             # timeutils may not return UTC, so we can't hardcode +0000
  27.             data['datetime'] = '%s %s' % (now.strftime(APACHE_TIME_FORMAT),
  28.                                           now.strftime('%z') or '+0000')
  29.             LOG.info(APACHE_LOG_FORMAT % data)
  30.         return response
复制代码
  1. #wsgi.Middleware类在keystone.commone下</pre>class Middleware(Application):    """Base WSGI middleware.    These classes require an application to be    initialized that will be called next.  By default the middleware will    simply call its wrapped app, or you can override __call__ to customize its    behavior.    """    @classmethod    def factory(cls, global_config, **local_config):        """Used for paste app factories in paste.deploy config files.        Any local configuration (that is, values under the [filter:APPNAME]        section of the paste config) will be passed into the `__init__` method        as kwargs.        A hypothetical configuration would look like:            [filter:analytics]            redis_host = 127.0.0.1            paste.filter_factory = keystone.analytics:Analytics.factory        which would result in a call to the `Analytics` class as            import keystone.analytics            keystone.analytics.Analytics(app, redis_host='127.0.0.1')        You could of course re-implement the `factory` method in subclasses,        but using the kwarg passing it shouldn't be necessary.        """        def _factory(app):            conf = global_config.copy()            conf.update(local_config)            return cls(app, **local_config)#__init__ here application=app        return _factory<br>    def __init__(self, application):<br>        self.application = application
复制代码





AccessLogMiddleware类下的__call__函数就核心,用web.dec.wsgify装饰过,参数request为Webob封装的Request对象,因为是filter,向下传递还需要一个app对象,可以看到,通过request.get_response(self.application)来获取app,向下过滤,直到找到不是filter的app对象。
那这个self.application哪来呢,毫无疑问,一定是初始化来的,这个类没有初始化函数,进入基类,wsgi,Middleware果不其然,不仅有__init__函数,还有factroy函数,pastedeploy调用factroy函数,其中
  1. cls(app, **local_config)
复制代码


初始化app,由于factory返回的是cls类本身,所以调用了__call__从而完成filter的功能。
access_log实现对请求的log记录,代码很明了。
access_log后面是sizelimit
  1. class RequestBodySizeLimiter(wsgi.Middleware):
  2.     """Limit the size of an incoming request."""
  3.     def __init__(self, *args, **kwargs):
  4.         super(RequestBodySizeLimiter, self).__init__(*args, **kwargs)
  5.     @webob.dec.wsgify(RequestClass=wsgi.Request)
  6.     def __call__(self, req):
  7.         if req.content_length > CONF.max_request_body_size:
  8.             raise exception.RequestTooLarge()
  9.         if req.content_length is None and req.is_body_readable:
  10.             limiter = utils.LimitingReader(req.body_file,
  11.                                            CONF.max_request_body_size)
  12.             req.body_file = limiter
  13.         return self.application
  14. 判断request大小,如果大于最大值,则raise Excetion报异常,否则继续向下传递,url_normalize
  15. class NormalizingFilter(wsgi.Middleware):
  16.     """Middleware filter to handle URL normalization."""
  17.     def process_request(self, request):
  18.         """Normalizes URLs."""
  19.         # Removes a trailing slash from the given path, if any.
  20.         if (len(request.environ['PATH_INFO']) > 1 and
  21.                 request.environ['PATH_INFO'][-1] == '/'):
  22.             request.environ['PATH_INFO'] = request.environ['PATH_INFO'][:-1]
  23.         # Rewrites path to root if no path is given.
  24.         elif not request.environ['PATH_INFO']:
  25.             request.environ['PATH_INFO'] = '/'
复制代码


这个没有直接在NormalizingFilter内部实现__call__功能,而是重载了process_request函数覆盖其基类Middleware的process_request函数并调用其__call__函数,如下,这个函数的功能应该是重新定义request的路径,使其规范化。
  
  1. @webob.dec.wsgify(RequestClass=Request)
  2.     def __call__(self, request):
  3.         try:
  4.             response = self.process_request(request)
  5.             if response:
  6.                 return response
  7.             response = request.get_response(self.application)
  8.             return self.process_response(request, response)
  9.         except exception.Error as e:
  10.             LOG.warning(e)
  11.             return render_exception(e,
  12.                                     user_locale=request.best_match_language())
  13.         except TypeError as e:
  14.             LOG.exception(e)
  15.             return render_exception(exception.ValidationError(e),
  16.                                     user_locale=request.best_match_language())
  17.         except Exception as e:
  18.             LOG.exception(e)
  19.             return render_exception(exception.UnexpectedError(exception=e),
  20.                                     user_locale=request.best_match_language())
复制代码


并继续向下调用,token_auth,一样的,处理request的public auth数据
  1. class TokenAuthMiddleware(wsgi.Middleware):
  2.     def process_request(self, request):
  3.         token = request.headers.get(AUTH_TOKEN_HEADER)
  4.         context = request.environ.get(CONTEXT_ENV, {})
  5.         context['token_id'] = token
  6.         if SUBJECT_TOKEN_HEADER in request.headers:
  7.             context['subject_token_id'] = (
  8.                 request.headers.get(SUBJECT_TOKEN_HEADER))
  9.         request.environ[CONTEXT_ENV] = context
复制代码


再往下是admin_token_auth,处理admin auth数据
  1. class AdminTokenAuthMiddleware(wsgi.Middleware):
  2.     """A trivial filter that checks for a pre-defined admin token.
  3.     Sets 'is_admin' to true in the context, expected to be checked by
  4.     methods that are admin-only.
  5.     """
  6.     def process_request(self, request):
  7.         token = request.headers.get(AUTH_TOKEN_HEADER)
  8.         context = request.environ.get(CONTEXT_ENV, {})
  9.         context['is_admin'] = (token == CONF.admin_token)
  10.         request.environ[CONTEXT_ENV] = context
复制代码


然后xml_body,xml to json
  1. class XmlBodyMiddleware(wsgi.Middleware):
  2.     """De/serializes XML to/from JSON."""
  3.     def process_request(self, request):
  4.         """Transform the request from XML to JSON."""
  5.         incoming_xml = 'application/xml' in str(request.content_type)
  6.         if incoming_xml and request.body:
  7.             request.content_type = 'application/json'
  8.             try:
  9.                 request.body = jsonutils.dumps(
  10.                     serializer.from_xml(request.body))
  11.             except Exception:
  12.                 LOG.exception('Serializer failed')
  13.                 e = exception.ValidationError(attribute='valid XML',
  14.                                               target='request body')
  15.                 return wsgi.render_exception(e)
复制代码


然后 json_body,对post的json格式数据进行解析
  1. class JsonBodyMiddleware(wsgi.Middleware):
  2.     """Middleware to allow method arguments to be passed as serialized JSON.
  3.     Accepting arguments as JSON is useful for accepting data that may be more
  4.     complex than simple primitives.
  5.     In this case we accept it as urlencoded data under the key 'json' as in
  6.     json= but this could be extended to accept raw JSON
  7.     in the POST body.
  8.     Filters out the parameters `self`, `context` and anything beginning with
  9.     an underscore.
  10.     """
  11.     def process_request(self, request):
  12.         # Abort early if we don't have any work to do
  13.         params_json = request.body
  14.         if not params_json:
  15.             return
  16.         # Reject unrecognized content types. Empty string indicates
  17.         # the client did not explicitly set the header
  18.         if request.content_type not in ('application/json', ''):
  19.             e = exception.ValidationError(attribute='application/json',
  20.                                           target='Content-Type header')
  21.             return wsgi.render_exception(e)
  22.         params_parsed = {}
  23.         try:
  24.             params_parsed = jsonutils.loads(params_json)
  25.         except ValueError:
  26.             e = exception.ValidationError(attribute='valid JSON',
  27.                                           target='request body')
  28.             return wsgi.render_exception(e)
  29.         finally:
  30.             if not params_parsed:
  31.                 params_parsed = {}
  32.         params = {}
  33.         for k, v in params_parsed.iteritems():
  34.             if k in ('self', 'context'):
  35.                 continue
  36.             if k.startswith('_'):
  37.                 continue
  38.             params[k] = v
  39.         request.environ[PARAMS_ENV] = params
复制代码


然后ec2_extension,s3_extension,这两个过滤filter实现的是Routes的机制,完成路径的匹配,如果在路径中没有找到,再调用self.application,如果找到,直接返回结果,代码如下;
  1. class Ec2Extension(wsgi.ExtensionRouter):
  2.     def add_routes(self, mapper):
  3.         ec2_controller = controllers.Ec2Controller()
  4.         # validation
  5.         mapper.connect(
  6.             '/ec2tokens',
  7.             controller=ec2_controller,
  8.             action='authenticate',
  9.             conditions=dict(method=['POST']))
  10.         # crud
  11.         mapper.connect(
  12.             '/users/{user_id}/credentials/OS-EC2',
  13.             controller=ec2_controller,
  14.             action='create_credential',
  15.             conditions=dict(method=['POST']))
  16.         mapper.connect(
  17.             '/users/{user_id}/credentials/OS-EC2',
  18.             controller=ec2_controller,
  19.             action='get_credentials',
  20.             conditions=dict(method=['GET']))
  21.         mapper.connect(
  22.             '/users/{user_id}/credentials/OS-EC2/{credential_id}',
  23.             controller=ec2_controller,
  24.             action='get_credential',
  25.             conditions=dict(method=['GET']))
  26.         mapper.connect(
  27.             '/users/{user_id}/credentials/OS-EC2/{credential_id}',
  28.             controller=ec2_controller,
  29.             action='delete_credential',
  30.             conditions=dict(method=['DELETE']))
复制代码


如果在这些路径中没有找到,则在其基类ExtensionRouter中继续向下传递,如下所示:mapper.connect(xxxxxxxxxx,contoller=self.application),也实现了factory和上面一样调用自身的__call__,在其基类Router中返回self._router.
  1. class ExtensionRouter(Router):
  2.     """A router that allows extensions to supplement or overwrite routes.
  3.     Expects to be subclassed.
  4.     """
  5.     def __init__(self, application, mapper=None):
  6.         if mapper is None:
  7.             mapper = routes.Mapper()
  8.         self.application = application
  9.         self.add_routes(mapper)
  10.         mapper.connect('{path_info:.*}', controller=self.application)
  11.         super(ExtensionRouter, self).__init__(mapper)
  12.     def add_routes(self, mapper):
  13.         pass
  14.     @classmethod
  15.     def factory(cls, global_config, **local_config):
  16.         """Used for paste app factories in paste.deploy config files.
  17.         Any local configuration (that is, values under the [filter:APPNAME]
  18.         section of the paste config) will be passed into the `__init__` method
  19.         as kwargs.
  20.         A hypothetical configuration would look like:
  21.             [filter:analytics]
  22.             redis_host = 127.0.0.1
  23.             paste.filter_factory = keystone.analytics:Analytics.factory
  24.         which would result in a call to the `Analytics` class as
  25.             import keystone.analytics
  26.             keystone.analytics.Analytics(app, redis_host='127.0.0.1')
  27.         You could of course re-implement the `factory` method in subclasses,
  28.         but using the kwarg passing it shouldn't be necessary.
  29.         """
  30.         def _factory(app):
  31.             conf = global_config.copy()
  32.             conf.update(local_config)
  33.             return cls(app, **local_config)
  34.         return _factory
复制代码


ec2_extension除了实现上述功能,在其core函数中实现了如下代码:
  1. EXTENSION_DATA = {
  2.     'name': 'OpenStack EC2 API',
  3.     'namespace': 'http://docs.openstack.org/identity/api/ext/'
  4.                  'OS-EC2/v1.0',
  5.     'alias': 'OS-EC2',
  6.     'updated': '2013-07-07T12:00:0-00:00',
  7.     'description': 'OpenStack EC2 Credentials backend.',
  8.     'links': [
  9.         {
  10.             'rel': 'describedby',
  11.             # TODO(ayoung): needs a description
  12.             'type': 'text/html',
  13.             'href': 'https://github.com/openstack/identity-api',
  14.         }
  15.     ]}
  16. extension.register_admin_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
  17. extension.register_public_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
复制代码


注册这个是有用的,在pipeline的最后一步会遇到。这里先放在着。S3_extension和ec2也是类似的实现了一个S3Extension,S3Controller和一个extension.registerxxxx(),user_crud也是类似。
最后一步是public_app_factroy,OK,找到他,在service.py下面
  1. @fail_gracefully
  2. def public_app_factory(global_conf, **local_conf):
  3.     controllers.register_version('v2.0')
  4.     conf = global_conf.copy()
  5.     conf.update(local_conf)
  6.     return wsgi.ComposingRouter(routes.Mapper(),
  7.                                 [identity.routers.Public(),
  8.                                  token.routers.Router(),
  9.                                  routers.VersionV2('public'),
  10.                                  routers.Extension(False)])
复制代码


配置完成后,返回了ComposingRouter对象,这个类下面做了什么呢,如下
  1. class ComposingRouter(Router):
  2.     def __init__(self, mapper=None, routers=None):
  3.         if mapper is None:
  4.             mapper = routes.Mapper()
  5.         if routers is None:
  6.             routers = []
  7.         for router in routers:
  8.             router.add_routes(mapper)
  9.         super(ComposingRouter, self).__init__(mapper)
复制代码
其实也没做什么,主要就是这一句,routers.add_routers(mapper)
,就是调用indentity.routers.Public(), token.routers.Router(),routers.VersionV2('public'),routers.Extension(False)类下的add_routes函数,并把mapper作为参数传进去了。继续看,这里注意routers.Extension这个类,跟进去
  1. class Extension(wsgi.ComposableRouter):
  2.     def __init__(self, is_admin=True):
  3.         if is_admin:
  4.             self.controller = controllers.AdminExtensions()
  5.         else:
  6.             self.controller = controllers.PublicExtensions()
  7.     def add_routes(self, mapper):
  8.         extensions_controller = self.controller
  9.         mapper.connect('/extensions',
  10.                        controller=extensions_controller,
  11.                        action='get_extensions_info',
  12.                        conditions=dict(method=['GET']))
  13.         mapper.connect('/extensions/{extension_alias}',
  14.                        controller=extensions_controller,
  15.                        action='get_extension_info',
  16.                        conditions=dict(method=['GET']))
复制代码


add_routes实现了Routes,这不奇怪,其他的三个类也是实现了上述功能,关键是他的controller,跟到PublicExtension下面
  1. class PublicExtensions(Extensions):
  2.     @property
  3.     def extensions(self):
  4.         return extension.PUBLIC_EXTENSIONS
复制代码


返回extension.PUBLIC_EXTENSION,还记得上面的
  1. extension.register_public_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
复制代码

  1. def register_public_extension(url_prefix, extension_data):
  2.     """Same as register_admin_extension but for public extensions."""
  3.     PUBLIC_EXTENSIONS[url_prefix] = extension_data
复制代码


就是给PUBLI_EXTENSION赋值,因此,这里指向了前面的EC2Controller,S3Controller,CRUDController。OK,这样就可以根据注册的服务,来完成映射的分发,调用public_api的流程基本完成。
问题“
1.Controller是怎么调用Manager的,Manager又是怎么调用Dirver的?这是根据不同Controller下的dependency装饰器来实现的。
2.Keystone下notification机制
bluefire1991
2013.11.2
19:47:54


相关文章
Openstack源代码分析之keystone服务(keystone-all)
http://www.aboutyun.com/thread-10139-1-1.html

Openstack之keystone源代码分析2--Controller->Manager->Driver
http://www.aboutyun.com/thread-10138-1-1.html


[openstack][G版]keystone源码学习
http://www.aboutyun.com/thread-10136-1-1.html






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条