分享

Hadoop之SequenceFile .

问题导读
1、SequenceFile的优缺点是什么?
2、SequenceFile的压缩基于CompressType是如何实现读写文件?
3、SequenceFile文件的数据组成形式是什么?用代码实现关键字段。





Hadoop序列化文件SequenceFile可以用于解决大量小文件(所谓小文件:泛指小于black大小的文件)问题,SequenceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key,value>对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。
hadoop Archive也是一个高效地将小文件放入HDFS块中的文件存档文件格式,详情请看:hadoop Archive
但是SequenceFile文件不能追加写入,适用于一次性写入大量小文件的操作。

SequenceFile的压缩基于CompressType,请看源码:
01. /**
02.   * The compression type used to compress key/value pairs in the
03.   * {@link SequenceFile}.
04.   * @see SequenceFile.Writer
05.   */  
06.public static enum CompressionType {  
07.    /** Do not compress records. */  
08.    NONE, //不压缩   
09.    /** Compress values only, each separately. */  
10.    RECORD,  //只压缩values   
11.    /** Compress sequences of records together in blocks. */  
12.    BLOCK //压缩很多记录的key/value组成块   
13.}  

SequenceFile读写示例:
01.import java.io.IOException;  
02.  
03.import org.apache.hadoop.conf.Configuration;  
04.import org.apache.hadoop.fs.Path;  
05.import org.apache.hadoop.io.IOUtils;  
06.import org.apache.hadoop.io.IntWritable;  
07.import org.apache.hadoop.io.SequenceFile;  
08.import org.apache.hadoop.io.SequenceFile.CompressionType;  
09.import org.apache.hadoop.io.SequenceFile.Reader;  
10.import org.apache.hadoop.io.SequenceFile.Writer;  
11.import org.apache.hadoop.io.Text;  
12.  
13./**
14. * @version 1.0
15. * @author Fish
16. */  
17.public class SequenceFileWriteDemo {  
18.    private static final String[] DATA = { "fish1", "fish2", "fish3", "fish4" };  
19.  
20.    public static void main(String[] args) throws IOException {  
21.        /**
22.         * 写SequenceFile
23.         */  
24.        String uri = "/test/fish/seq.txt";  
25.        Configuration conf = new Configuration();  
26.        Path path = new Path(uri);  
27.        IntWritable key = new IntWritable();  
28.        Text value = new Text();  
29.        Writer writer = null;  
30.        try {  
31.            /**
32.             * CompressionType.NONE 不压缩<br>
33.             * CompressionType.RECORD 只压缩value<br>
34.             * CompressionType.BLOCK 压缩很多记录的key/value组成块
35.             */  
36.            writer = SequenceFile.createWriter(conf, Writer.file(path), Writer.keyClass(key.getClass()),  
37.                    Writer.valueClass(value.getClass()), Writer.compression(CompressionType.BLOCK));  
38.  
39.            for (int i = 0; i < 4; i++) {  
40.                value.set(DATA);  
41.                key.set(i);  
42.                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);  
43.                writer.append(key, value);  
44.  
45.            }  
46.        } finally {  
47.            IOUtils.closeStream(writer);  
48.        }  
49.  
50.        /**
51.         * 读SequenceFile
52.         */  
53.        SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path));  
54.        IntWritable key1 = new IntWritable();  
55.        Text value1 = new Text();  
56.        while (reader.next(key1, value1)) {  
57.            System.out.println(key1 + "----" + value1);  
58.        }  
59.        IOUtils.closeStream(reader);// 关闭read流   
60.         
61.        /**
62.         * 用于排序
63.         */  
64.//      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, comparator, IntWritable.class, Text.class, conf);   
65.    }  
66.}  

以上程序执行多次,并不会出现数据append的情况,每次都是重新创建一个文件,且文件中仅仅只有四条数据。究其原因,可以查看SequenceFile.Writer类的构造方法源码:
01.out = fs.create(p, true, bufferSize, replication, blockSize, progress);  
第二个参数为true,表示每次覆盖同名文件,如果为false会抛出异常。这样设计的目的可能是和HDFS一次写入多次读取有关,不提倡追加现有文件,所以构造方法写死了true。

SequenceFile文件的数据组成形式:
1..png

一、Header
写入头部的源码:
01./** Write and flush the file header. */  
02.private void writeFileHeader()   
03.  throws IOException {  
04.  out.write(VERSION);//版本号   
05.  Text.writeString(out, keyClass.getName());//key的Class   
06.  Text.writeString(out, valClass.getName());//val的Class   
07.  
08.  out.writeBoolean(this.isCompressed());//是否压缩   
09.  out.writeBoolean(this.isBlockCompressed());//是否是CompressionType.BLOCK类型的压缩   
10.   
11.  if (this.isCompressed()) {  
12.    Text.writeString(out, (codec.getClass()).getName());//压缩类的名称   
13.  }  
14.  this.metadata.write(out);//写入metadata   
15.  out.write(sync);                       // write the sync bytes   
16.  out.flush();                           // flush header   
17.}  

版本号:
01.private static byte[] VERSION = new byte[] {  
02.  (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA  
03.};

同步标识符的生成方式:
01.byte[] sync;                          // 16 random bytes   
02.{  
03.  try {                                         
04.    MessageDigest digester = MessageDigest.getInstance("MD5");  
05.    long time = Time.now();  
06.    digester.update((new UID()+"@"+time).getBytes());  
07.    sync = digester.digest();  
08.  } catch (Exception e) {  
09.    throw new RuntimeException(e);  
10.  }  
11.}

二、Record
2.png
Writer有三个实现类,分别对应CompressType的NONE,RECOR,BLOCK。下面逐一介绍一下(结合上面的图看):
1、NONE SequenceFile
Record直接存Record 的长度,KEY的长度,key值,Value的值
2、 BlockCompressWriter
01./** Append a key/value pair. */  
02.    @Override  
03.    @SuppressWarnings("unchecked")  
04.    public synchronized void append(Object key, Object val)  
05.      throws IOException {  
06.      if (key.getClass() != keyClass)  
07.        throw new IOException("wrong key class: "+key+" is not "+keyClass);  
08.      if (val.getClass() != valClass)  
09.        throw new IOException("wrong value class: "+val+" is not "+valClass);  
10.  
11.      // Save key/value into respective buffers   
12.      int oldKeyLength = keyBuffer.getLength();  
13.      keySerializer.serialize(key);  
14.      int keyLength = keyBuffer.getLength() - oldKeyLength;  
15.      if (keyLength < 0)  
16.        throw new IOException("negative length keys not allowed: " + key);  
17.      WritableUtils.writeVInt(keyLenBuffer, keyLength);//每调一次,都会累加keyLength   
18.  
19.      int oldValLength = valBuffer.getLength();  
20.      uncompressedValSerializer.serialize(val);  
21.      int valLength = valBuffer.getLength() - oldValLength;  
22.      WritableUtils.writeVInt(valLenBuffer, valLength);//每调一次,都会累加valLength         
23.      // Added another key/value pair   
24.      ++noBufferedRecords;  
25.        
26.      // Compress and flush?   
27.      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();  
28.      if (currentBlockSize >= compressionBlockSize) {  
29.      //compressionBlockSize =  conf.getInt("io.seqfile.compress.blocksize", 1000000);   
30.      //超过1000000就会写一个Sync   
31.        sync();  
32.      }  

超过compressionBlockSize的大小,就会调用sync()方法,下面看看sync的源码(和上面的图对照):
会写入和图中所画的各个数据项。
01./** Compress and flush contents to dfs */  
02.    @Override  
03.    public synchronized void sync() throws IOException {  
04.      if (noBufferedRecords > 0) {  
05.        super.sync();  
06.         
07.        // No. of records   
08.        WritableUtils.writeVInt(out, noBufferedRecords);  
09.         
10.        // Write 'keys' and lengths   
11.        writeBuffer(keyLenBuffer);  
12.        writeBuffer(keyBuffer);  
13.         
14.        // Write 'values' and lengths   
15.        writeBuffer(valLenBuffer);  
16.        writeBuffer(valBuffer);  
17.         
18.        // Flush the file-stream   
19.        out.flush();  
20.         
21.        // Reset internal states   
22.        keyLenBuffer.reset();  
23.        keyBuffer.reset();  
24.        valLenBuffer.reset();  
25.        valBuffer.reset();  
26.        noBufferedRecords = 0;  
27.      }  
28.        
29.    }

RecordCompressWriter
01./** Append a key/value pair. */  
02.    @Override  
03.    @SuppressWarnings("unchecked")  
04.    public synchronized void append(Object key, Object val)  
05.      throws IOException {  
06.      if (key.getClass() != keyClass)  
07.        throw new IOException("wrong key class: "+key.getClass().getName()  
08.                              +" is not "+keyClass);  
09.      if (val.getClass() != valClass)  
10.        throw new IOException("wrong value class: "+val.getClass().getName()  
11.                              +" is not "+valClass);  
12.  
13.      buffer.reset();  
14.  
15.      // Append the 'key'   
16.      keySerializer.serialize(key);  
17.      int keyLength = buffer.getLength();  
18.      if (keyLength < 0)  
19.        throw new IOException("negative length keys not allowed: " + key);  
20.  
21.      // Compress 'value' and append it   
22.      deflateFilter.resetState();  
23.      compressedValSerializer.serialize(val);  
24.      deflateOut.flush();  
25.      deflateFilter.finish();  
26.  
27.      // Write the record out   
28.      checkAndWriteSync();                                // sync   
29.      out.writeInt(buffer.getLength());                   // total record length record的长度   
30.      out.writeInt(keyLength);                            // key portion length key的长度   
31.      out.write(buffer.getData(), 0, buffer.getLength()); // data 数据   
32.    }  

写入Sync:
01.synchronized void checkAndWriteSync() throws IOException {  
02.      if (sync != null &&  
03.          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync   
04.        sync();  
05.      }  
06.    }

SYNC_INTERVAL的定义:
01.private static final int SYNC_ESCAPE = -1;      // "length" of sync entries   
02.private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash   
03.private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash   
04.  
05./** The number of bytes between sync points.*/  
06.public static final int SYNC_INTERVAL = 100*SYNC_SIZE;   
每2000个byte,就会写一个Sync。

总结:
Record:存储SequenceFile通用的KV数据格式,Key和Value都是二进制变长的数据。Record表示Key和Value的byte的总和。
Sync:主要是用来扫描和恢复数据的,以至于读取数据的Reader不会迷失。
Header:存储了如下信息:文件标识符SEQ,key和value的格式说明,以及压缩的相关信息,metadata等信息。
metadata:包含文件头所需要的数据:文件标识、Sync标识、数据格式说明(含压缩)、文件元数据(时间、owner、权限等)、检验信息等。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条