分享

带大家一起学数据湖:数据湖之iceberg开发相关知识

问题导读

1.Spark读写iceberg表需要哪些配置?
2.Spark开发环境中需要引入哪些包?
3.如何开启hive对iceberg的支持?

1 环境准备

准备大数据集群 .安装HDFS ,HIVE,SAPRK ,FLINK
下载运行集群环境运行是需要的jar包


下载地址:http://iceberg.apache.org/releases/


2 Spark将读写iceberg表中数据

准备spark集群, 并配置iceberg环境

在spark的配置目录中添加hadoop和hive的配置文件 方便spark与hadoop和hive整合


1.png

在spark的jars包中添加下载好的iceberg的jar包


1.png

2.1 catalog为hadoop
2.1.1 sparksql操作

  1. spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 \
  2. --conf spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog \
  3. --conf spark.sql.catalog.hadoop_prod.type=hadoop \
  4. --conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://linux01:8020/doit/iceberg/warehouse
复制代码


创建表

  1. -- 使用系统默认的数据源 会将表生成在默认的本地文件夹中
  2. spark-sql> create  table tb_test1(id int ,name string) using  iceberg ;
复制代码
1.png

  1. -- 切换启动SQL脚本是自己设置的数据源hadoop_prod
  2. use hadoop_prod.default ;
  3. -- 创建表
  4. create  table tb_test1(id int ,name string) using  iceberg ;
复制代码
查看表的位置在HDFS上


1.png

插入入数据到iceberg表中

insert into tb_test1 values(1,'马云'),(2,'马蓉'),(3,'马保国') ;

select * from tb_test1 ;

1       马云
2       马蓉
3       马保国


1.png

2.1.2 spark shell操作


-- 登录shell客户端

spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0 \
--conf spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_prod.type=hadoop \
--conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://linux01:8020/doit/iceberg/warehouse


1.png

直接读取上面在sparksql中创建的表中的数据
  1. scala> spark.read.format("iceberg").load("hdfs://linux01:8020//doit/iceberg/warehouse/default/tb_test1").show
  2. +---+------+
  3. | id|  name|
  4. +---+------+
  5. |  1|  马云|
  6. |  2|  马蓉|
  7. |  3|马保国|
  8. +---+------+
复制代码



使用sparkAPI建表和插入数据在后面的API中在详细介绍

2.1.3 IDEA spark项目操作

使用IDEA创建maven项目 , 添加依赖
pom.xml文件
  1. <properties>
  2.         <maven.compiler.source>1.8</maven.compiler.source>
  3.         <maven.compiler.target>1.8</maven.compiler.target>
  4.         <scala.version>2.12.12</scala.version>
  5.         <spark.version>3.0.0</spark.version>
  6.         <hadoop.version>3.1.1</hadoop.version>
  7.         <encoding>UTF-8</encoding>
  8.     </properties>
  9.     <dependencies>
  10.         <!-- 导入scala的依赖 -->
  11.         <dependency>
  12.             <groupId>org.scala-lang</groupId>
  13.             <artifactId>scala-library</artifactId>
  14.             <version>${scala.version}</version>
  15.         </dependency>
  16.         <!-- 导入spark的依赖 -->
  17.         <dependency>
  18.             <groupId>org.apache.spark</groupId>
  19.             <artifactId>spark-core_2.12</artifactId>
  20.             <version>${spark.version}</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.apache.spark</groupId>
  24.             <artifactId>spark-sql_2.12</artifactId>
  25.             <version>${spark.version}</version>
  26.         </dependency>
  27.         <!--JDBC驱动包-->
  28.         <dependency>
  29.             <groupId>mysql</groupId>
  30.             <artifactId>mysql-connector-java</artifactId>
  31.             <version>5.1.48</version>
  32.         </dependency>
  33.         <!--hive-->
  34.         <dependency>
  35.             <groupId>org.apache.spark</groupId>
  36.             <artifactId>spark-hive_2.12</artifactId>
  37.             <version>${spark.version}</version>
  38.         </dependency>
  39.         <dependency>
  40.             <groupId>com.alibaba</groupId>
  41.             <artifactId>fastjson</artifactId>
  42.             <version>1.2.62</version>
  43.         </dependency>
  44.         <dependency>
  45.             <groupId>c3p0</groupId>
  46.             <artifactId>c3p0</artifactId>
  47.             <version>0.9.1.2</version>
  48.         </dependency>
  49.         <dependency>
  50.             <groupId>org.apache.iceberg</groupId>
  51.             <artifactId>iceberg-core</artifactId>
  52.             <version>0.10.0</version>
  53.         </dependency>
  54.         <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark3-runtime -->
  55.         <dependency>
  56.             <groupId>org.apache.iceberg</groupId>
  57.             <artifactId>iceberg-spark3-runtime</artifactId>
  58.             <version>0.10.0</version>
  59.         </dependency>
  60. <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
  61. <dependency>
  62.     <groupId>org.apache.avro</groupId>
  63.     <artifactId>avro</artifactId>
  64.     <version>1.9.0</version>
  65. </dependency>
  66.     </dependencies>
  67.     <build>
  68.         <plugins>
  69.             <!-- 指定编译java的插件 -->
  70.             <plugin>
  71.                 <groupId>org.apache.maven.plugins</groupId>
  72.                 <artifactId>maven-compiler-plugin</artifactId>
  73.                 <version>3.5.1</version>
  74.             </plugin>
  75.             <!-- 指定编译scala的插件 -->
  76.             <plugin>
  77.                 <groupId>net.alchim31.maven</groupId>
  78.                 <artifactId>scala-maven-plugin</artifactId>
  79.                 <version>3.2.2</version>
  80.                 <executions>
  81.                     <execution>
  82.                         <goals>
  83.                             <goal>compile</goal>
  84.                             <goal>testCompile</goal>
  85.                         </goals>
  86.                         <configuration>
  87.                             <args>
  88.                                 <arg>-dependencyfile</arg>
  89.                                 <arg>${project.build.directory}/.scala_dependencies</arg>
  90.                             </args>
  91.                         </configuration>
  92.                     </execution>
  93.                 </executions>
  94.             </plugin>
  95.         </plugins>
  96.     </build>
复制代码



  1.   val spark: SparkSession = SparkSession
  2.       .builder()
  3.       .config("spark.sql.catalog.hadoop_prod.type", "hadoop") // 设置数据源类别为hadoop
  4.       .config("spark.sql.catalog.hadoop_prod", classOf[SparkCatalog].getName)
  5.       .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://linux01:8020//doit/iceberg/warehouse/") // 设置数据源位置
  6.       .appName(this.getClass.getSimpleName)
  7.       .master("local[*]")
  8.       .getOrCreate()
  9.     // 获取表结构信息
  10.     val df = spark.table("hadoop_prod.default.tb_test1")
  11.     df.printSchema()
  12.     // 读取指定表下的数据
  13.     //spark.read.format("iceberg").load("/doit/iceberg/warehouse/default/tb_test1").show()
  14.      //3372567346381641315
  15.     /**
  16.      * select snapshot_id from hadoop_prod.default.tb_test1.snapshots ;
  17.      * select * from hadoop_prod.default.tb_test1.snapshots ;
  18.      */
  19.     // 读取指定快照下的数据
  20.     spark.read.option("snapshot-id", 3372567346381641315l).format("iceberg").load("/doit/iceberg/warehouse/default/tb_test1").show
复制代码


1.png

2.2 catalog为hive
spark可以使用sparkshell , sparksql 和idea中创建iceberg表 ,在hive中使用iceberg支持以后可以再hive中对数据进行分析处理  , 但是不能对数据进行修改和创建表等操作 , 也就是说暂且还不支持写操作 !

2.2.1 sparksql操作

spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:0.10.0     --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog     --conf spark.sql.catalog.spark_catalog.type=hive

-- 在hive中创建iceberg的数据库

create database spark_catalog.hang ;

-- 切换数据源数据库

use spark_catalog.hang ;

-- 创建iceberg表

create table tb_hang(id int , name string) using iceberg ;

在hive的工作目录中查看  出现如下目录 :

向表中插入数据  -----> 在hive客户端查询数据


1.png

-- 开启hive对iceberg的支持

SET iceberg.mr.catalog=hive;

set iceberg.engine.hive.enabled=true ;



-- 在hive端查看当前的数据库

show database ;

+----------------+
| database_name  |
+----------------+
| db_doit19      |
| db_icer        |
| default        |
| hang           |
+----------------+

切换 数据库  

use  hang ;

select * from tb_hang  ;

+-------------+---------------+
| tb_hang.id  | tb_hang.name  |
+-------------+---------------+
| 2           | hangge        |
| 1           | hang          |
+-------------+---------------+


1.png



最新经典文章,欢迎关注公众号


原文链接
https://blog.csdn.net/qq_37933018/article/details/110452480

加微信w3aboutyun,可拉入技术爱好者群

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /5 下一条