分享

ZooKeeper实现分布式队列Queue

本帖最后由 howtodown 于 2014-2-6 22:00 编辑

前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及java编程命令操作

目录

  • 分布式队列
  • 设计思路
  • 程序实现
1. 分布式队列

队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。

本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。

2. 设计思路

创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是 x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。

产品流程图
1.png

应用实例
2.png

图标解释

  • app1,app2,app3,app4是4个独立的业务系统
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列,假设队列长度为3
  • /queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
  • /queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
  • /queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
  • /queue/start,当znode队列中满了,触发创建开始节点
  • 当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束

注:

  • 1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
  • 2). app1可以通过zk2提交,app2也可通过zk3提交
  • 3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
  • 4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!
3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,提交3个请求

  1.   public static void doOne() throws Exception {
  2.         String host1 = "192.168.1.201:2181";
  3.         ZooKeeper zk = connection(host1);
  4.         initQueue(zk);
  5.         joinQueue(zk, 1);
  6.         joinQueue(zk, 2);
  7.         joinQueue(zk, 3);
  8.         zk.close();
  9.     }
复制代码

创建一个与服务器的连接


  1. public static ZooKeeper connection(String host) throws IOException {
  2.         ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
  3.             // 监听/queue/start创建的事件
  4.             public void process(WatchedEvent event) {
  5.                 if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) {
  6.                     System.out.println("Queue has Completed.Finish testing!!!");
  7.                 }
  8.             }
  9.         });
  10.         return zk;
  11.     }
复制代码

出始化队列

  1.    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
  2.         System.out.println("WATCH => /queue/start");
  3.         zk.exists("/queue/start", true);
  4.         if (zk.exists("/queue", false) == null) {
  5.             System.out.println("create /queue task-queue");
  6.             zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  7.         } else {
  8.             System.out.println("/queue is exist!");
  9.         }
  10.     }
复制代码

增加队列节点

  1. public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
  2.         System.out.println("create /queue/x" + x + " x" + x);
  3.         zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  4.         isCompleted(zk);
  5.     }
复制代码

检查队列是否完整

  1. public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
  2.         int size = 3;
  3.         int length = zk.getChildren("/queue", true).size();
  4.         System.out.println("Queue Complete:" + length + "/" + size);
  5.         if (length >= size) {
  6.             System.out.println("create /queue/start start");
  7.             zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  8.         }
  9.     }
复制代码

启动函数main

  1. public static void main(String[] args) throws Exception {
  2.     doOne();
  3. }
复制代码
运行结果:


  1. WATCH => /queue/start
  2. /queue is exist!
  3. create /queue/x1 x1
  4. Queue Complete:1/3
  5. create /queue/x2 x2
  6. Queue Complete:2/3
  7. create /queue/x3 x3
  8. Queue Complete:3/3
  9. create /queue/start start
  10. Queue has Completed.Finish testing!!!
复制代码

完全符合我的们预期。接下来我们看分布式环境

2). 分布式模拟实验

模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3

  1.   public static void doAction(int client) throws Exception {
  2.         String host1 = "192.168.1.201:2181";
  3.         String host2 = "192.168.1.201:2182";
  4.         String host3 = "192.168.1.201:2183";
  5.         ZooKeeper zk = null;
  6.         switch (client) {
  7.         case 1:
  8.             zk = connection(host1);
  9.             initQueue(zk);
  10.             joinQueue(zk, 1);
  11.             break;
  12.         case 2:
  13.             zk = connection(host2);
  14.             initQueue(zk);
  15.             joinQueue(zk, 2);
  16.             break;
  17.         case 3:
  18.             zk = connection(host3);
  19.             initQueue(zk);
  20.             joinQueue(zk, 3);
  21.             break;
  22.         }
  23.     }
复制代码

注:

  • 1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
  • 2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
  • 3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3

3.png

执行app1–>zk1

  1. #日志输出
  2. WATCH => /queue/start
  3. /queue is exist!
  4. create /queue/x1 x1
  5. Queue Complete:1/3
  6. #zookeeper控制台
  7. [zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
  8. [x10000000011]
复制代码

执行app2–>zk2

  1. #日志输出
  2. WATCH => /queue/start
  3. /queue is exist!
  4. create /queue/x2 x2
  5. Queue Complete:2/3
  6. #zookeeper控制台
  7. [zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
  8. [x20000000012, x10000000011]
复制代码

执行app3–>zk3

  1. #日志输出
  2. WATCH => /queue/start
  3. /queue is exist!
  4. create /queue/x3 x3
  5. Queue Complete:3/3
  6. create /queue/start start
  7. Queue has Completed.Finish testing!!!
  8. #zookeeper控制台
  9. [zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
  10. [x30000000016, x10000000014, start, x20000000015]
复制代码

/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:

  1. package org.conan.zookeeper.demo;
  2. import java.io.IOException;
  3. import org.apache.zookeeper.CreateMode;
  4. import org.apache.zookeeper.KeeperException;
  5. import org.apache.zookeeper.WatchedEvent;
  6. import org.apache.zookeeper.Watcher;
  7. import org.apache.zookeeper.ZooKeeper;
  8. import org.apache.zookeeper.ZooDefs.Ids;
  9. public class QueueZooKeeper {
  10.     public static void main(String[] args) throws Exception {
  11.         if (args.length == 0) {
  12.             doOne();
  13.         } else {
  14.             doAction(Integer.parseInt(args[0]));
  15.         }
  16.     }
  17.     public static void doOne() throws Exception {
  18.         String host1 = "192.168.1.201:2181";
  19.         ZooKeeper zk = connection(host1);
  20.         initQueue(zk);
  21.         joinQueue(zk, 1);
  22.         joinQueue(zk, 2);
  23.         joinQueue(zk, 3);
  24.         zk.close();
  25.     }
  26.     public static void doAction(int client) throws Exception {
  27.         String host1 = "192.168.1.201:2181";
  28.         String host2 = "192.168.1.201:2182";
  29.         String host3 = "192.168.1.201:2183";
  30.         ZooKeeper zk = null;
  31.         switch (client) {
  32.         case 1:
  33.             zk = connection(host1);
  34.             initQueue(zk);
  35.             joinQueue(zk, 1);
  36.             break;
  37.         case 2:
  38.             zk = connection(host2);
  39.             initQueue(zk);
  40.             joinQueue(zk, 2);
  41.             break;
  42.         case 3:
  43.             zk = connection(host3);
  44.             initQueue(zk);
  45.             joinQueue(zk, 3);
  46.             break;
  47.         }
  48.     }
  49.     // 创建一个与服务器的连接
  50.     public static ZooKeeper connection(String host) throws IOException {
  51.         ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
  52.             // 监控所有被触发的事件
  53.             public void process(WatchedEvent event) {
  54.                 if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
  55.                     System.out.println("Queue has Completed.Finish testing!!!");
  56.                 }
  57.             }
  58.         });
  59.         return zk;
  60.     }
  61.     public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
  62.         System.out.println("WATCH => /queue/start");
  63.         zk.exists("/queue/start", true);
  64.         if (zk.exists("/queue", false) == null) {
  65.             System.out.println("create /queue task-queue");
  66.             zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  67.         } else {
  68.             System.out.println("/queue is exist!");
  69.         }
  70.     }
  71.     public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
  72.         System.out.println("create /queue/x" + x + " x" + x);
  73.         zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  74.         isCompleted(zk);
  75.     }
  76.     public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
  77.         int size = 3;
  78.         int length = zk.getChildren("/queue", true).size();
  79.         System.out.println("Queue Complete:" + length + "/" + size);
  80.         if (length >= size) {
  81.             System.out.println("create /queue/start start");
  82.             zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  83.         }
  84.     }
  85. }
复制代码


http://blog.fens.me/zookeeper-queue

来自群组: IT男人帮

已有(2)人评论

跳转到指定楼层
9528 发表于 2014-3-12 16:32:24
楼主功德无量
回复

使用道具 举报

anyhuayong 发表于 2014-9-25 09:11:59
楼主功德无量
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条