分享

hadoop如何读取压缩包内的文件内容

Joker 发表于 2015-1-6 16:58:21 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 7 149438
我想使用Hadoop M/R的DistributedCache类,将一个压缩文件缓存起来。
但是,我想将压缩文件的的文件内容读取,并加载到内存中。

现在做到的只是能够看到解压文件名

附件内容为输出内容

请问下大家有什么方法可以获取到文件内容

a.zip

2.37 KB, 下载次数: 47

已有(7)人评论

跳转到指定楼层
nextuser 发表于 2015-1-6 18:20:28

下面一个读取tar包的内容,不能直接用,但是可以作为参考,楼主可以自己写一个读取zip包的例子
使用distributedCache的步骤:
1、在conf里正确配置被分发的文件的路径(hdfs上的路径)
2、在自定义的mapper或reducer中获取文件下载到本地后的路径(linux文件系统路径);一般是重写configure或者重写setup(新方式)
3、在自定义的mapper或reducer类中读取这些文件的内容

更多内容:

在使用mapreduce时,各个map之间需要共享一些信息。如果信息不大,可以保存在conf中。但是需求是在各个map之间共享文件或者tar包

使用distributedCache可以满足这个需求:
distributedCache可以把HDFS上的文件(数据文件、压缩文件等等)分发到各个执行task的节点。执行map或者reduce task的节点就可以在本地,直接用java的IO接口读取这些文件。
有两个需要注意的地方:被分发的文件需要事先存储在hdfs上;这些文件是只读的。

使用distributedCache的步骤:
1、在conf里正确配置被分发的文件的路径(hdfs上的路径)
2、在自定义的mapper或reducer中获取文件下载到本地后的路径(linux文件系统路径);一般是重写configure或者重写setup(新方式)
3、在自定义的mapper或reducer类中读取这些文件的内容
distributedCache也提供创建符号链接的功能,第2步就不需要获取文件在本地的路径,直接使用约定的符号链接即可。

分发的文件大致分两种类型:文件;压缩包

1、配置被分发的hdfs文件所在路径
可以使用distributedCache类提供的静态接口设置路径 , 也可以使用conf.set配置
示例:
1.jpg
或者
conf.set("mapred.cache.files", "/myapp/file");
conf.set("mapred.cache. archives", "/mayapp/file.zip");

看distributedCache.java代码可知 静态接口就是封装了conf.set的动作。
配置的位置在run函数里即可,比如:
2.jpg

2、在自己的mapper类中,使用distributedCache的接口获取文件下载到本地后的路径
这里查了些网上的使用示例,大部分例子在mapper类中重写configure接口(或者setup),将本地文件的路径保存在mapper类的成员变量中,供下面的map成员函数使用。
在myMapper类的configure中获取文件的路径:
3.jpg

getLocalCacheFiles返回的是数组(元素类型是Path),数组内容是这个task(map或reduce)所属的job设定的所有需要被分发的文件,这些文件被下载到本地节点后的路径。
所以用了localFiles[0]来取得我的文件的路径,因为只设置了一个文件。如果设置了多个文件,可以遍历Path数组,用String.contains("KeyWord")来判断是否是你所需要的文件。
这里我在configure接口中直接把文件内容读取到myMapper类的一个数组成员里,这样在map接口中就不需要再读,但是这样的前提是文件内容比较少,或者针对map程序有更好的数据结构,比如trie树之类的。否则容易OOM。比较原始的办法就是在map接口中读一行做一次判断或操作。

在myMapper类的configure中获取压缩包的路径
4.jpg

因为使用的是mapreduce二代框架,archive文件有多个(框架默认会加几个tar包和一些jar包),所以这里遍历了一下,取出了我需要的压缩包的路径。这个路径是解压好的。需要listFiles一下,获得解压包下面的文件路径。

3、读取文件内容
5.jpg

这里读的是压缩包解压后的所有文件内容
读一行处理一次

完毕。
distributedCache在mapreduce自身用得也不少
比如task运行之前 加载第三方的jar包到classpath 可以使用addFileToClassPath将配置加到conf中 然后使用与读取压缩包类似方式将jar包加入到classpath
再如streaming和pipe
是将脚本分发到task节点本地,然后在java中执行这个本地的脚本来实现的






回复

使用道具 举报

starrycheng 发表于 2015-1-6 17:30:27
并不是是个文件就能读取,它对应的一定的格式,必须安装规则来进行。这里有例子,可以参考:

1.单个文件的压缩或解压


  1. import java.io.FileInputStream;
  2. import java.io.FileOutputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IOUtils;
  10. import org.apache.hadoop.io.compress.CompressionCodec;
  11. import org.apache.hadoop.io.compress.CompressionCodecFactory;
  12. import org.apache.hadoop.io.compress.CompressionOutputStream;
  13. import org.apache.hadoop.io.compress.GzipCodec;
  14. import org.apache.hadoop.util.ReflectionUtils;
  15. public class CompressionTest {
  16. //设在本地目录下有一个名为uploadFile的文件,对本地文件进行gzip压缩
  17.         public static void StreamCompresson() throws IOException
  18.         {
  19.                 Configuration conf = new Configuration();
  20.                 //注意此处得到压缩器的方法,CompressionCodec是一个封装了压缩器的接口.
  21.                 //下面的语句根据第一个参数产生相应的压缩器
  22.                 CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(GzipCodec.class, conf);
  23.                
  24.                 FileOutputStream outFile = new FileOutputStream("uploadFile.gz");        //用于将数据写入该流指向的文件中
  25.                 FileInputStream in = new FileInputStream("uploadFile");        //该文件会被写压缩输出流,即被压缩。
  26.                 //要压缩的话,如下对一个写入输出流的数据进行压缩
  27.                 CompressionOutputStream out = codec.createOutputStream(outFile);
  28.                 IOUtils.copyBytes(in, out, 4096, true);
  29.         }
  30.         
  31.         //
  32.         public static void FileDecompressor() throws IOException
  33.         {
  34.                 Configuration conf = new Configuration();
  35.                 FileSystem local = FileSystem.getLocal(conf);
  36.                 Path input = new Path("uploadFile.gz");
  37.                 //获取所拥有的所有压缩器——工厂
  38.                 CompressionCodecFactory factory = new CompressionCodecFactory(conf);
  39.                 //根据后缀得到相应的压缩器
  40.                 CompressionCodec codec = factory.getCodec(input);
  41.                 //移除文件名的后缀
  42.                 String outputUri =CompressionCodecFactory.removeSuffix("uploadFile.gz", codec.getDefaultExtension());
  43.                 InputStream in = null;
  44.                 OutputStream out = null;
  45.                 //从压缩输入流中读取内容放入文件输出流
  46.          in = codec.createInputStream(local.open(input));
  47.                 out = local.create(new Path(outputUri));
  48.                 IOUtils.copyBytes(in, out, conf, true);
  49.         }
  50.         
  51.         public static void main(String [] args) throws IOException
  52.         {
  53.                 StreamCompresson();
  54.                 FileDecompressor();
  55.         }
  56. }
复制代码

另外,注意gzip 为压缩命令,gunzip为解压命令,具体使用可以用man查看。

对HDFS上的压缩文件(也包括序列文件),要是用hadoop -text来查看内容。

2.MapReduce上的压缩和解压

对于MapReduce 上的输入文件,TextInputFormat会自动识别是否是压缩文件,并进行解压,所以可以与普通文件进行完全相同的 处理。

对与输出文件,如果要进行压缩,需要对run()函数中的配置变量Configuration conf进行如下设置:

conf.setBoolean("mapred.output.compress", true);                  //启用输出文件压缩

conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompreissionCodec.class);              //选择压缩器


回复

使用道具 举报

Joker 发表于 2015-1-6 17:56:52
starrycheng 发表于 2015-1-6 17:30
并不是是个文件就能读取,它对应的一定的格式,必须安装规则来进行。这里有例子,可以参考:

1.单个文件 ...

这个我用过,不是符合我要的需求的。不过还是很感谢
回复

使用道具 举报

starrycheng 发表于 2015-1-6 18:08:53
Joker 发表于 2015-1-6 17:56
这个我用过,不是符合我要的需求的。不过还是很感谢
如果没有符合的,只能自己重写了
回复

使用道具 举报

落魂草 发表于 2015-1-6 23:14:01
回复

使用道具 举报

pengsuyun 发表于 2015-1-7 08:22:54
回复

使用道具 举报

chuyuan_zhou 发表于 2015-1-7 14:49:38
涨姿势了,学习学习!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条