第一步:将数据导入Hive中
在hive中,创建 stock 表结构。
[mw_shl_code=shell,true]hive> create table if not exists stock(tradedate string, tradetime string, stockid string, buyprice double, buysize int, sellprice string, sellsize int) row format delimited fields terminated by ',' stored as textfile;
OK
Time taken: 0.207 seconds
hive> desc stock;
OK
tradedate string
tradetime string
stockid string
buyprice double
buysize int
sellprice string
sellsize int
Time taken: 0.147 seconds, Fetched: 7 row(s)[/mw_shl_code]
将HDFS中的股票历史数据导入hive中。
[mw_shl_code=shell,true][hadoop@master bin]$ cd /home/hadoop/test/
[hadoop@master test]$ sudo rz
hive> load data local inpath ‘/home/handoop/test/stock.csv’ into table stock;[/mw_shl_code]
创建分区表 stock_partition,用日期做为分区表的分区ID。
[mw_shl_code=shell,true]hive> create table if not exists stock_partition(tradetime string, stockid string, buyprice double, buysize int, sellprice string, sellsize int) partitioned by (tradedate string) row format delimited fields terminated by ',';
OK
Time taken: 0.112 seconds
hive> desc stock_partition;
OK
tradetime string
stockid string
buyprice double
buysize int
sellprice string
sellsize int
tradedate string
# Partition Information
# col_name data_type comment
tradedate string[/mw_shl_code]
如果设置动态分区首先执行。
[mw_shl_code=shell,true]hive>set hive.exec.dynamic.partition.mode=nonstrict;
[/mw_shl_code]
创建动态分区,将stock表中的数据导入stock_partition表。
[mw_shl_code=shell,true]hive> insert overwrite table stock_partition partition(tradedate) select tradetime, stockid, buyprice, buysize, sellprice, sellsize, tradedate from stock distribute by tradedate;
Query ID = hadoop_20180524122020_f7a1b61a-84ed-4487-a37e-64ef9c3abc5f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1527103938304_0002, Tracking URL = http://master:8088/proxy/application_1527103938304_0002/
Kill Command = /opt/modules/hadoop-2.6.0/bin/hadoop job -kill job_1527103938304_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2018-05-24 12:20:13,931 Stage-1 map = 0%, reduce = 0%
2018-05-24 12:20:21,434 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.19 sec
2018-05-24 12:20:40,367 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 5.87 sec
MapReduce Total cumulative CPU time: 5 seconds 870 msec
Ended Job = job_1527103938304_0002
Loading data to table default.stock_partition partition (tradedate=null)
Time taken for load dynamic partitions : 492
Loading partition {tradedate=20130726}
Loading partition {tradedate=20130725}
Loading partition {tradedate=20130724}
Loading partition {tradedate=20130723}
Loading partition {tradedate=20130722}
Time taken for adding to write entity : 6
Partition default.stock_partition{tradedate=20130722} stats: [numFiles=1, numRows=25882, totalSize=918169, rawDataSize=892287]
Partition default.stock_partition{tradedate=20130723} stats: [numFiles=1, numRows=26516, totalSize=938928, rawDataSize=912412]
Partition default.stock_partition{tradedate=20130724} stats: [numFiles=1, numRows=25700, totalSize=907048, rawDataSize=881348]
Partition default.stock_partition{tradedate=20130725} stats: [numFiles=1, numRows=20910, totalSize=740877, rawDataSize=719967]
Partition default.stock_partition{tradedate=20130726} stats: [numFiles=1, numRows=24574, totalSize=862861, rawDataSize=838287]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 5.87 sec HDFS Read: 5974664 HDFS Write: 4368260 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 870 msec
OK
Time taken: 39.826 seconds[/mw_shl_code]
第二步:hive自定义UDF,统计204001该只股票每日的最高价和最低价
Hive 自定义Max统计最大值。
[mw_shl_code=java,true]package zimo.hadoop.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @function 自定义UDF统计最大值
* @author Zimo
*
*/
public class Max extends UDF{
public Double evaluate(Double a, Double b) {
if(a == null)
a=0.0;
if(b == null)
b=0.0;
if(a >= b){
return a;
} else {
return b;
}
}
}[/mw_shl_code]
Hive 自定义Min统计最小值。
[mw_shl_code=java,true]package zimo.hadoop.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @function 自定义UDF统计最小值
* @author Zimo
*
*/
public class Min extends UDF{
public Double evaluate(Double a, Double b) {
if(a == null)
a = 0.0;
if(b == null)
b = 0.0;
if(a >= b){
return b;
} else {
return a;
}
}
}[/mw_shl_code]
将自定义的Max和Min分别打包成maxUDF.jar和minUDF.jar, 然后上传至/home/hadoop/hive目录下,添加Hive自定义的UDF函数
[mw_shl_code=shell,true][hadoop@master ~]$ cd $HIVE_HOME
[hadoop@master hive1.0.0]$ sudo mkdir jar/
[hadoop@master hive1.0.0]$ ll
total 408
drwxr-xr-x 4 hadoop hadoop 4096 May 24 06:15 bin
drwxr-xr-x 2 hadoop hadoop 4096 May 24 05:53 conf
drwxr-xr-x 4 hadoop hadoop 4096 May 14 23:28 examples
drwxr-xr-x 7 hadoop hadoop 4096 May 14 23:28 hcatalog
drwxrwxr-x 3 hadoop hadoop 4096 May 24 11:50 iotmp
drwxr-xr-x 2 root root 4096 May 24 12:34 jar
drwxr-xr-x 4 hadoop hadoop 4096 May 14 23:41 lib
-rw-r--r-- 1 hadoop hadoop 23828 Jan 30 2015 LICENSE
drwxr-xr-x 2 hadoop hadoop 4096 May 24 03:36 logs
-rw-r--r-- 1 hadoop hadoop 397 Jan 30 2015 NOTICE
-rw-r--r-- 1 hadoop hadoop 4044 Jan 30 2015 README.txt
-rw-r--r-- 1 hadoop hadoop 345744 Jan 30 2015 RELEASE_NOTES.txt
drwxr-xr-x 3 hadoop hadoop 4096 May 14 23:28 scripts
[hadoop@master hive1.0.0]$ cd jar/
[hadoop@master jar]$ sudo rz
[hadoop@master jar]$ ll
total 8
-rw-r--r-- 1 root root 714 May 24 2018 maxUDF.jar
-rw-r--r-- 1 root root 713 May 24 2018 minUDF.jar
hive> add jar /opt/modules/hive1.0.0/jar/maxUDF.jar;
Added [/opt/modules/hive1.0.0/jar/maxUDF.jar] to class path
Added resources: [/opt/modules/hive1.0.0/jar/maxUDF.jar]
hive> add jar /opt/modules/hive1.0.0/jar/minUDF.jar;
Added [/opt/modules/hive1.0.0/jar/minUDF.jar] to class path
Added resources: [/opt/modules/hive1.0.0/jar/minUDF.jar][/mw_shl_code]
创建Hive自定义的临时方法maxprice和minprice。
[mw_shl_code=shell,true]hive> create temporary function maxprice as 'zimo.hadoop.hive.Max';
OK
Time taken: 0.009 seconds
hive> create temporary function minprice as 'zimo.hadoop.hive.Min';
OK
Time taken: 0.004 seconds[/mw_shl_code]
统计204001股票,每日的最高价格和最低价格。
[mw_shl_code=shell,true]hive> select stockid, tradedate, max(maxprice(buyprice,sellprice)), min(minprice(buyprice,sellprice)) from stock_partition where stockid='204001' group by tradedate;
20130722 4.05 0.0
20130723 4.48 2.2
20130724 4.65 2.205
20130725 11.9 8.7
20130726 12.3 5.2[/mw_shl_code]
第三步:统计每分钟均价
统计204001这只股票,每天每分钟的均价
[mw_shl_code=shell,true]hive> select stockid, tradedate, substring(tradetime,0,4), sum(buyprice+sellprice)/(count(*)*2) from stock_partition where stockid='204001' group by stockid, tradedate, substring(tradetime,0,4);
20130725 0951 9.94375
20130725 0952 9.959999999999999
20130725 0953 10.046666666666667
20130725 0954 10.111041666666667
20130725 0955 10.132500000000002
20130725 0956 10.181458333333333
20130725 0957 10.180625
20130725 0958 10.20340909090909
20130725 0959 10.287291666666667
20130725 1000 10.331041666666668
20130725 1001 10.342500000000001
20130725 1002 10.344375
20130725 1003 10.385
20130725 1004 10.532083333333333
20130725 1005 10.621041666666667
20130725 1006 10.697291666666667
20130725 1007 10.702916666666667
20130725 1008 10.78[/mw_shl_code]
原文链接:Hive项目实战:用Hive分析“余额宝”躺着赚大钱背后的逻辑