分享

彻底明白Flink系统学习31-2:Table API和SQL之读取外部数据

pig2 2019-2-21 12:20:25 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 17427
问题导读
1.Flink表格式都支持哪些格式?

2.Flink表格式都改如何定义和使用?
3.Flink提供的额外哪些TableSource和TableSinks?


上一篇:
彻底明白Flink系统学习31-1:Table API和SQL之读取外部数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26721

表格式
Flink提供了一组可与表连接器一起使用的表格式。

格式标记表示与连接器匹配的格式类型。

CSV格式
CSV格式允许读取和写入以逗号分隔的行。

Java|Scala实现
[mw_shl_code=scala,true].withFormat(
  new Csv()
    .field("field1", Types.STRING)    // required: ordered format fields
    .field("field2", Types.TIMESTAMP)
    .fieldDelimiter(",")              // optional: string delimiter "," by default
    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
    .quoteCharacter('"')              // optional: single character for string values, empty by default
    .commentPrefix('#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
)[/mw_shl_code]

YAML
[mw_shl_code=text,true]format:
  type: csv
  fields:                    # required: ordered format fields
    - name: field1
      type: VARCHAR
    - name: field2
      type: TIMESTAMP
  field-delimiter: ","       # optional: string delimiter "," by default
  line-delimiter: "\n"       # optional: string delimiter "\n" by default
  quote-character: '"'       # optional: single character for string values, empty by default
  comment-prefix: '#'        # optional: string to indicate comments, empty by default
  ignore-first-line: false   # optional: boolean flag to ignore the first line, by default it is not skipped
  ignore-parse-errors: true  # optional: skip records with parse error instead of failing by default[/mw_shl_code]
CSV格式包含在Flink中,不需要其他依赖项。

注意:目前写入行的CSV格式有限。 仅支持自定义字段分隔符作为可选参数。

JSON格式
格式: Serialization Schema 格式: Deserialization Schema

JSON format 允许读取和写入给定格式schema相对应的JSON 数据。 格式schema 可以定义为Flink类型,JSON模式(schema),也可以从所需的表模式(schema)派生。 Flink类型支持更类似SQL的定义并映射到相应的SQL数据类型。 JSON模式允许更复杂和嵌套的结构。

如果格式架构( format schema)等于表模式(schema),则还可以自动派生模式(schema )。 这允许仅定义一次模式(schema )信息。 格式(format )的名称,类型和字段顺序由表的模式(schema)确定。 如果时间属性的来源不是字段,则会忽略它们。 表模式(schema )中的from定义被解释为格式中的字段重命名。

Java|Scala
[mw_shl_code=scala,true].withFormat(
  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // required: define the schema either by using type information which parses numbers to corresponding types
    .schema(Type.ROW(...))

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    .jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )

    // or use the table's schema
    .deriveSchema()
)[/mw_shl_code]

YAML
[mw_shl_code=text,true]format:
  type: json
  fail-on-missing-field: true   # optional: flag whether to fail if a field is missing or not, false by default

  # required: define the schema either by using a type string which parses numbers to corresponding types
  schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"

  # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
  json-schema: >
    {
      type: 'object',
      properties: {
        lon: {
          type: 'number'
        },
        rideTime: {
          type: 'string',
          format: 'date-time'
        }
      }
    }

  # or use the table's schema
  derive-schema: true[/mw_shl_code]

下表显示了JSON模式(schema )类型到Flink SQL类型的映射:
JSON schema
Flink SQL
objectROW
booleanBOOLEAN
arrayARRAY[_]
numberDECIMAL
integerDECIMAL
stringVARCHAR
string with format: date-timeTIMESTAMP
string with format: dateDATE
string with format: timeTIME
string with encoding: base64ARRAY[TINYINT]
nullNULL (unsupported yet)

目前,Flink仅支持JSON模式规范draft-07的子集。 Union 类型(以及allOf,anyOf,not)尚不支持。 oneOf和类型数组仅支持指定为nullability

支持链接到文档中的通用定义的简单引用,如下面更复杂的示例所示:
[mw_shl_code=text,true]{
  "definitions": {
    "address": {
      "type": "object",
      "properties": {
        "street_address": {
          "type": "string"
        },
        "city": {
          "type": "string"
        },
        "state": {
          "type": "string"
        }
      },
      "required": [
        "street_address",
        "city",
        "state"
      ]
    }
  },
  "type": "object",
  "properties": {
    "billing_address": {
      "$ref": "#/definitions/address"
    },
    "shipping_address": {
      "$ref": "#/definitions/address"
    },
    "optional_address": {
      "oneOf": [
        {
          "type": "null"
        },
        {
          "$ref": "#/definitions/address"
        }
      ]
    }
  }
}[/mw_shl_code]
缺少字段处理:默认情况下,缺少的JSON字段设置为null。 如果缺少字段,可以启用严格的JSON解析来取消源【source 】(和查询)。

确保将JSON格式添加为依赖项。

Avro 格式
格式: Serialization Schema 格式: Deserialization Schema

Apache Avro格式允许读取和写入相对应的给定格式模式(schema)的Avro数据。 格式模式(format schema)可以定义为Avro指定记录的完全qualified 类名,也可以定义为Avro模式(schema )字符串。 如果使用类名,则在运行时期间类必须在类路径中可用。

Java|Scala实现
[mw_shl_code=scala,true].withFormat(
  new Avro()

    // required: define the schema either by using an Avro specific record class
    .recordClass(User.class)

    // or by using an Avro schema
    .avroSchema(
      "{" +
      "  \"type\": \"record\"," +
      "  \"name\": \"test\"," +
      "  \"fields\" : [" +
      "    {\"name\": \"a\", \"type\": \"long\"}," +
      "    {\"name\": \"b\", \"type\": \"string\"}" +
      "  ]" +
      "}"
    )
)[/mw_shl_code]

YAML
[mw_shl_code=text,true]format:
  type: avro

  # required: define the schema either by using an Avro specific record class
  record-class: "org.organization.types.User"

  # or by using an Avro schema
  avro-schema: >
    {
      "type": "record",
      "name": "test",
      "fields" : [
        {"name": "a", "type": "long"},
        {"name": "b", "type": "string"}
      ]
    }[/mw_shl_code]

Avro类型映射到相应的SQL数据类型。 仅支持Union类型以指定可为空(nullability ),否则它们将转换为ANY类型。 下表显示了映射:

Avro schema
Flink SQL
recordROW
enumVARCHAR
arrayARRAY[_]
mapMAP[VARCHAR, _]
unionnon-null type or ANY
fixedARRAY[TINYINT]
stringVARCHAR
bytesARRAY[TINYINT]
intINT
longBIGINT
floatFLOAT
doubleDOUBLE
booleanBOOLEAN
int with logicalType: dateDATE
int with logicalType: time-millisTIME
int with logicalType: time-microsINT
long with logicalType: timestamp-millisTIMESTAMP
long with logicalType: timestamp-microsBIGINT
bytes with logicalType: decimalDECIMAL
fixed with logicalType: decimalDECIMAL
nullNULL (unsupported yet)

Avro使用Joda-Time来表示特定记录类中的逻辑日期和时间类型。 Joda-Time依赖不是Flink 分布式的一部分。 因此,确保Joda-Time在运行时期间与特定记录类一起位于类路径中。 通过模式(schema )字符串指定的Avro格式不需要Joda-Time。

确保添加Apache Avro依赖项。

进一步的TableSources和TableSinks
尚未将以下表源和接收器迁移(或尚未完全迁移)到新的统一接口。

这些是Flink提供的额外TableSource:

类名Maven 依赖批处理?流?描述
OrcTableSourceflink-orcYNORC文件的TableSource。

这些是Flink提供的附加TableSink:
类名Maven 依赖批处理?流?        描述
CsvTableSinkflink-tableYAppendCSV文件的简单sink 。
JDBCAppendTableSinkflink-jdbcYAppend将JDBC表写入Table sink
CassandraAppendTableSinkflink-connector-cassandraNAppend写表到 Cassandra 表.

OrcTableSource

OrcTableSource读取ORC文件。 ORC是结构化数据的文件格式,并以压缩的列式表示形式存储数据。 ORC非常高效,支持投影(projection )和滤波器下推(filter push-down)。

创建OrcTableSource,如下所示:
[mw_shl_code=java,true]// create Hadoop Configuration
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  .path("file:///path/to/data")
  // schema of ORC files
  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  // Hadoop configuration
  .withConfiguration(config)
  // build OrcTableSource
  .build();[/mw_shl_code]

[mw_shl_code=scala,true]// create Hadoop Configuration
val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
  // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  .path("file:///path/to/data")
  // schema of ORC files
  .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  // Hadoop configuration
  .withConfiguration(config)
  // build OrcTableSource
  .build()[/mw_shl_code]
注意:OrcTableSource尚不支持ORC的Union类型。

CsvTableSink

CsvTableSink向一个或多个CSV文件emits 表。

接收器仅支持仅追加流表。 它不能用于emit 不断更新的表。 有关详细信息,请参阅表到流转换的文档emit 流表时,行至少写入一次(如果启用了检查点),并且CsvTableSink不会将输出文件拆分为存储桶文件,而是连续写入相同的文件。
[mw_shl_code=java,true]CsvTableSink sink = new CsvTableSink(
    path,                  // output path
    "|",                   // optional: delimit files by '|'
    1,                     // optional: write to a single file
    WriteMode.OVERWRITE);  // optional: override existing files

tableEnv.registerTableSink(
  "csvOutputTable",
  // specify table schema
  new String[]{"f0", "f1"},
  new TypeInformation[]{Types.STRING, Types.INT},
  sink);

Table table = ...
table.insertInto("csvOutputTable");[/mw_shl_code]
[mw_shl_code=scala,true]val sink: CsvTableSink = new CsvTableSink(
    path,                             // output path
    fieldDelim = "|",                 // optional: delimit files by '|'
    numFiles = 1,                     // optional: write to a single file
    writeMode = WriteMode.OVERWRITE)  // optional: override existing files

tableEnv.registerTableSink(
  "csvOutputTable",
  // specify table schema
  Array[String]("f0", "f1"),
  Array[TypeInformation[_]](Types.STRING, Types.INT),
  sink)

val table: Table = ???
table.insertInto("csvOutputTable")[/mw_shl_code]


JDBCAppendTableSink
JDBCAppendTableSink将表emit到JDBC连接器。 接收器仅支持仅追加流表。 它不能用于emit不断更新的表。 有关详细信息,请参阅表到流转换的文档

JDBCAppendTableSink将每个Table行至少插入一次数据库表(如果启用了检查点)。 但是,可以使用REPLACE或INSERT OVERWRITE指定插入查询(insertion query)以执行对数据库的upsert写入。

要使用JDBC接收器,必须将JDBC连接器依赖项(flink-jdbc)添加到项目中。 然后,可以使用JDBCAppendSinkBuilder创建接收器:
[mw_shl_code=java,true]JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build();

tableEnv.registerTableSink(
  "jdbcOutputTable",
  // specify table schema
  new String[]{"id"},
  new TypeInformation[]{Types.INT},
  sink);

Table table = ...
table.insertInto("jdbcOutputTable");[/mw_shl_code]
[mw_shl_code=scala,true]val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  .setDBUrl("jdbc:derby:memory:ebookshop")
  .setQuery("INSERT INTO books (id) VALUES (?)")
  .setParameterTypes(INT_TYPE_INFO)
  .build()

tableEnv.registerTableSink(
  "jdbcOutputTable",
  // specify table schema
  Array[String]("id"),
  Array[TypeInformation[_]](Types.INT),
  sink)

val table: Table = ???
table.insertInto("jdbcOutputTable")[/mw_shl_code]
与使用JDBCOutputFormat类似,必须显式指定JDBC驱动程序的名称,JDBC URL,要执行的查询以及JDBC表的字段类型。

CassandraAppendTableSink
CassandraAppendTableSink向Cassandra表emit一个表。 接收器仅支持仅追加流表。 它不能用于emit不断更新的表。 有关详细信息,请参阅表到流转换的文档

如果启用了检查点,CassandraAppendTableSink会将所有行至少插入一次Cassandra表中。 但是,可以将查询指定为upsert查询。

要使用CassandraAppendTableSink,必须将Cassandra连接器依赖项(flink-connector-cassandra)添加到项目中。 下面的示例显示了如何使用CassandraAppendTableSink。

[mw_shl_code=java,true]ClusterBuilder builder = ... // configure Cassandra cluster connection

CassandraAppendTableSink sink = new CassandraAppendTableSink(
  builder,
  // the query must match the schema of the table
  "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");

tableEnv.registerTableSink(
  "cassandraOutputTable",
  // specify table schema
  new String[]{"id", "name", "value"},
  new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
  sink);

Table table = ...
table.insertInto(cassandraOutputTable);[/mw_shl_code]

[mw_shl_code=scala,true]val builder: ClusterBuilder = ... // configure Cassandra cluster connection

val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
  builder,
  // the query must match the schema of the table
  "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)")

tableEnv.registerTableSink(
  "cassandraOutputTable",
  // specify table schema
  Array[String]("id", "name", "value"),
  Array[TypeInformation[_]](Types.INT, Types.STRING, Types.DOUBLE),
  sink)

val table: Table = ???
table.insertInto(cassandraOutputTable)[/mw_shl_code]

最新经典文章,欢迎关注公众号



已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条