about云开发

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 904|回复: 2

[连载型] 彻底明白Flink系统学习7:【最新基于Flink1.7】使用DataStream API进行数据处理

[复制链接]
发表于 2018-12-3 20:06:15 | 显示全部楼层 |阅读模式
问题导读

1.流处理和批处理分别入口是什么?
2.对于本地和远程运行程序,都可以使用哪个函数?
3.Flink数据源分为哪两类?
4.Flink DataStream和DataSet source都是基于什么格式?
5.Flink中kafka source是否为自定义?

关注最新经典文章,欢迎关注公众号


上一篇:
彻底明白Flink系统学习6:Flink最全最详细集群安装【带有各种说明】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26396

执行环境
为了开始编写Flink程序,我们首先根据自己的需要,可以获得现有的或创建一个执行环境(executionenvironment)。 Flink支持:
  • 获取一个已经有的environment
  • 创建一个本地environment
  • 创建一个远程environment

通常,只需要使用getExecutionEnvironment()。 它会根据你的环境来选择。 如果你在IDE中的本地环境中执行,那么它将启动本地执行环境。 否则,如果正在执行JAR,则Flink集群管理器将以分布式方式执行该程序。

如果要自己创建本地或远程环境,则还可以选择使用createLocalEnvironment()和createRemoteEnvironment(String host,int port,String和.jar文件)等方法来执行此操作。
那么这个具体怎么使用,下面给大家分别给出批处理程序和流处理程序。

流处理程序:
  1. import org.apache.flink.streaming.api.scala._
  2. import org.apache.flink.streaming.api.windowing.time.Time

  3. object WindowWordCount {
  4.   def main(args: Array[String]) {

  5.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  6.     val text = env.socketTextStream("localhost", 9999)

  7.     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  8.       .map { (_, 1) }
  9.       .keyBy(0)
  10.       .timeWindow(Time.seconds(5))
  11.       .sum(1)

  12.     counts.print()

  13.     env.execute("Window Stream WordCount")
  14.   }
  15. }
复制代码

批处理程序:【补充参考
  1. import org.apache.flink.api.scala._

  2. object WordCount {
  3.   def main(args: Array[String]) {

  4.     val env = ExecutionEnvironment.getExecutionEnvironment
  5.     val text = env.fromElements(
  6.       "Who's there?",
  7.       "I think I hear them. Stand, ho! Who's there?")

  8.     val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  9.       .map { (_, 1) }
  10.       .groupBy(0)
  11.       .sum(1)

  12.     counts.print()
  13.   }
  14. }
复制代码

从上面我们看出批处理和流处理他们获取实例都是通过getExecutionEnvironment方法,而StreamExecutionEnvironment代表的是流处理。ExecutionEnvironment代表的是批处理。

数据源
Sources是Flink从中获取数据的地方。  Flink支持许多预先实现的数据源功能。 它还支持编写自定义数据源函数,因此可以轻松编程任何不受支持的函数。 首先让我们尝试理解内置的源函数。

由于以前版本中,Flink只有流处理,因此下面是介绍的Flink1.7以前的版本,大体了解即可

Flink以前版本

DataStream API
基于Socket
DataStream API支持从套接字读取数据。 只需指定要从中读取数据的主机和端口,它就可以完成工作:
  1. socketTextStream(hostName, port);
复制代码
还可以选择指定分隔符:
  1. socketTextStream(hostName,port,delimiter)
复制代码
还可以指定API尝试获取数据的最大次数:
  1. socketTextStream(hostName,port,delimiter,maxRetry)
复制代码

基于文件
还可以选择使用Flink中基于文件的源函数从文件源流式传输数据。 可以使用readTextFile(String path)从路径中指定的文件中流式传输数据。 默认情况下,它将读取TextInputFormat并逐行读取字符串。
如果文件格式不是文本,则可以使用以下函数指定相同的格式:
  1. readFile(FileInputFormat<Out> inputFormat, String path)
复制代码
Flink还支持,使用readFileStream()读取文件流
  1. readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
复制代码

只需指定文件路径,轮询文件路径的轮询间隔以及监视类型。 监控类型包括三种类型:

  • 当系统应仅处理新文件时使用FileMonitoringFunction.WatchType.ONLY_NEW_FILES
  • 当系统仅追加文件内容时使用FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
  • 当系统不仅要重新处理文件的追加内容而且还要重新处理文件中的先前内容时,将使用FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED

如果文件不是文本文件,那么我们可以选择使用以下函数,它允许我们定义文件输入格式:
  1. readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
复制代码
在内部,它将读取文件任务划分为两个子任务。 一个子任务仅根据给定的WatchType监视文件路径。 第二个子任务并行执行读取文件。监控文件路径的子任务是不是并行子任务。 它的工作是根据轮询间隔扫描文件路径并报告要处理的文件,拆分文件,并将拆分分配给相应的下游线程:
1.png


Flink1.7版本
DataStream API
source依然是程序从中读取输入的位置。 可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序。 Flink附带了许多预先实现的源函数,但可以通过-非并行Source实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源。

可以从StreamExecutionEnvironment访问几个预定义的流源:
这里我们可以跟以前的版本比较增加Collection-based,一些函数页发生了变化。

基于文件
readTextFile(path) - 读取文本文件,即逐行读取,并将它们作为字符串返回。
readFile(fileInputFormat,path) - 按指定的文件输入格式指定读取(一次)文件。
readFile(fileInputFormat,path,watchType,interval,pathFilter) - 这是前两个内部调用的方法。 它根据给定的fileInputFormat读取路径中的文件。 根据提供的watchType,此source可以定期监控(每隔ms)新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理当前在路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。 使用pathFilter,用户可以进一步排除处理文件。这里篇幅有限更多信息可查看链接

基于Socket
socketTextStream  - 从socket读取。 元素可以用分隔符分隔。详细可查看上文

基于Collection
fromCollection(Seq) - 从Java Java.util.Collection创建数据流。 集合中的所有元素必须属于同一类型。

fromCollection(Iterator) - 从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。

fromElements(elements:_ *) - 从给定的对象序列创建数据流。 所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator) - 并行的从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。

generateSequence(from,to) - 并行生成给定间隔中的数字序列。


自定义
addSource  - 附加新的source函数。 例如,要从Apache Kafka读取,可以使用addSource(new FlinkKafkaConsumer08 <>(...))。 请参阅连接器以获取更多内容。

由于Flink1.7最新版本,这里也把DataSet API
补充上:
DataSet API


DataSet API 跟DataStream API 既有相同也有区别。

基于文件
readTextFile(path)/ TextInputFormat  - 按行读取文件并将它们作为字符串返回。

readTextFileWithValue(path)/ TextValueInputFormat  - 按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串。

readCsvFile(path)/ CsvInputFormat  - 解析逗号(或其他字符)分隔字段的文件。 返回元组,案例类对象或POJO的DataSet。 支持基本java类型及其Value对应作为字段类型。

readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat  - 使用给定的分隔符解析新行(或其他char序列)分隔的原始数据类型(如String或Integer)的文件。

readSequenceFile(Key,Value,path)/ SequenceFileInputFormat  - 创建JobConf并从指定路径读取文件,类型为SequenceFileInputFormat,Key class和Value类,并将它们返回为Tuple2 <Key,Value>。

基于Collection
fromCollection(Seq) - 从Seq创建数据集。 集合中的所有元素必须属于同一类型。

fromCollection(Iterator) - 从迭代器创建数据集。 该类指定迭代器返回的元素的数据类型。

fromElements(elements:_ *) - 根据给定的对象序列创建数据集。 所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。 该类指定迭代器返回的元素的数据类型。

generateSequence(from,to) - 并行生成给定间隔中的数字序列。

通用
readFile(inputFormat,path)/ FileInputFormat  - 接受文件输入格式。

createInput(inputFormat)/ InputFormat  - 接受通用输入格式。

例子:
  1. val env  = ExecutionEnvironment.getExecutionEnvironment

  2. // read text file from local files system
  3. val localLines = env.readTextFile("file:///path/to/my/textfile")

  4. // read text file from a HDFS running at nnHost:nnPort
  5. val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

  6. // read a CSV file with three fields
  7. val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

  8. // read a CSV file with five fields, taking only two of them
  9. val csvInput = env.readCsvFile[(String, Double)](
  10.   "hdfs:///the/CSV/file",
  11.   includedFields = Array(0, 3)) // take the first and the fourth field

  12. // CSV input can also be used with Case Classes
  13. case class MyCaseClass(str: String, dbl: Double)
  14. val csvInput = env.readCsvFile[MyCaseClass](
  15.   "hdfs:///the/CSV/file",
  16.   includedFields = Array(0, 3)) // take the first and the fourth field

  17. // read a CSV file with three fields into a POJO (Person) with corresponding fields
  18. val csvInput = env.readCsvFile[Person](
  19.   "hdfs:///the/CSV/file",
  20.   pojoFields = Array("name", "age", "zipcode"))

  21. // create a set from some given elements
  22. val values = env.fromElements("Foo", "bar", "foobar", "fubar")

  23. // generate a number sequence
  24. val numbers = env.generateSequence(1, 10000000)

  25. // read a file from the specified path of type SequenceFileInputFormat
  26. val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
  27. "hdfs://nnHost:nnPort/path/to/file")
复制代码


本帖被以下淘专辑推荐:

1

主题

6

听众

6

收听

高级会员

Rank: 4

积分
3483
发表于 2018-12-5 14:36:35 | 显示全部楼层
【最新基于Flink1.7】使用DataStream API进行数据处理~
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /3 下一条

QQ|小黑屋|about云开发-学问论坛|社区 ( 京ICP备12023829号

GMT+8, 2018-12-16 02:21 , Processed in 0.531213 second(s), 35 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

快速回复 返回顶部 返回列表