分享

彻底明白Flink系统学习21:【Flink1.7】Table API 和SQL API介绍1:注册

本帖最后由 pig2 于 2019-1-11 11:43 编辑

问题导读

1.Flink sql程序结构包含哪些内容?
2.TableEnvironment的作用是什么?
3.如何创建TableEnvironment?
4.如何注册表?

彻底明白Flink系统学习20:【Flink1.7】Table API 和SQL概述
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26598


Table API和SQL集成在一个联合API中。 此API的核心概念是一个表,它用作查询的输入和输出。 本文讲解了具有Table API和SQL查询的程序的常见结构,如何注册表,如何查询表以及如何发出表。

表API和SQL程序的结构
批处理和流式所有Table API和SQL程序都遵循相同的模式。 以下代码示例显示了Table API和SQL程序的常见结构。
[mw_shl_code=sql,true]// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...)           // or
tableEnv.registerTableSource("table2", ...)     // or
tableEnv.registerExternalCatalog("extCat", ...)
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
env.execute()[/mw_shl_code]
注意:表API和SQL查询可以轻松集成并嵌入到DataStream或DataSet程序中。 下面我们会讲解DataStream和DataSet API的集成部分,以及如何将DataStream和DataSet转换为Tables,反之亦然。
1.png

创建一个TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念。 它负责:

  • 在内部目录中注册表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量,表或聚合)函数
  • 将DataStream或DataSet转换为表
  • 可引用ExecutionEnvironment或StreamExecutionEnvironment

表始终绑定到特定的TableEnvironment。 不可能整合不同TableEnvironments 的表在相同的查询中等,如join或则union

创建StreamExecutionEnvironment 或则一个ExecutionEnvironmentTableConfig,然后调用静态方法 TableEnvironment.getTableEnvironment()创建TableEnvironment。另外有一个可选的TableConfig,可用于配置TableEnvironment或自定义查询优化和转换过程。[mw_shl_code=scala,true]// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)[/mw_shl_code]

在目录中注册表
TableEnvironment维护按名称注册的表的目录。有两种类型的表,输入表和输出表。 输入表可以在表API和SQL查询中引用,并提供输入数据。 输出表可用于将Table API或SQL查询的结果发送到外部系统。

可以从各种来源注册输入表:
  • 现有的Table对象,通常是Table API或SQL查询的结果。
  • TableSource,用于访问外部数据,例如文件,数据库或消息传递系统。
  • 一个DataStream 或则DataSet也可以成为表的数据源。

可以使用TableSink注册输出表。

注册表
Table在TableEnvironment中注册如下:
[mw_shl_code=scala,true]// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").select(...)

// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)[/mw_shl_code]
注意:注册表的处理方式与关系数据库系统中已知的VIEW类似,即定义表的查询未优化,但在另一个查询引用注册表时将内联。 如果多个查询引用相同的注册表,则将为每个引用查询内联并多次执行,即,不会共享注册表的结果。
注册TableSource
TableSource提供对外部数据的访问,外部数据存储在存储系统中,例如数据库(MySQL,HBase,...),具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC],...)或消息传递 系统(Apache Kafka,RabbitMQ,...)。

Flink旨在为常见的数据格式和存储系统提供TableSource。 关于Table Sources and Sinks信息,TableSource列表以及如何构建自定义TableSource,后面文章会有说明。
TableSource在TableEnvironment中注册如下:
[mw_shl_code=scala,true]// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)[/mw_shl_code]

注册TableSink
注册的TableSink可用于将Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [ Parquet,Avro,ORC],...)。

Flink旨在为常见的数据格式和存储系统提供TableSink。 有关可用接收器的详细信息以及有关如何实现自定义TableSink的说明,后面文章会有说明。。

TableSink在TableEnvironment中注册如下:
[mw_shl_code=scala,true]// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)[/mw_shl_code]

注册外部目录
外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。

可以通过实现ExternalCatalog接口创建外部目录,并在TableEnvironment中注册,如下所示:
[mw_shl_code=scala,true]// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)[/mw_shl_code]
在TableEnvironment中注册后,可以通过指定其完整路径(例如catalog.database.table)从Table API或SQL查询中访问ExternalCatalog中定义的所有表。

目前,Flink提供InMemoryExternalCatalog用于演示和测试目的。 但是,ExternalCatalog接口也可用于将目录(如HCatalog或Metastore)连接到Table API。

未完待续

最新经典文章,欢迎关注公众号





已有(2)人评论

跳转到指定楼层
a530491093 发表于 2019-1-14 15:42:58
学习了,感谢分享!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条