分享

带大家一起学数据湖:Apache+Hudi入门指南

问题导读:
1、什么是Apache Hudi?
2、Hudi如何编译?
3、Hive和Presto如何进行集成?
4、如何编写Hudi代码?



上一篇:带大家一起学数据湖:数据湖入门必备

1. 什么是Apache Hudi
  •     一个spark 库
  •     大数据更新解决方案,大数据中没有传统意义的更新,只有append和重写(Hudi就是采用重写方式)

使用Hudi的优点
  •     使用Bloomfilter机制+二次查找,可快速确定记录是更新还是新增
  •     更新范围小,是文件级别,不是表级别
  •     文件大小与hdfs的Blocksize保持一致
  •     数据文件使用parquet格式,充分利用列存的优势(dremal论文实现)
  •     提供了可扩展的大数据更新框架
  •     并发度由spark控制

hudi详细介绍见hudi官网 http://hudi.apache.org/cn/docs/0.5.0-quick-start-guide.html


2. Hudi编译
  1. git clone https://github.com/apache/incubator-hudi.git && cd incubator-hudi
  2. mvn clean package -DskipTests -DskipITs
复制代码

注意: 本文编译hudi使用的linux环境,window环境一定要加上-DskipITs,不然会编译docker文件启动服务运行linux命令导致报错,如果是linux环境且需要用docker进行测试可以考虑去掉其参数。

3. 前置环境安装准备

所有版本选择均是查看当前master分支pom 中所依赖的 spark,hive ,hadoop,presto版本。(hudi-0.5.2-SNAPSHOT)
2021-01-06_203529.jpg

**注意:**小版本不一样不影响使用,如果运行spark任务报错不兼容排下依赖包就好。

4. Hive和Presto集成
4.1 hive

hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。

hive 外表数据结构如下:
  1. CREATE EXTERNAL TABLE `test_partition`(
  2.   `_hoodie_commit_time` string,
  3.   `_hoodie_commit_seqno` string,
  4.   `_hoodie_record_key` string,
  5.   `_hoodie_file_name` string,
  6.   `id` string,
  7.   `oid` string,
  8.   `name` string,
  9.   `dt` string,
  10.   `isdeleted` string,
  11.   `lastupdatedttm` string,
  12.   `rowkey` string)
  13. PARTITIONED BY (
  14.   `_hoodie_partition_path` string)
  15. ROW FORMAT SERDE
  16.   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  17. STORED AS INPUTFORMAT
  18.   'org.apache.hudi.hadoop.HoodieParquetInputFormat'
  19. OUTPUTFORMAT
  20.   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
  21. LOCATION
  22.   'hdfs://hj:9000/tmp/hudi'
  23. TBLPROPERTIES (
  24.   'transient_lastDdlTime'='1582111004')
复制代码

hive集成hudi方法:将hudi jar复制到hive lib下
  1. cp ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $HIVE_HOME/lib
复制代码

4.2 Presto

presto 集成hudi 是基于hive catalog 同样是访问hive 外表进行查询,如果要集成需要把hudi 包copy 到presto hive-hadoop2插件下面。
presto集成hudi方法: 将hudi jar复制到 presto hive-hadoop2下
  1. cp  ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $PRESTO_HOME/plugin/hive-hadoop2/
复制代码


5. Hudi代码实战
5.1 Copy_on_Write 模式操作(默认模式)
5.1.1 insert操作(初始化插入数据)
  1. // 不带分区写入
  2.   @Test
  3.   def insert(): Unit = {
  4.     val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  5.     val insertData = spark.read.parquet("/tmp/1563959377698.parquet")
  6.     insertData.write.format("org.apache.hudi")
  7.       // 设置主键列名
  8.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  9.       // 设置数据更新时间的列名
  10.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  11.       // 并行度参数设置
  12.       .option("hoodie.insert.shuffle.parallelism", "2")
  13.       .option("hoodie.upsert.shuffle.parallelism", "2")
  14.       // table name 设置
  15.       .option(HoodieWriteConfig.TABLE_NAME, "test")
  16.       .mode(SaveMode.Overwrite)
  17.       // 写入路径设置
  18.       .save("/tmp/hudi")
  19.   }
  20. // 带分区写入
  21.   @Test
  22.   def insertPartition(): Unit = {
  23.     val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  24.     // 读取文本文件转换为df
  25.     val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")
  26.     insertData.write.format("org.apache.hudi")
  27.       // 设置主键列名
  28.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  29.       // 设置数据更新时间的列名
  30.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  31.       // 设置分区列
  32.       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
  33.       // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
  34.       .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
  35.       // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
  36.       .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
  37.       // 并行度参数设置
  38.       .option("hoodie.insert.shuffle.parallelism", "2")
  39.       .option("hoodie.upsert.shuffle.parallelism", "2")
  40.       .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
  41.       .mode(SaveMode.Overwrite)
  42.       .save("/tmp/hudi")
  43.   }
复制代码




5.1.2 upsert操作(数据存在时修改,不存在时新增)
  1. // 不带分区upsert
  2.   @Test
  3.   def upsert(): Unit = {
  4.     val spark = SparkSession.builder.appName("hudi upsert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  5.     val insertData = spark.read.parquet("/tmp/1563959377699.parquet")
  6.     insertData.write.format("org.apache.hudi")
  7.       // 设置主键列名
  8.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  9.       // 设置数据更新时间的列名
  10.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  11.       // 表名称设置
  12.       .option(HoodieWriteConfig.TABLE_NAME, "test")
  13.       // 并行度参数设置
  14.       .option("hoodie.insert.shuffle.parallelism", "2")
  15.       .option("hoodie.upsert.shuffle.parallelism", "2")
  16.       .mode(SaveMode.Append)
  17.       // 写入路径设置
  18.       .save("/tmp/hudi");
  19.   }
  20. // 带分区upsert
  21.   @Test
  22.   def upsertPartition(): Unit = {
  23.     val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  24.     val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_update_data.txt")
  25.     upsertData.write.format("org.apache.hudi").option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  26.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  27.        // 分区列设置
  28.       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
  29.       .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
  30.       .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
  31.       .option("hoodie.insert.shuffle.parallelism", "2")
  32.       .option("hoodie.upsert.shuffle.parallelism", "2")
  33.       .mode(SaveMode.Append)
  34.       .save("/tmp/hudi");
  35.   }
复制代码



5.1.3 delete操作(删除数据)
  1.   @Test
  2.   def delete(): Unit = {
  3.     val spark = SparkSession.builder.appName("delta insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  4.     val deleteData = spark.read.parquet("/tmp/1563959377698.parquet")
  5.     deleteData.write.format("com.uber.hoodie")
  6.       // 设置主键列名
  7.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  8.       // 设置数据更新时间的列名
  9.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  10.       // 表名称设置
  11.       .option(HoodieWriteConfig.TABLE_NAME, "test")
  12.       // 硬删除配置
  13.       .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
  14.   }
复制代码




删除操作分为软删除和硬删除配置在这里查看:http://hudi.apache.org/cn/docs/0 ... 4%E6%95%B0%E6%8D%AE

5.1.4 query操作(查询数据)

  1.   @Test
  2.   def query(): Unit = {
  3.     val basePath = "/tmp/hudi"
  4.     val spark = SparkSession.builder.appName("query insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  5.     val tripsSnapshotDF = spark.
  6.       read.
  7.       format("org.apache.hudi").
  8.       load(basePath + "/*/*")
  9.     tripsSnapshotDF.show()
  10.   }
复制代码




5.1.5 同步至Hive
  1.   @Test
  2.   def hiveSync(): Unit = {
  3.     val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  4.     val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")
  5.     upsertData.write.format("org.apache.hudi")
  6.       // 设置主键列名
  7.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  8.       // 设置数据更新时间的列名
  9.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  10.       // 分区列设置
  11.       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
  12.       // 设置要同步的hive库名
  13.       .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")
  14.       // 设置要同步的hive表名
  15.       .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")
  16.       // 设置数据集注册并同步到hive
  17.       .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  18.       // 设置当分区变更时,当前数据的分区目录是否变更
  19.       .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
  20.       // 设置要同步的分区列名
  21.       .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
  22.       // 设置jdbc 连接同步
  23.       .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")
  24.       // hudi表名称设置
  25.       .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
  26.       // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
  27.       .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
  28.       // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
  29.       .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
  30.       // 并行度参数设置
  31.       .option("hoodie.insert.shuffle.parallelism", "2")
  32.       .option("hoodie.upsert.shuffle.parallelism", "2")
  33.       .mode(SaveMode.Append)
  34.       .save("/tmp/hudi");
  35.   }
  36.   @Test
  37.   def hiveSyncMergeOnReadByUtil(): Unit = {
  38.     val args: Array[String] = Array("--jdbc-url", "jdbc:hive2://hj:10000", "--partition-value-extractor", "org.apache.hudi.hive.MultiPartKeysValueExtractor", "--user", "hive", "--pass", "hive", "--partitioned-by", "dt", "--base-path", "/tmp/hudi_merge_on_read", "--database", "hj_repl", "--table", "test_partition_merge_on_read")
  39.     HiveSyncTool.main(args)
  40.   }
复制代码




这里可以选择使用spark 或者hudi-hive包中的hiveSynTool进行同步,hiveSynTool类其实就是run_sync_tool.sh运行时调用的。hudi 和hive同步时保证hive目标表不存在,同步其实就是建立外表的过程。

5.1.6 Hive查询读优化视图和增量视图
  1.   @Test
  2.   def hiveViewRead(): Unit = {
  3.     // 目标表
  4.     val sourceTable = "test_partition"
  5.     // 增量视图开始时间点
  6.     val fromCommitTime = "20200220094506"
  7.     // 获取当前增量视图后几个提交批次
  8.     val maxCommits = "2"
  9.     Class.forName("org.apache.hive.jdbc.HiveDriver")
  10.     val prop = new Properties()
  11.     prop.put("user", "hive")
  12.     prop.put("password", "hive")
  13.     val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
  14.     val stmt = conn.createStatement
  15.     // 这里设置增量视图参数
  16.     stmt.execute("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat")
  17.     // Allow queries without partition predicate
  18.     stmt.execute("set hive.strict.checks.large.query=false")
  19.     // Dont gather stats for the table created
  20.     stmt.execute("set hive.stats.autogather=false")
  21.     // Set the hoodie modie
  22.     stmt.execute("set hoodie." + sourceTable + ".consume.mode=INCREMENTAL")
  23.     // Set the from commit time
  24.     stmt.execute("set hoodie." + sourceTable + ".consume.start.timestamp=" + fromCommitTime)
  25.     // Set number of commits to pull
  26.     stmt.execute("set hoodie." + sourceTable + ".consume.max.commits=" + maxCommits)
  27.     val rs = stmt.executeQuery("select * from " + sourceTable)
  28.     val metaData = rs.getMetaData
  29.     val count = metaData.getColumnCount
  30.     while (rs.next()) {
  31.       for (i <- 1 to count) {
  32.         println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
  33.       }
  34.       println("-----------------------------------------------------------")
  35.     }
  36.     rs.close()
  37.     stmt.close()
  38.     conn.close()
  39.   }
复制代码



读优化视图即去掉增量视图参数即可。

5.1.7 Presto查询读优化视图(暂不支持增量视图)

  1.   @Test
  2.   def prestoViewRead(): Unit = {
  3.     // 目标表
  4.     val sourceTable = "test_partition"
  5.     Class.forName("com.facebook.presto.jdbc.PrestoDriver")
  6.     val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
  7.     val stmt = conn.createStatement
  8.     val rs = stmt.executeQuery("select * from  " + sourceTable)
  9.     val metaData = rs.getMetaData
  10.     val count = metaData.getColumnCount
  11.     while (rs.next()) {
  12.       for (i <- 1 to count) {
  13.         println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
  14.       }
  15.       println("-----------------------------------------------------------")
  16.     }
  17.     rs.close()
  18.     stmt.close()
  19.     conn.close()
  20.   }
复制代码




5.2 Merge_On_Read 模式操作

5.2.1 insert操作(插入数据)

  1.   @Test
  2.   def insertPartitionMergeOnRead(): Unit = {
  3.     val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  4.     // 读取文本文件转换为df
  5.     val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")
  6.     insertData.write.format("org.apache.hudi")
  7.       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  8.       // 设置主键列名
  9.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  10.       // 设置数据更新时间的列名
  11.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  12.       // 设置分区列
  13.       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
  14.       // 设置当分区变更时,当前数据的分区目录是否变更
  15.       .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
  16.       // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
  17.       .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
  18.       // 并行度参数设置
  19.       .option("hoodie.insert.shuffle.parallelism", "2")
  20.       .option("hoodie.upsert.shuffle.parallelism", "2")
  21.       .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")
  22.       .mode(SaveMode.Overwrite)
  23.       .save("/tmp/hudi_merge_on_read")
  24.   }
复制代码




merge on read 主要是要是加入option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)参数,其他修改删除操作和copy on write 类似,这里不一一列举。

5.2.2 同步至Hive

  1.   @Test
  2.   def hiveSyncMergeOnRead(): Unit = {
  3.     val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
  4.     val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")
  5.     upsertData.write.format("org.apache.hudi")
  6.       // 配置读时合并
  7.       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
  8.       // 设置主键列名
  9.       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
  10.       // 设置数据更新时间的列名
  11.       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
  12.       // 分区列设置
  13.       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
  14.       // 设置要同步的hive库名
  15.       .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")
  16.       // 设置要同步的hive表名
  17.       .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition_merge_on_read")
  18.       // 设置数据集注册并同步到hive
  19.       .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  20.       // 设置当分区变更时,当前数据的分区目录是否变更
  21.       .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
  22.       // 设置要同步的分区列名
  23.       .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
  24.       // 设置jdbc 连接同步
  25.       .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")
  26.       // hudi表名称设置
  27.       .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")
  28.       // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
  29.       .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
  30.       // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
  31.       .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
  32.       // 并行度参数设置
  33.       .option("hoodie.insert.shuffle.parallelism", "2")
  34.       .option("hoodie.upsert.shuffle.parallelism", "2")
  35.       .mode(SaveMode.Append)
  36.       .save("/tmp/hudi_merge_on_read");
  37.   }
复制代码




与copy on write 操作一样,不同的是merge on read 会生成两个表后缀为_ro和_rt的外表。_ro为读优化视图,_rt为实时视图。

5.2.3 Hive查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)

  1. /**
  2.    * merge on read 实时视图查询
  3.    */
  4.   @Test
  5.   def mergeOnReadRealtimeViewByHive(): Unit = {
  6.     // 目标表
  7.     val sourceTable = "test_partition_merge_on_read_rt"
  8.     Class.forName("org.apache.hive.jdbc.HiveDriver")
  9.     val prop = new Properties()
  10.     prop.put("user", "hive")
  11.     prop.put("password", "hive")
  12.     val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
  13.     val stmt = conn.createStatement
  14.     val rs = stmt.executeQuery("select * from " + sourceTable)
  15.     val metaData = rs.getMetaData
  16.     val count = metaData.getColumnCount
  17.     while (rs.next()) {
  18.       for (i <- 1 to count) {
  19.         println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
  20.       }
  21.       println("-----------------------------------------------------------")
  22.     }
  23.     rs.close()
  24.     stmt.close()
  25.     conn.close()
  26.   }
  27.   /**
  28.    * merge on read 读优化视图查询
  29.    */
  30.   @Test
  31.   def mergeOnReadReadoptimizedViewByHive(): Unit = {
  32.     // 目标表
  33.     val sourceTable = "test_partition_merge_on_read_ro"
  34.     Class.forName("org.apache.hive.jdbc.HiveDriver")
  35.     val prop = new Properties()
  36.     prop.put("user", "hive")
  37.     prop.put("password", "hive")
  38.     val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
  39.     val stmt = conn.createStatement
  40.     val rs = stmt.executeQuery("select * from " + sourceTable)
  41.     val metaData = rs.getMetaData
  42.     val count = metaData.getColumnCount
  43.     while (rs.next()) {
  44.       for (i <- 1 to count) {
  45.         println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
  46.       }
  47.       println("-----------------------------------------------------------")
  48.     }
  49.     rs.close()
  50.     stmt.close()
  51.     conn.close()
  52.   }
复制代码



5.2.4 Presto查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)

  1. /**
  2.    * presto merge on read 实时视图查询
  3.    */
  4.   @Test
  5.   def mergeOnReadRealtimeViewByPresto(): Unit = {
  6.     // 目标表
  7.     val sourceTable = "test_partition_merge_on_read_rt"
  8.     Class.forName("com.facebook.presto.jdbc.PrestoDriver")
  9.     val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
  10.     val stmt = conn.createStatement
  11.     val rs = stmt.executeQuery("select * from  " + sourceTable)
  12.     val metaData = rs.getMetaData
  13.     val count = metaData.getColumnCount
  14.     while (rs.next()) {
  15.       for (i <- 1 to count) {
  16.         println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
  17.       }
  18.       println("-----------------------------------------------------------")
  19.     }
  20.     rs.close()
  21.     stmt.close()
  22.     conn.close()
  23.   }
  24.   /**
  25.    * presto merge on read 读优化视图查询
  26.    */
  27.   @Test
  28.   def mergeOnReadReadoptimizedViewByPresto(): Unit = {
  29.     // 目标表
  30.     val sourceTable = "test_partition_merge_on_read_ro"
  31.     Class.forName("com.facebook.presto.jdbc.PrestoDriver")
  32.     val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
  33.     val stmt = conn.createStatement
  34.     val rs = stmt.executeQuery("select * from  " + sourceTable)
  35.     val metaData = rs.getMetaData
  36.     val count = metaData.getColumnCount
  37.     while (rs.next()) {
  38.       for (i <- 1 to count) {
  39.         println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
  40.       }
  41.       println("-----------------------------------------------------------")
  42.     }
  43.     rs.close()
  44.     stmt.close()
  45.     conn.close()
  46.   }
复制代码




6. 问题整理
a. merg on read 问题
merge on read 要配置option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)才会生效,配置为option(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name())将不会生效。


b. spark pom 依赖问题
不要引入spark-hive 的依赖里面包含了hive 1.2.1的相关jar包,而hudi 要求的版本是2.x版本。如果一定要使用请排除相关依赖。

c. hive视图同步问题

代码与hive视图同步时resources要加入hive-site.xml 配置文件,不然同步hive metastore 会报错。
git 测试代码地址:https://github.com/hj2016/hudi-test

作者:别过来胖到我了
来源:https://blog.csdn.net/h335146502/article/details/104485494/

最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /5 下一条