分享

hbase源码系列(十)HLog与日志恢复

本帖最后由 坎蒂丝_Swan 于 2014-12-29 13:01 编辑
问题导读

1.HLogKey的有哪五要素?

2.类似于snapshot,在zk里面建立一个splitWAL节点,在这个节点下面建立任务有什么不一样?




HLog概述
hbase在写入数据之前会先写入MemStore,成功了再写入HLog,当MemStore的数据丢失的时候,还可以用HLog的数据来进行恢复,下面先看看HLog的图。

141911239379304.jpg

旧版的HLog是实际上是一个SequceneFile,0.96的已经使用Protobuf来进行序列化了。从Writer和Reader上来看HLog的都是Entry的,换句话说就是,它的每一条记录就是一个Entry。

  1. class Entry implements Writable {
  2.     private WALEdit edit;
  3.     private HLogKey key;
  4. }
复制代码


所以上面那个图已经不准确了,HLogKey没变,但是Value缺不是KeyValue,而是WALEdit。
下面我们看看HLogKey的五要素,region、tableName、log的顺序、写入时间戳、集群id。

  1. public HLogKey(final byte [] encodedRegionName, final TableName tablename,
  2.       long logSeqNum, final long now, List<UUID> clusterIds){
  3.     init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
  4. }
  5. protected void init(final byte [] encodedRegionName, final TableName tablename,
  6.       long logSeqNum, final long now, List<UUID> clusterIds) {
  7.     this.logSeqNum = logSeqNum;
  8.     this.writeTime = now;
  9.     this.clusterIds = clusterIds;
  10.     this.encodedRegionName = encodedRegionName;
  11.     this.tablename = tablename;
  12. }
复制代码

下面看看WALEdit的属性, 这里只列出来一个重要的,它是内部持有的一群KeyValue。。
  1. public class WALEdit implements Writable, HeapSize {
  2.   ......private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
复制代码


HLog的具体实现类是FSHLog,一个Region Server有两个FSHLog,一个负责RS上面所有的用户region的日志,一个负责RS上面的META表的region的日志。
对于日志来说,我们关心的是它如何保证一致性和准确性,在需要它的时候可以发挥救命作用。

HLog同步
对于meta region的HLog写入之后,它会立即同步到硬盘,非meta表的region,它会先把Entry添加到一个队列里面等待同步。

  1. while(!this.isInterrupted() && !closeLogSyncer.get()) {
  2.           try {
  3.             if (unflushedEntries.get() <= syncedTillHere) {
  4.               synchronized (closeLogSyncer) {
  5.                 closeLogSyncer.wait(this.optionalFlushInterval);
  6.               }
  7.             }// 同步已经添加的entry
  8.             sync();
  9.           } catch (IOException e) {
  10.             LOG.error("Error while syncing, requesting close of hlog ", e);
  11.             requestLogRoll();
  12.             Threads.sleep(this.optionalFlushInterval);
  13.           }
  14. }
复制代码


它这里是有一个判断条件的,如果判断条件不成立就立即同步,等待this.optionalFlushInterval时间,默认的同步间隔是1000,它是通过参数hbase.regionserver.optionallogflushinterval设置。unflushedEntries是一个AtomicLong在写入entry的时候递增,syncedTillHere是一个volatile long,同步完成之后也是变大,因为可能被多个线程调用同步操作,所以它是volatile的,从条件上来看,如果没有日志需要同步就等待一秒再进行判断,如果有日志需要同步,也是立马就写入硬盘的,如果发生错误,就是调用requestLogRoll方法,进行回滚,这个回滚比较有意思,它是跑过去flush掉MemStore中的数据,把他们写入硬盘。

下面是回滚的方法。中间我忽略了几步,然后找到LogRoller中的这段代码。

  1. byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
  2.         if (regionsToFlush != null) {
  3.           for (byte [] r: regionsToFlush) scheduleFlush(r);
  4. }
复制代码

找出来需要flush的region,然后计划flush。
  1. regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
  2.           this.oldestUnflushedSeqNums);
  3. static byte[][] findMemstoresWithEditsEqualOrOlderThan(
  4.       final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
  5.     List<byte[]> regions = null;
  6.     for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
  7.       //逐个对比,找出小于已输出为文件的最小的seq id的region
  8.       if (e.getValue().longValue() <= walSeqNum) {
  9.         if (regions == null) regions = new ArrayList<byte[]>();
  10.         regions.add(e.getKey());
  11.       }
  12.     }
  13.     return regions == null ? null : regions
  14.         .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
  15. }
复制代码


逐个对比,找出来未flush MemStore的比输出的文件的HLog流水号还小的region,当它准备flush MemStore之前会调用startCacheFlush方法来把region从oldestUnflushedSeqNums这个map当中去除,添加到已经flush的map当中。

从日志恢复
看过《HMaster启动过程》的童鞋都知道,如果之前有region失败的话,在启动之前会把之前的HLog进行split,把属于该region的为flush过的日志提取出来,然后生成一个新的HLog到recovered.edits目录下,中间的过程控制那块有点儿类似于snapshot的那种,在zk里面建立一个splitWAL节点,在这个节点下面建立任务,不一样的是,snapshot那块是自己处理自己的,这里是别人的闲事它也管,处理完了之后就更新这个任务的状态了,没有snapshot那么复杂的交互过程。

那啥时候会用到这个呢,在region打开的时候,我们从HRegionServer的openRegion方法一路跟踪,中间历经OpenMetaHandler,再到HRegion.openHRegion方法,终于在initializeRegionStores方法里面找到了那么一句话。

  1. // 如果recovered.edits有日志的话,就恢复日志
  2.     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
  3.         this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
复制代码

高潮来了!!!
  1. HLog.Reader reader = null;
  2.     try {
  3.       //创建reader读取hlog
  4.       reader = HLogFactory.createReader(fs, edits, conf);
  5.       long currentEditSeqId = -1;
  6.       long firstSeqIdInLog = -1;
  7.       long skippedEdits = 0;
  8.       long editsCount = 0;
  9.       long intervalEdits = 0;
  10.       HLog.Entry entry;
  11.       Store store = null;
  12.       boolean reported_once = false;
  13.       try {//逐个读取
  14.         while ((entry = reader.next()) != null) {
  15.           HLogKey key = entry.getKey();
  16.           WALEdit val = entry.getEdit();
  17.           //实例化firstSeqIdInLog
  18.           if (firstSeqIdInLog == -1) {
  19.             firstSeqIdInLog = key.getLogSeqNum();
  20.           }
  21.           boolean flush = false;
  22.           for (KeyValue kv: val.getKeyValues()) {
  23.             // 从WALEdits里面取出kvs
  24.             if (kv.matchingFamily(WALEdit.METAFAMILY) ||
  25.                 !Bytes.equals(key.getEncodedRegionName(),
  26.                   this.getRegionInfo().getEncodedNameAsBytes())) {//是meta表的kv就有compaction
  27.               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
  28.               if (compaction != null) {
  29.                 //完成compaction未完成的事情,校验输入输出文件,完成文件替换等操作
  30.                 completeCompactionMarker(compaction);
  31.               }
  32.               skippedEdits++;
  33.               continue;
  34.             }
  35.             // 获得kv对应的store
  36.             if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
  37.               store = this.stores.get(kv.getFamily());
  38.             }
  39.             if (store == null) {
  40.               // 应该不会发生,缺少它对应的列族
  41.               skippedEdits++;
  42.               continue;
  43.             }
  44.             // seq id小,呵呵,说明已经被处理过了这个日志
  45.             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily().getName())) {
  46.               skippedEdits++;
  47.               continue;
  48.             }
  49.             currentEditSeqId = key.getLogSeqNum();
  50.             // 这个就是我们要处理的日志,添加到MemStore里面就ok了
  51.             flush = restoreEdit(store, kv);
  52.             editsCount++;
  53.           }
  54.           //MemStore太大了,需要flush掉
  55.           if (flush) internalFlushcache(null, currentEditSeqId, status);
  56.          }
  57.       } catch (IOException ioe) {
  58.         // 就是把名字改了,然后在后面加上".时间戳",这个有毛意思?
  59.         if (ioe.getCause() instanceof ParseException) {
  60.           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
  61.           msg = "File corruption encountered!  " +
  62.               "Continuing, but renaming " + edits + " as " + p;
  63.         } else {// 不知道是啥错误,抛错误吧,处理不了
  64.           throw ioe;
  65.         }
  66.       }
  67.       status.markComplete(msg);
  68.       return currentEditSeqId;
  69.     } finally {
  70.       status.cleanup();
  71.       if (reader != null) {
  72.          reader.close();
  73.       }
  74.     }
复制代码


呵呵,读取recovered.edits下面的日志,符合条件的就加到MemStore里面去,完成之后,就把这些文件删掉。大家也看到了,这里通篇讲到一个logSeqNum,哪里都有它的身影,它实际上是FSHLog当中的一个递增的AtomicLong,每当往FSLog里面写入一条日志的时候,它都会加一,然后MemStore请求flush的时候,会调用FSLog的startCacheFlush方法,获取(logSeqNum+1)回来,然后写入到StoreFile的sequenceid字段,再次拿出来的时候,就遍历这个HStore下面的StoreFile的logSeqNum,取出来最大的跟它比较,小于它的都已经写过了,没必要再写了。
好了,HLog结束了,累死我了,要睡了。




hbase源码系列(二)HTable 探秘hbase源码系列(三)Client如何找到正确的Region Server
hbase源码系列(四)数据模型-表定义和列族定义的具体含义
hbase源码系列(五)Trie单词查找树
hbase源码系列(六)HMaster启动过程
hbase源码系列(七)Snapshot的过程
hbase源码系列(八)从Snapshot恢复表
hbase源码系列(九)StoreFile存储格式
hbase源码系列(十)HLog与日志恢复
hbase源码系列(十一)Put、Delete在服务端是如何处理?
hbase源码系列(十二)Get、Scan在服务端是如何处理?
hbase源码系列(十三)缓存机制MemStore与Block Cache
hbase源码之如何查询出来下一个KeyValue
hbase源码Compact和Split


欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(1)人评论

跳转到指定楼层
355815741 发表于 2014-12-28 14:22:43
学习了,谢谢分享~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条