分享

hive 全排序优化

本帖最后由 sunshine_junge 于 2014-9-19 15:12 编辑


问题导读:
1.全排序的解决方法有哪几种?
2.如何通过修改Partitioner,做到全排序?









Hive的排序关键字是SORT BY,它有意区别于传统数据库的ORDER BY也是为了强调两者的区别–SORT BY只能在单机范围内排序。考虑以下表定义:
  1. CREATE TABLE if not exists t_order(
  2. id int, -- 订单编号
  3. sale_id int, -- 销售ID
  4. customer_id int, -- 客户ID
  5. product _id int, -- 产品ID
  6. amount int -- 数量 ) PARTITIONED BY (ds STRING);
复制代码



在表中查询所有销售记录,并按照销售ID和数量排序:
  1. set mapred.reduce.tasks=2; Select sale_id, amount from t_order Sort by sale_id, amount;
复制代码


这一查询可能得到非期望的排序。指定的2个reducer分发到的数据可能是(各自排序):
Reducer1:
  1. Sale_id | amount 0 | 100 1 | 30 1 | 50 2 | 20
复制代码

Reducer2:
  1. Sale_id | amount 0 | 110 0 | 120 3 | 50 4 | 20
复制代码

因为上述查询没有reduce key,hive会生成随机数作为reduce key。这样的话输入记录也随机地被分发到不同reducer机器上去了。为了保证reducer之间没有重复的sale_id记录,可以使用DISTRIBUTE BY关键字指定分发key为sale_id。改造后的HQL如下:
  1. set mapred.reduce.tasks=2; Select sale_id, amount from t_order Distribute by sale_id Sort by sale_id, amount;
复制代码


这样能够保证查询的销售记录集合中,销售ID对应的数量是正确排序的,但是销售ID不能正确排序,原因是hive使用hadoop默认的HashPartitioner分发数据。
这就涉及到一个全排序的问题。解决的办法无外乎两种:

1.) 不分发数据,使用单个reducer
  1. set mapred.reduce.tasks=1;
复制代码



这一方法的缺陷在于reduce端成为了性能瓶颈,而且在数据量大的情况下一般都无法得到结果。但是实践中这仍然是最常用的方法,原因是通常排序的查询是为了得到排名靠前的若干结果,因此可以用limit子句大大减少数据量。使用limit n后,传输到reduce端(单机)的数据记录数就减少到n* (map个数)。

2.) 修改Partitioner,这种方法可以做到全排序。
这里可以使用Hadoop自带的TotalOrderPartitioner(来自于Yahoo!的TeraSort项目),这是一个为了支持跨reducer分发有序数据开发的Partitioner,它需要一个SequenceFile格式的文件指定分发的数据区间。如果我们已经生成了这一文件(存储在/tmp/range_key_list,分成100个reducer),可以将上述查询改写为
  1. set mapred.reduce.tasks=100; set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set total.order.partitioner.path=/tmp/ range_key_list; Select sale_id, amount from t_order Cluster by sale_id Sort by amount;
复制代码

有很多种方法生成这一区间文件(例如hadoop自带的o.a.h.mapreduce.lib.partition.InputSampler工具)。这里介绍用Hive生成的方法,例如有一个按id有序的t_sale表:
  1. CREATE TABLE if not exists t_sale ( id int, name string, loc string );
复制代码

则生成按sale_id分发的区间文件的方法是:
  1. create external table range_keys(sale_id int) row format serde'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe' stored as inputformat'org.apache.hadoop.mapred.TextInputFormat' outputformat'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat' location '/tmp/range_key_list';   
复制代码


生成的文件(/tmp/range_key_list目录下)可以让TotalOrderPartitioner按sale_id有序地分发reduce处理的数据。区间文件需要考虑的主要问题是数据分发的均衡性,这有赖于对数据深入的理解。

测试案例:

数据 140g, 按照字段time 降序排列 选出最大的前50个。

使用 一般方法 select * from table order by time desc limit 50.  执行了1小时6分钟完全算出。
任务数1个  map数  1783 reduce 1
而 select * from (select * from table distribute by time sort by time desc limit 50 ) t order by time desc limit 50;
需要5分钟算出。结果一致。
任务数2个   分别是:
map  1783 reduce 245
map 245 reduce   1





本帖被以下淘专辑推荐:

欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(3)人评论

跳转到指定楼层
gyzj518 发表于 2014-10-14 17:50:54
谢谢111111111111111111
回复

使用道具 举报

EASONLIU 发表于 2014-12-17 10:02:43
路过,学习学习
回复

使用道具 举报

hery 发表于 2015-3-5 00:01:36

路过,学习学习
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条