分享

0836-Apache Druid on HDP

本帖最后由 Mirinda 于 2021-4-19 13:46 编辑



问题导读:
1.什么是Apache Druid?
2.Apache Druid是一个什么架构?
3.如何使用Druid加速Hive查询?

1.Apache Druid简介
Apache Druid是一个分布式的、面向列的、实时分析数据库,旨在快速获取大量数据并将其编入索引,并对大型数据集进行快速的切片和切分分析(“OLAP查询),常用于实时摄取、快速查询和对时间依赖性很高的数据库用户。因此,Druid可以为可视化的分析应用程序提供强力的数据源支持,或用作需要快速聚合的高并发API的后端。Druid最适合面向事件的数据。
Apache Druid通常位于存储或处理层与最终用户之间,并充当查询层以服务于分析工作负载。

a1.png

常见应用领域包括:点击流分析、网络遥测分析、服务器指标存储、供应链分析、应用程序性能指标、数字营销、广告分析、商业智能BI / OLAP等。
Apache Druid的核心架构结合了数据仓库、时间序列数据库和日志搜索系统的思想,包括以下主要功能:
  • 列式存储格式
  • 可扩展的分布式系统
  • 大规模并行处理
  • 实时或批量加载数据
  • 自我修复、自我平衡、易于操作
  • 云原生的容错架构,不会丢失数据
  • 用于快速过滤的索引
  • 基于时间的分区
  • 近似算法
  • 加载数据时自动汇总


2.Apache Druid架构
Apache Druid具有多进程,分布式架构,旨在实现云友好且易于操作。每种Druid进程类型都可以独立配置和扩展,从而为您的集群提供最大的灵活性。这种设计还提高了容错能力:一个组件的故障不会立即影响其他组件。

a2.png

2.1 服务器类型
我们一般将Druid的服务器分为三种类型:主服务器(Master Server),查询服务器(Query Server)和数据服务器(Data Server)。

2.1.1 Master Server
Master Server管理数据的加载和可用性:它负责启动新的加载作业,并协调下述“Data Server”上数据的可用性。包含两个处理进程:Coordinator 和 Overlord。
  • Coordinator进程监视数据服务器上的Historical进程,它主要负责Segment的管理和分配。更具体地说,Druid Coordinator进程与Historical进程进行通信,以基于配置加载或删除Segment。Druid Coordinator负责加载新的Segment、删除过时的Segment、管理Segment的复制以及平衡Segment的负载,确保Segment在所有的Historical记录之间保持平衡。
  • Overlord进程监视数据服务器上的MiddleManager进程,并且是将数据加载到Druid中的控制器。它负责接受任务、协调任务分配、围绕任务创建锁以及将状态返回给调用方,并将加载任务分配给MiddleManager,并负责协调Segment的发布。可以将Overlord配置为以两种模式之一运行:本地模式或远程模式。

    • 在本地模式下,Overlord还负责创建用于执行任务的Peon。在本地模式下运行Overlord时,还必须提供所有MiddleManager和Peon配置。本地模式通常用于简单的工作流程。
    • 在远程模式下,Overlord和MiddleManager在单独的进程中运行,可以在不同的服务器上运行它们。如果打算将indexing服务用作整个Druid集群的索引服务,则建议使用此模式。


2.1.2 Query
Query Server提供用户和客户端应用程序与之交互的端点,将查询路由到数据服务器或其他查询服务器。包含两个处理进程:Broker和Router。
  • Broker进程从外部客户端接收查询,并将这些查询转发到数据服务器。当Broker从这些子查询中接收到结果时,它们会合并这些结果并将其返回给调用方。最终用户通常查询Broker,而不是直接查询数据服务器上的Historicals或MiddleManagers进程。
  • Router进程是一个可选的进程,它可以在Druid Broker、Overlord和Coordinator之前提供统一的API网关。

Router还运行Druid控制台,Druid控制台是用于数据源、段、任务、数据处理(Historical和MiddleManager)以及Coordinator动态配置的管理UI。还可以在控制台中运行SQL和Native Druid查询。

2.1.3 Data Server
Data Server:执行数据加载作业并存储可查询的数据。包含两个进程:Historical 和 MiddleManager。
  • Historical是存储和查询“历史”数据的主要进程,它从Deep Storage中下载Segment,并响应有关这些Segment的查询。不接受写操作。
  • MiddleManager是将新数据加载到群集中的进程,负责从外部数据源读取数据并发布至新的Druid Segment。

    • Peon进程是由MiddleManager产生的任务执行引擎,每个Peon运行一个单独的JVM,并负责执行一个任务。Peons始终与生成它们的MiddleManager在同一主机上运行。

2.2 外部依赖
除了内置的进程类型外,Druid还需要三个外部依赖项,可以利用现有的现有基础结构:Deep Storage、Metadata Storage、Zookeeper。

2.2.1 Deep Storage
Deep Storage是存储Segment的地方,Apache Druid本身不提供存储机制。这种Deep Storage的基础架构定义了数据的持久性级别,只要Druid进程可以看到该存储基础架构并能够获取存储在其上的Segment,那么无论丢失多少个Druid节点,数据都不会丢失。如果Segment从该存储层消失,则将丢失这些Segment表示的所有数据。
支持本地文件系统、HDFS和S3等,由属性druid.storage.type和druid.storage.storageDirectory等属性指定。

2.2.2 Metadata Storage
Metadata Storage是Apache Druid的外部依赖项,Apache Druid使用它来存储有关系统的各种元数据,而不是存储实际数据。
支持Derby、MySQL、PostgreSQL,由属性druid.metadata.storage.type等属性指定。

2.2.3 Zookeeper
Apache Druid使用Apache ZooKeeper(ZK)来管理当前集群状态,包含:
  • Coordinator的Leader选举
  • Historical中Segment的“发布”协议
  • Coordinator和Historical之间Segment的加载/删除协议
  • Overlord的Leader选举
  • Overlord和MiddleManager的任务管理


2.3 存储设计
Druid的数据存储在“datasources”中,类似于传统RDBMS中的“table”。每个datasource都按时间分区,并且可以选择按其他属性进一步分区。每个时间范围都称为“chunk”(如果按天划分,则为一天)。在一个chunk内,数据被划分为一个或多个“segment”。每个segment都是单个文件,通常包含多达几百万行的数据。

a3.png

一个datasource可能具有从几个segment到数十万甚至数百万个segment,每个segment都是从在MiddleManager上创建开始的,Segment的构建旨在生成紧凑且支持快速查询的数据文件,包括以下步骤:
  • 转换为列格式
  • 使用位图索引编制索引
  • 使用各种算法进行压缩

    • 字符串列的ID存储最小化的字典编码
    • 位图索引的位图压缩
    • 所有列的类型感知压缩


Apache Druid将其索引存储在Segment文件中,该Segment文件按时间进行分区。在基本设置中,将为每个时间间隔创建一个分段文件,其中该时间间隔可在granularitySpec的segmentGranularity参数中配置。为了使Druid在繁重的查询负载下正常运行,建议Segment文件的大小在300MB-700MB范围内。如果Segment文件大于此范围,可以更改时间间隔的粒度或者对数据进行分区,并在partitionsSpec中调整targetPartitionSize(一般建议最小为500万行)。
在Apache Druid中,一般有三种基本列的类型:时间戳列、维度列和指标列,如图所示:

a4.png

时间戳和指标列,都是由LZ4压缩的整数或浮点值的数组。
维度列由于支持筛选和分组操作,一般需要以下三个数据结构:
  • 将维度的值映射到整数ID的字典
  • 使用上述字典编码的维度的值的列表
  • 指示哪些行包含维度值的BITMAP

例如:
  1. 1: Dictionary that encodes column values
  2.   {
  3.     "Justin Bieber": 0,
  4.     "Ke$ha":         1
  5.   }
  6. 2: Column data
  7.   [0,
  8.    0,
  9.    1,
  10.    1]
  11. 3: Bitmaps - one for each unique value of the column
  12.   value="Justin Bieber": [1,1,0,0]
  13.   value="Ke$ha":         [0,0,1,1]
复制代码

3.在HDP上安装Apache Druid

环境
版本
操作系统
RHEL-7.6
数据库
MySQL-5.7
HDP
3.1.4

3.1 准备数据库
创建数据库,并授权(Druid 数据库需要使用utf8编码):
  1. mysql> CREATE DATABASE druid DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
  2. mysql> GRANT ALL PRIVILEGES ON efm.* TO efm@‘%’ IDENTIFIED BY ‘Cloudera4u’;
  3. mysql> FLUSH PRIVILEGES;
复制代码

加载MySQL的JDBC驱动:
  1. ambari-server setup --jdbc-db=mysql --jdbc-driver=/usr/share/java/mysql-connector-java.jar
  2. ambari-server restart
复制代码

3.2 安装Druid
打开Ambari WebUI上的添加服务向导,勾选 Druid:

a5.png


为Master Server和Query Server分配主机节点:

a6.png


为 Data Server分配主机节点:

a7.png


根据提示填入Metadata Storage数据库连接信息:

a8.png


安装完成后,可以看到Druid的服务汇总页面:

a9.png

在Druid服务汇总页面右侧,可以看到Quick Links下提供了两个WEB控制台
  • Druid Coordinator Console,用于显示集群信息:

a10.png

  • Druid Overlord Console可用于查看挂起的任务、正在运行的任务、可用的工作程序以及最近创建和终止的任务:

a11.png

4.导入数据
Apache Druid支持流式和批量加载数据两种方式,每种加载方法都支持其自己的源系统集。
  • 批量加载:当从文件进行批量加载时,应使用一次性任务,并且支持三种类型:index_parallel(本地、可以并行)、index_hadoop(基于hadoop)、和index(本地、单线程)。
  • 流式加载:最推荐、最流行的流式数据加载方法是直接从Kafka读取的Kafka索引服务。


无论使用哪种数据加载方式,都需要定制数据加载规范(JSON文件),主要由三个部分组成:
  • dataSchema:定义数据源的名称、时间戳、维度、指标、转换和过滤器
  • ioConfig:定义如何连接到数据源,以及如何解析数据
  • tuningConfig:控制每种加载方法特有的各种参数

4.1 导入本地数据源
使用单线程批量加载的方式加载数据到Druid,
数据文件路径:/usr/hdp/current/druid-overlord/quickstart/wikiticker-2015-09-12-sampled.json.gz

4.1.1 定义规范
  1. [root@hadoop47 ~]# cat index_local.json
  2. {
  3.   "type" : "index",
  4.   "spec" : {
  5.     "dataSchema" : {
  6.       "dataSource" : "wikipedia_local",
  7.       "parser" : {
  8.         "type" : "string",
  9.         "parseSpec" : {
  10.           "format" : "json",
  11.           "dimensionsSpec" : {
  12.             "dimensions" : [
  13.               "channel",
  14.               "cityName",
  15.               "comment",
  16.               "countryIsoCode",
  17.               "countryName",
  18.               "isAnonymous",
  19.               "isMinor",
  20.               "isNew",
  21.               "isRobot",
  22.               "isUnpatrolled",
  23.               "metroCode",
  24.               "namespace",
  25.               "page",
  26.               "regionIsoCode",
  27.               "regionName",
  28.               "user",
  29.               { "name": "added", "type": "long" },
  30.               { "name": "deleted", "type": "long" },
  31.               { "name": "delta", "type": "long" }
  32.             ]
  33.           },
  34.           "timestampSpec": {
  35.             "column": "time",
  36.             "format": "iso"
  37.           }
  38.         }
  39.       },
  40.       "metricsSpec" : [],
  41.       "granularitySpec" : {
  42.         "type" : "uniform",
  43.         "segmentGranularity" : "day",
  44.         "queryGranularity" : "none",
  45.         "intervals" : ["2015-09-12/2015-09-13"],
  46.         "rollup" : false
  47.       }
  48.     },
  49.     "ioConfig" : {
  50.       "type" : "index",
  51.       "firehose" : {
  52.         "type" : "local",
  53.         "baseDir" : "/usr/hdp/current/druid-overlord/quickstart/",
  54.         "filter" : "wikiticker-2015-09-12-sampled.json.gz"
  55.       },
  56.       "appendToExisting" : false
  57.     },
  58.     "tuningConfig" : {
  59.       "type" : "index",
  60.       "maxRowsPerSegment" : 5000000,
  61.       "maxRowsInMemory" : 25000,
  62.       "forceExtendableShardSpecs" : true
  63.     }
  64.   }
  65. }
复制代码

4.1.2 加载数据
  1. curl -X 'POST' -H 'Content-Type:application/json' -d @index_local.json http://hadoop46.luqimin.cn:8090/druid/indexer/v1/task
复制代码

4.2 导入HDFS数据源
4.2.1 定义规范
  1. {
  2.   "type" : "index_hadoop",
  3.   "spec" : {
  4.     "ioConfig" : {
  5.       "type" : "hadoop",
  6.       "inputSpec" : {
  7.         "type" : "static",
  8.         "paths" : "wikiticker-2015-09-12-sampled.json.gz"
  9.       }
  10.     },
  11.     "dataSchema" : {
  12.       "dataSource" : "wikiticker-hadoop",
  13.       "granularitySpec" : {
  14.         "type" : "uniform",
  15.         "segmentGranularity" : "day",
  16.         "queryGranularity" : "none",
  17.         "intervals" : ["2015-09-12/2015-09-13"]
  18.       },
  19.       "parser" : {
  20.         "type" : "hadoopyString",
  21.         "parseSpec" : {
  22.           "format" : "json",
  23.           "dimensionsSpec" : {
  24.             "dimensions" : [
  25.               "channel",
  26.               "cityName",
  27.               "comment",
  28.               "countryIsoCode",
  29.               "countryName",
  30.               "isAnonymous",
  31.               "isMinor",
  32.               "isNew",
  33.               "isRobot",
  34.               "isUnpatrolled",
  35.               "metroCode",
  36.               "namespace",
  37.               "page",
  38.               "regionIsoCode",
  39.               "regionName",
  40.               "user"
  41.             ]
  42.           },
  43.           "timestampSpec" : {
  44.             "format" : "auto",
  45.             "column" : "time"
  46.           }
  47.         }
  48.       },
  49.       "metricsSpec" : [
  50.         {
  51.           "name" : "count",
  52.           "type" : "count"
  53.         },
  54.         {
  55.           "name" : "added",
  56.           "type" : "longSum",
  57.           "fieldName" : "added"
  58.         },
  59.         {
  60.           "name" : "deleted",
  61.           "type" : "longSum",
  62.           "fieldName" : "deleted"
  63.         },
  64.         {
  65.           "name" : "delta",
  66.           "type" : "longSum",
  67.           "fieldName" : "delta"
  68.         },
  69.         {
  70.           "name" : "user_unique",
  71.           "type" : "hyperUnique",
  72.           "fieldName" : "user"
  73.         }
  74.       ]
  75.     },
  76.     "tuningConfig" : {
  77.       "type" : "hadoop",
  78.       "partitionsSpec" : {
  79.         "type" : "hashed",
  80.         "targetPartitionSize" : 5000000
  81.       },
  82.       "jobProperties" : {}
  83.     }
  84.   }
  85. }
复制代码


4.2.2 加载数据
# 上传数据文件到HDFS
  1. su druid -l -c 'hdfs dfs -put wikiticker-2015-09-12-sampled.json.gz /user/druid/'
复制代码

# 提交任务,该任务将提交至YARN运行
  1. curl -X 'POST' -H 'Content-Type:application/json' -d @ wikiticker-index.json http://hadoop46.luqimin.cn:8090/druid/indexer/v1/task
复制代码
4.3 导入Kafka数据源
打开Ambari中Druid的配置页面,修改Advanced druid-common中的属性druid.extensions.loadList,增加值:“druid-kafka-indexing-service”后,重启Druid服务。

a12.png

4.3.1 定义规范
  1. {
  2.   "type": "kafka",
  3.   "dataSchema": {
  4.     "dataSource": "wikipedia-kafka",
  5.     "parser": {
  6.       "type": "string",
  7.       "parseSpec": {
  8.         "format": "json",
  9.         "timestampSpec": {
  10.           "column": "time",
  11.           "format": "auto"
  12.         },
  13.         "dimensionsSpec": {
  14.           "dimensions": [
  15.             "channel",
  16.             "cityName",
  17.             "comment",
  18.             "countryIsoCode",
  19.             "countryName",
  20.             "isAnonymous",
  21.             "isMinor",
  22.             "isNew",
  23.             "isRobot",
  24.             "isUnpatrolled",
  25.             "metroCode",
  26.             "namespace",
  27.             "page",
  28.             "regionIsoCode",
  29.             "regionName",
  30.             "user",
  31.             { "name": "added", "type": "long" },
  32.             { "name": "deleted", "type": "long" },
  33.             { "name": "delta", "type": "long" }
  34.           ]
  35.         }
  36.       }
  37.     },
  38.     "metricsSpec" : [],
  39.     "granularitySpec": {
  40.       "type": "uniform",
  41.       "segmentGranularity": "DAY",
  42.       "queryGranularity": "NONE",
  43.       "rollup": false
  44.     }
  45.   },
  46.   "tuningConfig": {
  47.     "type": "kafka",
  48. "reportParseExceptions": false,
  49. "maxRowsInMemory": 1000
  50. "maxRowsPerSegment": 5000000
  51.   },
  52.   "ioConfig": {
  53.     "topic": "wikipedia",
  54.     "replicas": 1,
  55.     "taskDuration": "PT10M",
  56.     "completionTimeout": "PT20M",
  57.     "consumerProperties": {
  58.       "bootstrap.servers": "hadoop45.luqimin.cn:6667,hadoop46.luqimin.cn:6667,hadoop47.luqimin.cn:6667"
  59.     }
  60.   }
  61. }
复制代码

4.3.2 提交任务
  1. curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia-kafka-supervisor.json http://hadoop46.luqimin.cn:8090/druid/indexer/v1/supervisor
复制代码

可以看到任务正在运行

a13.png

向Kafka生产数据:
  1. /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list hadoop45.luqimin.cn:6667,hadoop47.luqimin.cn:6667,hadoop46.luqimin.cn:6667 --topic wikipedia < wikiticker-2015-09-12-sampled.json
复制代码
这时可以立即查询Druid中的数据。

5.查询数据
5.1 Json over HTTP
5.1.1 定义规范
  1. [root@hadoop47 ~]# cat wickiticker-top.json
  2. {
  3.     "queryType" : "topN",
  4.     "dataSource" : "wikipedia_local",
  5.     "intervals" : ["2015-09-10/2015-09-14"],
  6.     "granularity" : "all",
  7.     "dimension" : "page",
  8.     "metric" : "count",
  9.     "threshold" : 10,
  10.     "aggregations" : [
  11.        {
  12.          "type" : "count",
  13.          "name" : "count"
  14.         }
  15.      ]
  16.   }
复制代码

5.1.2 提交查询任务
  1. curl -X 'POST' -H 'Content-Type:application/json' -d @wickiticker-top.json http://hadoop45.luqimin.cn:8082/druid/v2?prett
复制代码

返回结果:
  1. [
  2. {"timestamp":"2015-09-12T00:46:58.771Z",
  3. "result":[
  4. {"count":33,"page":"Wikipedia:Vandalismusmeldung"},
  5. {"count":28,"page":"User:Cyde/List of candidates for speedy deletion/Subpage"},
  6. {"count":27,"page":"Jeremy Corbyn"},
  7. {"count":21,"page":"Wikipedia:Administrators' noticeboard/Incidents"},
  8. {"count":20,"page":"Flavia Pennetta"},
  9. {"count":18,"page":"Total Drama Presents: The Ridonculous Race"},
  10. {"count":18,"page":"User talk:Dudeperson176123"},
  11. {"count":18,"page":"Wikipédia:Le Bistro/12 septembre 2015"},
  12. {"count":17,"page":"Wikipedia:In the news/Candidates"},
  13. {"count":17,"page":"Wikipedia:Requests for page protection"}
  14. ]
  15. }
  16. ]
复制代码


5.2 SQL over HTTP
打开Ambari中Druid的配置页面,在Custom druid-common中增加属性druid.sql.enable = true,重启Druid服务。
a14.png

5.2.1 定义查询
  1. {"query": "select page, count(*) as c from "wikipedia-kafka" group by page order by c desc limit 5" }
复制代码

5.2.2 提交查询
  1. curl -X 'POST' -H 'Content-Type:application/json' -d @query.json http://hadoop45.luqimin.cn:8082/druid/v2/sql
复制代码

返回结果:
  1. [
  2. {"page":"Wikipedia:Vandalismusmeldung","c":33},
  3. {"page":"User:Cyde/List of candidates for speedy deletion/Subpage","c":28},
  4. {"page":"Jeremy Corbyn","c":27},
  5. {"page":"Wikipedia:Administrators' noticeboard/Incidents","c":21},
  6. {"page":"Flavia Pennetta","c":20}
  7. ]
复制代码

6.使用Druid加速Hive查询
可以使用Hive和Apache Druid的HDP集成对实时和历史数据执行交互式分析查询。可以发现现有的Druid数据源作为外部表,将批处理数据创建或摄取到Druid,使用Hive设置Druid-Kafka流式摄取,以及从Hive查询Druid数据源。
Hive与Druid的集成相当于在Druid上放置了一个SQL层。在Druid从Hive企业数据仓库(EDW)提取数据之后,可以使用Druid的交互式和亚秒级查询功能来加速对EDW中历史数据的查询。

a15.png

6.1 配置
hive中跟druid相关的配置:(使用Ambari安装Druid时自动配置的Advanced hive-interactive-site)
  1. hive.druid.bitmap.type=roaring                  
  2. hive.druid.broker.address.default=hadoop45.luqimin.cn:8888
  3. hive.druid.coordinator.address.default=hadoop46.luqimin.cn:8081
  4. hive.druid.http.numConnection=20                  
  5. hive.druid.http.read.timeout=PT10M               
  6. hive.druid.indexer.memory.rownum.max=75000        
  7. hive.druid.indexer.partition.size.max=1000000     
  8. hive.druid.indexer.segments.granularity=DAY      
  9. hive.druid.maxTries=5                             
  10. hive.druid.metadata.base=druid                    
  11. hive.druid.metadata.db.type=mysql                 
  12. hive.druid.metadata.uri=jdbc:mysql://hadoop47.luqimin.cn:3306/druid?createDatabaseIfNotExist=true
  13. hive.druid.metadata.username=druid               
  14. hive.druid.overlord.address.default=hadoop46.luqimin.cn:8090
  15. hive.druid.passiveWaitTimeMs=30000               
  16. hive.druid.rollup=true                           
  17. hive.druid.select.distribute=true                 
  18. hive.druid.select.threshold=10000                 
  19. hive.druid.sleep.time=PT10S                       
  20. hive.druid.storage.storageDirectory=/apps/druid/warehouse
  21. hive.druid.working.directory=/tmp/druid-indexing
复制代码

Druid加载数据时,会进行自动汇总,临时关闭自动汇总请在beeline中设置:
  1. set hive.druid.rollup=false
复制代码

6.2 示例
样例数据
  1. [root@hadoop47 ~]# head -n 1 wikiticker-2015-09-12-sampled.json
  2. {"time":"2015-09-12T00:46:58.771Z","channel":"#en.wikipedia","cityName":null,"comment":"added project","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Talk","page":"Talk:Oswald Tilghman","regionIsoCode":null,"regionName":null,"user":"GELongstreet","delta":36,"added":36,"deleted":0}
复制代码

使用beeline连接Hive LLAP实例,将数据加载至Hive:
# 创建外部表wiki_json,加载Json数据文件
  1. CREATE EXTERNAL TABLE wiki_json(json string)
  2. row format delimited fields terminated by '\n'
  3. stored as textfile
  4. location '/tmp/json’;
  5. # 创建内部表wiki
  6. create table wiki(
  7.   `time` string,
  8.   `channel` string,
  9.   `cityName` string,
  10.   `comment` string,
  11.   `countryIsoCode` string,
  12.   `countryName` string,
  13.   `isAnonymous` string,
  14.   `isMinor` string,
  15.   `isNew` string,
  16.   `isRobot` string,
  17.   `isUnpatrolled` string,
  18.   `metroCode` string,
  19.   `namespace` string,
  20.   `page` string,
  21.   `regionIsoCode` string,
  22.   `regionName` string,
  23.   `user` string,
  24.   `delta` int,
  25.   `added` int,
  26.   `deleted` int
  27.   ) ;
复制代码

# 使用json_tuple函数获取json内部,并写入表wiki
  1. insert overwrite table wiki select json_tuple(json,
  2.   'time',
  3.   'channel',
  4.   'cityName',
  5.   'comment',
  6.   'countryIsoCode',
  7.   'countryName',
  8.   'isAnonymou',
  9.   'isMinor',
  10.   'isNew',
  11.   'isRobot',
  12.   'isUnpatrolled',
  13.   'metroCode',
  14.   'namespace',
  15.   'page',
  16.   'regionIsoCode',
  17.   'regionName',
  18.   'user',
  19.   'delta',
  20.   'added',
  21.   'deleted') from wiki_json;
复制代码

创建一个Druid表,与Hive表的字段对应:
  1. CREATE external TABLE wiki_druid
  2. STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
  3. TBLPROPERTIES (
  4. "druid.segment.granularity" = "DAY",
  5. "druid.query.granularity" = "none")
  6. AS SELECT
  7. cast(regexp_replace(`time`, "T|Z", " ") as timestamp) as `__time`,
  8. cast(`channel` as string) `channel`,
  9. cast(`cityname` as string) `cityname`,
  10. cast(`comment` as string) `comment`,
  11. cast(`countryisocode` as string) `countryisocode`,
  12. cast(`countryname` as string) `countryname`,
  13. cast(`isanonymous` as string) `isanonymous`,
  14. cast(`isminor` as string) `isminor`,
  15. cast(`isnew` as string) `isnew`,
  16. cast(`isrobot` as string) `isrobot`,
  17. cast(`isunpatrolled` as string) `isunpatrolled`,
  18. cast(`metrocode` as string) `metrocode`,
  19. cast(`namespace` as string) `namespace`,
  20. cast(`page` as string) `page`,
  21. cast(`regionisocode` as string) `regionisocode`,
  22. cast(`user` as string) `user`,
  23. cast(`delta` as int) `delta`,
  24. cast(`added` as int) `added`,
  25. cast(`deleted` as int) `deleted`
  26. FROM wiki;
复制代码

也可以创建一个Hive的物化视图,并将其存储在Druid中:
  1. create materialized view wiki_view_druid
  2. STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
  3. as select
  4. cast(regexp_replace(`time`, "T|Z", " ") as timestamp) as `__time`,
  5. `page`,
  6. `user`,
  7. `added`,
  8. `delta`
  9. from wiki;
复制代码

执行查询
  1. select page, count(*) as c from wiki_druid group by page order by c desc limit 5;
复制代码

a16.png

查看执行计划:
  1. explain select page, count(*) as c from wiki_druid group by page order by c desc limit 5;
复制代码

a17.png

7.新版本UI
目前社区最新的Apache Druid稳定版本是0.17.0,除了功能增加和系统稳定性之外,还提供了全新的Web UI,如
  • 可视化的数据加载页面

a18.png

  • 数据预览、过滤、转换、聚合等

a19.png

  • SQL执行界面

a20.png



最新经典文章,欢迎关注公众号



原文链接: https://mp.weixin.qq.com/s/9ae-zlhQ1vFcWMQwiDX1Tw

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条