分享

Zookeeper源码分析之五 Leader选举

本帖最后由 desehawk 于 2014-11-25 22:22 编辑

问题导读


1.Obserer机器是否参与选举?
2.选举成功的条件是什么?为什么zookeeper要保存奇数?








前面几篇文章简单介绍了zookeeper的单机server client处理。接下来几篇文章会介绍分布式部署下zookeeper的实现原理。我们假设有3台server的集群,zoo.cfg配置如下

  1. tickTime=2000  
  2. dataDir=/home/admin/zk-data  
  3. clientPort=2181  
  4. #Learner初始化连接到Leader的超时时间  
  5. initLimit=10  
  6. #Learner和Leader之间消息发送,响应的超时时间  
  7. syncLimit=5  
  8. #集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口  
  9. server.1=master:2888:3888  
  10. server.2=slave1:2888:3888  
  11. server.3=slave2:2888:3888  
复制代码



在server.1的$dataDir下

  1. echo '1'>myid  
复制代码



启动server.1

  1. ./zkServer.sh start  
复制代码





分析之前先看看选举相关的类图




1.jpg



入口函数QuorumPeerMain主线程启动



  1. public void runFromConfig(QuorumPeerConfig config) throws IOException {  
  2.       ......  
  3.    
  4.       LOG.info("Starting quorum peer");  
  5.       try {  
  6.         //对client提供读写的server,一般是2181端口  
  7.           ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();  
  8.           cnxnFactory.configure(config.getClientPortAddress(),  
  9.                                 config.getMaxClientCnxns());  
  10.         //zk的逻辑主线程,负责选举,投票等  
  11.           quorumPeer = new QuorumPeer();  
  12.           quorumPeer.setClientPortAddress(config.getClientPortAddress());  
  13.           quorumPeer.setTxnFactory(new FileTxnSnapLog(  
  14.                       new File(config.getDataLogDir()),  
  15.                       new File(config.getDataDir())));  
  16.         //集群机器地址  
  17.           quorumPeer.setQuorumPeers(config.getServers());  
  18.           quorumPeer.setElectionType(config.getElectionAlg());  
  19.         //本机的集群编号  
  20.           quorumPeer.setMyid(config.getServerId());  
  21.           quorumPeer.setTickTime(config.getTickTime());  
  22.           quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());  
  23.           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());  
  24.           quorumPeer.setInitLimit(config.getInitLimit());  
  25.           quorumPeer.setSyncLimit(config.getSyncLimit());  
  26.         //投票决定方式,默认超过半数就通过  
  27.           quorumPeer.setQuorumVerifier(config.getQuorumVerifier());  
  28.           quorumPeer.setCnxnFactory(cnxnFactory);  
  29.           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));  
  30.           quorumPeer.setLearnerType(config.getPeerType());  
  31.         //启动主线程  
  32.           quorumPeer.start();  
  33.           quorumPeer.join();  
  34.       } catch (InterruptedException e) {  
  35.           // warn, but generally this is ok  
  36.           LOG.warn("Quorum Peer interrupted", e);  
  37.       }  
  38.     }  
复制代码



QuorumPeer复写Thread.start方法,启动


  1.    @Override  
  2.    public synchronized void start() {  
  3. //恢复DB,从zxid中回复epoch变量,代表投票轮数  
  4.        loadDataBase();  
  5. //启动针对client的IO线程  
  6.        cnxnFactory.start();  
  7. //选举初始化,主要是从配置获取选举类型         
  8.        startLeaderElection();  
  9. //启动  
  10.        super.start();  
  11.    }  
复制代码




loadDataBase过程,恢复epoch数

  1. private void loadDataBase() {  
  2.         try {  
  3.         //从本地文件恢复db  
  4.             zkDb.loadDataBase();  
  5.   
  6.             // load the epochs  
  7.         //从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid  
  8.             long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;  
  9.             long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);  
  10.             try {  
  11.                 currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);  
  12.             } catch(FileNotFoundException e) {  
  13.                 // pick a reasonable epoch number  
  14.                 // this should only happen once when moving to a  
  15.                 // new code version  
  16.                 currentEpoch = epochOfZxid;  
  17.                 LOG.info(CURRENT_EPOCH_FILENAME  
  18.                         + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",  
  19.                         currentEpoch);  
  20.                 writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);  
  21.             }  
  22.             if (epochOfZxid > currentEpoch) {  
  23.                 throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);  
  24.             }  
  25. .......  
  26.     }  
复制代码


选举初始化


  1. synchronized public void startLeaderElection() {  
  2.         try {  
  3.         //先投自己  
  4.             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());  
  5.         } catch(IOException e) {  
  6.             RuntimeException re = new RuntimeException(e.getMessage());  
  7.             re.setStackTrace(e.getStackTrace());  
  8.             throw re;  
  9.         }  
  10.     //从配置中拿自己的选举地址  
  11.         for (QuorumServer p : getView().values()) {  
  12.             if (p.id == myid) {  
  13.                 myQuorumAddr = p.addr;  
  14.                 break;  
  15.             }  
  16.         }  
  17.         ......  
  18.     //根据配置,获取选举算法  
  19.         this.electionAlg = createElectionAlgorithm(electionType);  
  20.     }  
复制代码




获取选举算法,默认为FastLeaderElection算法

  1. protected Election createElectionAlgorithm(int electionAlgorithm){  
  2.         Election le=null;  
  3.                   
  4.         //TODO: use a factory rather than a switch  
  5.         switch (electionAlgorithm) {  
  6.         case 0:  
  7.             le = new LeaderElection(this);  
  8.             break;  
  9.         case 1:  
  10.             le = new AuthFastLeaderElection(this);  
  11.             break;  
  12.         case 2:  
  13.             le = new AuthFastLeaderElection(this, true);  
  14.             break;  
  15.         case 3:  
  16.         //leader选举IO负责类  
  17.             qcm = new QuorumCnxManager(this);  
  18.             QuorumCnxManager.Listener listener = qcm.listener;  
  19.                 //启动已绑定3888端口的选举线程,等待集群其他机器连接  
  20.         if(listener != null){  
  21.                 listener.start();  
  22.         //基于TCP的选举算法  
  23.                 le = new FastLeaderElection(this, qcm);  
  24.             } else {  
  25.                 LOG.error("Null listener when initializing cnx manager");  
  26.             }  
  27.             break;  
  28.         default:  
  29.             assert false;  
  30.         }  
  31.         return le;  
  32.     }  
复制代码




FastLeaderElection初始化
  1.    private void starter(QuorumPeer self, QuorumCnxManager manager) {  
  2.        this.self = self;  
  3.        proposedLeader = -1;  
  4.        proposedZxid = -1;  
  5. //业务层发送队列,业务对象ToSend  
  6.        sendqueue = new LinkedBlockingQueue<ToSend>();  
  7. //业务层接受队列,业务对象Notificataion  
  8.        recvqueue = new LinkedBlockingQueue<Notification>();  
  9. //  
  10.        this.messenger = new Messenger(manager);  
  11.    }  
  12. essenger(QuorumCnxManager manager) {  
  13.     //启动业务层发送线程,将消息发给IO负责类QuorumCnxManager  
  14.            this.ws = new WorkerSender(manager);  
  15.   
  16.            Thread t = new Thread(this.ws,  
  17.                    "WorkerSender[myid=" + self.getId() + "]");  
  18.            t.setDaemon(true);  
  19.            t.start();  
  20.     //启动业务层接受线程,从IO负责类QuorumCnxManager接受消息  
  21.            this.wr = new WorkerReceiver(manager);  
  22.   
  23.            t = new Thread(this.wr,  
  24.                    "WorkerReceiver[myid=" + self.getId() + "]");  
  25.            t.setDaemon(true);  
  26.            t.start();  
  27.        }  
复制代码



QuorumPeer线程启动

  1. run(){  
  2. .......  
  3. try {  
  4.             /*
  5.              * Main loop
  6.              */  
  7.             while (running) {  
  8.                 switch (getPeerState()) {  
  9.         //如果状态是LOOKING,则进入选举流程  
  10.                 case LOOKING:  
  11.                     LOG.info("LOOKING");  
  12.   
  13.                     ......  
  14.                         try {  
  15.                 //选举算法开始选举,主线程可能在这里耗比较长时间  
  16.                             setCurrentVote(makeLEStrategy().lookForLeader());  
  17.                         } catch (Exception e) {  
  18.                             LOG.warn("Unexpected exception", e);  
  19.                             setPeerState(ServerState.LOOKING);  
  20.                         }  
  21.                     }  
  22.                     break;  
  23.         //其他流程处理         
  24.                 case OBSERVING:  
  25.                     try {  
  26.                         LOG.info("OBSERVING");  
  27.                         setObserver(makeObserver(logFactory));  
  28.                         observer.observeLeader();  
  29.                     } catch (Exception e) {  
  30.                         LOG.warn("Unexpected exception",e );                          
  31.                     } finally {  
  32.                         observer.shutdown();  
  33.                         setObserver(null);  
  34.                         setPeerState(ServerState.LOOKING);  
  35.                     }  
  36.                     break;  
  37.                 case FOLLOWING:  
  38.                     try {  
  39.                         LOG.info("FOLLOWING");  
  40.                         setFollower(makeFollower(logFactory));  
  41.                         follower.followLeader();  
  42.                     } catch (Exception e) {  
  43.                         LOG.warn("Unexpected exception",e);  
  44.                     } finally {  
  45.                         follower.shutdown();  
  46.                         setFollower(null);  
  47.                         setPeerState(ServerState.LOOKING);  
  48.                     }  
  49.                     break;  
  50.                 case LEADING:  
  51.                     LOG.info("LEADING");  
  52.                     try {  
  53.                         setLeader(makeLeader(logFactory));  
  54.                         leader.lead();  
  55.                         setLeader(null);  
  56.                     } catch (Exception e) {  
  57.                         LOG.warn("Unexpected exception",e);  
  58.                     } finally {  
  59.                         if (leader != null) {  
  60.                             leader.shutdown("Forcing shutdown");  
  61.                             setLeader(null);  
  62.                         }  
  63.                         setPeerState(ServerState.LOOKING);  
  64.                     }  
  65.                     break;  
  66.                 }  
  67. .......  
  68. }  
复制代码


进入选举流程
  1. public Vote lookForLeader() throws InterruptedException {  
  2. ......  
  3.         try {  
  4.         //收到的投票  
  5.             HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();  
  6.          
  7.             HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();  
  8.   
  9.             int notTimeout = finalizeWait;  
  10.   
  11.             synchronized(this){  
  12.                 logicalclock++;  
  13.         //先投给自己  
  14.                 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());  
  15.             }  
  16.   
  17.             LOG.info("New election. My id =  " + self.getId() +  
  18.                     ", proposed zxid=0x" + Long.toHexString(proposedZxid));  
  19.         //发送投票,包括发给自己  
  20.             sendNotifications();  
  21.   
  22.             /*
  23.              * Loop in which we exchange notifications until we find a leader
  24.              */  
  25.         //主循环,直到选出leader  
  26.             while ((self.getPeerState() == ServerState.LOOKING) &&  
  27.                     (!stop)){  
  28.                 /*
  29.                  * Remove next notification from queue, times out after 2 times
  30.                  * the termination time
  31.                  */  
  32.         //从IO线程里拿到投票消息,自己的投票也在这里处理  
  33.                 Notification n = recvqueue.poll(notTimeout,  
  34.                         TimeUnit.MILLISECONDS);  
  35.   
  36.                 /*
  37.                  * Sends more notifications if haven't received enough.
  38.                  * Otherwise processes new notification.
  39.                  */  
  40.         //如果空闲  
  41.                 if(n == null){  
  42.             //消息发完了,继续发送,一直到选出leader为止  
  43.                     if(manager.haveDelivered()){  
  44.                         sendNotifications();  
  45.                     } else {  
  46.             //消息还在,可能其他server还没启动,尝试连接  
  47.                         manager.connectAll();  
  48.                     }  
  49.   
  50.                     /*
  51.                      * Exponential backoff
  52.                      */  
  53.             //延长超时时间  
  54.                     int tmpTimeOut = notTimeout*2;  
  55.                     notTimeout = (tmpTimeOut < maxNotificationInterval?  
  56.                             tmpTimeOut : maxNotificationInterval);  
  57.                     LOG.info("Notification time out: " + notTimeout);  
  58.                 }  
  59.         //收到了投票消息  
  60.                 else if(self.getVotingView().containsKey(n.sid)) {  
  61.                     /*
  62.                      * Only proceed if the vote comes from a replica in the
  63.                      * voting view.
  64.                      */  
  65.                     switch (n.state) {  
  66.             //LOOKING消息,则  
  67.                     case LOOKING:  
  68.             ......  
  69.             //检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id  
  70.                         } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,  
  71.                                 proposedLeader, proposedZxid, proposedEpoch)) {  
  72.                 //胜出了,就把自己的投票修改为对方的,然后广播消息  
  73.                             updateProposal(n.leader, n.zxid, n.peerEpoch);  
  74.                             sendNotifications();  
  75.                         }  
  76.   
  77.                         ......  
  78.             //添加到本机投票集合,用来做选举终结判断  
  79.                         recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));  
  80.               
  81.             //选举是否结束,默认算法是超过半数server同意  
  82.                         if (termPredicate(recvset,  
  83.                                 new Vote(proposedLeader, proposedZxid,  
  84.                                         logicalclock, proposedEpoch))) {  
  85.   
  86.                             ......  
  87.                 //修改状态,LEADING or FOLLOWING  
  88.                                 self.setPeerState((proposedLeader == self.getId()) ?  
  89.                                         ServerState.LEADING: learningState());  
  90.                 //返回最终的选票结果  
  91.                                 Vote endVote = new Vote(proposedLeader,  
  92.                                         proposedZxid, proposedEpoch);  
  93.                                 leaveInstance(endVote);  
  94.                                 return endVote;  
  95.                             }  
  96.                         }  
  97.                         break;  
  98.             //如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时  
  99.             //OBSERVING机器不参数选举  
  100.                     case OBSERVING:  
  101.                         LOG.debug("Notification from observer: " + n.sid);  
  102.                         break;  
  103.             //这2种需要参与选举  
  104.                     case FOLLOWING:  
  105.                     case LEADING:  
  106.                         /*
  107.                          * Consider all notifications from the same epoch
  108.                          * together.
  109.                          */  
  110.                         if(n.electionEpoch == logicalclock){  
  111.                 //同样需要加入到本机的投票集合  
  112.                             recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));  
  113.                 //投票是否结束,如果结束,再确认LEADER是否有效  
  114.                 //如果结束,修改自己的状态并返回投票结果  
  115.                             if(termPredicate(recvset, new Vote(n.leader,  
  116.                                             n.zxid, n.electionEpoch, n.peerEpoch, n.state))  
  117.                                             && checkLeader(outofelection, n.leader, n.electionEpoch)) {  
  118.                                 self.setPeerState((n.leader == self.getId()) ?  
  119.                                         ServerState.LEADING: learningState());  
  120.   
  121.                                 Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);  
  122.                                 leaveInstance(endVote);  
  123.                                 return endVote;  
  124.                             }  
  125.                         }  
  126.   
  127.                         /**
  128.                          * Before joining an established ensemble, verify that
  129.                          * a majority are following the same leader.
  130.                          */  
  131.                         outofelection.put(n.sid, new Vote(n.leader, n.zxid,  
  132.                                 n.electionEpoch, n.peerEpoch, n.state));  
  133.                         ......  
  134.                         break;  
  135.                     default:  
复制代码



选举消息发送
  1.     private void sendNotifications() {  
  2.     //循环发送  
  3.         for (QuorumServer server : self.getVotingView().values()) {  
  4.             long sid = server.id;  
  5.         //消息实体  
  6.             ToSend notmsg = new ToSend(ToSend.mType.notification,  
  7.                     proposedLeader,  
  8.                     proposedZxid,  
  9.                     logicalclock,  
  10.                     QuorumPeer.ServerState.LOOKING,  
  11.                     sid,  
  12.                     proposedEpoch);  
  13. ......  
  14.         //添加到业务的发送队列,该队列会被WorkerSender消费  
  15.             sendqueue.offer(notmsg);  
  16.         }  
  17.     }  
复制代码



WorkerSender消费

  1. public void run() {  
  2.     while (!stop) {  
  3.         try {  
  4.             ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);  
  5.             if(m == null) continue;  
  6.   
  7.             process(m);  
  8.         } catch (InterruptedException e) {  
  9.             break;  
  10.         }  
  11.     }  
  12.     LOG.info("WorkerSender is down");  
  13. }  
  14.   
  15. private void process(ToSend m) {  
  16.   
  17.     byte requestBytes[] = new byte[36];  
  18.     ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);  
  19.   
  20.     /*
  21.      * Building notification packet to send
  22.      */  
  23.   
  24.     requestBuffer.clear();  
  25.     requestBuffer.putInt(m.state.ordinal());  
  26.     requestBuffer.putLong(m.leader);  
  27.     requestBuffer.putLong(m.zxid);  
  28.     requestBuffer.putLong(m.electionEpoch);  
  29.     requestBuffer.putLong(m.peerEpoch);  
  30. CnxManager这个IO负责类发送消息  
  31.     manager.toSend(m.sid, requestBuffer);  
  32.   
  33. }  
复制代码




QuorumCnxManager具体发送


  1.    public void toSend(Long sid, ByteBuffer b) {  
  2.        /*
  3.         * If sending message to myself, then simply enqueue it (loopback).
  4.         */  
  5. //如果是自己,不走网络,直接添加到本地接受队列  
  6.        if (self.getId() == sid) {  
  7.             b.position(0);  
  8.             addToRecvQueue(new Message(b.duplicate(), sid));  
  9.            /*
  10.             * Otherwise send to the corresponding thread to send.
  11.             */  
  12. //否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程  
  13.        } else {  
  14.             /*
  15.              * Start a new connection if doesn't have one already.
  16.              */  
  17.             if (!queueSendMap.containsKey(sid)) {  
  18.                 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(  
  19.                         SEND_CAPACITY);  
  20.                 queueSendMap.put(sid, bq);  
  21.                 addToSendQueue(bq, b);  
  22.   
  23.             } else {  
  24.                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);  
  25.                 if(bq != null){  
  26.                     addToSendQueue(bq, b);  
  27.                 } else {  
  28.                     LOG.error("No queue for server " + sid);  
  29.                 }  
  30.             }  
  31.             connectOne(sid);  
  32.                  
  33.        }  
  34.    }  
复制代码



尝试连接过程
  1. synchronized void connectOne(long sid){  
  2.         if (senderWorkerMap.get(sid) == null){  
  3.         .......  
  4.         //对方的选举地址,3888端口  
  5.                 electionAddr = self.quorumPeers.get(sid).electionAddr;  
  6.             .......  
  7.         //同步IO连接  
  8.                 Socket sock = new Socket();  
  9.                 setSockOpts(sock);  
  10.                 sock.connect(self.getView().get(sid).electionAddr, cnxTO);  
  11.                 if (LOG.isDebugEnabled()) {  
  12.                     LOG.debug("Connected to server " + sid);  
  13.                 }  
  14.         //连上了,初始化IO线程  
  15.                 initiateConnection(sock, sid);  
  16.             ......  
  17.     }  
复制代码



由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。

这个时候被连接的server的Listener选举线程会收到新连接
Listener主循环,接受连接
  1. while (!shutdown) {  
  2.                         Socket client = ss.accept();  
  3.                         setSockOpts(client);  
  4.                         LOG.info("Received connection request "  
  5.                                 + client.getRemoteSocketAddress());  
  6.                         receiveConnection(client);  
  7.                         numRetries = 0;  
  8.                     }  
复制代码


新连接处理

  1. public boolean receiveConnection(Socket sock) {  
  2.        Long sid = null;  
  3.          
  4.        try {  
  5.            // Read server id  
  6.     //读server id  
  7.            DataInputStream din = new DataInputStream(sock.getInputStream());  
  8.            sid = din.readLong();  
  9.            ......  
  10.          
  11.        //If wins the challenge, then close the new connection.  
  12. //如果对方id比我小,则关闭连接,只允许大id的server连接小id的server  
  13.        if (sid < self.getId()) {  
  14.            /*
  15.             * This replica might still believe that the connection to sid is
  16.             * up, so we have to shut down the workers before trying to open a
  17.             * new connection.
  18.             */  
  19.            SendWorker sw = senderWorkerMap.get(sid);  
  20.            if (sw != null) {  
  21.                sw.finish();  
  22.            }  
  23.   
  24.            /*
  25.             * Now we start a new connection
  26.             */  
  27.            LOG.debug("Create new connection to server: " + sid);  
  28.            closeSocket(sock);  
  29.            connectOne(sid);  
  30.   
  31.            // Otherwise start worker threads to receive data.  
  32.        }   
  33. //如果对方id比我大,允许连接,并初始化单独的IO线程  
  34. else {  
  35.            SendWorker sw = new SendWorker(sock, sid);  
  36.            RecvWorker rw = new RecvWorker(sock, sid, sw);  
  37.            sw.setRecv(rw);  
  38.   
  39.            SendWorker vsw = senderWorkerMap.get(sid);  
  40.             
  41.            if(vsw != null)  
  42.                vsw.finish();  
  43.             
  44.            senderWorkerMap.put(sid, sw);  
  45.             
  46.            if (!queueSendMap.containsKey(sid)) {  
  47.                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(  
  48.                        SEND_CAPACITY));  
  49.            }  
  50.             
  51.            sw.start();  
  52.            rw.start();  
  53.             
  54.            return true;      
  55.        }  
  56.        return false;  
  57.    }  
复制代码


连上后,自己server的IO线程初始化

  1. public boolean initiateConnection(Socket sock, Long sid) {  
  2.         DataOutputStream dout = null;  
  3.         try {  
  4.             // Sending id and challenge  
  5.         //先发一个server id  
  6.             dout = new DataOutputStream(sock.getOutputStream());  
  7.             dout.writeLong(self.getId());  
  8.             dout.flush();  
  9.         }   
  10.         ......  
  11.         // If lost the challenge, then drop the new connection  
  12.     //如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费  
  13.         if (sid > self.getId()) {  
  14.             LOG.info("Have smaller server identifier, so dropping the " +  
  15.                      "connection: (" + sid + ", " + self.getId() + ")");  
  16.             closeSocket(sock);  
  17.             // Otherwise proceed with the connection  
  18.         }   
  19.     //如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程  
  20.     else {  
  21.             SendWorker sw = new SendWorker(sock, sid);  
  22.             RecvWorker rw = new RecvWorker(sock, sid, sw);  
  23.             sw.setRecv(rw);  
  24.   
  25.             SendWorker vsw = senderWorkerMap.get(sid);  
  26.               
  27.             if(vsw != null)  
  28.                 vsw.finish();  
  29.               
  30.             senderWorkerMap.put(sid, sw);  
  31.             if (!queueSendMap.containsKey(sid)) {  
  32.                 queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(  
  33.                         SEND_CAPACITY));  
  34.             }  
  35.               
  36.             sw.start();  
  37.             rw.start();  
  38.               
  39.             return true;      
  40.               
  41.         }  
  42.         return false;  
  43.     }  
复制代码


通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。

IO发送线程SendWorker启动,开始发送选举消息
  1. try {  
  2.                 while (running && !shutdown && sock != null) {  
  3.   
  4.                     ByteBuffer b = null;  
  5.                     try {  
  6.             //每个server一个发送队列  
  7.                         ArrayBlockingQueue<ByteBuffer> bq = queueSendMap  
  8.                                 .get(sid);  
  9.                         if (bq != null) {  
  10.                 //拿消息  
  11.                             b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);  
  12.                         } else {  
  13.                             LOG.error("No queue of incoming messages for " +  
  14.                                       "server " + sid);  
  15.                             break;  
  16.                         }  
  17.   
  18.                         if(b != null){  
  19.                 //发消息  
  20.                             lastMessageSent.put(sid, b);  
  21.                             send(b);  
  22.                         }  
  23.                     } catch (InterruptedException e) {  
  24.                         LOG.warn("Interrupted while waiting for message on queue",  
  25.                                 e);  
  26.                     }  
  27.                 }  
  28.             } catch (Exception e) {  
  29.                 LOG.warn("Exception when using channel: for id " + sid + " my id = " +   
  30.                         self.getId() + " error = " + e);  
  31.             }  
  32.             this.finish();  
  33. ......  
复制代码


这个时候,其他机器通过IO线程RecvWorker收到消息
  1. public void run() {  
  2.            threadCnt.incrementAndGet();  
  3.            try {  
  4.                while (running && !shutdown && sock != null) {  
  5.                    /**
  6.                     * Reads the first int to determine the length of the
  7.                     * message
  8.                     */  
  9.         //包的长度  
  10.                    int length = din.readInt();  
  11.                    if (length <= 0 || length > PACKETMAXSIZE) {  
  12.                        throw new IOException(  
  13.                                "Received packet with invalid packet: "  
  14.                                        + length);  
  15.                    }  
  16.                    /**
  17.                     * Allocates a new ByteBuffer to receive the message
  18.                     */  
  19.         //读到内存  
  20.                    byte[] msgArray = new byte[length];  
  21.                    din.readFully(msgArray, 0, length);  
  22.                    ByteBuffer message = ByteBuffer.wrap(msgArray);  
  23.         //添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息  
  24.                    addToRecvQueue(new Message(message.duplicate(), sid));  
  25.                }  
  26.          ......  
  27.        }  
复制代码



业务层的接受线程WorkerReceiver拿消息

  1. public void run() {  
  2.   
  3.                Message response;  
  4.                while (!stop) {  
  5.                    // Sleeps on receive  
  6.                    try{  
  7.         //从IO线程拿数据  
  8.                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);  
  9.                        if(response == null) continue;  
  10.   
  11.                        /*
  12.                         * If it is from an observer, respond right away.
  13.                         * Note that the following predicate assumes that
  14.                         * if a server is not a follower, then it must be
  15.                         * an observer. If we ever have any other type of
  16.                         * learner in the future, we'll have to change the
  17.                         * way we check for observers.
  18.                         */  
  19.         //如果是Observer,则返回当前选举结果  
  20.                        if(!self.getVotingView().containsKey(response.sid)){  
  21.                            Vote current = self.getCurrentVote();  
  22.                            ToSend notmsg = new ToSend(ToSend.mType.notification,  
  23.                                    current.getId(),  
  24.                                    current.getZxid(),  
  25.                                    logicalclock,  
  26.                                    self.getPeerState(),  
  27.                                    response.sid,  
  28.                                    current.getPeerEpoch());  
  29.   
  30.                            sendqueue.offer(notmsg);  
  31.                        }  
  32.         else {  
  33.                            // Receive new message  
  34.                            .......  
  35.                            // State of peer that sent this message  
  36.             //对方节点状态  
  37.                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;  
  38.                            switch (response.buffer.getInt()) {  
  39.                            case 0:  
  40.                                ackstate = QuorumPeer.ServerState.LOOKING;  
  41.                                break;  
  42.                            case 1:  
  43.                                ackstate = QuorumPeer.ServerState.FOLLOWING;  
  44.                                break;  
  45.                            case 2:  
  46.                                ackstate = QuorumPeer.ServerState.LEADING;  
  47.                                break;  
  48.                            case 3:  
  49.                                ackstate = QuorumPeer.ServerState.OBSERVING;  
  50.                                break;  
  51.                            }  
  52.   
  53.                            // Instantiate Notification and set its attributes  
  54.             //初始化Notification对象  
  55.                            Notification n = new Notification();  
  56.                            n.leader = response.buffer.getLong();  
  57.                            n.zxid = response.buffer.getLong();  
  58.                            n.electionEpoch = response.buffer.getLong();  
  59.                            n.state = ackstate;  
  60.                            n.sid = response.sid;  
  61.                            ......  
  62.   
  63.                            /*
  64.                             * If this server is looking, then send proposed leader
  65.                             */  
  66.             //如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息  
  67.                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){  
  68.                                recvqueue.offer(n);  
  69.   
  70.                                ......  
  71.                            }   
  72.             //如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用  
  73.             else {  
  74.                                /*
  75.                                 * If this server is not looking, but the one that sent the ack
  76.                                 * is looking, then send back what it believes to be the leader.
  77.                                 */  
  78.                                Vote current = self.getCurrentVote();  
  79.                                if(ackstate == QuorumPeer.ServerState.LOOKING){  
  80.                                    if(LOG.isDebugEnabled()){  
  81.                                        LOG.debug("Sending new notification. My id =  " +  
  82.                                                self.getId() + " recipient=" +  
  83.                                                response.sid + " zxid=0x" +  
  84.                                                Long.toHexString(current.getZxid()) +  
  85.                                                " leader=" + current.getId());  
  86.                                    }  
  87.                                    ToSend notmsg = new ToSend(  
  88.                                            ToSend.mType.notification,  
  89.                                            current.getId(),  
  90.                                            current.getZxid(),  
  91.                                            logicalclock,  
  92.                                            self.getPeerState(),  
  93.                                            response.sid,  
  94.                                            current.getPeerEpoch());  
  95.                                    sendqueue.offer(notmsg);  
  96.                                }  
  97.                            }  
  98.                      .......  
  99.            }  
复制代码




由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。



Leader选举小结
1.server启动时默认选举自己,并向整个集群广播
2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播
3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO
4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING
5.Obserer机器不参与选举


相关文章推荐:
Zookeeper源码分析之一Server启动

Zookeeper源码分析之二Session建立

Zookeeper源码分析之三Exists请求和处理

深入浅出Zookeeper之四Create请求和处理

Zookeeper源码分析之六 Leader/Follower初始化

Zookeeper源码分析之七分布式CREATE事务处理


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条