分享

Flink1.11实践汇总:含table、kafka、hive等

本帖最后由 levycui 于 2020-11-4 22:12 编辑
问题导读:
1、Flink1.11如何将 聚合,update数据写入到kafka?
2、Flink 1.11 如何操作table案例?
3、读取kafka数据写入hive中,无分区信息及读取不到数据如何解决?
4、如何添加自定义分区时间抽取类 MyPartTimeExtractor ?




实践一、参考社区实现 Flink1.11将 聚合,update数据写入到kafka

1,快速链接社区文档:
https://github.com/ververica/flink-cdc-connectors

下载依赖包:

  1.    <dependency>
  2.         <groupId>com.alibaba.ververica</groupId>
  3.         <!-- add the dependency matching your database -->
  4.         <artifactId>flink-connector-mysql-cdc</artifactId>
  5.         <version>1.0.0</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>com.alibaba.ververica</groupId>
  9.         <!-- add the dependency matching your database -->
  10.         <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  11.         <version>1.0.0</version>
  12.     </dependency>
  13.     <dependency>
  14.         <groupId>com.alibaba.ververica</groupId>
  15.         <!-- add the dependency matching your database -->
  16.         <artifactId>flink-format-changelog-json</artifactId>
  17.         <version>1.0.0</version>
  18.     </dependency>
复制代码

我们观察到changelog- json 代码逻辑与canal-json一样 :一个工厂类,一个反序列化,一个序列化类

2020-11-04_183531.jpg

上代码测试:
  1.     import org.apache.flink.streaming.api.TimeCharacteristic
  2.     import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3.     import org.apache.flink.table.api.EnvironmentSettings
  4.     import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
  5.     object CdcSinkKafka {
  6.       def main(args: Array[String]): Unit = {
  7.         val env = StreamExecutionEnvironment.getExecutionEnvironment
  8.         env.enableCheckpointing(30001)
  9.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  10.         val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
  11.         val stenv = StreamTableEnvironment.create(env, bsSettings)
  12.         val source =
  13.           s"""
  14.              |CREATE TABLE kafka_table (
  15.              | category_id STRING,
  16.              | user_id INT,
  17.              | item_id STRING,
  18.              | behavior STRING,
  19.              | ts STRING
  20.              |) WITH (
  21.              | 'connector' = 'kafka',
  22.              | 'topic' = 'user_behavior',
  23.              | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
  24.              | 'properties.group.id' = 'test1',
  25.              | 'format' = 'json',
  26.              | 'scan.startup.mode' = 'earliest-offset'
  27.              |)
  28.            """.stripMargin
  29.         stenv.executeSql(source)
  30.         val sink =
  31.           s"""
  32.              |CREATE TABLE kafka_gmv (
  33.              |  id STRING,
  34.              |  gmv DECIMAL(10, 5)
  35.              |) WITH (
  36.              |    'connector' = 'kafka',
  37.              |    'topic' = 'kafka_gmv',
  38.              |    'scan.startup.mode' = 'earliest-offset',
  39.              |    'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
  40.              |    'format' = 'changelog-json'
  41.              |)
  42.            """.stripMargin
  43.         stenv.executeSql(sink)
  44.         val insert =
  45.           s"""
  46.              | INSERT INTO kafka_gmv
  47.              |    SELECT behavior, SUM(user_id) as gmv
  48.              |    FROM kafka_table
  49.              |    GROUP BY behavior
  50.            """.stripMargin
  51.         stenv.executeSql(insert)
  52.         val query =
  53.           s"""
  54.              |SELECT * FROM kafka_gmv;
  55.            """.stripMargin
  56.         stenv.executeSql(query).print()
  57.       }
  58.     }
复制代码



kafka查看数据:
2020-11-04_183626.jpg
{"data":{"id":"pv","gmv":955},"op":"-U"}
{"data":{"id":"pv","gmv":955},"op":"+U"}


实践二、Flink 1.11 table案例

1,先来的简单点的,折腾了半天 昨天是maven下载不了flink-clients.jar ,下载之后手动导入,然后最简单的代码都运行不了

今天没办法,还是报错,缺包(org.apache.flink.optimizer.costs.CostEstimator),没法子,缺的包就是flink-clients.jar里面的,

再次尝试加入依赖,今天是成功了,代码就执行成功了。

官网中文文档地址:https://ci.apache.org/projects/f ... A%E8%A7%86%E5%9B%BE

  1.     <dependency>
  2.         <groupId>org.apache.flink</groupId>
  3.         <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  4.         <version>${flink.version}</version>
  5.     </dependency>
  6.     <dependency>
  7.         <groupId>org.apache.flink</groupId>
  8.         <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
  9.         <version>${flink.version}</version>
  10.     </dependency>
  11.     <dependency>
  12.         <groupId>org.apache.flink</groupId>
  13.         <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  14.         <version>${flink.version}</version>
  15.     </dependency>
  16.     <dependency>
  17.         <groupId>org.apache.flink</groupId>
  18.         <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  19.         <version>${flink.version}</version>
  20.     </dependency>
  21.     <dependency>
  22.         <groupId>org.apache.flink</groupId>
  23.         <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  24.         <version>${flink.version}</version>
  25.     </dependency>
  26.     <dependency>
  27.         <groupId>org.apache.flink</groupId>
  28.         <artifactId>flink-clients_${scala.binary.version}</artifactId>
  29.         <version>${flink.version}</version>
  30.     </dependency>
  31.     public class SqlTest03 {
  32.         public static void main(String[] args) throws Exception {
  33.             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  34.             EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  35.             StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
  36.     // ingest a DataStream from an external source
  37.             DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new SourceFunction<Tuple3<Long, String, Integer>>() {
  38.                 @Override
  39.                 public void run(SourceContext<Tuple3<Long, String, Integer>> out) throws Exception {
  40.                     while (true){
  41.                         out.collect(new Tuple3<>(1L,"a",11));
  42.                         Thread.sleep(1000L);
  43.                     }
  44.                 }
  45.                 @Override
  46.                 public void cancel() {
  47.                 }
  48.             });
  49.             Table table = tableEnv.fromDataStream(ds, $("user"), $("product"), $("amount"));
  50.             DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
  51.             dsRow.print();
  52.             env.execute();
  53.         }
  54.     }
复制代码

顺便提一句关于 最后面的执行:
2020-11-04_183815.jpg


实践三、flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决

一、前言

在上一博客中写了flink1.11.0读取kafka数据写入到hive中,发现hive中无法查询flink通过scala写入的数据,搜了些资料查找原因,参考了下文章:https://zhuanlan.zhihu.com/p/157899980 里无法读取hive数据的原因,但里面比较明确给出的解决方案是修改源码,我觉得太麻烦了。查了下官方和阅读些flink源码,终于找到一种我认为比较便捷的解决方案,具体分析方法如下:

完整的flink读取kafka数据动态写出hive,实现实时数仓的代码demo请参考上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830

二、分析过程

1.StreamingFileCommitter 类中 commitPartitions 方法会调用PartitionTimeCommitTigger 类中的committablePartitions方法获取可提交的分区列表,然后变量分区提交分区。

2.主要是 PartitionTimeCommitTigger  类中的方法committablePartitions 用来获取需要提交分区的列表

2020-11-04_184012.jpg
2020-11-04_184022.jpg

里面需要  watermark > toMills(partTime) + commitDelay 成立 时才会把分区添加到需要提交分区列表中,这里是问题的关键,里面的 toMills(partTime) 方法转为毫秒时间时 会把 partTime 时间 往后8小时,所以会一直大于watermark 的值一直无法添加分区到需提交分区列表中
2020-11-04_184109.jpg

3.PartitionTimeExtractor 类   上图中的 extractor.extract()方法是实现了分区时间抽取接口 PartitionTimeExtractor 中的方法,查看flink官网:https://ci.apache.org/projects/f ... ors/filesystem.html 可以看出,可以自定义类实现 PartitionTimeExtractor 接口,重写里面的extract方法

2020-11-04_184138.jpg

4. PartitionTimeExtractor 实例对象 extractor 是由  PartitionTimeCommitTigger 类的构造方法根据读取配置信息来创建的,

当 'partition.time-extractor.kind'='custom' 是 使用自定义的实现PartitionTimeExtractor接口的分区时间抽取类,否则使用的是默认的DefaultPartTimeExtractor

2020-11-04_184206.jpg

2020-11-04_184230.jpg

5. 解决方案:使用自定义的分区抽取时间实现类MyPartTimeExtractor,重写 extract方法,方法的返回值partTime 时间减少8小时,如步骤2中所说  toMills(partTime)方法会把partTime时间因为时区问题多加8小时,这样一减一加则抵消掉时区的影响了。

三、解决方案实现

1.添加自定义分区时间抽取类 MyPartTimeExtractor 。  代码主要是复制默认PartitionTimeExtractor 另一实现类DefaultPartTimeExtractor 的逻辑,修改如下一行代码,减去8小时。
2020-11-04_184301.jpg

实现类完整代码参考 上一篇文章:https://blog.csdn.net/m0_37592814/article/details/108044830

2.hive建表是添加  partition.time-extractor.kind 和 partition.time-extractor.class 属性

如下:

2020-11-04_184335.jpg
3.hive验证
2020-11-04_184402.jpg

来源:
参考社区实现 Flink1.11将 聚合,update数据写入到kafka
https://www.pianshen.com/article/58101740377/

Flink 1.11 table案例
https://www.pianshen.com/article/73321658980/

flink1.11.0读取kafka数据写入hive中hive无分区信息及读取不到数据解决
https://www.pianshen.com/article/46751840422/

最新经典文章,欢迎关注公众号

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条