分享

彻底明白Flink系统学习15:【Flink1.7】DataSet 编程之Data Sources详解

pig2 2018-12-26 17:03:41 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 9985
问题导读

1.Flink如何获取各种不同格式的文件,转换为dataset?
2.Flink sources有哪些类型?
3.Flink是否可以读取压缩文件?
4.Flink读取压缩文件有什么缺点?
5.Flink如何遍历读取目录中的文件?

上一篇
彻底明白Flink系统学习14:【Flink1.7】DataSet 编程之Transformations详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26531


Sources是DataSet API期望从中获取数据的位置,创建初始数据集。 它可以是文件形式,也可以是Java集合。
DataSet API支持许多预先实现的数据源函数。 它还支持编写自定义数据源函数,因此可以轻松编程任何不受支持的函数。
ExecutionEnvironment包含了很多下面的方法。

基于文件
readTextFile(path)/ TextInputFormat  - 按行读取文件并将它们作为字符串返回。

readTextFileWithValue(path)/ TextValueInputFormat  - 按行读取文件并将它们作为StringValues返回。 StringValues是可变字符串。

readCsvFile(path)/ CsvInputFormat  - 解析逗号(或其他字符)分隔的文件。 返回元组,case class 对象或POJO的DataSet。 支持基本java类型及其Value对应作为字段类型。

readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat  - 使用给定的分隔符来解析new-line (或则 另外char sequence) 文件。被分割的原始数据类型,如String或Integer。此方法类似于具有单个字段的readCsvFile(String),但它不通过Tuple1生成DataSet。

【注释:很多人是这样的解释:使用给定的分隔符来解析新行(或另一个char序列)分隔的原始数据类型,例如字符串或整数。上面是比如readTextFile是按照行来读取文件,而readFileOfPrimitives个人认为是是按照原始数据类型来读取文件【留待以后验证】

这里也给出官网的API说明:
[mw_shl_code=scala,true]public <X> DataSource<X> readFileOfPrimitives(String filePath,
                                              String delimiter,
                                              Class<X> typeClass)
Creates a DataSet that represents the primitive type produced by reading the given file in delimited way. This method is similar to readCsvFile(String) with single field, but it produces a DataSet not through Tuple1.
Parameters:
filePath - The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
delimiter - The delimiter of the given file.
typeClass - The primitive type class to be read.
Returns:
A DataSet that represents the data read from the given file as primitive type.[/mw_shl_code]
来源地址,欢迎大家讨论

readSequenceFile(Key,Value,path)/ SequenceFileInputFormat  - 创建JobConf并从指定路径读取文件,类型为SequenceFileInputFormat,Key class和Value class ,并将它们返回为Tuple2 <Key,Value>。


基于集合
1.fromCollection(Seq) - 从Seq创建数据集。 集合中的所有元素必须属于同一类型。
2.fromCollection(Iterator) - 从迭代器创建数据集。 该类指定迭代器返回的元素的数据类型。
评注:
我们或许对迭代器有些陌生,这里主要用于用于遍历集合中的对象。
用法:配合while使用。换句话说for循环遍历方法代替过iterator的方法。效果一样。在集合里面和Map中用iterator比较方便。
3.fromElements(elements:_ *) - 根据给定的对象序列创建数据集。 所有对象必须属于同一类型。
评注:
也许你会符合_ *产生疑问,其实它代表集合的所有元素
更多可参考
spark开发基础之从Scala符号入门Scala
4.fromParallelCollection(SplittableIterator) - 并行地从迭代器创建数据集。 该类指定迭代器返回的元素的数据类型。
5.generateSequence(from,to) - 并行生成给定间隔中的数字序列。

通用
readFile(inputFormat,path)/ FileInputFormat  - 接受文件输入格式。
createInput(inputFormat)/ InputFormat  - 接受通用输入格式。


例子:
[mw_shl_code=scala,true]val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
"hdfs://nnHost:nnPort/path/to/file")[/mw_shl_code]

配置CSV解析
Flink为CSV解析提供了许多配置选项:
lineDelimiter:String指定各个记录的分隔符。默认行分隔符是换行符'\ n'。

fieldDelimiter:String指定用于分隔记录字段的分隔符。默认字段分隔符是逗号字符','。

includeFields:Array [Int]定义从输入文件中读取的字段(以及要忽略的字段)。默认情况下,将解析前n个字段(由types()调用中的类型数定义)。

pojoFields:Array [String]指定映射到CSV字段的POJO的字段。 CSV字段的解析器将根据POJO字段的类型和顺序自动初始化。

parseQuotedStrings:Character启用带引号的字符串解析。如果字符串字段的第一个字符是引号字符(未剪裁前导或尾随空格),则字符串将被解析为带引号的字符串。引用字符串中的字段分隔符将被忽略。如果带引号的字符串字段的最后一个字符不是引号字符,则引用的字符串解析将失败。如果启用了带引号的字符串解析并且该字段的第一个字符不是引用字符串,则该字符串将被解析为不带引号的字符串。默认情况下,禁用带引号的字符串解析。

ignoreComments:String指定注释前缀。所有以指定注释前缀开头的行都不会被解析和忽略。默认情况下,不会忽略任何行。

lenient:Boolean启用宽松解析,即忽略无法正确解析的行。默认情况下,禁用宽松解析,无效行引发异常。

ignoreFirstLine:Boolean将InputFormat配置为忽略输入文件的第一行。默认情况下,不会忽略任何行。


上面显然是针对不同的情况,采用不同的参数。

递归遍历输入路径目录
对于基于文件的输入,当输入路径是目录时,默认情况下不会枚举嵌套文件。 相反,只读取基目录中的文件,而忽略嵌套文件。 可以通过recursive.file.enumeration配置参数启用嵌套文件的递归遍历,如下例所示。
[mw_shl_code=scala,true]// enable recursive enumeration of nested input files
val env  = ExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val parameters = new Configuration

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)

// pass the configuration to the data source
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)[/mw_shl_code]

基于压缩文件
Flink目前支持输入文件的透明解压缩,如果它们标有适当的文件扩展名。 特别是,这意味着不需要进一步配置输入格式,并且任何FileInputFormat都支持压缩,包括自定义输入格式。 请注意,压缩文件可能无法并行读取,从而影响作业可伸缩性。

下表列出了当前支持的压缩方法。

压缩方法
File 格式
可行性
DEFLATE.deflateno
GZip.gz, .gzipno
Bzip2.bz2no
XZ.xzno


最新经典文章,欢迎关注公众号




已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条