分享

flink连kafka的json格式怎么执行SQL

ighack 2020-5-15 10:57:59 发表于 疑问解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 5617
本帖最后由 ighack 于 2020-5-15 16:49 编辑

[mw_shl_code=scala,true]tableEnv.connect(new Kafka().topic(topic)
                        .properties(properties)
                        .sinkPartitionerRoundRobin())

        .withFormat(new Json().failOnMissingField(false).deriveSchema())[/mw_shl_code].deriveSchema()在1.10版本中已经[backcolor=rgba(200, 200, 200, 0.1)]不推荐使用
请问一下这个方法是做什么用的。不要可不可以

已有(3)人评论

跳转到指定楼层
langke93 发表于 2020-5-17 16:10:30
deriveSchema是json format的格式的一种。
可以使用schema或者jsonSchema或者deriveSchema来定义json format

回复

使用道具 举报

langke93 发表于 2020-5-17 16:19:48
对于json是这样的:

.withFormat(
  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // optional: define the schema explicitly using type information. This overrides default
    // behavior that uses table's schema as format schema.
    .schema(Type.ROW(...))

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP. This also overrides default behavior.
    .jsonSchema(
      "{" +
      "  type: 'object'," +
      "  properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "  }" +
      "}"
    )
)

可参考官网
https://ci.apache.org/projects/f ... ct.html#json-format

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条