分享

彻底明白Flink系统学习9:【Flink1.7编程】数据流Transformations介绍2窗口及相关操作

pig2 2018-12-10 18:59:14 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 8737
本帖最后由 pig2 于 2018-12-10 19:09 编辑

问题导读

1.为何产生window窗口计算?
2.你认为什么情况下使用Window Apply?
3.Window Fold可以用来做什么?
4.window 流是否可以union和join?
5.DataStream是否可以split?


关注最新经典文章,欢迎关注公众号

上一篇:
彻底明白Flink系统学习8:【Flink1.7编程基础】DataStream Transformations介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26445


这篇文章,主要讲windows,那么我们思考为什么会产生windows?
我们前面流式处理,一条条消息处理不行吗?可以的。不过有些场景使用窗口更加适合,比如我们想看10分钟内下单量是多少。那么这时候我们就可以使用窗口计算了。窗口计算是对流式的一个封装,在某个时间内,对这个时间段内的数据一起处理。

理解了什么是windows,我们接着继续:
1.Window
KeyedStream → WindowedStream        
可以在已经分区的KeyedStream上定义Windows。 Windows根据某些特征(例如,在最近5秒内到达的数据)对每个key中的数据进行分组。 有关窗口的说明,可参考窗口

[mw_shl_code=java,true]dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
[/mw_shl_code]

2.WindowAll
DataStream → AllWindowedStream
Windows可以在常规DataStream上定义。 Windows根据某些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。 有关窗口的完整说明,可参考windows
也就是说: 针对全局的不基于某个key进行分组的window的窗口函数的实现
注意:在许多情况下,这是不是并行transformation。 所有记录将收集在windowAll operator 的一个任务中。
[mw_shl_code=java,true]dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
[/mw_shl_code]

3.Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用功能应用于window,下面是window元素手工求和
如果是windowAll transformation,你需要替换为 AllWindowFunction
[mw_shl_code=java,true]windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {

    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }

});[/mw_shl_code]

[mw_shl_code=scala,true]windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }[/mw_shl_code]

4.Window Reduce
WindowedStream → DataStream
将函数reduce功能应用于窗口并返回reduce的值。
[mw_shl_code=java,true]windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {

    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }

});[/mw_shl_code]

[mw_shl_code=scala,true]windowedStream.reduce { _ + _ }
[/mw_shl_code]

5.Window Fold
WindowedStream → DataStream
将功能Fold功能应用于窗口并返回folded 值。 示例函数应用于序列(1,2,3,4,5)时,将序列folded 为字符串“start-1-2-3-4-5”:
[mw_shl_code=java,true]windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});[/mw_shl_code]
[mw_shl_code=scala,true]val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })[/mw_shl_code]

6.windows聚合
WindowedStream → DataStream
聚合窗口的内容。 min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
[mw_shl_code=java,true]windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");[/mw_shl_code]
[mw_shl_code=scala,true]windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")[/mw_shl_code]

上面窗口计算完毕,接着我们介绍新的内容,流和窗口等的结合

7.Union
DataStream* → DataStream

两个或多个数据流Union操作,来创建包含来自所有流的所有元素的新流。 注意:如果将数据流与自身union,则会在结果流中每个元素获取两次。[mw_shl_code=java,true]dataStream.union(otherStream1, otherStream2, ...);
[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.union(otherStream1, otherStream2, ...)
[/mw_shl_code]

8.Window Join
DataStream,DataStream → DataStream        

给定的key和通用窗口Join两个数据流
[mw_shl_code=java,true]dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }[/mw_shl_code]

9.Interval Join
KeyedStream,KeyedStream → DataStream
在给定的时间间隔内使用公共keye ,Join 两个keye流的两个元素e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
[mw_shl_code=java,true]// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});[/mw_shl_code]

10.Window CoGroup
DataStream,DataStream → DataStream

在给定key和通用窗口上对两个数据流进行Cogroup。
[mw_shl_code=java,true]dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}[/mw_shl_code]
这里面CoGroup与join他们之间是有关联的,CoGroup可以实现datastream join。

11.Connect
DataStream,DataStream → ConnectedStreams

“Connect”两个保留类型的数据流。 Connect允许两个流之间的共享状态。
[mw_shl_code=java,true]DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);[/mw_shl_code]
[mw_shl_code=scala,true]someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)[/mw_shl_code]

12.CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap[mw_shl_code=java,true]connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});[/mw_shl_code]

[mw_shl_code=scala,true]connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)[/mw_shl_code]

13.Split
DataStream → SplitStream
根据某些标准将流拆分为两个或更多个流。
[mw_shl_code=java,true]SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});[/mw_shl_code]

[mw_shl_code=scala,true]val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)[/mw_shl_code]

14.Select
SplitStream → DataStream
从拆分流中select一个或多个流。
[mw_shl_code=java,true]SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");[/mw_shl_code]
[mw_shl_code=scala,true]val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")[/mw_shl_code]

15.Iterate
DataStream → IterativeStream → DataStream
通过将一个operator的输出重定向到某个先前的operator,在流中创建“feedback”循环。 这对于定义不断更新模型的算法特别有用。 以下代码以流开头并连续应用迭代体。 大于0的元素将被发送回feedback通道,其余元素将向下游转发。 有关完整说明,请参阅迭代。
[mw_shl_code=java,true]IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
});[/mw_shl_code]
[mw_shl_code=scala,true]initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}[/mw_shl_code]

16.Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便使用 event time 语义的窗口。
[mw_shl_code=java,true]stream.assignTimestamps (new TimeStampExtractor() {...});[/mw_shl_code]
[mw_shl_code=scala,true]stream.assignTimestamps { timestampExtractor }
[/mw_shl_code]

已有(3)人评论

跳转到指定楼层
huangrong 发表于 2019-5-10 08:29:55
看完一节,出门上班
回复

使用道具 举报

YTP520YTP 发表于 2019-12-19 15:57:11
666666666666666666666666
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条