分享

HBase Block Cache实现机制分析

pig2 2014-4-13 10:50:12 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 8099
输出此信息
  1. Block cache LRU eviction started; Attempting to free 24.74 MB of total=209.89 MB
复制代码

表示hbase正在做什么?

-------------------------------------------------------------------------------------------------------------------------------------------------
本文结合HBase 0.94.1版本源码,对HBase的Block Cache实现机制进行分析,总结学习其Cache设计的核心思想。

1. 概述

   HBase上Regionserver的内存分为两个部分,一部分作为Memstore,主要用来写;另外一部分作为BlockCache,主要用于读。

写请求会先写入Memstore,Regionserver会给每个region提供一个Memstore,当Memstore满64MB以后,会启动 flush刷新到磁盘。当Memstore的总大小超过限制时(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),会强行启动flush进程,从最大的Memstore开始flush直到低于限制。

读请求先到Memstore中查数据,查不到就到BlockCache中查,再查不到就会到磁盘上读,并把读的结果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache达到上限(heapsize * hfile.block.cache.size * 0.85)后,会启动淘汰机制,淘汰掉最老的一批数据。

   一个Regionserver上有一个BlockCache和N个Memstore,它们的大小之和不能大于等于heapsize * 0.8,否则HBase不能正常启动。

   默认配置下,BlockCache为0.2,而Memstore为0.4。在注重读响应时间的应用场景下,可以将 BlockCache设置大些,Memstore设置小些,以加大缓存的命中率。

HBase RegionServer包含三个级别的Block优先级队列:

Single:如果一个Block第一次被访问,则放在这一优先级队列中;

Multi:如果一个Block被多次访问,则从Single队列移到Multi队列中;

InMemory:如果一个Block是inMemory的,则放到这个队列中。

以上将Cache分级思想的好处在于:

首先,通过inMemory类型Cache,可以有选择地将in-memory的column families放到RegionServer内存中,例如Meta元数据信息;

通过区分Single和Multi类型Cache,可以防止由于Scan操作带来的Cache频繁颠簸,将最少使用的Block加入到淘汰算法中。

默认配置下,对于整个BlockCache的内存,又按照以下百分比分配给Single、Multi、InMemory使用:0.25、0.50和0.25。

注意,其中InMemory队列用于保存HBase Meta表元数据信息,因此如果将数据量很大的用户表设置为InMemory的话,可能会导致Meta表缓存失效,进而对整个集群的性能产生影响。

2. 源码分析

下面是对HBase 0.94.1中相关源码(org.apache.hadoop.hbase.io.hfile.LruBlockCache)的分析过程。

2.1加入Block Cache
  1. /** Concurrent map (the cache) */
  2. private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;
  3. /**
  4.   * Cache the block with the specified name and buffer.
  5.   * <p>
  6.   * It is assumed this will NEVER be called on an already cached block.  If
  7.   * that is done, an exception will be thrown.
  8.   * @param cacheKey block’s cache key
  9.   * @param buf block buffer
  10.   * @param inMemory if block is in-memory
  11.   */
  12. public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
  13.    CachedBlock cb = map.get(cacheKey);
  14.    if(cb != null) {
  15.      throw new RuntimeException(“Cached an already cached block”);
  16.    }
  17.    cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
  18.    long newSize = updateSizeMetrics(cb, false);
  19.    map.put(cacheKey, cb);
  20.    elements.incrementAndGet();
  21.    if(newSize > acceptableSize() && !evictionInProgress) {
  22.      runEviction();
  23.    }
  24. }
  25. /**
  26.   * Cache the block with the specified name and buffer.
  27.   * <p>
  28.   * It is assumed this will NEVER be called on an already cached block.  If
  29.   * that is done, it is assumed that you are reinserting the same exact
  30.   * block due to a race condition and will update the buffer but not modify
  31.   * the size of the cache.
  32.   * @param cacheKey block’s cache key
  33.   * @param buf block buffer
  34.   */
  35. public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
  36.    cacheBlock(cacheKey, buf, false);
  37. }
复制代码


1)  这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;

2)  根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建,见CachedBlock的access方法代码:
  1. /**
  2.   * Block has been accessed.  Update its local access time.
  3.   */
  4. public void access(long accessTime) {
  5.    this.accessTime = accessTime;
  6.    if(this.priority == BlockPriority.SINGLE) {
  7.      this.priority = BlockPriority.MULTI;
  8.    }
  9. }
复制代码

3)  将BlockCacheKey和创建的CachedBlock对象加入到全局的ConcurrentHashMap map中,同时做一些更新计数操作;

4)  最后判断如果加入后的Block Size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰过程:
  1. /** Eviction thread */
  2. private final EvictionThread evictionThread;
  3. /**
  4.   * Multi-threaded call to run the eviction process.
  5.   */
  6. private void runEviction() {
  7.    if(evictionThread == null) {
  8.      evict();
  9.    } else {
  10.      evictionThread.evict();
  11.    }
  12. }
复制代码


其中,EvictionThread线程即是LRU淘汰的具体实现线程。下面将给出详细分析。

2.2淘汰Block Cache

EvictionThread线程主要用于与主线程的同步,从而完成Block Cache的LRU淘汰过程。
  1. /*
  2.   * Eviction thread.  Sits in waiting state until an eviction is triggered
  3.   * when the cache size grows above the acceptable level.<p>
  4.   *
  5.   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
  6.   */
  7. private static class EvictionThread extends HasThread {
  8.    private WeakReference<LruBlockCache> cache;
  9.    private boolean go = true;
  10.    public EvictionThread(LruBlockCache cache) {
  11.      super(Thread.currentThread().getName() + “.LruBlockCache.EvictionThread”);
  12.      setDaemon(true);
  13.      this.cache = new WeakReference<LruBlockCache>(cache);
  14.    }
  15.    @Override
  16.    public void run() {
  17.      while (this.go) {
  18.        synchronized(this) {
  19.          try {
  20.            this.wait();
  21.          } catch(InterruptedException e) {}
  22.        }
  23.        LruBlockCache cache = this.cache.get();
  24.        if(cache == null) break;
  25.        cache.evict();
  26.      }
  27.    }
  28.    public void evict() {
  29.      synchronized(this) {
  30.        this.notify(); // FindBugs NN_NAKED_NOTIFY
  31.      }
  32.    }
  33.    void shutdown() {
  34.      this.go = false;
  35.      interrupt();
  36.    }
  37. }
复制代码

EvictionThread线程启动后,调用wait被阻塞住,直到EvictionThread线程的evict方法被主线程调用时执行notify(见上面的代码分析过程,通过主线程的runEviction方法触发调用),开始执行LruBlockCache的evict方法进行真正的淘汰过程,代码如下:
  1. /**
  2.   * Eviction method.
  3.   */
  4. void evict() {
  5.    // Ensure only one eviction at a time
  6.    if(!evictionLock.tryLock()) return;
  7.    try {
  8.      evictionInProgress = true;
  9.      long currentSize = this.size.get();
  10.      long bytesToFree = currentSize - minSize();
  11.      if (LOG.isDebugEnabled()) {
  12.        LOG.debug(“Block cache LRU eviction started; Attempting to free ” +
  13.          StringUtils.byteDesc(bytesToFree) + ” of total=” +
  14.          StringUtils.byteDesc(currentSize));
  15.      }
  16.      if(bytesToFree <= 0) return;
  17.      // Instantiate priority buckets
  18.      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
  19.          singleSize());
  20.      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
  21.          multiSize());
  22.      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
  23.          memorySize());
  24.      // Scan entire map putting into appropriate buckets
  25.      for(CachedBlock cachedBlock : map.values()) {
  26.        switch(cachedBlock.getPriority()) {
  27.          case SINGLE: {
  28.            bucketSingle.add(cachedBlock);
  29.            break;
  30.          }
  31.          case MULTI: {
  32.            bucketMulti.add(cachedBlock);
  33.            break;
  34.          }
  35.          case MEMORY: {
  36.            bucketMemory.add(cachedBlock);
  37.            break;
  38.          }
  39.        }
  40.      }
  41.      PriorityQueue<BlockBucket> bucketQueue =
  42.        new PriorityQueue<BlockBucket>(3);
  43.      bucketQueue.add(bucketSingle);
  44.      bucketQueue.add(bucketMulti);
  45.      bucketQueue.add(bucketMemory);
  46.      int remainingBuckets = 3;
  47.      long bytesFreed = 0;
  48.      BlockBucket bucket;
  49.      while((bucket = bucketQueue.poll()) != null) {
  50.        long overflow = bucket.overflow();
  51.        if(overflow > 0) {
  52.          long bucketBytesToFree = Math.min(overflow,
  53.            (bytesToFree - bytesFreed) / remainingBuckets);
  54.          bytesFreed += bucket.free(bucketBytesToFree);
  55.        }
  56.        remainingBuckets-;
  57.      }
  58.      if (LOG.isDebugEnabled()) {
  59.        long single = bucketSingle.totalSize();
  60.        long multi = bucketMulti.totalSize();
  61.        long memory = bucketMemory.totalSize();
  62.        LOG.debug(“Block cache LRU eviction completed; ” +
  63.          “freed=” + StringUtils.byteDesc(bytesFreed) + “, ” +
  64.          “total=” + StringUtils.byteDesc(this.size.get()) + “, ” +
  65.          “single=” + StringUtils.byteDesc(single) + “, ” +
  66.          “multi=” + StringUtils.byteDesc(multi) + “, ” +
  67.          “memory=” + StringUtils.byteDesc(memory));
  68.      }
  69.    } finally {
  70.      stats.evict();
  71.      evictionInProgress = false;
  72.      evictionLock.unlock();
  73.    }
  74. }
复制代码


1)首先获取锁,保证同一时刻只有一个淘汰线程运行;

2)计算得到当前Block Cache总大小currentSize及需要被淘汰释放掉的大小bytesToFree,如果bytesToFree小于等于0则不进行后续操作;

3) 初始化创建三个BlockBucket队列,分别用于存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;

4) 遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;

5) 将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);

6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象:
  1.   public long free(long toFree) {
  2.      CachedBlock cb;
  3.      long freedBytes = 0;
  4.      while ((cb = queue.pollLast()) != null) {
  5.        freedBytes += evictBlock(cb);
  6.        if (freedBytes >= toFree) {
  7.          return freedBytes;
  8.        }
  9.      }
  10.      return freedBytes;
  11.    }
复制代码
7) 进一步调用了LruBlockCache的evictBlock方法,从全局ConcurrentHashMap中移除该CachedBlock对象,同时更新相关计数:
  1. protected long evictBlock(CachedBlock block) {
  2.    map.remove(block.getCacheKey());
  3.    updateSizeMetrics(block, true);
  4.    elements.decrementAndGet();
  5.    stats.evicted();
  6.    return block.heapSize();
  7. }
复制代码



8) 释放锁,完成善后工作。

3. 总结

以上关于Block Cache的实现机制,核心思想是将Cache分级,这样的好处是避免Cache之间相互影响,尤其是对HBase来说像Meta表这样的Cache应该保证高优先级。






已有(1)人评论

跳转到指定楼层
pig2 发表于 2014-4-13 10:52:11
提示:上面问题输出,位于下面函数中
EvictionThread线程启动后,调用wait被阻塞住,直到EvictionThread线程的evict方法被主线程调用时执行notify(见上面的代码分析过程,通过主线程的runEviction方法触发调用),开始执行LruBlockCache的evict方法进行真正的淘汰过程,代码如下:

  1. /**
  2.   * Eviction method.
  3.   */
  4. void evict() {
  5.    // Ensure only one eviction at a time
  6.    if(!evictionLock.tryLock()) return;
  7.    try {
  8.      evictionInProgress = true;
  9.      long currentSize = this.size.get();
  10.      long bytesToFree = currentSize - minSize();
  11.      if (LOG.isDebugEnabled()) {
  12.        LOG.debug(“Block cache LRU eviction started; Attempting to free ” +
  13.          StringUtils.byteDesc(bytesToFree) + ” of total=” +
  14.          StringUtils.byteDesc(currentSize));
  15.      }
  16.      if(bytesToFree <= 0) return;
  17.      // Instantiate priority buckets
  18.      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
  19.          singleSize());
  20.      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
  21.          multiSize());
  22.      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
  23.          memorySize());
  24.      // Scan entire map putting into appropriate buckets
  25.      for(CachedBlock cachedBlock : map.values()) {
  26.        switch(cachedBlock.getPriority()) {
  27.          case SINGLE: {
  28.            bucketSingle.add(cachedBlock);
  29.            break;
  30.          }
  31.          case MULTI: {
  32.            bucketMulti.add(cachedBlock);
  33.            break;
  34.          }
  35.          case MEMORY: {
  36.            bucketMemory.add(cachedBlock);
  37.            break;
  38.          }
  39.        }
  40.      }
  41.      PriorityQueue<BlockBucket> bucketQueue =
  42.        new PriorityQueue<BlockBucket>(3);
  43.      bucketQueue.add(bucketSingle);
  44.      bucketQueue.add(bucketMulti);
  45.      bucketQueue.add(bucketMemory);
  46.      int remainingBuckets = 3;
  47.      long bytesFreed = 0;
  48.      BlockBucket bucket;
  49.      while((bucket = bucketQueue.poll()) != null) {
  50.        long overflow = bucket.overflow();
  51.        if(overflow > 0) {
  52.          long bucketBytesToFree = Math.min(overflow,
  53.            (bytesToFree - bytesFreed) / remainingBuckets);
  54.          bytesFreed += bucket.free(bucketBytesToFree);
  55.        }
  56.        remainingBuckets-;
  57.      }
  58.      if (LOG.isDebugEnabled()) {
  59.        long single = bucketSingle.totalSize();
  60.        long multi = bucketMulti.totalSize();
  61.        long memory = bucketMemory.totalSize();
  62.        LOG.debug(“Block cache LRU eviction completed; ” +
  63.          “freed=” + StringUtils.byteDesc(bytesFreed) + “, ” +
  64.          “total=” + StringUtils.byteDesc(this.size.get()) + “, ” +
  65.          “single=” + StringUtils.byteDesc(single) + “, ” +
  66.          “multi=” + StringUtils.byteDesc(multi) + “, ” +
  67.          “memory=” + StringUtils.byteDesc(memory));
  68.      }
  69.    } finally {
  70.      stats.evict();
  71.      evictionInProgress = false;
  72.      evictionLock.unlock();
  73.    }
  74. }
复制代码




回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条