分享

Hadoop2+Hbase简单程序实现

fish_tx 发表于 2015-2-11 11:21:34 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 12310
实现:
       将dfs中的数据插入到hbase中。
注:有使用序列化。

package com.fish.mr2hb;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.log4j.Logger;


public class CellphoneSum {
   
    private static Logger log = Logger.getLogger(CellphoneSum.class);
   
    private static String TABLE = "myHBase";

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", "hdfs://Hadoop2-Master:9000/hbase");
        conf.set("hbase.zookeeper.quorum", "Hadoop2-Master,Hadoop2-Slave1,Hadoop2-Slave2");
        Path inpath = new Path("hdfs://Hadoop2-Master:9000/temp/ncmdp/part-r-00000");
        Job job = new Job(conf, CellphoneSum.class.getSimpleName());

        job.setInputFormatClass(TextInputFormat.class);
        job.setJarByClass(CellphoneSum.class);
        
        FileInputFormat.setInputPaths(job, inpath);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReduce.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CellphoneWritable.class);
        TableMapReduceUtil.initTableReducerJob(TABLE, MyReduce.class, job);
        job.waitForCompletion(true);
    }
   
    static class MyMapper extends Mapper<LongWritable, Text, Text, CellphoneWritable>{
        
        @Override
        protected void map(
                LongWritable k1,
                Text v1,
                Mapper<LongWritable, Text, Text, CellphoneWritable>.Context context)
                throws IOException, InterruptedException {
            String[] ss = v1.toString().split("\t");
            log.info("---->"+v1.toString()+"---"+ss.length);
            context.write(new Text(ss[0]), new CellphoneWritable(ss[0],ss[1]));
        }
    }

    static class MyReduce extends TableReducer<Text, CellphoneWritable, ImmutableBytesWritable>{
        private final String FAMILY = "family2";
        private final String CELLPHONE = "cellphone";
        private final String SUM = "sum";
        
        @Override
        protected void reduce(
                Text k2,
                Iterable<CellphoneWritable> v2s,
                Reducer<Text, CellphoneWritable, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
            byte[] key = ("fc-"+k2.toString()).getBytes();
            Put put = new Put(key);
            CellphoneWritable cellphoneWritable = v2s.iterator().next();
            put.add(FAMILY.getBytes(), CELLPHONE.getBytes(), (cellphoneWritable.phone).getBytes());
            put.add(FAMILY.getBytes(), SUM.getBytes(), (cellphoneWritable.sum+"").getBytes());
            
            context.write(new ImmutableBytesWritable(key), put);
        }
    }
   
}

class CellphoneWritable implements Writable{

    String phone;
    long sum;
   
    public CellphoneWritable(){
        
    }
   
    public CellphoneWritable(String phone, String sun){
        this.phone = phone;
        this.sum = Long.parseLong(sun);
    }
   
    @Override
    public void readFields(DataInput in) throws IOException {
        phone = in.readUTF();
        sum = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phone);
        out.writeLong(sum);
    }
   
    public String toString(){
        return "---->"+phone+"\t"+sum;
    }
   
}


分享出来,大家可以一起学习。。。

已有(6)人评论

跳转到指定楼层
stark_summer 发表于 2015-2-12 12:14:19
个人感觉写的不错 赞
回复

使用道具 举报

s060403072 发表于 2015-2-11 11:24:45
回复

使用道具 举报

yxyjmu 发表于 2015-2-12 05:27:11
好的抠门石啊,,,,
回复

使用道具 举报

tustyao 发表于 2015-2-12 11:30:25
回复

使用道具 举报

stark_summer 发表于 2015-2-12 11:46:05
回复

使用道具 举报

shuai 发表于 2015-5-7 19:21:16
赞赞赞,回复收藏
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条