分享

Reducer端数据接收不到,也就是迭代器中貌似就没有数据!求解

逸辰不逸晨 发表于 2017-12-1 15:38:49 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 8321
package com.pri;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
* @author 吕梁彪
* 此次在Reducer进行join操作
* 输入路径为多个
*/
public class Mutil_File_ReducerMeger extends Configured implements Tool{

        @Override
        public int run(String[] args) throws Exception {
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "Mutil_File_ReducerMeger");
                job.setJarByClass(Mutil_File_ReducerMeger.class);
                job.setMapperClass(Mutil_File_ReducerMeger_Mapper.class);
                job.setReducerClass(Mutil_File_ReducerMeger_Reducer.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LLBWritable.class);
                job.setOutputKeyClass(LLBWritable.class);
                job.setOutputValueClass(NullWritable.class);
                //job.setNumReduceTasks(0);
                Path path = new Path("D:\\IOoperation\\MapReducer\\output\\out1");

                FileSystem fs = FileSystem.get(conf);
                if(fs.exists(path)) {
                        fs.delete(path,true);
                }
                FileInputFormat.setInputPaths(job, "D:\\IOoperation\\MapReducer\\input\\shop_information.txt,D:\\IOoperation\\MapReducer\\input\\shop.txt");
                FileOutputFormat.setOutputPath(job, path);
                return job.waitForCompletion(true)?0:1;
        }
        public static class LLBWritable implements Writable,Serializable{
                private static final long serialVersionUID = 1L;
                private String goodsName;
                private int goodsId;
                private int goodsPrice;
                private int goodsSales;
                private int goodsFlag;

                public void set(String goodsName,int goodsId,int goodsPrice,int goodsSales,int goodsFlag) {
                        this.goodsName         = goodsName;
                        this.goodsId         = goodsId;
                        this.goodsPrice = goodsPrice;
                        this.goodsSales = goodsSales;
                        this.goodsFlag         = goodsFlag;

                }

                public int getGoodsFlag() {
                        return goodsFlag;
                }

                public void setGoodsFlag(int goodsFlag) {
                        this.goodsFlag = goodsFlag;
                }

                public String getGoodsName() {
                        return goodsName;
                }

                public void setGoodsName(String goodsName) {
                        this.goodsName = goodsName;
                }

                public int getGoodsId() {
                        return goodsId;
                }

                public void setGoodsId(int goodsId) {
                        this.goodsId = goodsId;
                }

                public int getGoodsPrice() {
                        return goodsPrice;
                }

                public void setGoodsPrice(int goodsPrice) {
                        this.goodsPrice = goodsPrice;
                }

                public int getGoodsSales() {
                        return goodsSales;
                }

                public void setGoodsSales(int goodsSales) {
                        this.goodsSales = goodsSales;
                }

                @Override
                public void write(DataOutput out) throws IOException {
                        out.writeUTF(goodsName);
                        out.writeInt(goodsId);
                        out.writeInt(goodsPrice);
                        out.writeInt(goodsSales);
                        out.writeInt(goodsFlag);
                }

                @Override
                public void readFields(DataInput in) throws IOException {
                        in.readUTF();
                        in.readInt();
                        in.readInt();
                        in.readInt();
                        in.readInt();
                }

                @Override
                public String toString() {
                        return "Goods_Information [goodsName=" + goodsName + ", goodsId=" + goodsId + ", goodsPrice=" + goodsPrice
                                        + ", goodsSales=" + goodsSales + "]";
                }


        }

        public static class Mutil_File_ReducerMeger_Mapper extends Mapper<LongWritable, Text, Text, LLBWritable>{
                Text k = new Text();
                LLBWritable llb = new LLBWritable();
                @Override
                protected void map(LongWritable key, Text value, Context context)
                                throws IOException, InterruptedException {
                        //获取输入文件的名称(便于分类)
                        FileSplit fs =(FileSplit)context.getInputSplit();
                        String name = fs.getPath().getName();
                        String[] values;
                        if(name.contains("information")) {
                                k.set("shop_information");
                                values = StringUtils.split(value.toString(),"\t");
                                for(int i = 0 ; i < values.length ; i++) {
                                        llb.set(values[0],Integer.parseInt(values[1]), 0, Integer.parseInt(values[2]), 2);
                                }
                        }else {
                                k.set("shop");
                                values = StringUtils.split(value.toString(),"\t");
                                for(int i = 0 ; i < values.length ; i++) {
                                        llb.set("", Integer.parseInt(values[0]), Integer.parseInt(values[1]), 0, 1);
                                }
                        }
                        context.write(k, llb);
                }
        }

        public static class Mutil_File_ReducerMeger_Reducer extends Reducer<Text, LLBWritable, LLBWritable, NullWritable>{
                ArrayList<LLBWritable> list = new ArrayList<>();
                LLBWritable llb1 = new LLBWritable();
                @Override
                protected void reduce(Text arg0, Iterable<LLBWritable> values,Context context) throws IOException, InterruptedException {

                        for (LLBWritable value : values) {
                                if("shop".equals(arg0.toString())) {
                                        try {
                                                BeanUtils.copyProperties(llb1,value);
                                        } catch (Exception e) {
                                                System.out.println(e.getMessage());
                                                e.printStackTrace();
                                        }
                                }else {
                                        try {
                                                LLBWritable llb2 =  new LLBWritable();
                                                BeanUtils.copyProperties(llb2,value);
                                                list.add(llb2);
                                        } catch (Exception e) {
                                                System.out.println(e.getMessage());
                                                e.printStackTrace();
                                        }
                                }
                        }
                        for (LLBWritable llb : list) {
                                llb.setGoodsPrice(llb1.getGoodsPrice());
                                context.write(llb, NullWritable.get());
                        }
                }
        }
        public static void main(String[] args) {
                try {
                        System.out.println(ToolRunner.run(new Mutil_File_ReducerMeger(), args));
                } catch (Exception e) {
                        System.out.println(e.getMessage());
                        e.printStackTrace();
                }
        }
}


已有(6)人评论

跳转到指定楼层
sstutu 发表于 2017-12-1 16:07:17
这是在哪运行的。
FileInputFormat.setInputPaths(job, "D:\\IOoperation\\MapReducer\\input\\shop_information.txt,D:\\IOoperation\\MapReducer\\input\\shop.txt");
FileOutputFormat.setOutputPath(job, path);

路径不建议使用window,而且如果换个地方,这个运行就会出错,而且不能读取文件。
最好使用hdfs路径。

回复

使用道具 举报

逸辰不逸晨 发表于 2017-12-4 09:24:05
只是作为实验的在window下测试一下reducer side join 方法,就是reducer中iterator没有值,但是key是有值存在的。如果将reducer的输出设置为0的话,map端也可以输出数据!
回复

使用道具 举报

逸辰不逸晨 发表于 2017-12-4 09:26:27
sstutu 发表于 2017-12-1 16:07
这是在哪运行的。
FileInputFormat.setInputPaths(job, "D:\\IOoperation\\MapReducer\\input\\shop_infor ...

只是作为实验的在window下测试一下reducer side join 方法,就是reducer中iterator没有值,但是key是有值存在的。如果将reducer的输出设置为0的话,map端也可以输出数据!

回复

使用道具 举报

逸辰不逸晨 发表于 2018-2-28 17:18:37
问题已经解决了,原因是序列化也就是自定义类型中出了点问题!
回复

使用道具 举报

ww8605853 发表于 2018-5-23 00:05:37
学习学习,一名小白!
回复

使用道具 举报

ww8605853 发表于 2018-5-23 19:23:18
逸辰不逸晨 发表于 2018-2-28 17:18
问题已经解决了,原因是序列化也就是自定义类型中出了点问题!

序列化中定义的类出现了什么问题?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条