分享

hadoop之MapReduce的问题,请求解答!

刚接触的hadoop,作为一个资深的小白,刚踏入hadoop,搭建成功环境以后,且能访问的页面都已经成功访问成功,肯定就是非常希望能够运行一个实例来查看结果了,可是我不知道为什么就是出现了各种问题,在解决了很多问题后,终于让MapReduce的第一个实例运行不报错了,但是却也没有得到自己想要的结果。希望能够有人能够帮助下!

运行完成后就得到了这个结果,输出为0

运行完成后就得到了这个结果,输出为0

并没有报错,就是没有结果!
看下代码吧!
先新建了一个类来存数据
package com.wujie;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class KeyPari implements WritableComparable<KeyPari> {
    private int year;
    private int hot;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getHot() {
        return hot;
    }

    public void setHot(int hot) {
        this.hot = hot;
    }

    @Override
    public String toString() {
        return "Patri{" +
                "year=" + year +
                ", hot=" + hot +
                '}';
    }

    @Override
    public int compareTo(KeyPari o) {
        int res = Integer.compare(year,o.getYear());
        if(res != 0){
            return res ;
        }
        return Integer.compare(hot,o.getHot());
    }

    /**
     * 理解为反序列化得过程
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(year);
        dataOutput.writeInt(hot);
    }

    /**
     * 理解为序列化得过程
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.year = dataInput.readInt();
        this.hot = dataInput.readInt();
    }


    @Override
    public int hashCode() {

        return new Integer(year +hot).hashCode();
    }
}

-----------------------------------------------------------------------
然后自定义一个排序
package com.wujie;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 自定义排序
*/
public class SortHot extends WritableComparator {
    public SortHot() {
        super(KeyPari.class,true);
    }

    /**
     * 年份升序,温度降序
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        KeyPari o1 = (KeyPari) a;
        KeyPari o2 = (KeyPari) b;
        int res = Integer.compare(o1.getYear(),o2.getYear());
        if(res != 0){
            return res;
        }
        return -Integer.compare(o1.getHot(),o2.getHot());//降序排序
    }
}

----------------------------------------------------------------------------------
进行一个按年份分组
package com.wujie;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 自定义分组
*/
public class GroupHost extends WritableComparator {
    public GroupHost() {
        super(KeyPari.class,true);
    }

    /**
     * 年份相同为一组
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        KeyPari o1 = (KeyPari) a;
        KeyPari o2 = (KeyPari) b;
        return Integer.compare(o1.getYear(),o2.getYear());

    }
}

---------------------------------------------------------------------------------------
自定义一个分区
package com.wujie;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* 自定义分区
*/
public class FirstPartition extends Partitioner<KeyPari,Text> {
    @Override
    public int getPartition(KeyPari keyPari, Text value, int num) {
        return (keyPari.getYear()*127) % num;//按照年份分区
    }
}

----------------------------------------------------------------------
最后就是一个主类运行
package com.wujie;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;


import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class RunJob {
    public static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    static class HotMapper extends Mapper<LongWritable,Text,KeyPari,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] ss = line.split("\t");
            if(ss.length == 2){
                try {
                    Date date = simpleDateFormat.parse(ss[0]);
                    Calendar calendar = Calendar.getInstance();
                    calendar.setTime(date);
                    int year = calendar.get(1);
                    String hot = ss[1].substring(0);
                    KeyPari keyPari = new KeyPari();
                    keyPari.setHot(Integer.parseInt(hot));
                    keyPari.setYear(year);
                    System.out.println("map:"+keyPari);
                    context.write(keyPari,value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class HotReduce extends Reducer<KeyPari,Text,KeyPari,Text>{
        @Override
        protected void reduce(KeyPari key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text v:values
                 ) {
                    System.out.println("reduce:---key:"+key);
                    System.out.println("reduce:--v:"+v);
                context.write(key,v);

            }
        }
    }

    public static void main(String[] args){
        Configuration configuration = new Configuration();
        try {
            Job job = new Job(configuration);
            job.setJobName("hot");
            job.setJarByClass(RunJob.class);
            job.setMapperClass(HotMapper.class);
            job.setReducerClass(HotReduce.class);
            job.setMapOutputKeyClass(KeyPari.class);
            job.setMapOutputValueClass(Text.class);
            job.setNumReduceTasks(3);
            job.setPartitionerClass(FirstPartition.class);
            job.setSortComparatorClass(SortHot.class);
            job.setGroupingComparatorClass(GroupHost.class);

            FileInputFormat.addInputPath(job,new Path("/usr/input/hot"));
            FileOutputFormat.setOutputPath(job,new Path("/usr/output/hot"));
            System.exit(job.waitForCompletion(true) ? 0:1);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

--------------------------------------------------------------------------------------
运行步骤如下:
1.在hadoop中创建/usr/input/hot/的 文件夹
2.将数据文本上传至/usr/input/hot/的文件夹下
3.将程序打jar包,放到虚拟机中运行
4.最后就是读到数据,也将文件分块了,但是文件中却没有任何的数据

已有(2)人评论

跳转到指定楼层
easthome001 发表于 2018-5-23 10:46:53
下面可能是一个原因。
/**
     * 理解为反序列化得过程
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(year);
        dataOutput.writeInt(hot);
    }

相关推荐参考
Reducer端数据接收不到,也就是迭代器中貌似就没有数据!求解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23443


回复

使用道具 举报

ww8605853 发表于 2018-5-25 21:47:20
终于找到了问题所在,在这里简单说一下,在这个过程中,因为是新手,对于排除错误的方法,其实就是那么几个,第一,看看官方列子是否可以执行,如果可以执行,说明环境没有太大问题,第二,检查自己的代码是否有问题,第三,就是检查一下你准备的数据格式是否正确了,我就是初心将数据格式弄错了!!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条