分享

Hadoop源码分析——数据节点写数据2

本帖最后由 desehawk 于 2016-6-23 18:28 编辑



接上篇:
Hadoop源码分析——数据节点写数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18966


我们知道,写数据的时候其实需要同时写两个文件,数据块文件和它的校验信息文件,BlockReceiver.setBlockPositiion()会设置这两个文件的写位置,为后续持久化数据做准备。设置数据块文件的写位置比较简单,但校验信息文件的就相对复杂了,这又是一个和分块校验相关的调整。校验信息的计算需要数据以块的形式组织,但用户以流的形式写数据,如果写(追加)数据的开始点落在某个校验块的中间,需要在确定校验信息文件写位置的时候,计算不完整数据校验块(图中的校验块n)的校验信息,为后续写校验数据做准备。

4.png


setBlockPosition()方法只有一个参数offsetInBlock,即数据块文件的写位置,如果该位置不处于校验块的边界,就会调用computePartialChunkCrc()计算上述校验信息。该方法需要三个参数,进入方法时,前面两个参数blkoff,ckoff的值如上图,第三个参数bytesPerChecksum是校验块的大小。根据这些参数,首先可以计算出不完整数据块的长度sizePartialChunk,然后,分配缓冲区读入数据,计算这些数据的校验和。校验和的结果保存在BlockReceiver的成员变量partialCrc中,后面会和新写入的数据一起,计算出新的校验和的值,并保持在校验信息文件的当前位置。注意,对于数据块文件,新写入的数据会追加在数据块文件的文件尾,但校验信息文件,如果对应的是一个不完整数据校验块,那么,文件的最后部分(从ckoff开始的地方)会被更新,而不能简单地直接进行追加。代码如下:

[mw_shl_code=bash,true] /**
   * Sets the file pointer in the local block file to the specified value.
   */
  private void setBlockPosition(long offsetInBlock) throws IOException {
    if (finalized) {
      if (!isRecovery) {
        throw new IOException("Write to offset " + offsetInBlock +
                              " of block " + block +
                              " that is already finalized.");
      }
      if (offsetInBlock > datanode.data.getLength(block)) {
        throw new IOException("Write to offset " + offsetInBlock +
                              " of block " + block +
                              " that is already finalized and is of size " +
                              datanode.data.getLength(block));
      }
      return;
    }

    if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
      return;                   // nothing to do
    }
    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                            offsetInBlock / bytesPerChecksum * checksumSize;
    if (out != null) {
     out.flush();
    }
    if (checksumOut != null) {
      checksumOut.flush();
    }

    // If this is a partial chunk, then read in pre-existing checksum
    if (offsetInBlock % bytesPerChecksum != 0) {
      LOG.info("setBlockPosition trying to set position to " +
               offsetInBlock + " for " + block +
               " which is not a multiple of bytesPerChecksum " +
               bytesPerChecksum);
      computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Changing block file offset of block " + block + " from " +
        datanode.data.getChannelPosition(block, streams) +
             " to " + offsetInBlock +
             " meta file offset to " + offsetInChecksum);
    }

    // set the position of the block file
    datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
  }

  /**
   * reads in the partial crc chunk and computes checksum
   * of pre-existing data in partial chunk.
   */
  private void computePartialChunkCrc(long blkoff, long ckoff,
                                      int bytesPerChecksum) throws IOException {

    // find offset of the beginning of partial chunk.
    //
    int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
    int checksumSize = checksum.getChecksumSize();
    blkoff = blkoff - sizePartialChunk;
    LOG.info("computePartialChunkCrc sizePartialChunk " +
              sizePartialChunk + " " + block +
              " offset in block " + blkoff +
              " offset in metafile " + ckoff);

    // create an input stream from the block file
    // and read in partial crc chunk into temporary buffer
    //
    byte[] buf = new byte[sizePartialChunk];
    byte[] crcbuf = new byte[checksumSize];
    FSDataset.BlockInputStreams instr = null;
    try {
      instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
      IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);

      // open meta file and read in crc value computer earlier
      IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
    } finally {
      IOUtils.closeStream(instr);
    }

    // compute crc of partial chunk from data read in the block file.
    partialCrc = new PureJavaCrc32();
    partialCrc.update(buf, 0, sizePartialChunk);
    LOG.info("Read in partial CRC chunk from disk for " + block);

    // paranoia! verify that the pre-computed crc matches what we
    // recalculated just now
    if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
      String msg = "Partial CRC " + partialCrc.getValue() +
                   " does not match value computed the " +
                   " last time file was closed " +
                   FSInputChecker.checksum2long(crcbuf);
      throw new IOException(msg);
    }
    //LOG.debug("Partial CRC matches 0x" +
    //            Long.toHexString(partialCrc.getValue()));
  }[/mw_shl_code]

上述准备工作完成后,receivePacket()把请求的数据包发送到下游的数据节点,通过这种方式,写数据的数据包就会流经数据流管道上的所有数据节点。代码如下
[mw_shl_code=bash,true]if (mirrorOut != null && !mirrorError) {
      try {
        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
        mirrorOut.flush();
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }[/mw_shl_code]


接下来要提及的是数据长度为0的数据包,这是一种特殊的数据包,当客户端长时间没有输出数据时,就会发送这样的心跳包,用于维护客户端和数据流管道上的各个数据节点的TCP连接。,如果数据包的长度不是0,就要将请求包中的数据和校验和写入文件。代码如下:
[mw_shl_code=bash,true] if (len == 0) {
      LOG.debug("Receiving empty packet for " + block);
    } else {
      offsetInBlock += len;

      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                            checksumSize;

      if ( buf.remaining() != (checksumLen + len)) {
        throw new IOException("Data remaining in packet does not match " +
                              "sum of checksumLen and dataLen");
      }
      int checksumOff = buf.position();
      int dataOff = checksumOff + checksumLen;
      byte pktBuf[] = buf.array();

      buf.position(buf.limit()); // move to the end of the data.

      /* skip verifying checksum iff this is not the last one in the
       * pipeline and clientName is non-null. i.e. Checksum is verified
       * on all the datanodes when the data is being written by a
       * datanode rather than a client. Whe client is writing the data,
       * protocol includes acks and only the last datanode needs to verify
       * checksum.
       */
      if (mirrorOut == null || clientName.length() == 0) {
        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
      }

      try {
        if (!finalized) {
          //finally write to the disk :
          out.write(pktBuf, dataOff, len);

          // If this is a partial chunk, then verify that this is the only
          // chunk in the packet. Calculate new crc for this chunk.
          if (partialCrc != null) {
            if (len > bytesPerChecksum) {
              throw new IOException("Got wrong length during writeBlock(" +
                                    block + ") from " + inAddr + " " +
                                    "A packet can have only one partial chunk."+
                                    " len = " + len +
                                    " bytesPerChecksum " + bytesPerChecksum);
            }
            partialCrc.update(pktBuf, dataOff, len);
            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
            checksumOut.write(buf);
            LOG.debug("Writing out partial crc for data len " + len);
            partialCrc = null;
          } else {
            checksumOut.write(pktBuf, checksumOff, checksumLen);
          }
          datanode.myMetrics.incrBytesWritten(len);
          /// flush entire packet before sending ack
          flush();

          // update length only after flush to disk
          datanode.data.setVisibleLength(block, offsetInBlock);
          dropOsCacheBehindWriter(offsetInBlock);
        }
      } catch (IOException iex) {
        datanode.checkDiskError(iex);
        throw iex;
      }
    }

    // put in queue for pending acks
    if (responder != null) {
      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                      lastPacketInBlock);
    }

    if (throttler != null) { // throttle I/O
      throttler.throttle(payloadLen);
    }

    return payloadLen;[/mw_shl_code]


主要关注点包括:客户端写数据的时候,只有最后一个数据节点需要通过verifyChunks()方法对数据进行校验,数据流管道中的其他节点不需要执行这项检查。校验发现数据有问题,会上报名字节点,并抛出异常通知调用者BlockReceiver.receivePacket();数据包如果成功接收并顺利写入磁盘后,则需要将处理的结果,通过PacketResponder.enqueue()方法,发送给PacketResponder线程,并由该线程继续后面的处理。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条