分享

flink 报AppendStreamTableSink requires that Table has only insert changes

ighack 2020-5-25 09:43:19 发表于 疑问解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 5995
本帖最后由 ighack 于 2020-5-25 09:45 编辑

[mw_shl_code=scala,true]tableEnv.connect(new Kafka()
                        .version("universal")
                        .startFromEarliest()
                        .topic(topic)
                        .properties(properties)
                        )
        .withFormat(new Json().failOnMissingField(false))
        .withSchema(new Schema().field("name",DataTypes.STRING())
                                .field("num",DataTypes.INT()))
        .inAppendMode()
        .createTemporaryTable("usert")

    val result = tableEnv.sqlQuery("SELECT name, sum(num) as onum FROM usert group by name")

tableEnv.connect(new Kafka()
      .version("universal")
      .topic(sink_topic)
      .properties(props)
      .sinkPartitionerRoundRobin()
    )
      .withFormat(new Json().failOnMissingField(false))
      .withSchema(new Schema().field("name",DataTypes.STRING())
                               .field("onum",DataTypes.INT()))
      .inAppendMode()
        .createTemporaryTable("userp")


      result.insertInto("userp")[/mw_shl_code]
网上有的说不能group by    (我需要group by)
有的说要加时间窗   (不加时间窗行不行)
可是我这个就是一个非常简单的东西。不知道要怎么改啊

已有(3)人评论

跳转到指定楼层
阿飞 发表于 2020-5-25 18:08:08
模式改为这个试试:.inUpsertMode()
回复

使用道具 举报

ighack 发表于 2020-5-26 08:14:36
阿飞 发表于 2020-5-25 18:08
模式改为这个试试:.inUpsertMode()

Unknown value for property 'update-mode'.
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条