分享

将mapreduce的结果导入到数据库时遇到问题

草莓的橘子樹 发表于 2017-3-24 16:07:05 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 10612
我的目的是想先从文件中读取数据经mapreduce处理后再将结果数据导入到数据库中。但是提示好像数据类型出错。
代码如下:[mw_shl_code=java,true]package com.sun.mysql;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/** * 将mapreduce的结果数据写入mysql中 */
public class mysqlmapre {
/**  * 重写DBWritable
     * TblsWritable需要向mysql中写入数据
     */
    public static class TblsWritable implements Writable, DBWritable
    {  
            String word;  
            int count;  
            public TblsWritable()
            {  
            
            }  
            public TblsWritable(String word,int count)
            {  
            this.word = word;
            this.count = count;
            }  
            @Override  
            public void write(PreparedStatement statement) throws SQLException
            {
                    statement.setString(1, this.word);  
                    statement.setInt(2, this.count);  
            }  
            @Override  
            public void readFields(ResultSet resultSet) throws SQLException
            {  
                    this.word = resultSet.getString(1);  
                    this.count = resultSet.getInt(2);  
            }  
            @Override  
            public void write(DataOutput out) throws IOException
            {  
                    out.writeUTF(this.word);
                    out.writeInt(this.count);
            }  
            @Override  
            public void readFields(DataInput in) throws IOException
            {  
                    this.word = in.readUTF();  
                    this.count = in.readInt();  
            }  
            public String toString()
            {  
                return new String(this.word + "        " + this.count);  
            }  
    }
    public static class ConnMysqlMapper extends Mapper<Object,Text,Text, IntWritable>{
    //TblsRecord是自定义的类型,也就是上面重写的DBWritable类

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException
        {  
        //<首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
        //key对于本程序没有太大的意义,没有使用

                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                        String token = itr.nextToken();
                    for(int i=0;i<input.length;i++)
                                 if((token.compareToIgnoreCase(input))==0){
                            word.set(token);
                            context.write(word, one);
                        }
                    
              }
        }  
   }  
    public static class ConnMysqlReducer extends Reducer<LongWritable,Text,TblsWritable,TblsWritable>{  
           
        public void reduce(LongWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{  
        //接收到的key value对即为要输入数据库的字段,所以在reduce中:
        //wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,
        // 然后等待写入数据库
        //wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
        for(Iterator<IntWritable> itr = values.iterator();itr.hasNext();){  
                     context.write(new TblsWritable(key.toString(),itr.next().get()),null);
                 }  
        
        }  
    }  
        public static         String input[] = null;
        public static void readTxtFile(String filePath){
                try {
                        File file=new File(filePath);
                        if(file.isFile() && file.exists()){ //判断文件是否存在
                            InputStreamReader read = new InputStreamReader(
                            new FileInputStream(file));
                            BufferedReader bufferedReader = new BufferedReader(read);
                            String lineTxt = null;
                            while((lineTxt = bufferedReader.readLine()) != null){
                                String a = lineTxt;                              
                                input = a.split(" ");
                     
                            }
                            read.close();
                }else{
                    System.out.println("找不到指定的文件");
                }
                } catch (Exception e) {
                    System.out.println("读取文件内容出错");
                    e.printStackTrace();
                }             
            }


        @SuppressWarnings("deprecation")
        public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException
    {
        Configuration conf = new Configuration();
        DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.1.40-bin.jar"), conf);
            String filePath = "/usr/local/hadoop/ab.txt";
           readTxtFile(filePath);
                args = new String[]{"hdfs://master:9000/user/hadoop/input/a.txt"};
               
                DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.121:3306/mapreducetest","root", "123456");   
                 
        Job job = Job.getInstance(conf, "test mysql connection");
        job.setJarByClass(mysqlmapre.class);
      
        job.setMapperClass(ConnMysqlMapper.class);  
        job.setReducerClass(ConnMysqlReducer.class);  
         
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);
        
        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(DBOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        DBOutputFormat.setOutput(job, "t1", "word","count");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}[/mw_shl_code]

运行后提示的错误是:
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-24 15:00:47,361 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1380292941_0001 failed with state FAILED due to: NA
2017-03-24 15:00:47,424 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 35

求大神帮忙看下这个问题出在哪儿,我试了好多次都没用……多谢了!

已有(6)人评论

跳转到指定楼层
starrycheng 发表于 2017-3-24 18:03:12
  job.setOutputFormatClass(DBOutputFormat.class);
替换为
job.setOutputFormatClass(TextOutputFormat.class);

去掉
DBOutputFormat.setOutput(job, "t1", "word","count");
好像自定义的有问题

回复

使用道具 举报

草莓的橘子樹 发表于 2017-3-24 22:30:00
starrycheng 发表于 2017-3-24 18:03
job.setOutputFormatClass(DBOutputFormat.class);
替换为
job.setOutputFormatClass(TextOutputFormat ...

谢谢你的回复!但是……不太行,替换之后没用,而且怎么可以删掉输出设置呢…
请问哪里有问题?

回复

使用道具 举报

NEOGX 发表于 2017-3-25 13:44:55
草莓的橘子樹 发表于 2017-3-24 22:30
谢谢你的回复!但是……不太行,替换之后没用,而且怎么可以删掉输出设置呢…
请问哪里有问题?

ConnMysqlMapper extends Mapper<Object,Text,Text, IntWritable>
map的输出类型是reduce的输入类型
Reducer<LongWritable,Text,TblsWritable,TblsWritable>
因此reduce需要修改为
Reducer<Text,IntWritable,TblsWritable,TblsWritable>


回复

使用道具 举报

草莓的橘子樹 发表于 2017-3-26 20:22:10
NEOGX 发表于 2017-3-25 13:44
ConnMysqlMapper extends Mapper
map的输出类型是reduce的输入类型
Reducer

谢谢大神的回复!!我get到错在哪里了…哈哈…
你提示改的那两行不太行,后来发现这样改就好啦~~

157,158行应该改成:

        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(IntWritable.class);         
        job.setOutputKeyClass(TblsWritable.class);  
        job.setOutputValueClass(TblsWritable.class);

102,104行应该改成:

    public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{      
        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{  

回复

使用道具 举报

草莓的橘子樹 发表于 2017-3-26 22:14:25
NEOGX 发表于 2017-3-25 13:44
ConnMysqlMapper extends Mapper
map的输出类型是reduce的输入类型
Reducer

你好,我把ConnMysqlReducer 类里的功能改了一下(我想把map的结果在reduce里相加),就把
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>
这行输出类型改了,可是还是不行,又出现上面一样的异常。
  
   
[mw_shl_code=java,true]public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>{  
        private IntWritable result = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{  
               int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
        }  
    }  
[/mw_shl_code]

异常提示:
2017-03-26 21:54:40,092 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1956595032_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:115)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-26 21:54:40,094 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1956595032_0001 failed with state FAILED due to: NA

麻烦再帮忙看下,谢谢…
回复

使用道具 举报

einhep 发表于 2017-3-28 12:56:02
草莓的橘子樹 发表于 2017-3-26 22:14
你好,我把ConnMysqlReducer 类里的功能改了一下(我想把map的结果在reduce里相加),就把
public stati ...

org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
上面是文本转换为DBWritable失败。
除了输入就是输出,猜测是因为输入的text或则中间结果,在输出的时候,想转换为DBWritable,但是失败了。提出下面办法
1.重写下输入格式
2.最好调试下,这样定位更准确。
上面代码只能看出思路是正确的,但是具体哪里有问题,调试会更加准确。定位后,楼主可以贴出来,在商讨
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条