分享

彻底明白Flink系统学习16:【Flink1.7】DataSet 编程之Data Sinks详解


问题导读

1.DataSet 有哪些内置Sink?
2.Flink Sink的作用是什么?
3.DataSet 输出数据如何实现本地排序?
4.Sink是否支持全局排序?

上一篇:
彻底明白Flink系统学习15:【Flink1.7】DataSet 编程之Data Sources详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26540


前面我们讲了Data Sources,接下来我们将Data Sinks,Flink还有Sinks吗?如果你学习Flume,或许可能就想到了Flume其实也有Source和Sink,其实这二者都是差不多的。Flume的Sink有多种,Flink的Data Sinks同样也是多种。那么这里的DataSink的含义是什么?我们知道数据首先读取,然后经过Operate,最后输出,那么Sink其实就是保存数据。

1.png

DataSink存储Dataset的数据到其它地方,Flink带有各种内置输出格式,也可以自定义Sink。下面首先我们看看Flink给我们带来了哪些方便,可以直接使用哪些Sink?

内置Sinks
writeAsText()/ TextOutputFormat  - 将元素作为字符串逐行写入。 通过调用每个元素的toString()方法获得字符串。
writeAsCsv(...)/ CsvOutputFormat  - 将元组写入逗号分隔值(csv)文件。 行和字段分隔符是可配置的。 每个字段的值来自对象的toString()方法。
print()/ printToErr() - 打印标准输出/标准错误流上每个元素的toString()值。
write()/ FileOutputFormat  - 自定义文件输出的方法和基类。 支持自定义对象到字节的转换。
output()/ OutputFormat  - 大多数通用输出方法,用于非基于文件的Data Sink(例如将结果存储在数据库中)。


Flink在输出的同时,可以让我们做不少的事情,比如排序等。

例子:
标准数据sink方法:
[mw_shl_code=scala,true]// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file")

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")[/mw_shl_code]

那么对于乱序的数据,我们想排序后输出,该如何操作那?

本地排序输出
我们可以使用元组下标或则字段表达式指定字段及顺序进行本地排序,适用于每种输出格式。
例子:
[mw_shl_code=scala,true]val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print()

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...)[/mw_shl_code]

注意:目前尚不支持全局排序的输出。

最新经典文章,欢迎关注公众号


已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条