flink连kafka的json格式怎么执行SQL
本帖最后由 ighack 于 2020-5-15 16:49 编辑tableEnv.connect(new Kafka().topic(topic)
.properties(properties)
.sinkPartitionerRoundRobin())
.withFormat(new Json().failOnMissingField(false).deriveSchema()).deriveSchema()在1.10版本中已经不推荐使用
请问一下这个方法是做什么用的。不要可不可以
deriveSchema是json format的格式的一种。
可以使用schema或者jsonSchema或者deriveSchema来定义json format
对于json是这样的:
.withFormat(
new Json()
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
// optional: define the schema explicitly using type information. This overrides default
// behavior that uses table's schema as format schema.
.schema(Type.ROW(...))
// or by using a JSON schema which parses to DECIMAL and TIMESTAMP. This also overrides default behavior.
.jsonSchema(
"{" +
"type: 'object'," +
"properties: {" +
" lon: {" +
" type: 'number'" +
" }," +
" rideTime: {" +
" type: 'string'," +
" format: 'date-time'" +
" }" +
"}" +
"}"
)
)
可参考官网
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#json-format
学习看了
页:
[1]