||
合并小文件,存放到HDFS上。例如,当需要分析来自服务器的Apache日志时,各个日志文件可能比较小,然而Hadoop更适合处理大文件,效率会更高,此时就需要合并分散的文件。如果先将所有的文件合并,再复制上传到HDFS上的话,需要占用本地计算机的大量磁盘空间。采用在向HDFS复制上传文件上的过程中将小文件进行合并,效果会更好。
开发一个PutMerge程序,用于将合并文件后放入HDFS。
用于将一组HDFS文件在复制到本地计算机一起进行合并。
实现思路:
文件的上传下载,就是字节字符流的读写操作。
读文件:输入流 ------> read
写文件:输出流 ------>write
代码实现:
package org.ch.hadoop.hdfs;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
/*
* 功能:在向HDFS上传复制文件的过程中,进行合并文件
*
* */
public class PutMerge {
/*
* 复制上传文件,并将文件合并
* @param localDir
* 本地要上传的文件目录
* @param hdfsFile
* HDFS上的文件名称,包括路径
* */
public static void put(String localDir,String hdfsFile){
//1)获取配置信息
Configuration conf = new Configuration();
//本地路径
Path localpath = new Path(localDir);
//hdfs 路径
Path hdfspath = new Path(hdfsFile);
try{
//获取本地文件系统
FileSystem localFs = FileSystem.getLocal(conf);
//获取HDFS文件系统
FileSystem hdfs = FileSystem.get(conf);
//本地文件系统中指定目录的所有文件
FileStatus[] status = localFs.listStatus(localpath);
//打开hdfs上文件的输出流
FSDataOutputStream fsDataOutputStream = hdfs.create(hdfspath);
//循环遍历本地文件
for(FileStatus fileStatus : status){
Path path = fileStatus.getPath();
System.out.println("文件为:" + path.getName());
//打开文件输入流
FSDataInputStream fsDataInputStream = localFs.open(path);
//进行流的读写操作
byte[] buffer = new byte[1024];
int len = 0;
while((len = fsDataInputStream.read(buffer)) > 0){
fsDataOutputStream.write(buffer,0 , len);
}
fsDataInputStream.close();
}
fsDataOutputStream.close();
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
String localDir = "/home/ch-hduser/myfile/log";
String hdfsFile = "hdfs://localhost:9000/test/logs.data";
put(localDir, hdfsFile);
}
}