分享

求一个能用的比wordCount复杂一些的MapReduce的源码

christie1988 发表于 2013-10-16 13:39:59 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 12 10498
急求!最好是有一点现实意义的,java编写的
              
               
               

已有(12)人评论

跳转到指定楼层
u010485870 发表于 2013-10-16 13:40:30

            同样求解11求解
        
回复

使用道具 举报

virgo777 发表于 2013-10-16 13:41:01

            你做个选题吧,我可以帮你试下,
反正最近也是闲着,就当赞RP了。
        
回复

使用道具 举报

christie1988 发表于 2013-10-16 13:41:40

            引用 2 楼 virgo777 的回复:你做个选题吧,我可以帮你试下,
反正最近也是闲着,就当赞RP了。

任何方向任何应用都可以,比如应用于网上书店,大量图片的转换,相似图片的搜索,中国移动的数据分析。。能用的源码都行
        
回复

使用道具 举报

virgo777 发表于 2013-10-16 13:42:19

            Hadoop权威指南中带了一个对天气的分析。
实际上就是一个字符串的处理。
你去看看吧。
没什么新意的。
然后把其中的算法替换下就可以了。
        
回复

使用道具 举报

tntzbzc 发表于 2013-10-16 13:42:55

            引用 3 楼 christie1988 的回复:Quote: 引用 2 楼 virgo777 的回复:
你做个选题吧,我可以帮你试下,
反正最近也是闲着,就当赞RP了。

任何方向任何应用都可以,比如应用于网上书店,大量图片的转换,相似图片的搜索,中国移动的数据分析。。能用的源码都行

厄,你说的这些我都没有,其他的可以挖?用户在线时常、当前在线用户、KD换算,需要的话留个邮箱我发你
        
回复

使用道具 举报

virgo777 发表于 2013-10-16 13:43:40

            virgo_wang@qq.com
能给我一份吗?
谢谢
        
回复

使用道具 举报

tntzbzc 发表于 2013-10-16 13:44:24

            引用 6 楼 virgo777 的回复:virgo_wang@qq.com
能给我一份吗?
谢谢

贴一个以前写的测试代码,
根据用户登陆登出计算用户在线时长的方法。

package ps2package;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ps2onlinetime
{
        public static class ps2onlinetimeCombiner extends Reducer
        {
                Text outputkey = new Text();
                Text outputvalue = new Text();
                public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
                        String v = "";
                        for (Text val : values) {
                                if (val.toString().length() == 23)
                                        if (v.equals(""))
                                        v += val.toString();
                                        else
                                        v += "\t" + val.toString();
                        }
                        String[] v1 = v.split("\t");
                        Arrays.sort(v1);
                        v = "";
                        loginTime beginT = new loginTime();
                        loginTime endT = new loginTime();
                        String fbstr = "", ebstr = "";
                        int onlineT = 0, v1type = 0;
                        for (int i = 0; i = 102) {
                                        endT = new loginTime(v1);
                                        if (fbstr.equals("")) {
                                                beginT = endT;
                                                fbstr = beginT.getvStr();
                                        }
                                        if (beginT.getTimeType() == 102) {
                                                try {
                                                        onlineT += loginTime.compTime(beginT.getTime(), endT.getTime());
                                                }
                                                catch (Exception ex) {
                                                        ;
                                                }
                                        }
                                        ebstr = endT.getvStr();
                                        beginT = endT;
                                }
                        }
                        if (fbstr.length() == 23 && ebstr.length() == 23 && onlineT >= 0) {
                                v = fbstr + ";" + String.valueOf(onlineT) + ";" + ebstr;
                                outputvalue.set(v);
                                context.write(key, outputvalue);
                        }
                }
        }
        public static class ps2onlinetimeReducer extends Reducer
        {
                Text outputvalue = new Text();
                public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
                        int j = 0;
                        String v = "";
                        for (Text val : values) {
                                if (v.equals("")) v += val.toString();
                                else v += "\t" + val.toString();
                        }
                        String[] v1 = v.split("\t");
                        Arrays.sort(v1);
                        v = "";
                        loginTime beginT = new loginTime();
                        loginTime endT = new loginTime();
                        String fbstr = "", ebstr = "";
                        int onlineT = 0;
                        for (int i = 0; i = 0) {
                                v = fbstr + "\t" + ebstr + "\t" + String.valueOf(onlineT);
                                outputvalue.set(v);
                                context.write(key, outputvalue);
                        }
                }
        }
        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                conf.set("mapred.job.tracker", "master.nj.hadoop:9001");
                conf.set("mapred.job.reuse.jvm.num.tasks", "-1");
                conf.set("mapred.child.java.opts", "-xmx512m");
                String[] dfsPath = new String[2];
                String _LOGTIME = loginTime.getStringDate();
                String __LOGTIME = loginTime.getStringDate(true);
                dfsPath[0] = "/ps2tt1/loginlog/CharacterLogins_20G_1.log";
                // CharacterLogins_all.log
                // CharacterLogins20121027.log
                // CharacterLogins_20G.log
                // CharacterLogins_20G_1.log
                dfsPath[1] = "/ps2tt1/outdata/";
                String outpath = dfsPath[1];
                dfsPath[1] = "/ps2tt1/outdata/" + "onlinetime_" + _LOGTIME;
                MyHDFS myfs = new MyHDFS(conf, dfsPath);
                myfs.setRedCount(1);
                myfs.setInputSize(myfs.getMyBlockSize());
                System.out.println("block size:" + myfs.getMyBlockSize());
                System.out.println("input size:" + myfs.getMyInputSize());
                Job job = new Job(myfs.getMyConf(), "OnlineTime");
                System.out.println("OnlineTime 1.40.15");
                System.out.println(outpath);
                FileSystem hdfs = FileSystem.get(myfs.getMyConf());
                Path dst = myfs.getMyInPath();
                job.setJarByClass(ps2onlinetime.class);
                job.setMapperClass(ps2ccu.ps2loginMaper.class);
                job.setCombinerClass(ps2onlinetimeCombiner.class);
                job.setReducerClass(ps2onlinetimeReducer.class);
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setNumReduceTasks(myfs.getMyRedCount());
                FileInputFormat.addInputPath(job, new Path(myfs.getMyInPath().toString()));
                FileInputFormat.setMinInputSplitSize(job, myfs.getMyInputSize() * 1024 * 1024);
                FileInputFormat.setMaxInputSplitSize(job, myfs.getMyInputSize() * 1024 * 1024);
                FileOutputFormat.setOutputPath(job, myfs.getMyOutPath());
                System.out.println("Input Split:" + myfs.getMyInputSize());
                System.out.println("Reduce Count:" + myfs.getMyRedCount());
                boolean jobrun = false;
                jobrun = job.waitForCompletion(true);
                if (jobrun) {
                        _LOGTIME = loginTime.getStringDate();
                        __LOGTIME = loginTime.getStringDate(true);
                        dst = new Path(myfs.getMyOutPath().toString());
                        FileStatus outfiles[] = hdfs.listStatus(dst);
                        for (FileStatus file : outfiles) {
                                if (file.isDir() == false && file.getPath().toString().indexOf("part-r-") >= 0) {
                                        String mvinpath = file.getPath().toString();
                                        String mvoutpath = outpath + "/onlinetime/onlinetime_" + _LOGTIME;
                                        new MoveFile(mvinpath, mvoutpath, conf);
                                        new DeleteFile(myfs.getMyOutPath().toString(), conf);
                                }
                        }
                }
        }
}
        
回复

使用道具 举报

u010454803 发表于 2013-10-16 13:45:10

            引用 5 楼 tntzbzc 的回复:Quote: 引用 3 楼 christie1988 的回复:
Quote: 引用 2 楼 virgo777 的回复:
你做个选题吧,我可以帮你试下,
反正最近也是闲着,就当赞RP了。

任何方向任何应用都可以,比如应用于网上书店,大量图片的转换,相似图片的搜索,中国移动的数据分析。。能用的源码都行

厄,你说的这些我都没有,其他的可以挖?用户在线时常、当前在线用户、KD换算,需要的话留个邮箱我发你

可以给我发一份瞅瞅,研究下么?最近正在看这方面东西,有个例子看起来能更深入些。谢啦~
zhangxc2046@gmail.com
        
回复

使用道具 举报

tntzbzc 发表于 2013-10-16 13:45:44

            引用 6 楼 virgo777 的回复:virgo_wang@qq.com
能给我一份吗?
谢谢

引用 8 楼 u010454803 的回复:可以给我发一份瞅瞅,研究下么?最近正在看这方面东西,有个例子看起来能更深入些。谢啦~
zhangxc2046@gmail.com

测试源码已发送,4个测试MR和几个调用类方法
有问题可以在论坛分享讨论,有需要源码网友请留下邮箱地址
        
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条