分享

自定义类封装HashSet,如何序列化

本帖最后由 小寒co1de 于 2017-10-14 23:27 编辑

[mw_shl_code=java,true]import org.apache.hadoop.io.WritableComparable;

import java.io.*;
import java.util.*;

/**
* Created by hadoop on 2017/9/30 0030.
*/
public class RelativeKey implements WritableComparable<RelativeKey> {
    private HashSet<String> keySet = new HashSet<String>();

    public RelativeKey() {
    }

    public HashSet<String> getKeySet() {
        return keySet;
    }

    public void setKeySet(String s) {
        this.keySet.add(s) ;
    }

    public void write(DataOutput dataOutput) throws IOException {
//        dataOutput.writeInt(keySet.size());
//        for (String s: keySet) {
//            dataOutput.writeUTF(s);
//        }
        String[] s = keySet.toArray(new String[] {});
        dataOutput.writeInt(s.length);
        for (int i=0;i<s.length;i++){
            dataOutput.writeUTF(s);
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
//        int length =dataInput.readInt();
//        for(int i=0; i<length;i++){
//            this.setKeySet(dataInput.readUTF());
//        }
//        keySet.clear();     如果不加这一句,reduce端接受的数据会出异常,不知道为何
        int length =dataInput.readInt();
        String[] s = new String[length];
        for (int i=0;i<s.length;i++){
            s=dataInput.readUTF();
//            System.out.println(s);
        }
        for (int i=0;i<s.length;i++){
            keySet.add(s);
        }



    }


    public int compareTo(RelativeKey o) {   
        if (equals(o) == true){
            return 1;
        }
        else return -1;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        RelativeKey that = (RelativeKey) o;

        return keySet != null ? keySet.equals(that.keySet) : that.keySet == null;
    }

    @Override
    public int hashCode() {
        return keySet != null ? keySet.hashCode() : 0;
    }
}
[/mw_shl_code]


这个自定义key是做牛奶啤酒案例用的,但发现这样发送数据给reduce后reduce的数据会变成下面这样,
QQ截图20171014232305.png

好像是一直再往hashset中添加数据,而在代码中加入keySet.clear()之后,就读入正常的键值对了,问题应该是在序列化中,但不知为何出现。另外比较方法似乎也有问题,但想先解决第一个问题,请各位前辈指点一下

已有(7)人评论

跳转到指定楼层
desehawk 发表于 2017-10-15 08:59:57
应该不是序列化的问题。
没有看到你的完整程序,猜测应该是这样的。mapreduce是把结果输出到磁盘,然后reduce在到磁盘去读取map输出结果。根据楼主提供的代码,似乎在keySet也读取了。这样就有了两份数据。这个只是其一
其二:
后面代码中,又用到了keySet,前面的数据没有清空,后面又添加   keySet.add(s);所以可能就产生了数据混乱。
  for (int i=0;i<s.length;i++){
            keySet.add(s);
        }

回复

使用道具 举报

langke93 发表于 2017-10-15 09:40:40

可以在 keySet.clear()之前,看下keySet里面是否有数据
[mw_shl_code=java,true]public void readFields(DataInput dataInput) throws IOException {
//        int length =dataInput.readInt();
//        for(int i=0; i<length;i++){
//            this.setKeySet(dataInput.readUTF());
//        }
//        keySet.clear();     如果不加这一句,reduce端接受的数据会出异常,不知道为何
        int length =dataInput.readInt();
        String[] s = new String[length];
        for (int i=0;i<s.length;i++){
            s=dataInput.readUTF();
//            System.out.println(s);
        }
        for (int i=0;i<s.length;i++){
            keySet.add(s);
        }



    }[/mw_shl_code]

回复

使用道具 举报

小寒co1de 发表于 2017-10-15 13:57:37
本帖最后由 小寒co1de 于 2017-10-15 14:01 编辑

补充上完整代码,用的是本地测试模式,没有添加集群信息

[mw_shl_code=java,true]

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;

import static java.io.FileDescriptor.out;

/**
* Created by hadoop on 2017/9/30 0030.
*/
public class Relative {
    public static class MyMap extends Mapper<LongWritable, Text, RelativeKey, IntWritable> {
        BufferedWriter bf = null;

        protected void setup(Mapper<LongWritable, Text, RelativeKey, IntWritable>.Context context) throws IOException, InterruptedException {
             bf = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("mapout")));
        }

        public void map(LongWritable ikey, Text ivalue, Context context)
                throws IOException, InterruptedException {

            String[] line = ivalue.toString().split(" ");

            for(int i = 1; i<line.length;i++){

                for(int j = i+1;j<line.length;j++){
                    RelativeKey rk= new RelativeKey();
                    rk.setKeySet(line);
                    rk.setKeySet(line[j]);
                    context.write(rk,new IntWritable(1));

                    String keyset="";
                   for(String s : rk.getKeySet()){
                       keyset+=s;
                   }
                   bf.write(keyset+1);
                   bf.newLine();

                }
            }

//            bf.close();
        }

        public void cleanup(Mapper<LongWritable, Text, RelativeKey, IntWritable>.Context context) throws IOException, InterruptedException {
            bf.close();
       }



    }

    public static class MyReduce extends Reducer<RelativeKey, IntWritable, Text, IntWritable> {

        public void setup(Reducer<RelativeKey, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        }

        public void reduce(RelativeKey _key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            // 累加值
            int count = 0;
            for (IntWritable val : values) {
                count +=Integer.parseInt(val.toString());
            }

            //测试收到的key对
            System.out.println("keySize is "+_key.getKeySet().size());
            String keyset="";
            for(String s : _key.getKeySet()){
                keyset+=s;
            }
            System.out.println(keyset);

            //将key整理成指定形式
            String[] keyString = new String[10];   //本应该是2,但是会数组越界,写成10发现读出来好长一串,就是前文里提到的
            int i = 0;
            for (String s: _key.getKeySet()) {
                keyString[i++] = s;
            }
//            System.out.println("----------");
            String outkey = keyString[0]+"-"+keyString[1];
            context.write(new Text(outkey),new IntWritable(count));
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "JobName");
        job.setJarByClass(Relative.class);
        // TODO: specify a mapper
        job.setMapperClass(Relative.MyMap.class);
        // TODO: specify a reducer
        job.setReducerClass(Relative.MyReduce.class);

        job.setMapOutputKeyClass(RelativeKey.class);
        job.setMapOutputValueClass(IntWritable.class);
        // TODO: specify output types
//        job.setOutputKeyClass(Text.class);
//        job.setOutputValueClass(Text.class);

        // TODO: specify input and output DIRECTORIES (not files)
        FileInputFormat.setInputPaths(job, new Path("C:\\Users\\lenovo\\Desktop\\wordcount\\src\\input\\input3.txt"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\lenovo\\Desktop\\wordcount\\src\\output4"));

        Path path = new Path("C:\\Users\\lenovo\\Desktop\\wordcount\\src\\output4");
        FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
        }
        if (!job.waitForCompletion(true))
            return;
    }
}
[/mw_shl_code]
回复

使用道具 举报

小寒co1de 发表于 2017-10-15 14:16:56
langke93 发表于 2017-10-15 09:40
可以在 keySet.clear()之前,看下keySet里面是否有数据
[mw_shl_code=java,true]public void readFields ...

确实是有数据的,
QQ截图20171015141252.png
hashset把重复数据剔除了,难怪会出现2 3 4 7 这样的集合大小。

为什么会出现这样的问题呢?和我本地测试有关系吗?
我每次都new了一个自定义类(封装了hashset),然后再调用write的,按我的理解每次的hashset都是一个新对象,怎么会读取到前一个对象的值呢?
回复

使用道具 举报

小寒co1de 发表于 2017-10-15 14:23:25
desehawk 发表于 2017-10-15 08:59
应该不是序列化的问题。
没有看到你的完整程序,猜测应该是这样的。mapreduce是把结果输出到磁盘,然后red ...

我补充了完整代码上来。我每次都发送的是一个新的对象,为什么会和前一个对象有关系呢?


这也引出我一直有的一个困惑,序列化和反序列化到底是在什么过程进行的,我一直认为是在Shuffle的时候进行,即此时框架才会调用对象的这两个方法,这样才能进行框架排序这样的操作。但这回这个问题使我很迷惑,关于hadoop数据流,序列化,反序列化的过程有更详细的讲解教程吗?网上大部分都只说怎么用,有点浅了。
回复

使用道具 举报

einhep 发表于 2017-10-16 07:34:18
小寒co1de 发表于 2017-10-15 14:23
我补充了完整代码上来。我每次都发送的是一个新的对象,为什么会和前一个对象有关系呢?

没看到定义的地方,有一个地方RelativeKey,这个是输出类型
public static class MyMap extends Mapper<LongWritable, Text, RelativeKey, IntWritable>

在reduce这里的RelativeKey是输入类型。可能跟你的RelativeKey有关系
public static class MyReduce extends Reducer<RelativeKey, IntWritable, Text, IntWritable>


回复

使用道具 举报

小寒co1de 发表于 2017-10-16 12:30:16
einhep 发表于 2017-10-16 07:34
没看到定义的地方,有一个地方RelativeKey,这个是输出类型
public static class MyMap extends Mapper  ...

代码有两部分,自定义键已经发过了,我这又把MapReduce过程的代码发上来了,确实是自定义键出了问题,但不知道为什么
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条