about云开发

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 834|回复: 1

[连载型] 彻底明白Flink系统学习8:【Flink1.7编程基础】DataStream Transformations介绍

[复制链接]
发表于 2018-12-5 17:41:31 | 显示全部楼层 |阅读模式
本帖最后由 pig2 于 2018-12-6 11:29 编辑

问题导读
1.Flink中Map函数和FlatMap函数有什么区别?
2.如何生成KeyedStream?
3.KeyedStream如何转换为DataStream ?
4.min和minBy之间的差异是什么?

关注最新经典文章,欢迎关注公众号

上一篇
彻底明白Flink系统学习7:【最新基于Flink1.7】使用DataStream API进行数据处理
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26434



数据转换将数据流从一种形式转换为另一种形式,也就是说输入可以是一个或多个数据流,输出也可以是零,或一个或多个数据流。Flink1.7对transform另起一个新的名字“Operators ”--Operators transform 。程序可以将多个transform组合成复杂的数据流拓扑。

DataStream Transformations介绍
1.Map
转换:DataStream → DataStream
输入一个元素并生成一个元素。 一个map函数,它将输入流的值加倍:
[Java] 纯文本查看 复制代码
dataStream.map { x => x * 2 }

[Scala] 纯文本查看 复制代码
dataStream.map { x => x * 2 }

2.FlatMap
转换:DataStream → DataStream
输入一个元素并生成零个,一个或多个元素。 将句子分割为单词的flatmap函数:
[Java] 纯文本查看 复制代码
dataStream.flatMap { str => str.split(" ") }

[Scala] 纯文本查看 复制代码
dataStream.flatMap { str => str.split(" ") }

补充:从上面我们看出map和flatmap的区别,map的输入和输出个数是1对1的,flatMap则不一定。

3.Filter
转换:DataStream → DataStream
计算每个元素的布尔函数,并保留函数返回true的元素。 过滤掉零值的过滤器,通俗来讲就是过滤掉等于0的元素,转换成新的数据流
[Java] 纯文本查看 复制代码
dataStream.filter { _ != 0 }

[Scala] 纯文本查看 复制代码
dataStream.filter { _ != 0 }


4.KeyBy
转换:DataStream → KeyedStream        
逻辑分区流分为不同的分区。 具有相同key的所有记录都分配给同一分区。 在内部,keyBy()是使用hash分区实现的。 指定key有不同的方法。此Transformations返回KeyedStream,
[Java] 纯文本查看 复制代码
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

[Scala] 纯文本查看 复制代码
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

注意
在以下情况下,类型不能成为key :

1.它是POJO类型但没有override hashCode()方法并依赖于Object.hashCode()实现。
2.它是任何类型的数组。

POJO类型是Flink一个符合数据类型,更多查看下面内容
##############################
补充
POJO类型是什么?
Flink对类型做出如下区分:
1.    基本类型:所有Java基本数据类型和对应装箱类型,加上void, String, Date
2.    基本数组和Object数组
3.    复合类型:
a.     Flink Java Tuple(Flink Java API的一部分)
b.    Scala case 类(包括Scala Tuple)
c.     POJO类:遵循类bean模式的类
4.    Scala辅助类型(Option,Either,Lists,Maps…)
5.    泛型(Generic):这些类型将不会由Flink自己序列化,而是借助Kryo来序列化

POJO类支持复杂类型的创建,并且在定义keys时可以使用成员的名字:dataSet.join(another).where("name").equalTo("personName")。同时,POJO类对于运行时是透明的,这使得Flink可以十分高效地处理它们。

POJO类型的规则
当以下条件满足时,Flink将以POJO类型识别一个数据类型,并允许以成员名引用:
1.    该类是public并且独立的(即没有非静态的内部类)
2.    该类拥有一个public的无参数构造函数
3.    该类(以及该类的超类)的成员要么是public的,要么拥有public的符合Java bean对Getter和Setter命名规则的Getter和Setter函数。
##############################

5.Reduce
转换:KeyedStream → DataStream
Reduce统计当前值与上一个值,返回DataStream。下面是实现求和,返回DataStream
[Java] 纯文本查看 复制代码
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

[Scala] 纯文本查看 复制代码
keyedStream.reduce { _ + _ }


6.Fold
转换:KeyedStream → DataStream
Fold通过将上一个folder流与当前记录组合来生成KeyedDataStream。 它会返回数据流。
Fold函数,当应用于sequence (1,2,3,4,5)时,返回“start-1”,“start-1-2”,“start-1-2-3”,...
[Java] 纯文本查看 复制代码
DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

[Scala] 纯文本查看 复制代码
val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i })


7.聚合
DataStream API支持各种聚合,min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
[Java] 纯文本查看 复制代码
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");


[Scala] 纯文本查看 复制代码
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")









本帖被以下淘专辑推荐:

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /3 下一条

QQ|小黑屋|about云开发-学问论坛|社区 ( 京ICP备12023829号

GMT+8, 2018-12-16 03:28 , Processed in 0.419906 second(s), 32 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

快速回复 返回顶部 返回列表