分享

Flink难点:彻底明白CEP2,条件分类


问题导读

1.对于接受事件该如何实现过滤?
2.CEP都有哪些条件?
3.迭代条件和简单条件有什么区别?
4.如何组合条件?


接上篇
Flink难点CEP1:什么是CEP以及量词的含义
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27142


条件
对于每个模式,可以指定接受事件必须满足的条件,用来“传入”到模式中,例如 其值应大于5,或大于先前接受的事件的平均值。 可以通过pattern.where(),pattern.or()或pattern.until()方法指定事件属性的条件。 条件可以分为IterativeConditions(迭代条件)和SimpleConditions(简单条件)。

迭代条件:这是最常见的条件类型。 这是可以如何指定一个条件,该条件基于先前接受的事件的属性或其子集的统计信息来接受后续事件。

下面是迭代条件的代码,如果名称以“foo”开头,则接受名为“middle”的模式的下一个事件,并且如果该模式的先前接受的事件的价格总和加上当前的价格 事件不超过5.0的值。 迭代条件是非常有用的,尤其是与循环模式组合,例如, oneOrMore().

[mw_shl_code=scala,true]middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )[/mw_shl_code]

[mw_shl_code=java,true]middle.oneOrMore()
    .subtype(SubEvent.class)
    .where(new IterativeCondition<SubEvent>() {
        @Override
        public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
            if (!value.getName().startsWith("foo")) {
                return false;
            }
   
            double sum = value.getPrice();
            for (Event event : ctx.getEventsForPattern("middle")) {
                sum += event.getPrice();
            }
            return Double.compare(sum, 5.0) < 0;
        }
    });[/mw_shl_code]

注意对ctx.getEventsForPattern(...)的调用将查找给定潜在匹配项的所有先前接受的事件。 此操作的成本可能会有所不同,因此在实施的条件时,请尽量减少其使用。

所描述的上下文也提供了对事件时间特征的访问。 有关更多信息,请参阅时间上下文(Time context.)

简单条件:这种类型的条件扩展了前面提到的IterativeCondition类,并且仅根据事件本身的属性决定是否接受事件。

[mw_shl_code=scala,true]start.where(event => event.getName.startsWith("foo"))
[/mw_shl_code]
[mw_shl_code=java,true]start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});[/mw_shl_code]
最后,还可以通过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型(此处为Event)的子类型。
[mw_shl_code=scala,true]start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
[/mw_shl_code]
[mw_shl_code=java,true]start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});[/mw_shl_code]


组合条件:如上所示,您可以将子类型条件与其他条件组合使用。 这适用于所有条件。 您可以通过顺序调用where()来任意组合条件。 最终结果将是各个条件的结果的逻辑AND。 要使用OR组合条件,可以使用or()方法,如下所示。
[mw_shl_code=scala,true]pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
[/mw_shl_code]
[mw_shl_code=java,true]pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});[/mw_shl_code]

停止条件:在循环模式(oneOrMore()和oneOrMore()。optional())的情况下,还可以指定停止条件,例如: 接受值大于5的事件,直到值的总和小于50。

为了更好地理解它,请看下面的示例。特定

示例:

1.模式如“(a +until b)”(one or more "a" until "b")

2.一系列传入事件“a1”“c”“a2”“b”“a3”

3.将输出结果:{a1 a2} {a1} {a2} {a3}。

如所见,由于停止条件,未返回{a1 a2 a3}或{a2 a3}。
下一篇
Flink难点:彻底明白CEP3:独立模式【Patterns】算子Pattern Operation
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27300



最新经典文章,欢迎关注公众号

加入About云知识星球,获取更多实用资料
微信图片_20190610161932.png





本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
金瞳 发表于 2019-12-9 14:58:21
1.对于接受事件该如何实现过滤?
- 对于接收事件都会满足条件,可以通过pattern.where ,pattern.or,pattern.util方法指定条件进行过滤。

2.CEP都有哪些条件?
- 迭代条件
- 简单条件

3.迭代条件和简单条件有什么区别?
- 迭代:
   后续的条件判断可能需要前面的判断,比如A事件发生了3次,前面发生了2次,然后下一次发生了第三次就属于迭代。
- 简单:
    仅根据事件本身的属性决定是否接受事件。

4.如何组合条件?
- 顺序调用where,相当于AND
- 也可以用OR来拼接
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条