分享

Hadoop RPC通信Client客户端的流程分析

问题导读
1、Hadoop的RPC的通信与其他系统的RPC有哪些不一样?
2、Client本身的执行流程是怎样的?
3、如何学习Hadoop RPC?





概要
Hadoop的RPC的通信与其他系统的RPC通信不太一样,作者针对Hadoop的使用特点,专门的设计了一套RPC框架,这套框架个人感觉还是有点小复杂的。所以我打算分成Client客户端和Server服务端2个模块做分析。如果你对RPC的整套流程已经非常了解的前提下,对于Hadoop的RPC,你也一定可以非常迅速的了解的。OK,下面切入正题。

Hadoop的RPC的相关代码都在org.apache.hadoop.ipc的包下,首先RPC的通信必须遵守许多的协议,其中最最基本的协议即使如下:
  1. /**
  2. * Superclass of all protocols that use Hadoop RPC.
  3. * Subclasses of this interface are also supposed to have
  4. * a static final long versionID field.
  5. * Hadoop RPC所有协议的基类,返回协议版本号
  6. */
  7. public interface VersionedProtocol {
  8.   
  9.   /**
  10.    * Return protocol version corresponding to protocol interface.
  11.    * @param protocol The classname of the protocol interface
  12.    * @param clientVersion The version of the protocol that the client speaks
  13.    * @return the version that the server will speak
  14.    */
  15.   public long getProtocolVersion(String protocol,
  16.                                  long clientVersion) throws IOException;
  17. }
复制代码



他是所有协议的基类,他的下面还有一堆的子类,分别对应于不同情况之间的通信,下面是一张父子类图:
1.jpg


顾名思义,只有客户端和服务端遵循相同的版本号,才能进行通信。RPC客户端的所有相关操作都被封装在了一个叫Client.java的文件中:
  1. /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  2. * parameter, and return a {@link Writable} as their value.  A service runs on
  3. * a port and is defined by a parameter class and a value class.
  4. * RPC客户端类
  5. * @see Server
  6. */
  7. public class Client {
  8.   
  9.   public static final Log LOG =
  10.     LogFactory.getLog(Client.class);
  11.   //客户端到服务端的连接
  12.   private Hashtable<ConnectionId, Connection> connections =
  13.     new Hashtable<ConnectionId, Connection>();
  14.   //回调值类
  15.   private Class<? extends Writable> valueClass;   // class of call values
  16.   //call回调id的计数器
  17.   private int counter;                            // counter for call ids
  18.   //原子变量判断客户端是否还在运行
  19.   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
  20.   final private Configuration conf;
  21.   //socket工厂,用来创建socket
  22.   private SocketFactory socketFactory;           // how to create sockets
  23.   private int refCount = 1;
  24.   ......
复制代码



从代码中明显的看到,这里存在着一个类似于connections连接池的东西,其实这暗示着连接是可以被复用的,在hashtable中,与每个Connecttion连接的对应的是一个ConnectionId,显然这里不是一个Long类似的数值:
  1. /**
  2.     * This class holds the address and the user ticket. The client connections
  3.     * to servers are uniquely identified by <remoteAddress, protocol, ticket>
  4.     * 连接的唯一标识,主要通过<远程地址,协议类型,用户组信息>
  5.     */
  6.    static class ConnectionId {
  7.          //远程的socket地址
  8.      InetSocketAddress address;
  9.      //用户组信息
  10.      UserGroupInformation ticket;
  11.      //协议类型
  12.      Class<?> protocol;
  13.      private static final int PRIME = 16777619;
  14.      private int rpcTimeout;
  15.      private String serverPrincipal;
  16.      private int maxIdleTime; //connections will be culled if it was idle for
  17.      //maxIdleTime msecs
  18.      private int maxRetries; //the max. no. of retries for socket connections
  19.      private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
  20.      private int pingInterval; // how often sends ping to the server in msecs
  21.      ....
复制代码



这里用了3个属性组成唯一的标识属性,为了保证可以进行ID的复用,所以作者对ConnectionId的equal比较方法和hashCode 进行了重写:
  1. /**
  2.       * 作者重写了equal比较方法,只要成员变量都想等也就想到了
  3.       */
  4.      @Override
  5.      public boolean equals(Object obj) {
  6.        if (obj == this) {
  7.          return true;
  8.        }
  9.        if (obj instanceof ConnectionId) {
  10.          ConnectionId that = (ConnectionId) obj;
  11.          return isEqual(this.address, that.address)
  12.              && this.maxIdleTime == that.maxIdleTime
  13.              && this.maxRetries == that.maxRetries
  14.              && this.pingInterval == that.pingInterval
  15.              && isEqual(this.protocol, that.protocol)
  16.              && this.rpcTimeout == that.rpcTimeout
  17.              && isEqual(this.serverPrincipal, that.serverPrincipal)
  18.              && this.tcpNoDelay == that.tcpNoDelay
  19.              && isEqual(this.ticket, that.ticket);
  20.        }
  21.        return false;
  22.      }
  23.      
  24.      /**
  25.       * 重写了hashCode的生成规则,保证不同的对象产生不同的hashCode值
  26.       */
  27.      @Override
  28.      public int hashCode() {
  29.        int result = 1;
  30.        result = PRIME * result + ((address == null) ? 0 : address.hashCode());
  31.        result = PRIME * result + maxIdleTime;
  32.        result = PRIME * result + maxRetries;
  33.        result = PRIME * result + pingInterval;
  34.        result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
  35.        result = PRIME * rpcTimeout;
  36.        result = PRIME * result
  37.            + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
  38.        result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
  39.        result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
  40.        return result;
  41.      }
复制代码



这样就能保证对应同类型的连接就能够完全复用了,而不是仅仅凭借引用的关系判断对象是否相等,这里就是一个不错的设计了。与连接Id对应的就是Connection了,它里面维护是一下的一些变量:
  1.   /** Thread that reads responses and notifies callers.  Each connection owns a
  2.    * socket connected to a remote address.  Calls are multiplexed through this
  3.    * socket: responses may be delivered out of order. */
  4.   private class Connection extends Thread {
  5.         //所连接的服务器地址
  6.     private InetSocketAddress server;             // server ip:port
  7.     //服务端的krb5的名字,与安全方面相关
  8.     private String serverPrincipal;  // server's krb5 principal name
  9.     //连接头部,内部包含了,所用的协议,客户端用户组信息以及验证的而方法
  10.     private ConnectionHeader header;              // connection header
  11.     //远程连接ID
  12.     private final ConnectionId remoteId;                // connection id
  13.     //连接验证方法
  14.     private AuthMethod authMethod; // authentication method
  15.     //下面3个变量都是安全方面的
  16.     private boolean useSasl;
  17.     private Token<? extends TokenIdentifier> token;
  18.     private SaslRpcClient saslRpcClient;
  19.    
  20.     //下面是一组socket通信方面的变量
  21.     private Socket socket = null;                 // connected socket
  22.     private DataInputStream in;
  23.     private DataOutputStream out;
  24.     private int rpcTimeout;
  25.     private int maxIdleTime; //connections will be culled if it was idle for
  26.          //maxIdleTime msecs
  27.     private int maxRetries; //the max. no. of retries for socket connections
  28.     //tcpNoDelay可设置是否阻塞模式
  29.     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
  30.     private int pingInterval; // how often sends ping to the server in msecs
  31.    
  32.     // currently active calls 当前活跃的回调,一个连接 可能会有很多个call回调
  33.     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
  34.     //最后一次IO活动通信的时间
  35.     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
  36.     //连接关闭标记
  37.     private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
  38.     private IOException closeException; // close reason
  39.     .....
复制代码



里面维护了大量的和连接通信相关的变量,在这里有一个很有意思的东西connectionHeader,连接头部,里面的数据时为了在通信最开始的时候被使用:
  1. class ConnectionHeader implements Writable {
  2.   public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);
  3.   
  4.   //客户端和服务端通信的协议名称
  5.   private String protocol;
  6.   //客户端的用户组信息
  7.   private UserGroupInformation ugi = null;
  8.   //验证的方式,关系到写入数据的时的格式
  9.   private AuthMethod authMethod;
  10.   .....
复制代码



起到标识验证的作用。一个Client类的基本结构我们基本可以描绘出来了,下面是完整的类关系图:
1.jpg


在上面这幅图中,你肯定会发现我少了一个很关键的类了,就是Call回调类。Call回调在很多异步通信中是经常出现的。因为在通信过程中,当一个对象通过网络发送请求给另外一个对象的时候,如果采用同步的方式,会一直阻塞在那里,会带来非常不好的效率和体验的,所以很多时候,我们采用的是一种叫回调接口的方式。在这期间,用户可以继续做自己的事情。所以同样的Call这个概念当然也是适用在Hadoop RPC中。在Hadoop的RPC的核心调用原理, 简单的说,就是我把parame参数序列化到一个对象中,通过参数的形式把对象传入,进行RPC通信,最后服务端把处理好的结果值放入call对象,在返回给客户端,也就是说客户端和服务端都是通过Call对象进行操作,Call里面存着,请求的参数,和处理后的结构值2个变量。通过Call对象的封装,客户单实现了完美的无须知道细节的调用。下面是Call类的类按时:
  1.   /** A call waiting for a value. */
  2.   //客户端的一个回调
  3.   private class Call {
  4.         //回调ID
  5.     int id;                                       // call id
  6.     //被序列化的参数
  7.     Writable param;                               // parameter
  8.     //返回值
  9.     Writable value;                               // value, null if error
  10.     //出错时返回的异常
  11.     IOException error;                            // exception, null if value
  12.     //回调是否已经被完成
  13.     boolean done;                                 // true when call is done
  14.     ....
复制代码



看到这个Call回调类,也许你慢慢的会明白Hadoop RPC的一个基本原型了,这些Call当然是存在于某个连接中的,一个连接可能会发生多个回调,所以在Connection中维护了calls列表:
  1.   private class Connection extends Thread {
  2.     ....
  3.     // currently active calls 当前活跃的回调,一个连接 可能会有很多个call回调
  4.     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
复制代码



作者在设计Call类的时候,比较聪明的考虑一种并发情况下的Call调用,所以为此设计了下面这个Call的子类,就是专门用于短时间内的瞬间Call调用:
  1.   /** Call implementation used for parallel calls. */
  2.   /** 继承自Call回调类,可以并行的使用,通过加了index下标做Call的区分 */
  3.   private class ParallelCall extends Call {
  4.         //每个ParallelCall并行的回调就会有对应的结果类
  5.     private ParallelResults results;
  6.     //index作为Call的区分
  7.     private int index;
  8.     ....
复制代码



如果要查找值,就通过里面的ParallelCall查找,原理是根据index索引:
在执行之前,你会先得到ConnectionId:
  1.   /** Result collector for parallel calls. */
  2.   private static class ParallelResults {
  3.         //并行结果类中拥有一组返回值,需要ParallelCall的index索引匹配
  4.     private Writable[] values;
  5.     //结果值的数量
  6.     private int size;
  7.     //values中已知的值的个数
  8.     private int count;
  9.     .....
  10.     /** Collect a result. */
  11.     public synchronized void callComplete(ParallelCall call) {
  12.       //将call中的值赋给result中
  13.       values[call.index] = call.value;            // store the value
  14.       count++;                                    // count it
  15.       //如果计数的值等到最终大小,通知caller
  16.       if (count == size)                          // if all values are in
  17.         notify();                                 // then notify waiting caller
  18.     }
  19.   }
复制代码



因为Call结构集是这些并发Call共有的,所以用的是static变量,都存在在了values数组中了,只有所有的并发Call都把值取出来了,才算回调成功,这个是个非常细小的辅助设计,这个在有些书籍上并没有多少提及。下面我们看看一般Call回调的流程,正如刚刚说的,最终客户端看到的形式就是,传入参数,获得结果,忽略内部一切逻辑,这是怎么做到的呢,答案在下面:
  1. public Writable call(Writable param, InetSocketAddress addr,
  2.                        Class<?> protocol, UserGroupInformation ticket,
  3.                        int rpcTimeout)
  4.                        throws InterruptedException, IOException {
  5.     ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
  6.         ticket, rpcTimeout, conf);
  7.     return call(param, remoteId);
  8.   }
复制代码



接着才是主流程:
  1. public Writable call(Writable param, ConnectionId remoteId)  
  2.                        throws InterruptedException, IOException {
  3.         //根据参数构造一个Call回调
  4.     Call call = new Call(param);
  5.     //根据远程ID获取连接
  6.     Connection connection = getConnection(remoteId, call);
  7.     //发送参数
  8.     connection.sendParam(call);                 // send the parameter
  9.     boolean interrupted = false;
  10.     synchronized (call) {
  11.       //如果call.done为false,就是Call还没完成
  12.       while (!call.done) {
  13.         try {
  14.           //等待远端程序的执行完毕
  15.           call.wait();                           // wait for the result
  16.         } catch (InterruptedException ie) {
  17.           // save the fact that we were interrupted
  18.           interrupted = true;
  19.         }
  20.       }
  21.       //如果是异常中断,则终止当前线程
  22.       if (interrupted) {
  23.         // set the interrupt flag now that we are done waiting
  24.         Thread.currentThread().interrupt();
  25.       }
  26.       //如果call回到出错,则返回call出错信息
  27.       if (call.error != null) {
  28.         if (call.error instanceof RemoteException) {
  29.           call.error.fillInStackTrace();
  30.           throw call.error;
  31.         } else { // local exception
  32.           // use the connection because it will reflect an ip change, unlike
  33.           // the remoteId
  34.           throw wrapException(connection.getRemoteAddress(), call.error);
  35.         }
  36.       } else {
  37.             //如果是正常情况下,返回回调处理后的值
  38.         return call.value;
  39.       }
  40.     }
  41.   }
复制代码



在这上面的操作步骤中,重点关注2个函数,获取连接操作,看看人家是如何保证连接的复用性的:
  1. private Connection getConnection(ConnectionId remoteId,
  2.                                    Call call)
  3.                                    throws IOException, InterruptedException {
  4.     .....
  5.     /* we could avoid this allocation for each RPC by having a  
  6.      * connectionsId object and with set() method. We need to manage the
  7.      * refs for keys in HashMap properly. For now its ok.
  8.      */
  9.     do {
  10.       synchronized (connections) {
  11.             //从connection连接池中获取连接,可以保证相同的连接ID可以复用
  12.         connection = connections.get(remoteId);
  13.         if (connection == null) {
  14.           connection = new Connection(remoteId);
  15.           connections.put(remoteId, connection);
  16.         }
  17.       }
  18.     } while (!connection.addCall(call));
复制代码



有点单例模式的味道哦,还有一个方法叫sendParam发送参数方法:
  1.     public void sendParam(Call call) {
  2.       if (shouldCloseConnection.get()) {
  3.         return;
  4.       }
  5.       DataOutputBuffer d=null;
  6.       try {
  7.         synchronized (this.out) {
  8.           if (LOG.isDebugEnabled())
  9.             LOG.debug(getName() + " sending #" + call.id);
  10.          
  11.           //for serializing the
  12.           //data to be written
  13.           //将call回调中的参数写入到输出流中,传向服务端
  14.           d = new DataOutputBuffer();
  15.           d.writeInt(call.id);
  16.           call.param.write(d);
  17.           byte[] data = d.getData();
  18.           int dataLength = d.getLength();
  19.           out.writeInt(dataLength);      //first put the data length
  20.           out.write(data, 0, dataLength);//write the data
  21.           out.flush();
  22.         }
  23.         ....
复制代码



代码只发送了Call的id,和请求参数,并没有把所有的Call的内容都扔出去了,一定是为了减少数据量的传输,这里还把数据的长度写入了,这是为了方便服务端准确的读取到不定长的数据。这服务端中间的处理操作不是今天讨论的重点。Call的执行过程就是这样。那么Call是如何被调用的呢,这又要重新回到了Client客户端上去了,Client有一个run()函数,所有的操作都是始于此的。
  1.     public void run() {
  2.       if (LOG.isDebugEnabled())
  3.         LOG.debug(getName() + ": starting, having connections "
  4.             + connections.size());
  5.       //等待工作,等待请求调用
  6.       while (waitForWork()) {//wait here for work - read or close connection
  7.             //调用完请求,则立即获取回复
  8.         receiveResponse();
  9.       }
  10.       
  11.       close();
  12.       
  13.       if (LOG.isDebugEnabled())
  14.         LOG.debug(getName() + ": stopped, remaining connections "
  15.             + connections.size());
  16.     }
复制代码



操作很简单,程序一直跑着,有请求,处理请求,获取请求,没有请求,就死等。
  1. private synchronized boolean waitForWork() {
  2.       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
  3.         long timeout = maxIdleTime-
  4.               (System.currentTimeMillis()-lastActivity.get());
  5.         if (timeout>0) {
  6.           try {
  7.             wait(timeout);
  8.           } catch (InterruptedException e) {}
  9.         }
  10.       }
  11.       ....
复制代码



获取回复的操作如下:
  1. /* Receive a response.
  2.      * Because only one receiver, so no synchronization on in.
  3.      * 获取回复值
  4.      */
  5.     private void receiveResponse() {
  6.       if (shouldCloseConnection.get()) {
  7.         return;
  8.       }
  9.       //更新最近一次的call活动时间
  10.       touch();
  11.       
  12.       try {
  13.         int id = in.readInt();                    // try to read an id
  14.         if (LOG.isDebugEnabled())
  15.           LOG.debug(getName() + " got value #" + id);
  16.         //从获取call中取得相应的call
  17.         Call call = calls.get(id);
  18.         //判断该结果状态
  19.         int state = in.readInt();     // read call status
  20.         if (state == Status.SUCCESS.state) {
  21.           Writable value = ReflectionUtils.newInstance(valueClass, conf);
  22.           value.readFields(in);                 // read value
  23.           call.setValue(value);
  24.           calls.remove(id);
  25.         } else if (state == Status.ERROR.state) {
  26.           call.setException(new RemoteException(WritableUtils.readString(in),
  27.                                                 WritableUtils.readString(in)));
  28.           calls.remove(id);
  29.         } else if (state == Status.FATAL.state) {
  30.           // Close the connection
  31.           markClosed(new RemoteException(WritableUtils.readString(in),
  32.                                          WritableUtils.readString(in)));
  33.         }
  34.         .....
  35.       } catch (IOException e) {
  36.         markClosed(e);
  37.       }
  38.     }
复制代码



从之前维护的Call列表中取出,做判断。Client本身的执行流程比较的简单:
1.jpg


Hadoop RPC客户端的通信模块的部分大致就是我上面的这个流程,中间其实还忽略了很多的细节,大家学习的时候,针对源码会有助于更好的理解,Hadoop RPC的服务端的实现更加复杂,所以建议采用分模块的学习或许会更好一点。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条