分享

彻底明白Flink系统学习8:【Flink1.7编程基础】DataStream Transformations介绍

pig2 2018-12-5 17:41:31 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 13319
本帖最后由 pig2 于 2018-12-6 11:29 编辑

问题导读
1.Flink中Map函数和FlatMap函数有什么区别?
2.如何生成KeyedStream?
3.KeyedStream如何转换为DataStream ?
4.min和minBy之间的差异是什么?

关注最新经典文章,欢迎关注公众号

上一篇
彻底明白Flink系统学习7:【最新基于Flink1.7】使用DataStream API进行数据处理
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26434



数据转换将数据流从一种形式转换为另一种形式,也就是说输入可以是一个或多个数据流,输出也可以是零,或一个或多个数据流。Flink1.7对transform另起一个新的名字“Operators ”--Operators transform 。程序可以将多个transform组合成复杂的数据流拓扑。

DataStream Transformations介绍
1.Map
转换:DataStream → DataStream
输入一个元素并生成一个元素。 一个map函数,它将输入流的值加倍:
[mw_shl_code=java,true]dataStream.map { x => x * 2 }[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.map { x => x * 2 }
[/mw_shl_code]
2.FlatMap
转换:DataStream → DataStream
输入一个元素并生成零个,一个或多个元素。 将句子分割为单词的flatmap函数:
[mw_shl_code=java,true]dataStream.flatMap { str => str.split(" ") }[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.flatMap { str => str.split(" ") }
[/mw_shl_code]
补充:从上面我们看出map和flatmap的区别,map的输入和输出个数是1对1的,flatMap则不一定。

3.Filter
转换:DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的元素。 过滤掉零值的过滤器,通俗来讲就是过滤掉等于0的元素,转换成新的数据流
[mw_shl_code=java,true]dataStream.filter { _ != 0 }
[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.filter { _ != 0 }
[/mw_shl_code]

4.KeyBy
转换:DataStream → KeyedStream        
逻辑分区流分为不同的分区。 具有相同key的所有记录都分配给同一分区。 在内部,keyBy()是使用hash分区实现的。 指定key有不同的方法。此Transformations返回KeyedStream,
[mw_shl_code=java,true]dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple[/mw_shl_code]
注意
在以下情况下,类型不能成为key :

1.它是POJO类型但没有override hashCode()方法并依赖于Object.hashCode()实现。
2.它是任何类型的数组。

POJO类型是Flink一个符合数据类型,更多查看下面内容
##############################
补充
POJO类型是什么?
Flink对类型做出如下区分:
1.    基本类型:所有Java基本数据类型和对应装箱类型,加上void, String, Date
2.    基本数组和Object数组
3.    复合类型:
a.     Flink Java Tuple(Flink Java API的一部分)
b.    Scala case 类(包括Scala Tuple)
c.     POJO类:遵循类bean模式的类
4.    Scala辅助类型(Option,Either,Lists,Maps…)
5.    泛型(Generic):这些类型将不会由Flink自己序列化,而是借助Kryo来序列化

POJO类支持复杂类型的创建,并且在定义keys时可以使用成员的名字:dataSet.join(another).where("name").equalTo("personName")。同时,POJO类对于运行时是透明的,这使得Flink可以十分高效地处理它们。

POJO类型的规则
当以下条件满足时,Flink将以POJO类型识别一个数据类型,并允许以成员名引用:
1.    该类是public并且独立的(即没有非静态的内部类)
2.    该类拥有一个public的无参数构造函数
3.    该类(以及该类的超类)的成员要么是public的,要么拥有public的符合Java bean对Getter和Setter命名规则的Getter和Setter函数。
##############################

5.Reduce
转换:KeyedStream → DataStream
Reduce统计当前值与上一个值,返回DataStream。下面是实现求和,返回DataStream
[mw_shl_code=java,true]keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});[/mw_shl_code]
[mw_shl_code=scala,true]keyedStream.reduce { _ + _ }
[/mw_shl_code]

6.Fold
转换:KeyedStream → DataStream
Fold通过将上一个folder流与当前记录组合来生成KeyedDataStream。 它会返回数据流。
Fold函数,当应用于sequence (1,2,3,4,5)时,返回“start-1”,“start-1-2”,“start-1-2-3”,...
[mw_shl_code=java,true]DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });[/mw_shl_code]
[mw_shl_code=scala,true]val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i })[/mw_shl_code]

7.聚合
DataStream API支持各种聚合,min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
[mw_shl_code=java,true]keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");[/mw_shl_code]

[mw_shl_code=scala,true]keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")[/mw_shl_code]








本帖被以下淘专辑推荐:

已有(7)人评论

跳转到指定楼层
huangrong 发表于 2019-5-10 08:21:35
感觉有spark的基础看这个要快些
回复

使用道具 举报

若无梦何远方 发表于 2019-8-26 16:11:52
1. Flink中Map函数和FlatMap函数的区别:Map函数<=> 传入一个DataSet/DataStream 返回一个DataSet/DataStream (传入几个参数,返回几个参数)
2. 如何生成KeyStream <=> 通过KeyBy生成(Stream)
3. KeyedStream转换成DataStream的转换<=> 通过reduce,flod 进行转换
4. min与minBy之间的差异是<=> Min返回的是值,MinBy返回的是元素
回复

使用道具 举报

若无梦何远方 发表于 2019-8-26 16:14:58
若无梦何远方 发表于 2019-8-26 16:11
1. Flink中Map函数和FlatMap函数的区别:Map函数 传入一个DataSet/DataStream 返回一个DataSet/DataStream ( ...

1. Flink中Map函数和FlatMap函数的区别: Map函数<=> 传入一个DataSet/DataStream 返回一个DataSet/DataStream (传入几个参数,返回几个参数)
2. 如何生成KeyStream: <=> 通过KeyBy生成(Stream)
3. KeyedStream转换成DataStream的转换<=> 通过reduce,flod 进行转换
4. min与minBy之间的差异是<=> Min返回的是值,MinBy返回的是元素

如有错误请纠正 谢谢
回复

使用道具 举报

金瞳 发表于 2019-12-9 19:30:30
1.Flink中Map函数和FlatMap函数有什么区别?
- map:输入一个元素并生成一个元素。
- flatMap:输入一个元素并生成零个,一个或多个元素。

2.如何生成KeyedStream?
- keyBy算子

3.KeyedStream如何转换为DataStream ?
- reduce算子
- fold算子

4.min和minBy之间的差异是什么?
- minBy:指定字段最小值
回复

使用道具 举报

zcy2nn 发表于 2020-1-7 17:55:06
这个java 的代码是不是不对呀
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条