about云开发

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

about云开发 首页 大数据 Flink 连载型 查看内容

彻底明白Flink系统学习9:【Flink1.7编程】数据流Transformations介绍2窗口及相关操作

2018-12-12 20:11| 发布者: admin| 查看: 9| 评论: 0|原作者: pig2

摘要: 问题导读 1.为何产生window窗口计算? 2.你认为什么情况下使用Window Apply? 3.Window Fold可以用来做什么? 4.window 流是否可以union和join? 5.DataStream是否可以split? 关注最新经典文章,欢迎关 ...
问题导读

1.为何产生window窗口计算?
2.你认为什么情况下使用Window Apply?
3.Window Fold可以用来做什么?
4.window 流是否可以union和join?
5.DataStream是否可以split?


关注最新经典文章,欢迎关注公众号

上一篇:
彻底明白Flink系统学习8:【Flink1.7编程基础】DataStream Transformations介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26445


这篇文章,主要讲windows,那么我们思考为什么会产生windows?
我们前面流式处理,一条条消息处理不行吗?可以的。不过有些场景使用窗口更加适合,比如我们想看10分钟内下单量是多少。那么这时候我们就可以使用窗口计算了。窗口计算是对流式的一个封装,在某个时间内,对这个时间段内的数据一起处理。

理解了什么是windows,我们接着继续:
1.Window
KeyedStream → WindowedStream        
可以在已经分区的KeyedStream上定义Windows。 Windows根据某些特征(例如,在最近5秒内到达的数据)对每个key中的数据进行分组。 有关窗口的说明,可参考窗口

[Java] 纯文本查看 复制代码
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

[Scala] 纯文本查看 复制代码
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data


2.WindowAll
DataStream → AllWindowedStream
Windows可以在常规DataStream上定义。 Windows根据某些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。 有关窗口的完整说明,可参考windows
也就是说: 针对全局的不基于某个key进行分组的window的窗口函数的实现
注意:在许多情况下,这是不是并行transformation。 所有记录将收集在windowAll operator 的一个任务中。
[Java] 纯文本查看 复制代码
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

[Scala] 纯文本查看 复制代码
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data


3.Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用功能应用于window,下面是window元素手工求和
如果是windowAll transformation,你需要替换为 AllWindowFunction
[Java] 纯文本查看 复制代码
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {

    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }

});


[Scala] 纯文本查看 复制代码
windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }


4.Window Reduce
WindowedStream → DataStream
将函数reduce功能应用于窗口并返回reduce的值。
[Java] 纯文本查看 复制代码
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {

    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }

});


[Scala] 纯文本查看 复制代码
windowedStream.reduce { _ + _ }


5.Window Fold
WindowedStream → DataStream
将功能Fold功能应用于窗口并返回folded 值。 示例函数应用于序列(1,2,3,4,5)时,将序列folded 为字符串“start-1-2-3-4-5”:
[Java] 纯文本查看 复制代码
windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});

[Scala] 纯文本查看 复制代码
val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })


6.windows聚合
WindowedStream → DataStream
聚合窗口的内容。 min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
[Java] 纯文本查看 复制代码
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

[Scala] 纯文本查看 复制代码
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")


上面窗口计算完毕,接着我们介绍新的内容,流和窗口等的结合

7.Union
DataStream* → DataStream

两个或多个数据流Union操作,来创建包含来自所有流的所有元素的新流。 注意:如果将数据流与自身union,则会在结果流中每个元素获取两次。
[Java] 纯文本查看 复制代码
dataStream.union(otherStream1, otherStream2, ...);

[Scala] 纯文本查看 复制代码
dataStream.union(otherStream1, otherStream2, ...)


8.Window Join
DataStream,DataStream → DataStream        

给定的key和通用窗口Join两个数据流
[Java] 纯文本查看 复制代码
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

[Scala] 纯文本查看 复制代码
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }


9.Interval Join
KeyedStream,KeyedStream → DataStream
在给定的时间间隔内使用公共keye ,Join 两个keye流的两个元素e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
[Java] 纯文本查看 复制代码
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});


10.Window CoGroup
DataStream,DataStream → DataStream

在给定key和通用窗口上对两个数据流进行Cogroup。
[Java] 纯文本查看 复制代码
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

[Scala] 纯文本查看 复制代码
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}

这里面CoGroup与join他们之间是有关联的,CoGroup可以实现datastream join。

11.Connect
DataStream,DataStream → ConnectedStreams

“Connect”两个保留类型的数据流。 Connect允许两个流之间的共享状态。
[Java] 纯文本查看 复制代码
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

[Scala] 纯文本查看 复制代码
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)


12.CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap
[Java] 纯文本查看 复制代码
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});


[Scala] 纯文本查看 复制代码
connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)


13.Split
DataStream → SplitStream
根据某些标准将流拆分为两个或更多个流。
[Java] 纯文本查看 复制代码
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});


[Scala] 纯文本查看 复制代码
val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)


14.Select
SplitStream → DataStream
从拆分流中select一个或多个流。
[Java] 纯文本查看 复制代码
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

[Scala] 纯文本查看 复制代码
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")


15.Iterate
DataStream → IterativeStream → DataStream
通过将一个operator的输出重定向到某个先前的operator,在流中创建“feedback”循环。 这对于定义不断更新模型的算法特别有用。 以下代码以流开头并连续应用迭代体。 大于0的元素将被发送回feedback通道,其余元素将向下游转发。 有关完整说明,请参阅迭代。
[Java] 纯文本查看 复制代码
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
});

[Scala] 纯文本查看 复制代码
initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}


16.Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便使用 event time 语义的窗口。
[Java] 纯文本查看 复制代码
stream.assignTimestamps (new TimeStampExtractor() {...});

[Scala] 纯文本查看 复制代码
stream.assignTimestamps { timestampExtractor }



鲜花

握手

雷人

路过

鸡蛋
关闭

推荐上一条 /3 下一条

QQ|小黑屋|about云开发-学问论坛|社区 ( 京ICP备12023829号

GMT+8, 2018-12-19 18:58 , Processed in 0.332989 second(s), 29 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

返回顶部