分享

MapSideJoin问题,求解

a3087661 发表于 2015-4-24 18:26:33 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 8 45887

已有(8)人评论

跳转到指定楼层
a3087661 发表于 2015-4-24 18:29:20
这是报错,有map阶段但是没有Reduce!!!

zxy@zxy-virtual-machine:/usr/hadoop/hadoop-2.4.0$ hadoop jar WordCount.jar _pasted_code_.ReduceSideJoin /input /output
15/04/24 18:10:50 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/04/24 18:10:50 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/04/24 18:10:51 INFO input.FileInputFormat: Total input paths to process : 2
15/04/24 18:10:51 INFO mapreduce.JobSubmitter: number of splits:2
15/04/24 18:10:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1552492915_0001
15/04/24 18:10:51 WARN conf.Configuration: file:/home/zxy/hadoop_tmp/mapred/staging/zxy1552492915/.staging/job_local1552492915_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
15/04/24 18:10:51 WARN conf.Configuration: file:/home/zxy/hadoop_tmp/mapred/staging/zxy1552492915/.staging/job_local1552492915_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
15/04/24 18:10:51 WARN conf.Configuration: file:/home/zxy/hadoop_tmp/mapred/local/localRunner/zxy/job_local1552492915_0001/job_local1552492915_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
15/04/24 18:10:51 WARN conf.Configuration: file:/home/zxy/hadoop_tmp/mapred/local/localRunner/zxy/job_local1552492915_0001/job_local1552492915_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
15/04/24 18:10:51 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/04/24 18:10:51 INFO mapreduce.Job: Running job: job_local1552492915_0001
15/04/24 18:10:51 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/04/24 18:10:51 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Waiting for map tasks
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Starting task: attempt_local1552492915_0001_m_000000_0
15/04/24 18:10:52 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/04/24 18:10:52 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/data.txt:0+54
15/04/24 18:10:52 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/04/24 18:10:52 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/04/24 18:10:52 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/04/24 18:10:52 INFO mapred.MapTask: soft limit at 83886080
15/04/24 18:10:52 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/04/24 18:10:52 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/04/24 18:10:52 INFO mapred.LocalJobRunner:
15/04/24 18:10:52 INFO mapred.MapTask: Starting flush of map output
15/04/24 18:10:52 INFO mapred.MapTask: Spilling map output
15/04/24 18:10:52 INFO mapred.MapTask: bufstart = 0; bufend = 160; bufvoid = 104857600
15/04/24 18:10:52 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
15/04/24 18:10:52 INFO mapred.MapTask: Finished spill 0
15/04/24 18:10:52 INFO mapred.Task: Task:attempt_local1552492915_0001_m_000000_0 is done. And is in the process of committing
15/04/24 18:10:52 INFO mapred.LocalJobRunner: map
15/04/24 18:10:52 INFO mapred.Task: Task 'attempt_local1552492915_0001_m_000000_0' done.
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Finishing task: attempt_local1552492915_0001_m_000000_0
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Starting task: attempt_local1552492915_0001_m_000001_0
15/04/24 18:10:52 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/04/24 18:10:52 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/data1.txt:0+40
15/04/24 18:10:52 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/04/24 18:10:52 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/04/24 18:10:52 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/04/24 18:10:52 INFO mapred.MapTask: soft limit at 83886080
15/04/24 18:10:52 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/04/24 18:10:52 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/04/24 18:10:52 INFO mapred.LocalJobRunner:
15/04/24 18:10:52 INFO mapred.MapTask: Starting flush of map output
15/04/24 18:10:52 INFO mapred.MapTask: Spilling map output
15/04/24 18:10:52 INFO mapred.MapTask: bufstart = 0; bufend = 146; bufvoid = 104857600
15/04/24 18:10:52 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
15/04/24 18:10:52 INFO mapred.MapTask: Finished spill 0
15/04/24 18:10:52 INFO mapred.Task: Task:attempt_local1552492915_0001_m_000001_0 is done. And is in the process of committing
15/04/24 18:10:52 INFO mapred.LocalJobRunner: map
15/04/24 18:10:52 INFO mapred.Task: Task 'attempt_local1552492915_0001_m_000001_0' done.
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Finishing task: attempt_local1552492915_0001_m_000001_0
15/04/24 18:10:52 INFO mapred.LocalJobRunner: map task executor complete.
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Waiting for reduce tasks
15/04/24 18:10:52 INFO mapred.LocalJobRunner: Starting task: attempt_local1552492915_0001_r_000000_0
15/04/24 18:10:52 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/04/24 18:10:52 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@77571787
15/04/24 18:10:52 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
15/04/24 18:10:52 INFO reduce.EventFetcher: attempt_local1552492915_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
15/04/24 18:10:52 INFO mapreduce.Job: Job job_local1552492915_0001 running in uber mode : false
15/04/24 18:10:52 INFO mapreduce.Job:  map 100% reduce 0%
15/04/24 18:10:53 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1552492915_0001_m_000001_0 decomp: 158 len: 162 to MEMORY
15/04/24 18:10:53 INFO reduce.InMemoryMapOutput: Read 158 bytes from map-output for attempt_local1552492915_0001_m_000001_0
15/04/24 18:10:53 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 158, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->158
15/04/24 18:10:53 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1552492915_0001_m_000000_0 decomp: 172 len: 176 to MEMORY
15/04/24 18:10:53 INFO reduce.InMemoryMapOutput: Read 172 bytes from map-output for attempt_local1552492915_0001_m_000000_0
15/04/24 18:10:53 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 172, inMemoryMapOutputs.size() -> 2, commitMemory -> 158, usedMemory ->330
15/04/24 18:10:53 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
15/04/24 18:10:53 INFO mapred.LocalJobRunner: 2 / 2 copied.
15/04/24 18:10:53 INFO reduce.MergeManagerImpl: finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
15/04/24 18:10:53 WARN io.ReadaheadPool: Failed readahead on ifile
EBADF: Bad file descriptor
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:263)
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:142)
        at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
15/04/24 18:10:53 INFO mapred.Merger: Merging 2 sorted segments
15/04/24 18:10:53 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 318 bytes
15/04/24 18:10:53 INFO reduce.MergeManagerImpl: Merged 2 segments, 330 bytes to disk to satisfy reduce memory limit
15/04/24 18:10:53 INFO reduce.MergeManagerImpl: Merging 1 files, 332 bytes from disk
15/04/24 18:10:53 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
15/04/24 18:10:53 INFO mapred.Merger: Merging 1 sorted segments
15/04/24 18:10:53 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 322 bytes
15/04/24 18:10:53 INFO mapred.LocalJobRunner: 2 / 2 copied.
15/04/24 18:10:53 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
15/04/24 18:10:53 INFO mapred.LocalJobRunner: reduce task executor complete.
15/04/24 18:10:53 WARN mapred.LocalJobRunner: job_local1552492915_0001
java.lang.Exception: java.io.EOFException
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:197)
        at java.io.DataInputStream.readUTF(DataInputStream.java:609)
        at java.io.DataInputStream.readUTF(DataInputStream.java:564)
        at _pasted_code_.Person.readFields(Person.java:65)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
        at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
        at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
        at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
15/04/24 18:10:53 INFO mapreduce.Job: Job job_local1552492915_0001 failed with state FAILED due to: NA
15/04/24 18:10:54 INFO mapreduce.Job: Counters: 38
        File System Counters
                FILE: Number of bytes read=19172
                FILE: Number of bytes written=462776
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=148
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=14
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Map-Reduce Framework
                Map input records=10
                Map output records=10
                Map output bytes=306
                Map output materialized bytes=338
                Input split bytes=203
                Combine input records=0
                Combine output records=0
                Reduce input groups=0
                Reduce shuffle bytes=338
                Reduce input records=0
                Reduce output records=0
                Spilled Records=10
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=91
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=382869504
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=94
        File Output Format Counters
                Bytes Written=0
回复

使用道具 举报

leo_1989 发表于 2015-4-24 19:00:36
a3087661 发表于 2015-4-24 18:29
这是报错,有map阶段但是没有Reduce!!!

zxy@zxy-virtual-machine:/usr/hadoop/hadoop-2.4.0$ hadoop jar ...

我也遇到了这个问题是代码版本与环境版本不匹配,代码是旧版本,但是却引用了新版本包

回复

使用道具 举报

a3087661 发表于 2015-4-24 19:30:01
leo_1989 发表于 2015-4-24 19:00
我也遇到了这个问题是代码版本与环境版本不匹配,代码是旧版本,但是却引用了新版本包

确定是api和hadoop不匹配?
回复

使用道具 举报

mituan2008 发表于 2015-4-25 00:57:59
a3087661 发表于 2015-4-24 19:30
确定是api和hadoop不匹配?

楼主的都是什么版本
回复

使用道具 举报

a3087661 发表于 2015-4-25 07:39:13
mituan2008 发表于 2015-4-25 00:57
楼主的都是什么版本

hadoop2.4.0,系统用的ubuntu kylin 14.10
回复

使用道具 举报

desehawk 发表于 2015-4-25 12:48:33
本帖最后由 desehawk 于 2015-4-25 12:49 编辑
a3087661 发表于 2015-4-25 07:39
hadoop2.4.0,系统用的ubuntu kylin 14.10

楼主mapreduce的写法是使用的hadoop1.x,使用的版本是hadoop2.x可能存在问题。

下面是新版本的写法:
  1.     public static void main(String[] args) throws Exception
  2.     {
  3.         /**
  4.         * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
  5.         * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
  6.         */
  7.         JobConf conf = new JobConf(WordCount.class);
  8.         conf.setJobName("wordcount");          //设置一个用户定义的job名称
  9.         conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
  10.         conf.setOutputValueClass(IntWritable.class);  //为job输出设置value类
  11.         conf.setMapperClass(Map.class);        //为job设置Mapper类
  12.         conf.setCombinerClass(Reduce.class);      //为job设置Combiner类
  13.         conf.setReducerClass(Reduce.class);        //为job设置Reduce类
  14.         conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
  15.         conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类
  16.         /**
  17.         * InputFormat描述map-reduce中对job的输入定义
  18.         * setInputPaths():为map-reduce job设置路径数组作为输入列表
  19.         * setInputPath():为map-reduce job设置路径数组作为输出列表
  20.         */
  21.         FileInputFormat.setInputPaths(conf, new Path(args[0]));
  22.         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  23.         JobClient.runJob(conf);        //运行一个job
  24.     }
复制代码




完整内容:
package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
*
* 描述:WordCount explains by Felix
* @author Hadoop Dev Group
*/
public class WordCount
{
    /**
    * MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)
    * Mapper接口:
    * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。
    * Reporter 则可用于报告整个应用的运行进度,本例中未使用。
    *
    */
    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable>
    {
        /**
        * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口,
        * 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。
        */
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        /**
        * Mapper接口中的map方法:
        * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
        * 映射一个单个的输入k/v对到一个中间的k/v对
        * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
        * OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。
        * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
        */
        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }
    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            int sum = 0;
            while (values.hasNext())
            {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
    public static void main(String[] args) throws Exception
    {
        /**
        * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
        * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
        */
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");          //设置一个用户定义的job名称
        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
        conf.setOutputValueClass(IntWritable.class);  //为job输出设置value类
        conf.setMapperClass(Map.class);        //为job设置Mapper类
        conf.setCombinerClass(Reduce.class);      //为job设置Combiner类
        conf.setReducerClass(Reduce.class);        //为job设置Reduce类
        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类
        /**
        * InputFormat描述map-reduce中对job的输入定义
        * setInputPaths():为map-reduce job设置路径数组作为输入列表
        * setInputPath():为map-reduce job设置路径数组作为输出列表
        */
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);        //运行一个job
    }
}



回复

使用道具 举报

a3087661 发表于 2015-4-25 13:30:37
desehawk 发表于 2015-4-25 12:48
本帖最后由 desehawk 于 2015-4-25 12:49 编辑

楼主mapreduce的写法是使用的hadoop1.x,使用的版本是ha ...

改完之后程序开始报错了
报错: The method setMapperClass(Class<? extends Mapper>) in the type JobConf is not applicable
         for the arguments (Class<MyMapper>)
Image 2.png
回复

使用道具 举报

desehawk 发表于 2015-4-25 14:34:45
a3087661 发表于 2015-4-25 13:30
改完之后程序开始报错了
报错: The method setMapperClass(Class


检查下引用包,还有函数的参数个数,及参数类型确保正确
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条