分享

从 MapReduce 到 Hive 实战分析

pig2 发表于 2014-4-23 19:49:43 [显示全部楼层] 回帖奖励 倒序浏览 阅读模式 关闭右栏 17 79411
问题导读:
1.limit的作用是什么?
2.一条记录被读取后调用hive自定义mapper函数,依次经过什么步骤?
3.hive数据为什么会倾斜?
4.为什么比较多的公司使用hive?






1、背景介绍

早先的工作中,有很多比较复杂的分析工作,当时对hive还不熟悉,但是java比较熟悉,所以在进行处理的时候,优先选择了MR.
但是随着工作的数据内容越来越多,越来越复杂,对应的调整也越来越多,越来越复杂.纯使用MR方式整个流程就比较复杂,如果需要修改某个部分,那首先需要修改代码中的逻辑,然后把代码打包上传到某个可访问路径上(一般就是hdfs),然后在调度平台内执行.如果改动较大的情况,可能还会需要在测试环境中多次调试. 总之就是会花比较多的时间在非业务逻辑改动的工作上.
考虑到维护的成本的增大,慢慢的开始准备将MR的作业,逐渐的移植到一些脚本平台上去,hive成了我们的首选。

2、实战场景

先看这样一个场景. 每一个用户在登录到网站上的时候会带有一个ip地址,多次登录可能会有多个不同的ip地址.
假设,我们已经有一个 用户->ip地址这样的一份数据.我们需要对此进行分析,得到一份来自相同ip的用户的关系表,数据格式类似
用户->用户,具体的ip我们不保留了。

第1步 用udf取最频繁ip

我们先看一下原始数据的字段,是user_id,ips,我们再来看看ips内容的格式,我们执行
Select * from iptable limit 100
你会发现,虽然我们limit了100而且是没有任何复杂条件的查询,hive竟然也会去扫描所有的数据,这非常奇怪也很浪费。原来hive的limit在默认的情况下的执行过程就是把所有数据都跑出来,然后再一个reduce上,进行limit。这是为了保证在某些情况下筛选条件对结果的影响。
但是我们可以通过打开一个hive.limit.optimize.enable=true来简化这个查询,当这个选项打开以后hive会读取hive.limit.row.max.size,hive.limit.optimize.limit.file的默认值来进行小数据量的计算。
我们看到ips的原始数据的格式是ip,ip,… 用逗号分隔的多个ip字符串。
我们要从用户->[ip地址] 这样的数据中得到一个用户使用最多的ip地址作为用户的最常用ip。这里我们会使用hive的自定义udf来完成这一步的工作。
那么udf是什么呢,udf就是user define function的缩写.通过它我们可以对hive进行扩展,hive本身已经带了很多的基本的udf了,比如length(),sin(),unix_timestamp(),regexp_replace()等等.
这些都是一些比较通用的处理,如果有的时候我们要在字段上做一些特殊的逻辑就要自己动手写了.

下面就是我们用来实现这个功能的udf代码
  1. @Description(name = “freq
  2. ips”, value = “find most frequence ips from all login ip”, extended = “”)
  3. public class FindFreqIps extends UDF {
  4.     public String evaluate(String content, int limit) {
  5.        // 计算最常用ip的代码逻辑,并返回结果
  6.        Return result;
  7.     }
  8. }
复制代码

里面的逻辑主要就是找到前limit个最长使用的Ip,我们看到我们的类需要继承自hive包中的UDF类,然后任意的定义输入类型和返回类型,但是方法的名字一定要叫evaluate,hive会使用反射来得到这个方法的输入输出。当我们要在hive中使用它的时候,我们要首先把这个类打成jar包,然后让hive可以访问到。一般可以直接放在hdfs上,然后使用
  1. Add jar hdfs_path/myjar.jar;
  2. Create temporary function FindFreqIps as ‘FindFreqIps’
  3. Select user_id, FindFreqIps(ips) as freqIps from tablexxx
复制代码


另外还有一种是继承自genericUDF,这种方式可以自由的控制输入和返回类型处理,比起UDF来说更加的灵活些。但是我们这里普通的udf就足够了。

第2步 列转行,进行join

从第一步,我们得到了用户最常用的N个ip,我们这里假设值3个。然后我们要找到这些用户之间的关联,即相同的ip的关系。
那么非常直接的方式,我们直接对用户的ip进行join,但是现在ip是3个连在一起字符串的形式,无法直接join。那么我们就先把ip都分解开。
我们把这个ips的字段进行一个列转行的转换,如下
  1. Select user_id,ip from tablexxx
  2. Lateral view explode(split(ips, “,”))
  3. subview as ip
复制代码
这样就会得到 user->ip的单条的记录。这里的
这下要join就方便了,假设上面的结果表是singleIP我们
  1. Select a.user_id as fromid, b.user_id as
  2. toid
  3. SingleIP a
  4. Join  SingleIP b
  5. On a.ip = b.ip and a.user_id <>
  6. b.user_id;
复制代码


什么,报错了!
a.user_id <> b.user_id这个部分会报错,因为hive中join的时候,是只能指定等式来进行匹配的,不支持不等式的条件。如果使用了不等式,会使join的数量变的非常大。
于是,我们就只能采用另外方式
  1. Select * from
  2. (Select a.user_id as fromid, b.user_id as
  3. toid
  4. SingleIP a  Join
  5. SingleIP b
  6. On a.ip = b.ip) m
  7. Where m.fromid <> m.toid;
复制代码

你会发现,执行了1次join,2次select使用的mr的步骤还是一步。一般总感觉嵌套了一次select以后也会对应的产生2次mr,难道是hive自己进行了优化吗?那么我们借助hive的分析工具来看看hive是如何执行的呢。
我们在刚才的语句前加上explain,来看看这个select的执行计划。
Hive会通过antlr来对输入的sql语句进行语法分析,产生一个执行计划。
执行计划会有三个部分

第一部分是ABSTRACT SYNTAX
TREE抽象语法树
这里面显示了hive把这个sql解析成什么样的各个token。
类似这样(TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))表示

第二部分是STAGE DEPENDENCIES
一个hive的执行过程会包含多个stage,他们之间有互相依赖的关系。比如下面

  1. Stage-1 is a root
  2. stage
  3. Stage-0 depends on stages: Stage-1
  4. Stage-3 depends on
  5. stages: Stage-0
复制代码

这里的stage-1是root stage。而0依赖于1,3依赖于0。
第三个部分是STAGE PLANS, 就是每个stage中的具体执行的步骤。

在stage plans里面每一个stage和普通的hadoop程序一样会有map和reduce的过程。我们截取一段map过程的计划看下。
  1. Stage: Stage-1
  2.     Map Reduce
  3.       Alias ->
  4. Map Operator Tree:
  5.         a
  6.           TableScan
  7.             alias:
  8. a
  9.             Reduce
  10. Output Operator
  11.               key expressions:
  12. expr: ip
  13. type: string
  14.              sort
  15. order: +
  16. Map-reduce partition columns:
  17. expr: ip
  18. type: string
  19.               tag:
  20. 0
  21.               value expressions:
  22. expr: user_id
  23. type: string
复制代码

这里是对a表也就是SingleIP表的一个map阶段的操作。Reduce output operator这里会显示使用ip作为key,自增排序,因为是string的所以是字典序的自增。Partition使用ip作为分发字段。tag指的是类似一个来源的概念,因为这里的join采用的是reduce join的方式,每一个从不同的map来的数据最后在reduce进行汇合,他们会被打上一个标记,代表他们的来源。然后就是value的内容,user_id。

然后再来看看reduce过程的计划
  1. Reduce Operator Tree:
  2.         Join
  3. Operator
  4.           condition
  5. map:
  6. Inner Join 0 to 1
  7.           condition
  8. expressions:
  9.             0
  10. {VALUE._col0}
  11.             1
  12. {VALUE._col0}
  13. handleSkewJoin: false
  14. outputColumnNames: _col0, _col2
复制代码


这里显示一个join的操作。这里表示把0的内容加到1上。后面有一个handleSkewJoin,这个是hive的一个应对数据倾斜的一种处理方式,默认是关闭的,我们后面再来详细看。
这里也可以用explain extended,输出的信息会更加详细。那么看了这个我们再比较一下我们之前的第二个查询计划,我们来看看加上了嵌套查询以后的执行计划有什么变化呢?会发现hive在reduce的执行计划里面会加上一段
  1. Filter
  2. Operator
  3. predicate:
  4. expr: (_col0 <> _col2)
  5. type: boolean
复制代码


在reduce最后输出之前,进行了一个过滤的操作,过滤的条件就是外部的查询的where条件。正如我们所料,hive发现这个过程是可以一次性完成的,所以进行了优化,放在了reduce阶段来作了。
另外如果hive中有多张表进行join,如果他们的join key是一样的,那么hive就会把他们都放在一次mr中完成。

第3步 数据倾斜

上一步中,我们计算出了所有的相同ip的人的点对点关系。但是这个结果集会有不少问题,比如如果某个ip是一个公共出口,那么就会出现同一个ip有上万人都在使用,他们互相join展开以后,结果的数据量会非常大,时间上很慢不说,最终得到的数据实际上很多我们也用不上(这个是基于业务上得考虑),甚至有可能,在展开的时候会出现各种问题,导致计算时间过长,算不出来。这种情况,我们在hive里面称之为数据倾斜。

在group by的时候,如果出现某一个reduce上得数据量过大的情况,hive有一个默认的hive.groupby.skewindata选项,当把它设置为true的时候,hive会将原来的一次MR变成2次,第一次,数据在reduce的时候会随机分发到每个reduce,做部分的聚合,然后第二次的时候再按照group by的key进行分发。这样可以有效的处理一般的倾斜情况。

而在join的时候,如果join的其中某个key的值非常的多,也会导致倾斜。有的时候,如果有null值,在hive看来null和null是相等的,它也会对他们进行join,也会错误的倾斜。由于join的时候,hive会把第一张表的内容放到一个内容map中,然后不断的读取后表的内容来进行join,所以如果左边的表示小表这个过程就会非常的高效。当然使用mapjoin也一种有效的方式,直接把一张足够小的表完全放到内存来后另一张表进行join。类似这样
  1. SELECT /*+ MAPJOIN(b) */
  2. a.key, a.value FROM a join b on a.key = b.key;
复制代码


我们的ip计算使用的是自己join自己,所有也没有大小表之分,同时单表的数据量也大到无法完全放进内存,那么是不是就要进行硬算呢?在实际中,因为ip的分布没有倾斜到太过火的程度,硬算也确实可以,但是这里我们换一种方式来稍稍优化一下。
首先我们采用bucket的方式来保存之间的用户->ip的数据。使用ip来作为分桶键。
  1. CREATE TABLE userip(user_id bigint, ip STRING)
  2. CLUSTERED BY (ip) INTO 128 BUCKETS;
复制代码

然后set hive.enforce.bucketing  = true;开启bucket计算
from tableaaa
insert overwrite table tablebbb
select user_id, ip;
结果将会被保存到128个不同的桶中,默认根据ip的hashcode来取模。这样每个桶内的数据基本大概是原数据量的1/100。当然如果原始数据量太大,还可以分桶更加多一些。
这个地方如果我们不开启enforce.bucketing的话,也可以通过设置
set mapred.reduce.tasks=128.然后在查询中cluster by来强制指定进行分桶。这步完成之后,我们再来进行设置
set hive.optimize.bucketmapjoin=true;
set  hive.optimize.bucketmapjoin.sortedmerge=true;
然后hive就能对每个分块数据进行mapjoin。

第4步 用udaf取top N

好了,现在我们已经有所有的user->user的数据,我们希望要一个user->[users]的一对多的记录,但是这个数据量有点大,实际上每个用户大概关联1000个已经足够了。首先对数据进行排序,排序的依据就是按照用户的相同的ip的数量。然后去最前面的1000个,不足的按实际数量取。
这个地方比较容易想到的就是,先group by fromid,toid,然后count一个总数作为新字段,如下
这里想到一种做法是用淘宝的一个类sql的row_number实现,然后用row_number来对fromid做主键,给按照count从大到小写上序列编号seq。最后做一个嵌套查询,只取seq<=1000的数据。Row_number的话标准的hive中没有。那么这里就可以让自定义udaf上场了。
Udaf顾名思义就是一个Aggregate的udf,和之前的udf的区别就是他一般是用来group by的场合中。
  1. @Description(name = “myudaf”, value = “calc users has most same ips ” )
  2. public class GenericUDAFCollect extends AbstractGenericUDAFResolver {
  3.     @Override
  4.     public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
  5.            throws SemanticException {
  6.        return new MyUDAFEvaluator();
  7.     }
  8. }
复制代码
自己定义一个evaluator,并且实现其中的一些方法。
  1. public static class MyUDAFEvaluator extends GenericUDAFEvaluator {
  2.     @Override
  3.     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
  4.        
  5.     }
  6.     @Override
  7.     public void reset(AggregationBuffer agg) throws HiveException {
  8.        
  9.     }
  10.     @Override
  11.     public AggregationBuffer getNewAggregationBuffer() throws HiveException {
  12.        
  13.     }
  14.     // Mapside
  15.     @Override
  16.     public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
  17.        
  18.     }
  19.     // Mapside
  20.     @Override
  21.     public Object terminatePartial(AggregationBuffer agg) throws HiveException {
  22.        
  23.     }
  24.     @Override
  25.     public void merge(AggregationBuffer agg, Object partial) throws HiveException {
  26.        
  27.     }
  28.         // Reduceside
  29.     @Override
  30.         public Object terminate(AggregationBuffer agg) throws HiveException {
  31.        
  32.     }
  33. }
复制代码


在init阶段会传一个Mode进来,这个Mode中定义了以下的几个阶段
PARTIAL1: 这个是map阶段,这个阶段会调用iterate(),和terminatePartial()
PARTIAL2:  这个是map段得combiner阶段,会将map端的数据进行合并,也可能没有这个阶段。会执行merge()和terminatePartial()
FINAL: 这个是reduce阶段,会调用merge()和terminate()

COMPLETE: 这是纯map处理,无reduce的情况出现的阶段,它会调用iterate()和terminate()
而从函数方面来说,init是初始化,他会传入mode作为参数。可以根据不同的阶段采取不同的处理。getNewAggregationBuffer的处理是hive为了内存的复用,减少gc,他并不是每一次处理一条记录都会新申请空间,而是在处理一批数据的时候重复使用一批内存。Terminate就是最终的输出了。
Ok,了解了udaf,那么可以动手了。Sql如下
  1. Select fromid, getTopN(toid,n) from tablexx3
  2. Group by fromid
复制代码



其中的getTopN首先在map端,将每一个fromid的关联的toid的次数都记录下来,记录条数代表重复的ip数量,然后按照这个次数进行倒序排序,截取前n个。
在reduce端,将各个map端的结果再按照次数倒序排序,再进行截取n个并进行合并。最终输出的就是每个fromid对应的toid的列表了。
从这次从mr转换到hive的过程中,对我们目前的mr和hive进行了一些比较

3、mr和hive比较

1. 运算资源消耗

无论从时间,数据量,计算量上来看,一般情况下mr都是优于或者等于hive的。mr的灵活性是毋庸置疑的。在转换到hive的过程中,会有一些为了实现某些场景的需求而不得不用多步hive来实现的时候。
2. 开发成本/维护成本

毫无疑问,hive的开发成本是远低于mr的。如果能熟练的运用udf和transform会更加提高hvie开发的效率。另外对于数据的操作也非常的直观,对于全世界程序员都喜闻乐见的sql语法的继承也让它更加的容易上手。
   hive独有的分区管理,方便进行数据的管理。
   代码的管理也很方便,就是直接的文本。
   逻辑的修改和生效很方便。
   但是当出现异常错误的时候,hive的调试会比较麻烦。特别是在大的生产集群上面的时候。
3. 底层相关性

在使用hive以后,读取文件的时候,再也不用关心文件的格式,文件的分隔符,只要指定一次,hive就会保存好。相比mr来说方便了很多。
当侧重关心与业务相关的内容的时候,用hive会比较有优势。而在一些性能要求高,算法研究的时候,mr会更加适合。














已有(17)人评论

跳转到指定楼层
pig2 发表于 2014-4-23 19:51:47
上文提高了limit,这里解释一下:
limit的含义是比如有100个,那么只取符合10个符合条件的。
回复

使用道具 举报

fernando1987 发表于 2014-9-4 15:55:02
hive> select * from part where name='123';
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1409803137666_0012, Tracking URL = http://hadoop1:8088/proxy/application_1409803137666_0012/
Kill Command = /home/hadoop/hadoop-2.2.0/bin/hadoop job  -kill job_1409803137666_0012
Hadoop job information for Stage-1: number of mappers: 0; number of reducers: 0
2014-09-04 15:51:31,733 Stage-1 map = 0%,  reduce = 0%
Ended Job = job_1409803137666_0012 with errors
Error during job, obtaining debugging information...
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched:
Job 0:  HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec
请问这个是什么原因呐? 刚刚开始学习hive
回复

使用道具 举报

break-spark 发表于 2014-10-30 17:22:41
不错的例子,赞一个
回复

使用道具 举报

pig2 发表于 2014-10-31 00:38:03
fernando1987 发表于 2014-9-4 15:55
hive> select * from part where name='123';
Total jobs = 1
Launching Job 1 out of 1
卡住了,看看日志
回复

使用道具 举报

EASONLIU 发表于 2014-12-17 10:08:24
路过,学习学习
回复

使用道具 举报

wubaozhou 发表于 2015-1-1 16:37:09
回复

使用道具 举报

小飞鱼123 发表于 2015-11-1 22:00:45
路过,学习学习
回复

使用道具 举报

bingyuac 发表于 2016-4-20 21:53:23
学习了 很强大
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条