about云开发

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

about云开发 首页 大数据 Flink 连载型 查看内容

彻底明白Flink系统学习3:编程知识之Flink程序结构

2018-12-12 20:13| 发布者: admin| 查看: 12| 评论: 0|原作者: pig2

摘要: 问题导读 1.Flink程序结是什么结构? 2.Flink中source,sink分别是什么意思? 2.Flink数据源有哪些? 3.如何自定义Flink数据源? 4.Flink如何定义Sink? 关注最新经典文章,欢迎关注公众号 上一篇 彻 ...
问题导读

1.Flink程序结是什么结构?
2.Flink中source,sink分别是什么意思?
2.Flink数据源有哪些?
3.如何自定义Flink数据源?
4.Flink如何定义Sink?

关注最新经典文章,欢迎关注公众号


上一篇
彻底明白Flink系统学习2:Flink分布式执行包括调度、通信机制、检查点等
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26357


这篇文章是对第二篇的一个补充,我们看到Flink的Source和Sink可能对他们有所误解,这里对他们做一个诠释说明。同时由于这个不能单独来说,因此这里另起一篇Flink编程结构的一个解说。
我们大概都知道任何程序都是需要有输入、处理、输出
那么Flink同样也是,它的专业术语是什么?
Flink专业术语对应Source,map,Sink
1.数据源(source)
也就是在在我们提到Flink程序的时候,我们会有Source数据源,然后map其实就是对输入的数据处理的意思,接着Sink就是落地数据,也就是我们存储数据到什么地方。
这里需要跟Flume的source和sink比较,这二者含义和作用基本相同,但是却不能混淆。Flink的source,虽然也是数据源,但是这些我们需要代码指定的,Flume直接通过配置文件指定。同样sink也是。而Flink中间还有对数据的处理,也就是map。这就是程序Flink的程序结构。

Flink的source到底是什么?为了更好地理解,我们这里给出下面一个简单典型的wordcount程序。
  1. case class WordWithCount(word: String, count: Long)

  2. val text = env.socketTextStream(host, port, '\n')

  3. val windowCounts = text.flatMap { w => w.split("\\s") }
  4.   .map { w => WordWithCount(w, 1) }
  5.   .keyBy("word")
  6.   .timeWindow(Time.seconds(5))
  7.   .sum("count")

  8. windowCounts.print()
复制代码

上面不知我们能否看出哪里是是source?
//连接socket获取输入的数据
val text = env.socketTextStream(host, port, '\n')
这里便是读取数据源,也就是我们所说的source。Flink的source多种多样,比如我们可以自定义source,这里面不知你是否思考一个问题,为什么要定义source。比如我们读取数据,该如何读取,这时候我们就想到了socketTextStream,在比如我们想读取mysql的数据,我们就需要mysql的source。如果没有source该怎么做?自定义source。该如何自定义,不会啊。大家可参考下面文章Flink自定义一个简单source及mysqlsource实例

2.处理
上面source我们认识了,读取数据之后,我们就要什么了那?处理数据。wordcount里面,哪些是处理数据的?
val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")
flatMap 、map 、keyBy、timeWindow、sum那么这些就是对数据的处理。

3.保存数据(sink)
上面数据是如何保存的,相信不用说,大家也能看出来。
windowCounts.print()
这里输出可以说是非常简单的。而sink当然跟source一样也是可以自定义的。你是否想到,我们为什么要自定义sink?
很简单,Flink数据,保存到myslq,是不能直接保存的,所以需要自定义一个sink。不定义sink可以吗?可以的,那就是自己在写一遍,每次调用都复制一遍,这样造成大量的重复,所以我们需要自定义sink。
那么常见的sink有哪些?如下:
flink在批处理中常见的sink
1.基于本地集合的sink(Collection-based-sink)
2.基于文件的sink(File-based-sink)

1.基于本地集合的sink(Collection-based-sink)
  1. package code.book.batch.sinksource.scala
  2. import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
  3. object DataSink000 {
  4.   def main(args: Array[String]): Unit = {
  5.     //1.定义环境
  6.     val env = ExecutionEnvironment.getExecutionEnvironment
  7.     //2.定义数据 stu(age,name,height)
  8.     val stu: DataSet[(Int, String, Double)] = env.fromElements(
  9.       (19, "zhangsan", 178.8),
  10.       (17, "lisi", 168.8),
  11.       (18, "wangwu", 184.8),
  12.       (21, "zhaoliu", 164.8)
  13.     )
  14.     //3.sink到标准输出
  15.     stu.print

  16.     //3.sink到标准error输出
  17.     stu.printToErr()

  18.     //4.sink到本地Collection
  19.     print(stu.collect())
  20.   }
  21. }
复制代码

2.基于文件的sink(File-based-sink)
flink支持多种存储设备上的文件,包括本地文件,hdfs文件,alluxio文件等。
flink支持多种文件的存储格式,包括text文件,CSV文件等。
  1. package code.book.batch.sinksource.scala

  2. import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
  3. import org.apache.flink.core.fs.FileSystem.WriteMode
  4. object DataSink001 {
  5.   def main(args: Array[String]): Unit = {
  6.     //0.主意:不论是本地还是hdfs.若Parallelism>1将把path当成目录名称,若Parallelism=1将把path当成文件名。
  7.     val env = ExecutionEnvironment.getExecutionEnvironment
  8.     val ds1: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
  9.     //1.写入到本地,文本文档,NO_OVERWRITE模式下如果文件已经存在,则报错,OVERWRITE模式下如果文件已经存在,则覆盖
  10.     ds1.setParallelism(1).writeAsText("file:///output/flink/datasink/test001.txt",
  11.     WriteMode.OVERWRITE)
  12.     env.execute()

  13.     //2.写入到hdfs,文本文档,路径不存在则自动创建路径。
  14.     ds1.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",
  15.     WriteMode.OVERWRITE)
  16.     env.execute()

  17.     //3.写入到hdfs,CSV文档
  18.     //3.1读取csv文件
  19.     val inPath = "hdfs:///input/flink/sales.csv"
  20.     case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
  21.     val ds2 = env.readCsvFile[Sales](
  22.       filePath = inPath,
  23.       lineDelimiter = "\n",
  24.       fieldDelimiter = ",",
  25.       lenient = false,
  26.       ignoreFirstLine = true,
  27.       includedFields = Array(0, 1, 2, 3),
  28.       pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
  29.     )
  30.     //3.2将CSV文档写入到hdfs
  31.     val outPath = "hdfs:///output/flink/datasink/sales.csv"
  32.     ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",
  33.     fieldDelimiter = "|", WriteMode.OVERWRITE)
  34.     env.execute()
  35.   }
  36. }
复制代码

3.基于文件的sink(数据进行排序)
可以使用sortPartition对数据进行排序后再sink到外部系统。
执行程序

  1. package code.book.batch.sinksource.scala
  2. import org.apache.flink.api.common.operators.Order
  3. import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
  4. import org.apache.flink.core.fs.FileSystem.WriteMode
  5. object DataSink002 {
  6.   def main(args: Array[String]): Unit = {
  7.     val env = ExecutionEnvironment.getExecutionEnvironment
  8.     //stu(age,name,height)
  9.     val stu: DataSet[(Int, String, Double)] = env.fromElements(
  10.       (19, "zhangsan", 178.8),
  11.       (17, "lisi", 168.8),
  12.       (18, "wangwu", 184.8),
  13.       (21, "zhaoliu", 164.8)
  14.     )
  15.     //1.以age从小到大升序排列(0->9)
  16.     stu.sortPartition(0, Order.ASCENDING).print
  17.     //2.以naem从大到小降序排列(z->a)
  18.     stu.sortPartition(1, Order.DESCENDING).print
  19.     //3.以age升序,height降序排列
  20.     stu.sortPartition(0, Order.ASCENDING).sortPartition(2, Order.DESCENDING).print
  21.     //4.所有字段升序排列
  22.     stu.sortPartition("_", Order.ASCENDING).print
  23.     //5.以Student.name升序
  24.     //5.1准备数据
  25.     case class Student(name: String, age: Int)
  26.     val ds1: DataSet[(Student, Double)] = env.fromElements(
  27.       (Student("zhangsan", 18), 178.5),
  28.       (Student("lisi", 19), 176.5),
  29.       (Student("wangwu", 17), 168.5)
  30.     )
  31.     val ds2 = ds1.sortPartition("_1.age", Order.ASCENDING).setParallelism(1)
  32.     //5.2写入到hdfs,文本文档
  33.     val outPath1="hdfs:///output/flink/datasink/Student001.txt"
  34.     ds2.writeAsText(filePath = outPath1, WriteMode.OVERWRITE)
  35.     env.execute()
  36.     //5.3写入到hdfs,CSV文档
  37.     val outPath2="hdfs:///output/flink/datasink/Student002.csv"
  38.     ds2.writeAsCsv(filePath = outPath2,rowDelimiter = "\n",
  39.     fieldDelimiter = "|||",WriteMode.OVERWRITE)
  40.     env.execute()
  41.   }
  42. }

复制代码
注释:我们知道由于Flink既有流处理,又有批处理。上面只是批处理的sink,当然也有流处理的sink。更多大家可以搜索或则参考官网。

4.扩展
为了更好的理解,下面我们与传统程序,spark等比较下。

传统程序
我们来想想传统程序应该是怎么样的。小的输入例如:数组、字符串。大的比如文件等,这就是我们传统程序的输入。输出当然比如输出到磁盘或则数据库

大数据程序
那么Spark和Mapreduce的输入是什么?Spark输入相当多,当然其实也就是Spark的数据源。Spark输出也比较多,比传统的多了分布式文件系统,比如hdfs等。

对于Spark数据源比较多,不做详细描述,感兴趣可参考下面内容:
Spark多数据源计算实践及其在GrowingIO的实践
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18615


spark json文件parquet文件,和常用的文件,jdbc等数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20573


spark2 sql去哪读取数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23466


solr作为spark sql的数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21888


sparksql多数据源透明访问
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22494


通过Spark DataSource API 如何实现Rest数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=19291


Spark SQL之External DataSource外部数据源(一)示例
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12657


Spark SQL之External DataSource外部数据源(二)源码分析
http://www.aboutyun.com/forum.php?mod=viewthread&tid=12660



鲜花

握手

雷人

路过

鸡蛋
关闭

推荐上一条 /3 下一条

QQ|小黑屋|about云开发-学问论坛|社区 ( 京ICP备12023829号

GMT+8, 2018-12-19 18:58 , Processed in 0.374636 second(s), 28 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

返回顶部