由于后面的工作,鄙人将Vector.txt放入分布式缓存,然后传入Matrix.txt与之相乘,矩阵向量乘法的程序及运行中出现问题如下,这是本人结合网上例子编写的第一个程序,错误多多,跪请各位师长们耐心解惑,感激不尽!一、Map过程
package 。。(略)
import java。。(略)
public class MatrixMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
@SuppressWarnings("unused")
private Text lineNumber = new Text();//矩阵行序号;
char [] sz = null;
private static int i = 0;
StringBuilder splited = null;
@SuppressWarnings("deprecation")
@Override
public void setup (Context context) throws IOException,NullPointerException{
BufferedReader br=null; // 读取文件流
String line;
/**
*在setup()中对缓存文件进行读取
*然后,在map()方法中使用处理后的缓存文件
*/
//path是Linux文件系统中的路径
Path[] paths = context.getLocalCacheFiles();
for(Path path : paths){
if(path.toString().indexOf("Vector.txt") >= 0){ //如果是 address文件
FileReader in = new FileReader(path.toString());
br = new BufferedReader(in);
while((line=br.readLine()) != null){ //读取文件中的每一行
splited.append(line);
}
}
//先将读取向量的内容转化为字符串,再将其字符串转换为字符数组
String ss = splited.toString();
sz = ss.toCharArray();
}
}
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(values.toString()) ;
int j = 0;//向量序号;
lineNumber.set(i+" ");
while (itr.hasMoreTokens()){
Double sz1=Double.parseDouble(String.valueOf(sz[j]));//将字符数组转换为数字;
Double mresult=sz1*Double.parseDouble(itr.nextToken());//矩阵元素与向量对应元素相乘,并将结果存入mresult;
DoubleWritable one = new DoubleWritable(mresult);
context.write(lineNumber, one);
j++;
}
i++;
}
}
二、Reduce过程
package (略);
import 。。。(略)
public class MatrixReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable rresult = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
Double sum = 0.0;
for (DoubleWritable val : values) {
//合并结果
sum += val.get();
}
rresult.set(sum);
context.write(key, rresult);
}
}
三、主函数
package 。。。(略)
import。。。(略)
public class runMatrixjob {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
/*if (otherArgs.length<4){
System.err.println("arguments error!");
System.exit(2);
}*/
Job job = new Job();
job.setJobName("MDVM");
DistributedCache.addCacheFile(new URI("hdfs://namenode/test/input/Vector.txt#myfile"), conf);
job.setJarByClass(runMatrixjob.class);
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
|