ighack 发表于 2020-5-15 10:57:59

flink连kafka的json格式怎么执行SQL

本帖最后由 ighack 于 2020-5-15 16:49 编辑

tableEnv.connect(new Kafka().topic(topic)
                        .properties(properties)
                        .sinkPartitionerRoundRobin())

      .withFormat(new Json().failOnMissingField(false).deriveSchema()).deriveSchema()在1.10版本中已经不推荐使用
请问一下这个方法是做什么用的。不要可不可以

langke93 发表于 2020-5-17 16:10:30

deriveSchema是json format的格式的一种。
可以使用schema或者jsonSchema或者deriveSchema来定义json format

langke93 发表于 2020-5-17 16:19:48

对于json是这样的:

.withFormat(
new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // optional: define the schema explicitly using type information. This overrides default
    // behavior that uses table's schema as format schema.
    .schema(Type.ROW(...))

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP. This also overrides default behavior.
    .jsonSchema(
      "{" +
      "type: 'object'," +
      "properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "}" +
      "}"
    )
)

可参考官网
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#json-format

美丽天空 发表于 2020-5-18 10:07:05

学习看了
页: [1]
查看完整版本: flink连kafka的json格式怎么执行SQL