分享

Ceilometer之notification agent代码分析

eason 发表于 2016-3-9 16:45:49 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10845
说完了polling agent,咱们接着说notification agent,打开setup.cfg文件,找到入口 ceilometer-agent-notification = ceilometer.cmd.agent_notification:main,打开文件,发现启动了NotificationService服务,然后看start方法,核心代码如下
[mw_shl_code=applescript,true]self.listeners, self.pipeline_listeners = [], []
        self._configure_main_queue_listeners(transporter, event_transporter)[/mw_shl_code]

意思是配置主消息队列监听器(主消息队列是指exchange为nova,cinder,neutron等的消息队列),来到该方法下,
[mw_shl_code=applescript,true]notification_manager = self._get_notifications_manager(transporter)

for ext in notification_manager:
            handler = ext.obj

            for new_tar in handler.get_targets(cfg.CONF):
                if new_tar not in targets:
                    targets.append(new_tar)
                endpoints.append(handler)
        urls = cfg.CONF.notification.messaging_urls or [None]
        for url in urls:
            transport = messaging.get_transport(url)
            listener = messaging.get_notification_listener(
                transport, targets, endpoints)
            listener.start()
            self.listeners.append(listener)
[/mw_shl_code]

首先调用命名空间是ceilometer.notification的plugins(还在setup.cfg中),然后获取相关的endpoins和targets,最后启动监听器,最重要的是启动了监听器

我们不断找监听器的代码,最后发现是在oslo_messaging中的server.py文件中,看他的start方法
[mw_shl_code=applescript,true]if self._executor is not None:
            return
        try:
            listener = self.dispatcher._listen(self.transport)
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self._executor = self._executor_cls(self.conf, listener,
                                            self.dispatcher)
        self._executor.start()[/mw_shl_code]

这里还是用到了plugin,命名空间是oslo.messaging.executors,调用执行器执行代码,我们debug获得plugin是哪个,结果是eventlet,找到他的代码,
文件在/usr/lib/python2.7/site-packages/oslo_messaging/_executors/impl_eventlet.py中,看他的start方法,核心如下
[mw_shl_code=applescript,true]@excutils.forever_retry_uncaught_exceptions
        def _executor_thread():
            try:
                while self._running:
                    incoming = self.listener.poll()
                    if incoming is not None:
                        self._dispatch(incoming)
            except greenlet.GreenletExit:
                return[/mw_shl_code]
意思就是不断调用listener的poll方法获得incoming,(意思就是不断获取消息队列上的消息),如果有,就执行self._dispatch(incoming)


我们找到listener,
[mw_shl_code=applescript,true]listener = AMQPListener(self, conn)
        for target, priority in targets_and_priorities:
            conn.declare_topic_consumer(
                exchange_name=self._get_exchange(target),
                topic='%s.%s' % (target.topic, priority),
                callback=listener, queue_name=pool)
        return listener[/mw_shl_code]
可以看出其实就是设置了许多消费者,监听消息队列上的信息,exchange_name就是nova,cinder之类的,topic是notification,priority类似与info之类


设置这样的消费者就监听了主消息队列上的消息,如果获得了消息(比如某事件触发来了compute.instance.update),然后执行self._dispatch(incoming),我们找到该方法,
[mw_shl_code=applescript,true]for screen, callback in self._callbacks_by_priority.get(priority, []):
            if screen and not screen.match(ctxt, publisher_id, event_type,
                                           metadata, payload):
                continue
            localcontext.set_local_context(ctxt)
            try:
                if executor_callback:
                    ret = executor_callback(callback, ctxt, publisher_id,
                                            event_type, payload, metadata)
                else:
                    ret = callback(ctxt, publisher_id, event_type, payload,
                                   metadata)
                ret = NotificationResult.HANDLED if ret is None else ret
                if self.allow_requeue and ret == NotificationResult.REQUEUE:
                    return ret
            finally:
                localcontext.clear_local_context()
        return NotificationResult.HANDLED[/mw_shl_code]

_callbacks_by_priority是filter_rule和endpoint的方法的dict,首先判断是否匹配(事件类型之类的),找到某个endpoint的方法,执行它即可。该回调方法的作用一般是publish消息。


以compute的notification中的instance.py的Memory为例子,回调方法执行其父类NotificationBase的info方法,
[mw_shl_code=applescript,true]notification = messaging.convert_to_old_notification_format(
            'info', ctxt, publisher_id, event_type, payload, metadata)
        self.to_samples_and_publish(context.get_admin_context(), notification)[/mw_shl_code]
将消息发布出去即可,但是在发布过程中,需要调用Memory的get_sample方法返回处理过的消息。重要的是,发布消息的途径还是在pipeline中定义的,如果消息匹配到了相应的meter,就会按照pipeline中相应的sink发布消息。



已有(1)人评论

跳转到指定楼层
eason 发表于 2016-3-9 16:47:15
给大家推荐一个博客
http://catkang.github.io/2015/11 ... fication.html#ljdvz
本文参考了他的内容
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条