分享

请问有人用过Oozie的JMS通知功能吗?

hapjin 发表于 2016-5-17 22:10:58 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 11445
关于Oozie的JMS Notification.....
官网上说:安装好ActiveMQ后,自己作为订阅者接收提交给Oozie的作业的结果。
除了官网外,有没有这方面的详细文档?

已有(3)人评论

跳转到指定楼层
einhep 发表于 2016-5-17 22:30:37


一,介绍

本文使用Oozie的消息通知功能,并根据JMS规范中的消息选择器(Selector)实现 根据作业的ID来过滤消息。

首先搭建好JMS Provider(ActiveMQ) ,并进行相关配置,这样Oozie Server就可以把消息发送给JMS Provider了,我们使用了ActiveMQ作为消息服务器。相关配置可参考:Oozie 使用ActiveMQ实现 JMS通知

在ActiveMQ中配置好Topic,该Topic名称为username。即该username。即该{username}提交的所有的作业,作业的执行结果都会发送到名为${username}的Topic上。

对于Oozie而言,每提交一个作业会生成一个JobID,而我们的需求是,只对某些JobID感兴趣,并不是对该用户提交的所有作业感兴趣。因此,需要根据JobID来过滤订阅到${username}这个Topic上的消息。



二,实现JMS 消息选择器

根据JMS规范,在创建一个消费者时,可使用消息选择器。这样消费者就只能接收到被消息选择器过滤以后的消息了。而消息选择器使用 消息属性 和 消息头作为条件表达式,这些条件表达式使用boolean逻辑来声明应该将哪一条消息传递给JMS消费者。请注意:消息选择器无法参考消息体内的数据,它只能使用消息头和消息属性。

于是,我们就需要知道Oozie生成的JMS消息的消息头和消息属性是什么?参考官方文档可以看出,Oozie生成的JMS消息的头部由JMSHeaderConstants类来定义。其源代码如下:


[mw_shl_code=bash,true]/**
*
* Class holding constants used in JMS selectors
*/
public final class JMSHeaderConstants {
    // JMS Application specific properties for selectors
    public static final String EVENT_STATUS = "eventStatus";
    public static final String SLA_STATUS = "slaStatus";
    public static final String APP_NAME = "appName";
    public static final String USER = "user";
    public static final String MESSAGE_TYPE = "msgType";
    public static final String APP_TYPE = "appType";
   
    //public static final String JOBID = "jobId";// add for my specific selectors
    // JMS Header property
    public static final String MESSAGE_FORMAT = "msgFormat";

}[/mw_shl_code]

从中可以看出,可以使用EVENT_STATUS、SLA_STATUS、USER……相关属性构造选择器。但是官方JMSHeaderConstants类的源代码中并没有JobID这个属性。

因此,需要修改源代码,添加JobID,以使得我们消费者能够根据JobID进行过滤。(如上,第14行就是我自己添加的代码)

此外,还需要在JobMessage类的构造方法里面添加一行:

[mw_shl_code=bash,true]        jmsMessageProperties.put(JMSHeaderConstants.JOBID, id);//add for my specific selectors
[/mw_shl_code]


至此,Oozie端的代码修改完毕。
这两个类在oozie-client这个maven工程中
下1.png
进入到该工程的文件夹下,使用
mvn clean compile
mvn clean package
生成相关的jar文件
2.png
将生成的oozie-client-4.1.0.jar文件替换掉原来的Oozie 安装目录下的lib包下的对应的jar包。重启Oozie即可。
另外,Apache Oozie-4.1.0是不支持Spark作业的。而Cloudera-Oozie-4.1.0则是支持Spark作业的。
Apache oozie-client.jar 与Cloudera的 oozie-client.jar对比如下:
3.png



[mw_shl_code=bash,true]5月 6, 中午12点09:47.473     FATAL     org.apache.oozie.service.Services     

SERVER[datanode1] Runtime Exception during Services Load. Check your list of 'oozie.services' or 'oozie.services.ext'

5月 6, 中午12点09:47.480     FATAL     org.apache.oozie.service.Services     

SERVER[datanode1] E0103: Could not load service classes, resource [spark-action-0.1.xsd] not found
org.apache.oozie.service.ServiceException: E0103: Could not load service classes, resource [spark-action-0.1.xsd] not found
    at org.apache.oozie.service.Services.loadServices(Services.java:309)
    at org.apache.oozie.service.Services.init(Services.java:213)
    at org.apache.oozie.servlet.ServicesLoader.contextInitialized(ServicesLoader.java:46)
    at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4210)
    at org.apache.catalina.core.StandardContext.start(StandardContext.java:4709)
    at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:802)
    at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:779)
    at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:583)
    at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:944)
    at org.apache.catalina.startup.HostConfig.deployWARs(HostConfig.java:779)
    at org.apache.catalina.startup.HostConfig.deployApps(HostConfig.java:505)
    at org.apache.catalina.startup.HostConfig.start(HostConfig.java:1322)
    at org.apache.catalina.startup.HostConfig.lifecycleEvent(HostConfig.java:325)
    at org.apache.catalina.util.LifecycleSupport.fireLifecycleEvent(LifecycleSupport.java:142)
    at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1068)
    at org.apache.catalina.core.StandardHost.start(StandardHost.java:822)
    at org.apache.catalina.core.ContainerBase.start(ContainerBase.java:1060)
    at org.apache.catalina.core.StandardEngine.start(StandardEngine.java:463)
    at org.apache.catalina.core.StandardService.start(StandardService.java:525)
    at org.apache.catalina.core.StandardServer.start(StandardServer.java:759)
    at org.apache.catalina.startup.Catalina.start(Catalina.java:595)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.catalina.startup.Bootstrap.start(Bootstrap.java:289)
    at org.apache.catalina.startup.Bootstrap.main(Bootstrap.java:414)
Caused by: java.lang.IllegalArgumentException: resource [spark-action-0.1.xsd] not found[/mw_shl_code]

这样,Oozie发送给JMS消息服务器的消息,在头部中都会带一个JobID了,而我们就根据这个JobID属性进行消息过滤。

可参考Oozie官方文档中给出的一句话:


JMS messages published are javax.jms.TextMessage . The body contains JSON and the header contains multiple properties that can be used as
selectors. The header properties are not repeated in the body of the  message to keep the messages small.

由于JobID已经在body里面了,故Oozie并没有把它放到Header中去。



三,使用消息选择器来过滤消息

由于现在Oozie发送给ActiveMQ的每条JMS消息都会在头部带一个JobID,故现在可使用JobID作为消息选择器过滤消息了。


String selector=JMSHeaderConstants.JOBID + "='" + jobid + "'";
MessageConsumer consumer = session.createConsumer(topic, selector);

String selector=JMSHeaderConstants.JOBID + "='" + jobid + "'";
MessageConsumer consumer = session.createConsumer(topic, selector);




来自:hapjin

回复

使用道具 举报

穿裤衩的小破孩 发表于 2017-6-21 20:56:08
请问你图中这个问题是怎么解决的?
SERVER[datanode1] Runtime Exception during Services Load. Check your list of 'oozie.services' or 'oozie.services.ext'

5月 6, 中午12点09:47.480     FATAL     org.apache.oozie.service.Services     

SERVER[datanode1] E0103: Could not load service classes, resource [spark-action-0.1.xsd] not found
org.apache.oozie.service.ServiceException: E0103: Could not load service classes, resource [spark-action-0.1.xsd] not found
回复

使用道具 举报

einhep 发表于 2017-6-21 21:40:39
穿裤衩的小破孩 发表于 2017-6-21 20:56
请问你图中这个问题是怎么解决的?
SERVER[datanode1] Runtime Exception during Services Load. Check yo ...



放大上面图,可以看到左侧文件,右边红色图中则没有。
所以应该需要修改下源码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条