分享

一文讲解从Flink、Spark、Kafka、MySQL、Hive导入数据到ClickHouse

问题导读:
1、如何使用Flink导入数据?
2、如何使用Spark导入数据?
3、如何从MySQL中导入数据?
4、如何从Hive中导入数据?


本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink、Spark、Kafka、MySQL、Hive将数据导入ClickHouse,具体内容包括:

  •     使用Flink导入数据
  •     使用Spark导入数据
  •     从Kafka中导入数据
  •     从MySQL中导入数据
  •     从Hive中导入数据

使用Flink导入数据

本文介绍使用 flink-jdbc将数据导入ClickHouse,Maven依赖为:

  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
  4.     <version>1.10.1</version>
  5. </dependency>
复制代码

示例

本示例使用Kafka connector,通过Flink将Kafka数据实时导入到ClickHouse

  1. public class FlinkSinkClickHouse {
  2.     public static void main(String[] args) throws Exception {
  3.         String url = "jdbc:clickhouse://192.168.10.203:8123/default";
  4.         String user = "default";
  5.         String passwd = "hOn0d9HT";
  6.         String driver = "ru.yandex.clickhouse.ClickHouseDriver";
  7.         int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入
  8.         // 创建执行环境
  9.         EnvironmentSettings settings = EnvironmentSettings
  10.                 .newInstance()
  11.                 .useBlinkPlanner()
  12.                 .inStreamingMode()
  13.                 .build();
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  16.         String kafkaSource11 = "" +
  17.                 "CREATE TABLE user_behavior ( " +
  18.                 " `user_id` BIGINT, -- 用户id\n" +
  19.                 " `item_id` BIGINT, -- 商品id\n" +
  20.                 " `cat_id` BIGINT, -- 品类id\n" +
  21.                 " `action` STRING, -- 用户行为\n" +
  22.                 " `province` INT, -- 用户所在的省份\n" +
  23.                 " `ts` BIGINT, -- 用户行为发生的时间戳\n" +
  24.                 " `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
  25.                 " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间\n" +
  26.                 " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +
  27.                 ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
  28.                 " 'topic' = 'user_behavior', -- kafka主题\n" +
  29.                 " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取\n" +
  30.                 " 'properties.group.id' = 'group1', -- 消费者组\n" +
  31.                 " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
  32.                 " 'format' = 'json', -- 数据源格式为 json\n" +
  33.                 " 'json.fail-on-missing-field' = 'true',\n" +
  34.                 " 'json.ignore-parse-errors' = 'false'" +
  35.                 ")";
  36.         // Kafka Source
  37.         tEnv.executeSql(kafkaSource11);
  38.         String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
  39.         Table table = tEnv.sqlQuery(query);
  40.         String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
  41.                 "VALUES(?,?,?,?,?,?)";
  42.         //将数据写入 ClickHouse Sink
  43.         JDBCAppendTableSink sink = JDBCAppendTableSink
  44.                 .builder()
  45.                 .setDrivername(driver)
  46.                 .setDBUrl(url)
  47.                 .setUsername(user)
  48.                 .setPassword(passwd)
  49.                 .setQuery(insertIntoCkSql)
  50.                 .setBatchSize(batchsize)
  51.                 .setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
  52.                 .build();
  53.         String[] arr = {"user_id","item_id","cat_id","action","province","ts"};
  54.         TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};
  55.         tEnv.registerTableSink(
  56.                 "sink",
  57.                 arr,
  58.                 type,
  59.                 sink
  60.         );
  61.         tEnv.insertInto(table, "sink");
  62.         tEnv.execute("Flink Table API to ClickHouse Example");
  63.     }
  64. }
复制代码
   Note:

  •         由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
  •         在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数据。

使用Spark导入数据

本文主要介绍如何通过Spark程序写入数据到Clickhouse中。

  1. <dependency>
  2.       <groupId>ru.yandex.clickhouse</groupId>
  3.       <artifactId>clickhouse-jdbc</artifactId>
  4.       <version>0.2.4</version>
  5. </dependency>
  6. <!-- 如果报错:Caused by: java.lang.ClassNotFoundException: com.google.common.escape.Escapers,则添加下面的依赖 -->
  7. <dependency>
  8.           <groupId>com.google.guava</groupId>
  9.           <artifactId>guava</artifactId>
  10.           <version>28.0-jre</version>
  11. </dependency>
复制代码

示例

  1. object Spark2ClickHouseExample {
  2.   val properties = new Properties()
  3.   properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
  4.   properties.put("user", "default")
  5.   properties.put("password", "hOn0d9HT")
  6.   properties.put("batchsize", "1000")
  7.   properties.put("socket_timeout", "300000")
  8.   properties.put("numPartitions", "8")
  9.   properties.put("rewriteBatchedStatements", "true")
  10.   case class Person(name: String, age: Long)
  11.   private def runDatasetCreationExample(spark: SparkSession): Dataset[Person] = {
  12.     import spark.implicits._
  13.     // DataFrames转成DataSet
  14.     val path = "file:///e:/people.json"
  15.     val peopleDS = spark.read.json(path)
  16.     peopleDS.createOrReplaceTempView("people")
  17.     val ds = spark.sql("SELECT name,age FROM people").as[Person]
  18.     ds.show()
  19.     ds
  20.   }
  21.   def main(args: Array[String]) {
  22.     val url = "jdbc:clickhouse://kms-1:8123/default"
  23.     val table = "people"
  24.     val spark = SparkSession
  25.       .builder()
  26.       .appName("Spark  Example")
  27.       .master("local") //设置为本地运行
  28.       .getOrCreate()
  29.     val ds = runDatasetCreationExample(spark)
  30.     ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url, table, properties)
  31.     spark.stop()
  32.   }
  33. }
复制代码

从Kafka中导入数据

主要是使用ClickHouse的表引擎。

使用方式

  1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
  2. (
  3.     name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
  4.     name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
  5.     ...
  6. ) ENGINE = Kafka()
  7. SETTINGS
  8.     kafka_broker_list = 'host:port',
  9.     kafka_topic_list = 'topic1,topic2,...',
  10.     kafka_group_name = 'group_name',
  11.     kafka_format = 'data_format'[,]
  12.     [kafka_row_delimiter = 'delimiter_symbol',]
  13.     [kafka_schema = '',]
  14.     [kafka_num_consumers = N,]
  15.     [kafka_max_block_size = 0,]
  16.     [kafka_skip_broken_messages = N,]
  17.     [kafka_commit_every_batch = 0,]
  18.     [kafka_thread_per_consumer = 0]
  19.     kafka_broker_list :逗号分隔的brokers地址 (localhost:9092).
  20.     kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔.
  21.     kafka_group_name :消费者组.
  22.     kafka_format – Message format. 比如JSONEachRow、JSON、CSV等等
复制代码

使用示例

在kafka中创建user_behavior主题,并向该主题写入数据,数据示例为:

  1. {"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
  2. {"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
  3. {"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
复制代码

在ClickHouse中创建表,选择表引擎为Kafka(),如下:

  1. CREATE TABLE kafka_user_behavior (
  2.     user_id UInt64 COMMENT '用户id',
  3.     item_id UInt64 COMMENT '商品id',
  4.     cat_id UInt16  COMMENT '品类id',
  5.     action String  COMMENT '行为',
  6.     province UInt8 COMMENT '省份id',
  7.     ts UInt64      COMMENT '时间戳'
  8.   ) ENGINE = Kafka()
  9.     SETTINGS
  10.     kafka_broker_list = 'cdh04:9092',
  11.     kafka_topic_list = 'user_behavior',
  12.     kafka_group_name = 'group1',
  13.     kafka_format = 'JSONEachRow'
  14. ;
  15. -- 查询
  16. cdh04 :) select * from kafka_user_behavior ;
  17. -- 再次查看数据,发现数据为空
  18. cdh04 :) select count(*) from kafka_user_behavior;
  19. SELECT count(*)
  20. FROM kafka_user_behavior
  21. ┌─count()─┐
  22. │       0 │
  23. └─────────┘
复制代码

通过物化视图将kafka数据导入ClickHouse

当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。

  •     首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
  •     然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
  •     最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中

  1. --  创建Kafka引擎表
  2. CREATE TABLE kafka_user_behavior_src (
  3.     user_id UInt64 COMMENT '用户id',
  4.     item_id UInt64 COMMENT '商品id',
  5.     cat_id UInt16  COMMENT '品类id',
  6.     action String  COMMENT '行为',
  7.     province UInt8 COMMENT '省份id',
  8.     ts UInt64      COMMENT '时间戳'
  9.   ) ENGINE = Kafka()
  10.     SETTINGS
  11.     kafka_broker_list = 'cdh04:9092',
  12.     kafka_topic_list = 'user_behavior',
  13.     kafka_group_name = 'group1',
  14.     kafka_format = 'JSONEachRow'
  15. ;
  16. -- 创建一张终端用户使用的表
  17. CREATE TABLE kafka_user_behavior (
  18.     user_id UInt64 COMMENT '用户id',
  19.     item_id UInt64 COMMENT '商品id',
  20.     cat_id UInt16  COMMENT '品类id',
  21.     action String  COMMENT '行为',
  22.     province UInt8 COMMENT '省份id',
  23.     ts UInt64      COMMENT '时间戳'
  24.   ) ENGINE = MergeTree()
  25.     ORDER BY user_id
  26. ;
  27. -- 创建物化视图,同步数据
  28. CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
  29.     AS SELECT * FROM kafka_user_behavior_src ;
  30. -- 查询,多次查询,已经被查询的数据依然会被输出
  31. cdh04 :) select * from kafka_user_behavior;
  32.     Note:
  33.     Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。
复制代码

从MySQL中导入数据

同kafka中导入数据类似,ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。
数据类型对应关系

MySQL中数据类型与ClickHouse类型映射关系如下表。
2020-11-18_201001.jpg

使用方式
  1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
  2. (
  3.     name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
  4.     name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
  5.     ...
  6. ) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
复制代码

使用示例

  1. -- 连接MySQL中clickhouse数据库的test表
  2. CREATE TABLE mysql_users(
  3.     id Int32,
  4.     name String
  5. ) ENGINE = MySQL(
  6. '192.168.10.203:3306',
  7. 'clickhouse',
  8. 'users',
  9. 'root',
  10. '123qwe');
  11. -- 查询数据
  12. cdh04 :) SELECT * FROM mysql_users;
  13. SELECT *
  14. FROM mysql_users
  15. ┌─id─┬─name──┐
  16. │  1 │ tom   │
  17. │  2 │ jack  │
  18. │  3 │ lihua │
  19. └────┴───────┘
  20. -- 插入数据,会将数据插入MySQL对应的表中
  21. -- 所以当查询MySQL数据时,会发现新增了一条数据
  22. INSERT INTO users VALUES(4,'robin');
  23. -- 再次查询
  24. cdh04 :) select * from mysql_users;               
  25. SELECT *
  26. FROM mysql_users
  27. ┌─id─┬─name──┐
  28. │  1 │ tom   │
  29. │  2 │ jack  │
  30. │  3 │ lihua │
  31. │  4 │ robin │
  32. └────┴───────┘
复制代码

注意:对于MySQL表引擎,不支持UPDATE和DELETE操作,比如执行下面命令时,会报错:
  1. -- 执行更新
  2. ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
  3. -- 执行删除
  4. ALTER TABLE mysql_users DELETE WHERE id = 1;
  5. -- 报错
  6. DB::Exception: Mutations are not supported by storage MySQL.
复制代码

从Hive中导入数据

本文使用Waterdrop进行数据导入,Waterdrop是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中。

我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。配置文件包括四个部分,分别是Spark、Input、filter和Output。

关于Waterdrop的安装,十分简单,只需要下载ZIP文件,解压即可。使用Waterdrop需要安装Spark。

    在Waterdrop安装目录的config/文件夹下创建配置文件:hive_table_batch.conf,内容如下。主要包括四部分:Spark、Input、filter和Output。

  •         Spark部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
  •         Input部分是定义数据源,其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。
  •         filter部分配置一系列的转化,比如过滤字段
  •         Output部分是将处理好的结构化数据写入ClickHouse,ClickHouse的连接配置。
  •         需要注意的是,必须保证hive的metastore是在服务状态。

  1. spark {
  2.   spark.app.name = "Waterdrop_Hive2ClickHouse"
  3.   spark.executor.instances = 2
  4.   spark.executor.cores = 1
  5.   spark.executor.memory = "1g"
  6.   // 这个配置必需填写
  7.   spark.sql.catalogImplementation = "hive"
  8. }
  9. input {
  10.     hive {
  11.         pre_sql = "select * from default.users"
  12.         table_name = "hive_users"
  13.     }
  14. }
  15. filter {}
  16. output {
  17.     clickhouse {
  18.         host = "kms-1:8123"
  19.         database = "default"
  20.         table = "users"
  21.         fields = ["id", "name"]
  22.         username = "default"
  23.         password = "hOn0d9HT"
  24.     }
  25. }
复制代码

    执行任务
  1. [kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh  --config config/hive_table_batch.conf --master yarn --deploy-mode cluster
复制代码

这样就会启动一个Spark作业执行数据的抽取,等执行完成之后,查看ClickHouse的数据。

总结

本文主要介绍了如何通过Flink、Spark、Kafka、MySQL以及Hive,将数据导入到ClickHouse,对每一种方式都出了详细的示例,希望对你有所帮。


作者:西贝
来源:https://mp.weixin.qq.com/s/SOX_4JlcLYk7wHPNDLJ1Rg

最新经典文章,欢迎关注公众号


已有(2)人评论

跳转到指定楼层
东陆炸洋芋 发表于 2020-11-20 09:34:40
干货满满,学习了
回复

使用道具 举报

aa834581918 发表于 2020-11-25 10:01:20
收藏了,以后有用
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条