分享

大数据项目:离线项目拓展youtube

问题导读


1.order by、sort by、distribute by、cluster by有哪些区别?
2.如何行转列、列转行?
3.如何在在分区中分桶?
4.如何ETL原始数据?
5.JVM堆内存溢出如何解决?



一、需求描述
统计Youtube视频网站的常规指标,各种TopN指标:

–统计视频观看数Top10

–统计视频类别热度Top10

–统计视频观看数Top20所属类别

–统计视频观看数Top50所关联视频的所属类别Rank

–统计每个类别中的视频热度Top10,以Music为例

–统计每个类别中视频流量Top10,以Music为例

–统计上传视频最多的用户Top10以及他们上传的视频

–统计每个类别视频观看数Top10

数据格式:
20190730081858579.png


第一列视频的id,唯一的表示某一个视频的 ,比如复制id在youtube上进查询
20190730081908411.png



第二列是作者
第三列是视频的年龄
第四列是视频的类别
第五列是视频的长度
第六列是视频的观看次数
第七列是视频评分
第八列是流量

ps:下面有关于视频的的数结构

二、知识储备梳理
2.1、order by,sort by,distribute by,cluster by
Order by 和sort by有什么区别?orderby是一个全局排序,如果数据量特别巨大 ,sort by是局部的,什么是局部的呢?比如说分区、分桶,distribute by是把数据划分成不同的区域,一般情况下sort by和sidtribute by是可以连用的,cluster by 和前面两个其实是一样的,还有一个作用是分桶

背景表结构
在讲解中我们需要贯串一个 例子,所以需要设计一个情景,对应
还要有一个表结构和填充数据。如下:有3个字段,分别为personId标识某一个人,company标识一家公司名称,money标识该公司每年盈利收入(单位:万元人民币)
微信截图_20190907103306.png


建表导入数据:
create table company_info(

        personId string,

        company string,

        money float

)row format delimited fields terminated by “\t”

load data local inpath “company_info.txt” into table company_info;


2.1.1、order by
hive中的order
by语句会对查询结果做一次全局排序,即,所有的mapper产生的结果都会交给一个reducer去处理,无论数据量大小,job任务只会启动一个reducer,如果数据量巨大,则会耗费大量的时间。

尖叫提示:如果在严格模式下,order
by需要指定limit数据条数,不然数据量巨大的情况下会造成崩溃无输出结果。涉及属性:set
hive.mapred.mode=nonstrict/strict

如果设置为了strict 严格模式, 就必须在后面通过limit关键字来设置条数,如果不设置的话,很可能数据量特别大的时候机器就崩溃了

如果nonstrict 非严格模式,就不用指定了,因为是默认的大小100

例如:按照money排序的例子
[mw_shl_code=sql,true]select * from company_info order by money desc;[/mw_shl_code]
2.1.2、sort by
hive中的sort
by语句会对每一块局部数据进行局部排序,即,每一个reducer处理的数据都是有序的,但是不能保证全局有序。如果想保证全局有序,可以在sort
by的基础之上做一次全局的归并排序。

2.1.3、distribute by
hive中的distribute by一般要和sort
by一起使用,即将某一块数据归给(distribute
by)某一个reducer处理,然后在指定的reducer中进行sort by排序。

单用dsidtribute是没有任何意义的,但是语法上是没有错误的 ,只是单纯的把数据划分到了不同的分区里面,但是没有做其他的事情

sort by按照 distribute by分好的分区进行排序

尖叫提示:distribute by必须写在sort by之前

例如:不同的人(personId)分为不同的组,每组按照money排序。
[mw_shl_code=sql,true]select \* from company\_info distribute by personId sort by personId, money desc;
[/mw_shl_code]

如果不同组中的money值有相同的,还是需要全局排序的

2.1.4、cluster by
hive中的cluster by在distribute by和sort
by排序字段一致的情况下是等价的。同时,cluster
by指定的列只能是降序,即默认的descend,而不能是ascend。

例如:写一个等价于distribute by 与sort by的例子
[mw_shl_code=sql,true]select * from company_info distribute by personId sort by personId;

等价于

select * from compnay_info cluster by personId; [/mw_shl_code]

2.2、行转列、列转行(UDAF与UDTF)
2.2.1、行转列
表结构:
微信截图_20190907103404.png


20190730081940195.png


孙悟空、猪八戒是分布在多行上面的,我们把多行上的数据转到一行上面,叫做行转列

创建表及数据导入:**
[mw_shl_code=sql,true]create table person_info(

name string,

constellation string,

blood_type string)

row format delimited fields terminated by "\t";

load data local inpath “person_info.txt” into table person_info;[/mw_shl_code]
20190730081950753.png

例如:把星座和血型一样的人归类到一起**
[mw_shl_code=sql,true]select

​ t1.base,
concat_ws('|', collect_set(t1.name)) name

from

​ (select

​ name,

​ concat(constellation, ",", blood_type) base

​ from

​ person_info) t1

group by

t1.base;[/mw_shl_code]

分析:

select name, concat(constellation, “,”, blood_type) base

concat 就是把白羊座和A拼到一起了,”白羊座,A“ 命名为base
其实就是
20190730082000392.png


concat_ws(’|’, collect_set(t1.name)) name
collect_set聚合name,每个name之间用”|“分割,在mysql中有一个group_concat

最后的结果
20190730082010357.png



如果
20190730082018177.png


还是按照星座和血型聚合的,但是出来的不只是人名,并且修改了上面的表名,

结果
20190730082026410.png



concat_ws(’|’, collect_set(t1.name)) name 意思是吧t1.name表里面的值全部用|拼接起来,group by是聚合的意思,根据concat进行聚合,然后通过collect_set 函数收集起来,通过concat_ws进行拼接

2.2.2、列转行
表结构:
微信截图_20190907103515.png


例如:
20190730082039755.png



创建表及导入数据:
[mw_shl_code=sql,true]create table movie_info(

​ movie string,

&#8203; category array<string>)

row format delimited fields terminated by "\t"

collection items terminated by ",";

load data local inpath "movie.txt" into table movie_info;[/mw_shl_code]

解释:
电影名称 string类型
类别 String类型,可以有多个类别,所以是array的,

hive中是支持array的

collection items terminated by “,”;当前表中的数据所有的数组类型的字段他们之间的元素用什么来分割,这里是用“,”来分割、

但是这里注意
20190730082049836.png


如图,这里只能分割其中的一个list,这也是一个弊端

执行上面的建表操作
20190730082102399.png

然后将其展开,但是这里注意列转行,只能对list等集合进行操作

当然也可以是map如下图
20190730082108895.png


例如:将电影分类中的数组数据展开

select

&#8203; movie,

&#8203; category_name

from

&#8203; movie_info lateral view explode(category) table_tmp as category_name;

将上面的list展开
[mw_shl_code=sql,true]select
        movie,category_name
from
        movie_info lateral view explide(category) table_tmp as category_name;[/mw_shl_code]

拓展:

UDAF:聚合函数多行输入,一行输出
UDTF:一行输入,多行输出

explode这个词的意思是爆裂、炸开的意思,这里就是把一行的数据拆散的意思

也可以单独用
[mw_shl_code=sql,true]select
        explode(category) as category
from
        movie_info;[/mw_shl_code]

这个时候的结果就是把category炸开了
20190730082131281.png


20190730082224535.png


上面的函数中latera是侧写的意思,就是将上面爆炸的数据进行一次侧写,生成一个新的表,标的名字叫做table_tmp ,这个表里面炸开的字段叫做category_name
20190730082236762.png


2.3、数组操作
“fields terminated by”:字段与字段之间的分隔符。

“collection items terminated by”:一个字段中各个子元素item的分隔符。

2.4、orc存储
orc即Optimized Row Columnar (ORC)
file,在RCFile的基础上演化而来,可以提供一种高效的方法在Hive中存储数据,提升了读、写、处理数据的效率。
20190730082254159.png

他是一种默认的存储格式,还有一种存储格式是textFile,textFile可读性比较好,但是效率比较低。orc主要是为了提升了读、写、处理数据的效率。

orc会把每一列的数据转化成行去存储
一行存储不同格式的效率会比较低,怎样变高呢?name就应该一行存储相同格式的

一行数据存的全是某一列里面的内容,相当于把表做了一个90度的旋转

比如第一个索引存的是p1 p2 p3 p4 p5,蒂格尔索引存的是float 100 200 300 400 500

用这样的格式存储,压缩效率很高,数据处理也会高很多,甚至可以实现跳行,比如我就想访问某一列的数据,对于orc来讲,我只需要拿到某一类对应的index,就访问那一行

2.5、Hive分桶
Hive可以将表或者表的分区进一步组织成桶,以达到:

1、数据取样效率更高

2、数据处理效率更高

桶通过对指定列进行哈希来实现,将一个列名下的数据切分为"一组桶",每个桶都对应了一个该列名下的一个存储文件。

之前说过hive是有分区的,建表的时候可以通过pertition by
分区的目的就是为了在访问数据的时候更快,在mysql中也有索引也有分区

那么hive的分桶是干嘛的?就是在分区的基础上在进行划分,可以理解为子分区,还有一种方式是在表里面直接进行分桶,其实表示有分区的,没有自定义分区的话,默认就是一个分区。所以在宏观上理解的话,可以分为两种,在分区上进行分桶,或者直接在表上进行分桶。严格上来讲都是在分区上进行分桶,只不过后者是只有一个分区而已。

2.5.1、直接分桶
开始操作之前,需要将hive.enforce.bucketing属性设置为true,以标识Hive可以识别桶。
[mw_shl_code=sql,true]create table music(

&#8203;    id int,

&#8203;    name string,

&#8203;    size float)

row format delimited

fields terminated by "\t"

clustered by (id) into 4 buckets;[/mw_shl_code]

该代码的意思是将music表按照id将数据分成了4个桶,插入数据时,会对应4个
reduce操作,输出4个文件。

id 进来进行一种hash算法,然后得到一个数字,这个数字对桶的个数进行求余,就想当前的个数是四,最后得到的余数就是0123 ,最后都会依次的放进去,上面的clustered by 里面的id就是在hash归桶的时候要通过那个字段进行归桶

2.5.2、在分区中分桶
当数据量过大,需要庞大发分区数量时,可以考虑桶,因为分区数量太大的情况可能会导致文件系统挂掉,而且桶比分区有更高的查询效率。数据最终落在哪一个桶里,取决于clustered
by的那个列的值的hash数与桶的个数求余来决定。虽然有一定离散行,但不能保证每个桶中的数据量是一样的。
[mw_shl_code=sql,true]create table music2(

&#8203;    id int,

&#8203;    name string,

&#8203;    size float)

partitioned by (date string)

clustered by (id) sorted by(size) into 4 bucket

row format delimited

fields terminated by "\t";

load data local inpath 'demo/music.txt' into table music2 partition(date='2017-08-30');[/mw_shl_code]

sorted by(size) into 4 bucket ,可以在某一个桶里面,按照某一个顺序进行排列顺序,即这个桶里面的数据都是有序的,按照size进行排序的。

partition(date=‘2017-08-30’); 指定导入到哪个分区里面

hbase的数据清洗的概念是什么?region,按照某一个范围,扔到某一个region里面,如果在rowkey是递增的,然后在某一个时段里面特别密集,就会导致某一个region特别大,某一个region特别大,就会导致数据处理的速度特别慢,某一台集群的压力特别大

hive中,加入第一个桶里面都是数据,第三个第四个桶里面没有数据,这样会导致怎样的现象呢?hive中的数据本身还是存在hdfs上面还是那个txt文件,在hbase里面有存储的倾斜,也有处理的倾斜,但是在hive、中只有数据处理的倾斜,在hive中,一个桶里面的数据会交给一个reducer处理,不可能出现两个桶对应于一个reducer,一个reducer就对应一个文件的输出,所以直白的说就是一个桶会对应一个文件的输出,两个桶会产生两个。现在问题就出来了,如果一个桶的数据特别多,剩下的桶没有数据,会导致第一个桶的reducer在第一台集齐的nodemanager执行很慢,然后第二个第三个第四个的reducer里面没数据,这就会导致负载不均衡了,所以分桶不是一定会导致数据不发生倾斜,但不是绝对的。

三、项目
3.1、数据结构
3.1.1、视频表
微信截图_20190907103638.png


3.1.2、用户表

微信截图_20190907103722.png



20190730082328874.png


20190730082342165.png



注意仔细观察上图中,我们上面说到这里有的是按照&分割的,有的是按照tab分割的,这样上面只能分割一个list,所以我们需要进行清洗数据

3.2原始数据存放地
HDFS目录:

视频数据集:/youtube/video/2008

用户数据集:/youtube/users/2008

先把数据方法hdfs上面
201907300823513.png


3.3、技术选型
* CDH5.3.6-Hadoop2.5.0

* CDH5.3.6-Hive0.13.1

* Mysql

3.3.1、数据清洗
MapReduce

3.3.2、数据分析
MapReduce or Hive

3.4、ETL原始数据
通过观察原始数据形式,可以发现,视频可以有多个所属分类,每个所属分类用&符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用"\t"进行分割。为了分析数据时方便对存在多个子元素的数据进行操作,我们首先进行数据重组清洗操作。即:将所有的类别用"&“分割,同时去掉两边空格,多个相关视频id也使用”&"进行分割。

3.6.1、ETL之ETLUtil
[mw_shl_code=java,true]package com.z.youtube.util;

public class ETLUtil {
public static String oriString2ETLString(String ori){
StringBuilder etlString = new StringBuilder();
String[] splits = ori.split("\t");
if(splits.length < 9) return null;
splits[3] = splits[3].replace(" ", "");
for(int i = 0; i < splits.length; i++){
if(i < 9){
if(i == splits.length - 1){
etlString.append(splits[i ]);        
}else{
etlString.append(splits[ i] + "\t");        
}
}else{
if(i == splits.length - 1){
etlString.append(splits[ i]);
}else{
etlString.append(splits[ i] + "&");
}
}
}

return etlString.toString();
}
}[/mw_shl_code]
完了之后可以写个main行数,随便粘贴上面的一行数据进行测试

3.6.2、ETL之Mapper
[mw_shl_code=java,true]package com.z.youtube.mr.etl;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.z.youtube.util.ETLUtil;

public class VideoETLMapper extends Mapper<Object, Text, NullWritable, Text>{
        Text text = new Text();
        
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String etlString = ETLUtil.oriString2ETLString(value.toString());
               
                if(StringUtils.isBlank(etlString)) return;
               
                text.set(etlString);
                context.write(NullWritable.get(), text);
        }
        

}[/mw_shl_code]

3.6.3、ETL之Runner
[mw_shl_code=sql,true]package com.z.youtube.mr.etl;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class VideoETLRunner implements Tool {
        private Configuration conf = null;

        @Override
        public void setConf(Configuration conf) {
                this.conf = conf;
        }

        @Override
        public Configuration getConf() {

                return this.conf;
        }

        @Override
        public int run(String[] args) throws Exception {
                conf = this.getConf();
        //需要传入两个参数,第一个参数是你要清洗的
        //数据的所在目录。
        //第二个参数是清洗完之后你的输出目录
                conf.set("inpath", args[0]);
                conf.set("outpath", args[1]);

                Job job = Job.getInstance(conf, "youtube-video-etl");
               
                job.setJarByClass(VideoETLRunner.class);
               
                job.setMapperClass(VideoETLMapper.class);
                job.setMapOutputKeyClass(NullWritable.class);
                job.setMapOutputValueClass(Text.class);
                job.setNumReduceTasks(0);
               
                this.initJobInputPath(job);
                this.initJobOutputPath(job);
               
                return job.waitForCompletion(true) ? 0 : 1;
        }

        private void initJobOutputPath(Job job) throws IOException {
                Configuration conf = job.getConfiguration();
        //通过这个键把刚才传到conf里面的值取出来
                String outPathString = conf.get("outpath");
               
                FileSystem fs = FileSystem.get(conf);
               
                Path outPath = new Path(outPathString);
                if(fs.exists(outPath)){
                        fs.delete(outPath, true);
                }
               
                FileOutputFormat.setOutputPath(job, outPath);
               
        }

        private void initJobInputPath(Job job) throws IOException {
                Configuration conf = job.getConfiguration();
                String inPathString = conf.get("inpath");
               
                FileSystem fs = FileSystem.get(conf);
               
                Path inPath = new Path(inPathString);
                if(fs.exists(inPath)){
                        FileInputFormat.addInputPath(job, inPath);
                }else{
                        throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);
                }
        }

        public static void main(String[] args) {
                try {
                        int resultCode = ToolRunner.run(new VideoETLRunner(), args);
                        if(resultCode == 0){
                                System.out.println("Success!");
                        }else{
                                System.out.println("Fail!");
                        }
                        System.exit(resultCode);
                } catch (Exception e) {
                        e.printStackTrace();
                        System.exit(1);
                }
        }
}[/mw_shl_code]

3.6.4、执行ETL
[mw_shl_code=shell,true]$ bin/yarn jar ~/Desktop/youtube-0.0.1-SNAPSHOT.jar \
com.z.youtube.mr.etl.VideoETLRunner \
/youtube/video/2008/0222 \
/youtube/output/video/2008/0222[/mw_shl_code]
2019073008240961.png


3.5、准备工作
3.5.1、创建表
创建表:youtube_ori(原始视频数据表),youtube_user_ori(原始用户表),

创建表:youtube_orc,youtube_user_orc

为什么要有原始表?因为想让数据的存储为orc形式,如果是orc形式是不能够直接使用load data这样的命令插入数据的,必须通过查询的方式把数据插入到orc格式的表里面,所以需要先见两个表,把清洗后的数据导进去,到入之后,对表进行查询,插入到orc表里面
[mw_shl_code=sql,true]youtube_ori:

create table youtube_ori(

videoId string,

uploader string,

age int,

category array<string>,

length int,

views int,

rate float,

ratings int,

comments int,

relatedId array<string>)
row format delimited
fields terminated by "\t"
collection items terminated by
"&"
stored as textfile;[/mw_shl_code]

然后把原始数据插入到orc表中
[mw_shl_code=sql,true]youtube_orc:

create table youtube_orc(

videoId string,

uploader string,

age int,

category array<string>,

length int,

views int,

rate float,

ratings int,

comments int,

relatedId array<string>)
clustered by (uploader) into 8 buckets
row format delimited fields terminated by
"\t"
collection items terminated by
"&"
stored as orc;[/mw_shl_code]
[mw_shl_code=sql,true]youtube_user_orc:

create table youtube_user_orc(
    uploader string,
    videos int,
    friends int)
clustered by (uploader) into 24 buckets
row format delimited
fields terminated by "\t"
stored as orc;[/mw_shl_code]
注意不同的地方是stored as orc

20190730082442472.png


2019073008244611.png


20190730082452262.png


然后开始把数据导入到orc表里面
20190730082500952.png


过程可能比较慢

查一下orc表的数据
20190730082515165.png

3.5.2、导入ETL后的数据
youtube_ori:

load data inpath “/youtube/output/video/2008/0222” into table youtube_ori;

youtube_user_ori:

load data inpath “/youtube/user/2008/0903” into table youtube_user_ori;

3.5.3、向ORC表插入数据
youtube_orc:

insert into table youtube_orc select * from youtube_ori;

youtube_user_orc:

insert into table youtube_user_orc select * from youtube_user_ori;

3.6、业务分析
3.6.1、统计视频观看数Top10
思路:

1) 使用order
by按照views字段做一个全局排序即可,同时我们设置只显示前10条。

最终代码:
[mw_shl_code=applescript,true]select
    videoId,
    uploader,
    age,
    category,
    length,
    views,
    rate,
    ratings,
    comments
from
    youtube_orc
order by
views desc
limit
    10;[/mw_shl_code]
20190730082526946.png

3.6.2、统计视频类别热度Top10
思路:

1) 即统计每个类别有多少个视频,显示出包含视频最多的前10个类别。

2) 我们需要按照类别group by聚合,然后count组内的videoId个数即可。

3) 因为当前表结构为:一个视频对应一个或多个类别。所以如果要group
by类别,需要先将类别进行列转行(展开),然后再进行count即可。

4) 最后按照热度排序,显示前10条。

最终代码:
[mw_shl_code=sql,true]select
    category_name as category,
    count(t1.videoId) as hot
from (
    select
        videoId,
        category_name
    from
        youtube_orc lateral view explode(category) t_catetory as category_name) t1
group by
    t1.category_name
order by
    hot
desc limit
    10;[/mw_shl_code]
20190730082547236.png

3.6.3、统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数
思路:

1) 先找到观看数最高的20个视频所属条目的所有信息,降序排列

2) 把这20条信息中的category分裂出来(列转行)

3) 最后查询视频分类名称和该分类下有多少个Top20的视频

最终代码:
[mw_shl_code=sql,true]select
    category_name as category,
    count(t2.videoId) as hot_with_views
from (
    select
        videoId,
        category_name
    from (
        select
            *
        from
            youtube_orc
        order by
            views
        desc limit
            20) t1 lateral view explode(category) t_catetory as category_name) t2
group by
    category_name
order by
    hot_with_views
desc;[/mw_shl_code]
20190730082608377.png

20190730082614561.png

执行结果
20190730082625137.png


3.6.4、统计视频观看数Top50所关联视频的所属类别Rank
rank排名的意思

思路:

查询出观看数最多的前50个视频的所有信息(当然包含了每个视频对应的关联视频),记为临时表t1
[mw_shl_code=sql,true]select
    *
from
    youtube_orc
order by
    views
desc limit
    50;[/mw_shl_code]
t1:观看数前50的视频

2) 将找到的50条视频信息的相关视频relatedId列转行,记为临时表t2
[mw_shl_code=sql,true]select
    explode(relatedId) as videoId
from
        t1;[/mw_shl_code]
t2:将相关视频的id进行列转行操作

3) 将相关视频的id和youtube_orc表进行inner join操作
[mw_shl_code=sql,true](select
    distinct(t2.videoId),
    t3.category
from
    t2
inner join
    youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name;[/mw_shl_code]
t5:得到两列数据,一列是category,一列是之前查询出来的相关视频id

4) 按照视频类别进行分组,统计每组视频个数,然后排行

最终代码:
[mw_shl_code=sql,true]select
    category_name as category,
    count(t5.videoId) as hot
from (
    select
        videoId,
        category_name
    from (
        select
            distinct(t2.videoId),
            t3.category
        from (
            select
                explode(relatedId) as videoId
            from (
                select
                    *
                from
                    youtube_orc
                order by
                    views
                desc limit
                    50) t1) t2
        inner join
            youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5
group by
    category_name
order by
    hot
desc;[/mw_shl_code]
t4 lateral view explode(category) t_catetory

这句话的意思是对t4表进行侧写,侧写的内容是,把category这一例炸开,炸开的结果生成一张新的表叫做t_catetory,炸开的这一列有一列的别名叫做category_name ,

然后distinct去重,然后distinct上面的select整个完成之后生成一个新的表叫做t5表
2019073008263753.png


3.6.5、统计每个类别中的视频热度Top10,以Music为例
思路:

要想统计Music类别中的视频热度Top10,需要先找到Music类别,所以需要将category展开,所以可以创建一张表用于存放categoryId展开的数据。

2) 向category展开的表中插入数据。

3) 统计对应类别(Music)中的视频热度。

最终代码:

创建表类别表:
[mw_shl_code=sql,true]create table youtube_category(
    videoId string,
    uploader string,
    age int,
    categoryId string,
    length int,
    views int,
    rate float,
    ratings int,
    comments int,
    relatedId array<string>)
row format delimited
fields terminated by "\t"
collection items terminated by "&"
stored as orc;[/mw_shl_code]
向类别表中插入数据:
[mw_shl_code=sql,true]insert into table youtube_category  
    select
        videoId,
        uploader,
        age,
        categoryId,
        length,
        views,
        rate,
        ratings,
        comments,
        relatedId
    from
        youtube_orc lateral view explode(category) catetory as categoryId;[/mw_shl_code]
将类别炸开

统计Music类别的Top10(也可以统计其他)
[mw_shl_code=sql,true]select
    videoId,
    views
from
    youtube_category
where
    categoryId = "Music"
order by
    views
desc limit
    10;[/mw_shl_code]
20190730082648462.png


3.6.6、统计每个类别中视频流量Top10,以Music为例
思路:

1) 创建视频类别展开表(categoryId列转行后的表)

2) 按照ratings排序即可

最终代码:
[mw_shl_code=sql,true]select
    videoId,
    views,
    ratings
from
    youtube_category
where
    categoryId = "Music"
order by
    ratings
desc limit
    10;[/mw_shl_code]
20190730082657588.png


3.6.7、统计上传视频最多的用户Top10以及他们上传的观看次数在前20的视频
思路:

1) 先找到上传视频最多的10个用户的用户信息
[mw_shl_code=sql,true]
select
    *
from
    youtube_user_orc
order by
    videos
desc limit
    10;[/mw_shl_code]
通过uploader字段与youtube_orc表进行join,得到的信息按照views观看次数进行排序即可。

最终代码:
[mw_shl_code=sql,true]select
    t2.videoId,
    t2.views,
    t2.ratings,
    t1.videos,
    t1.friends
from (
    select
        *
    from
        youtube_user_orc
    order by
        videos desc
    limit
        10) t1
join
    youtube_orc t2
on
    t1.uploader = t2.uploader
order by
    views desc
limit
    20;[/mw_shl_code]
20190730082714603.png


3.6.8、统计每个类别视频观看数Top10
思路:

1) 先得到categoryId展开的表数据

子查询按照categoryId进行分区,然后分区内排序,并生成递增数字,该递增数字这一列起名为rank列

3) 通过子查询产生的临时表,查询rank值小于等于10的数据行即可。

最终代码:
[mw_shl_code=sql,true]select
    t1.*
from (
    select
        videoId,
        categoryId,
        views,
        row_number() over(partition by categoryId order by views desc) rank from youtube_category) t1
where
    rank <= 10;[/mw_shl_code]
partition by categoryId 相同的分类在一个分区里面

row_number 这个函数的意思是对后面每一分区里面的数据进行一个升序的序列号的生成
20190730082722994.png


四、可能出现的问题
4.1、JVM堆内存溢出
描述:java.lang.OutOfMemoryError: Java heap space

解决:在yarn-site.xml中加入如下代码
[mw_shl_code=xml,true]<property>
//yarn允许的最大执行任务的调度的内存是多少,
//一定不要超过实际的电脑内存,比如电脑是2g,这里就分配2g,
//这里默认是8g
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>2048</value>
</property>

<property>
//物理内存和虚拟内存的关系,
//比如上面分配的物理内存是2g
//2.1的意思是上面的2*2.1就是虚拟内存的大小
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>2.1</value>
</property>

<property>
//在执行mapreduce任务的时候,每一个jvm实例,
//所允许的最大堆内存的大小是1024个,默认是200个
<name>mapred.child.java.opts</name>
        <value>-Xmx1024m</value>
</property>[/mw_shl_code]
五、总结及作业
请大家自行使用Java-MapReduce来实现上述需求。



最新经典文章,欢迎关注公众号

来源:CSDN

作者:Leesin Dong

原文:《大数据学习笔记之项目(三):离线项目拓展youtube》

https://blog.csdn.net/dataiyangu/article/details/97746461?utm_source=app


本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条