分享

zookeeper适用场景:配置文件同步

问题导读:
1.本文三个角色之间是什么关系?
2.三个角色的作用是什么?
3.如何代码实现这三个角色的作用?





zookeeper适用场景:zookeeper解决了哪些问题有关于分布式集群配置文件同步问题的描述,本文介绍如何把zk应用到配置文件分发的场景。
假设有三个角色
  • trigger:发布最新的配置文件数据,发送指令和数据给zk_agent,实现是下面的trigger.py
  • zk_agent:接收来自trigger.py触发的指令和数据,并且把数据更新到zk service上,从而触发zk_app来获取最新的配置数据,实现是下面的zk_agent.py
  • zk_app:部署在每台worker上的注册监听zk中配置文件所在znode的变化,从而获取最新的配置文件,应用到worker中,实现是下面的zk_app.py

    zk_conf.png

配置文件同步到:zk_agent实现逻辑:
* 初始化连接到zk service,首先竞选出master
* master create一个配置文件管理的PERSISTENT类型的znode,比如是/Applications/NginxConf,
* 启动一个线程,专门接收trigger发送的指令,一收到指令,就create一个"conf-"开头的SEQUENCE|PERSISTENT类型的节点,指定znode数据内容是从trigger收到的数据
* 第一次刚更新会创建节点/Applications/NginxConf/conf-0000000000,以后每次更新新的配置文件编号就会增大。
vim zk_agent.py

  1. #!/usr/bin/env python2.7
  2. # -*- coding: UTF-8 -*-
  3. # author : firefoxbug
  4. # E-Mail : wanghuafire@gmail.com
  5. # Blog   : www.firefoxbug.net
  6. import logging
  7. from os.path import basename, join
  8. from zkclient import ZKClient, zookeeper, watchmethod
  9. import os
  10. import sys
  11. import threading
  12. import signal
  13. import time
  14. logging.basicConfig(
  15.     level = logging.DEBUG,
  16.     format = "[%(asctime)s] %(levelname)-8s %(message)s"
  17. )
  18. log = logging
  19. class GJZookeeper(object):
  20.     ZK_HOST = "localhost:2181"
  21.     ROOT = "/Roles"
  22.     WORKERS_PATH = join(ROOT, "workers")
  23.     MASTERS_NUM = 1
  24.     TIMEOUT = 10000
  25.     def __init__(self, verbose = True):
  26.         self.VERBOSE = verbose
  27.         self.masters = []
  28.         self.is_master = False
  29.         self.path = None
  30.         self.APP_ROOT = "/Applications"
  31.         self.APP_CONF = join(self.APP_ROOT,"NginxConf")
  32.         self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
  33.         self.say("login zookeeper successfully!")
  34.         # init
  35.         self.create_roles_znode()
  36.         # register
  37.         self.register()
  38.     def create_roles_znode(self):
  39.         """
  40.         create the zookeeper node if not exist
  41.         |-Roles
  42.              |-workers
  43.         """
  44.         nodes = (self.ROOT, self.WORKERS_PATH)
  45.         for node in nodes:
  46.             if not self.zk.exists(node):
  47.                 try:
  48.                     self.zk.create(node, "")
  49.                 except:
  50.                     pass
  51.     @property
  52.     def is_slave(self):
  53.         return not self.is_master
  54.     def register(self):
  55.         """
  56.         register a node for this worker,znode type : EPHEMERAL | SEQUENCE
  57.         |-Roles
  58.              |-workers
  59.                      |-worker000000000x         ==>>master
  60.                      |-worker000000000x+1       ==>>worker
  61.                      ....
  62.         """
  63.         self.path = self.zk.create(self.WORKERS_PATH + "/worker", "1", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
  64.         self.path = basename(self.path)
  65.         self.say("I'm %s" % self.path)
  66.         # check who is the master
  67.         self.get_master()
  68.     def get_master(self):
  69.         """
  70.         get children, and check who is the smallest child
  71.         """
  72.         @watchmethod
  73.         def watcher(event):
  74.             self.say("child changed, try to get master again.")
  75.             self.get_master()
  76.         children = self.zk.get_children(self.WORKERS_PATH, watcher)
  77.         children.sort()
  78.         self.say("%s's children: %s" % (self.WORKERS_PATH, children))
  79.         # check if I'm master
  80.         self.masters = children[:self.MASTERS_NUM]
  81.         if self.path in self.masters:
  82.             self.is_master = True
  83.             self.say("I've become master!")
  84.             self.create_app_znode()
  85.         else:
  86.             self.say("%s is masters, I'm slave" % self.masters)
  87.     def create_app_znode(self):
  88.         """
  89.         create the zookeeper node if not exist
  90.         |-Applications
  91.                     |-NginxConf
  92.         """
  93.         nodes = (self.APP_ROOT, self.APP_CONF)
  94.         for node in nodes:
  95.             if not self.zk.exists(node):
  96.                 try:
  97.                     self.say("Create znode [%s] ..."%(node))
  98.                     self.zk.create(node, "")
  99.                 except:
  100.                     pass
  101.     def create_conf_znode(self,data):
  102.         """
  103.         create the zookeeper node's children if not exist,contents is conf data
  104.         |-Applications
  105.                     |-NginxConf
  106.                             |-item-000000000x => data
  107.         """
  108.         self.child_node = join(self.APP_CONF,"conf-")
  109.         path = self.zk.create(self.child_node,data, flags=zookeeper.SEQUENCE|zookeeper.PERSISTENT)
  110.         self.say("create znode %s"%path)
  111.     def say(self, msg):
  112.         """
  113.         print messages to screen
  114.         """
  115.         if self.VERBOSE:
  116.             if self.path:
  117.                 log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
  118.             else:
  119.                 log.info(msg)
  120.         
  121. class Watcher:
  122.     def __init__(self):
  123.         """ Creates a child thread, which returns.  The parent
  124.             thread waits for a KeyboardInterrupt and then kills
  125.             the child thread.
  126.         """
  127.         self.child = os.fork()
  128.         if self.child == 0:
  129.             return
  130.         else:
  131.             self.watch()
  132.     def watch(self):
  133.         try:
  134.             os.wait()
  135.         except KeyboardInterrupt:
  136.             print ' exit...'
  137.             self.kill()
  138.         sys.exit()
  139.     def kill(self):
  140.         try:
  141.             os.kill(self.child, signal.SIGKILL)
  142.         except OSError:
  143.             pass
  144. def start_zk_worker():
  145.     """
  146.         连接到zookeeper执行初始化
  147.     """
  148.     gj_zookeeper = GJZookeeper()
  149.     th1 = threading.Thread(target = start_agent_worker, name = "thread_1", args = (gj_zookeeper,))
  150.     th1.start()
  151.     th1.join()
  152. def start_agent_worker(gj_zookeeper):
  153.     """
  154.         监听配置文件变更信息,解析指令
  155.     """
  156.     import socket
  157.     address = ('', 8877)
  158.     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s = socket.socket()
  159.     s.bind(address)
  160.     s.listen(5)
  161.     print "listen on port 8877 ..."
  162.     while True:
  163.         ss, addr = s.accept()
  164.         print "receive connetcion from " ,addr
  165.         content = ""
  166.         while True:
  167.             try :
  168.                 data = ss.recv(512)
  169.                 if not data:
  170.                     print "close connetcion " ,addr
  171.                     ss.close()
  172.                     break
  173.                 content = content + data
  174.                 print "receive message from %s : %s"%(addr[0],data)
  175.             except Exception,e:
  176.                 print "receive error from %s : %s"%(addr[0],str(e))
  177.                 ss.close()
  178.                 break   
  179.         parse_trigger_package(content,gj_zookeeper)
  180.     s.close()
  181. def parse_trigger_package(data,gj_zookeeper):
  182.     try:
  183.         cmd = data.split('|')[0]
  184.         content = data.split('|')[1]
  185.     except Exception, e:
  186.         print "ERROR :",str(e)
  187.         return
  188.     if cmd == "ADD":
  189.         gj_zookeeper.create_conf_znode(content)
  190.     else:
  191.         pass
  192. def main():
  193.     Watcher()
  194.     start_zk_worker()
  195. if __name__ == "__main__":
  196.     main()
  197.     import time
  198.     time.sleep(20000)
复制代码


配置文件接收应用:zk_appzk_app逻辑如下,它工作在每台worker上
  • 初始化连接到zk service
  • 获取所有/Applications/NginxConf的znode列表并且设置Watcher
  • 找到列表中最大的znode节点,记录它的编号,然后getData获取它的数据,这里就拿到了最新的配置信息
  • 每次又Watcher被触发,就获取列表中编号最大的节点,然后把上一次保存的编号和它比较,一旦又更新就重新获取数据,应用到worker,如果相同就不需要到zk上获取数据。
vim zk_app.py

  1. #!/usr/bin/env python2.7
  2. # -*- coding: UTF-8 -*-
  3. # author : firefoxbug
  4. # E-Mail : wanghuafire@gmail.com
  5. # Blog   : www.firefoxbug.net
  6. import logging
  7. from os.path import basename, join
  8. from zkclient import ZKClient, zookeeper, watchmethod
  9. import os
  10. import sys
  11. import threading
  12. import signal
  13. import time
  14. logging.basicConfig(
  15.     level = logging.DEBUG,
  16.     format = "[%(asctime)s] %(levelname)-8s %(message)s"
  17. )
  18. log = logging
  19. class GJZookeeper(object):
  20.     ZK_HOST = "localhost:2181"
  21.     ROOT = "/Roles"
  22.     WORKERS_PATH = join(ROOT, "workers")
  23.     MASTERS_NUM = 1
  24.     TIMEOUT = 10000
  25.     def __init__(self, verbose = True):
  26.         self.VERBOSE = verbose
  27.         self.masters = []
  28.         self.is_master = False
  29.         self.path = None
  30.         self.APP_ROOT = "/Applications"
  31.         self.APP_CONF = join(self.APP_ROOT,"NginxConf")
  32.         self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
  33.         self.say("login zookeeper successfully!")
  34.         # init
  35.         self.create_roles_znode()
  36.         # register
  37.         self.register()
  38.     def create_roles_znode(self):
  39.         """
  40.         create the zookeeper node if not exist
  41.         |-Roles
  42.              |-workers
  43.         """
  44.         nodes = (self.ROOT, self.WORKERS_PATH)
  45.         for node in nodes:
  46.             if not self.zk.exists(node):
  47.                 try:
  48.                     self.zk.create(node, "")
  49.                 except:
  50.                     pass
  51.     @property
  52.     def is_slave(self):
  53.         return not self.is_master
  54.     def register(self):
  55.         """
  56.         register a node for this worker,znode type : EPHEMERAL | SEQUENCE
  57.         |-Roles
  58.              |-workers
  59.                      |-worker000000000x         ==>>master
  60.                      |-worker000000000x+1       ==>>worker
  61.                      ....
  62.         """
  63.         self.path = self.zk.create(self.WORKERS_PATH + "/worker", "1", flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
  64.         self.path = basename(self.path)
  65.         self.say("I'm %s" % self.path)
  66.         # check who is the master
  67.         self.get_master()
  68.     def get_master(self):
  69.         """
  70.         get children, and check who is the smallest child
  71.         """
  72.         @watchmethod
  73.         def watcher(event):
  74.             self.say("child changed, try to get master again.")
  75.             self.get_master()
  76.         children = self.zk.get_children(self.WORKERS_PATH, watcher)
  77.         children.sort()
  78.         self.say("%s's children: %s" % (self.WORKERS_PATH, children))
  79.         # check if I'm master
  80.         self.masters = children[:self.MASTERS_NUM]
  81.         if self.path in self.masters:
  82.             self.is_master = True
  83.             self.say("I've become master!")
  84.             self.create_app_znode()
  85.         else:
  86.             self.say("%s is masters, I'm slave" % self.masters)
  87.     def create_app_znode(self):
  88.         """
  89.         create the zookeeper node if not exist
  90.         |-Applications
  91.                     |-NginxConf
  92.         """
  93.         nodes = (self.APP_ROOT, self.APP_CONF)
  94.         for node in nodes:
  95.             if not self.zk.exists(node):
  96.                 try:
  97.                     self.say("Create znode [%s] ..."%(node))
  98.                     self.zk.create(node, "")
  99.                 except:
  100.                     pass
  101.     def create_conf_znode(self,data):
  102.         """
  103.         create the zookeeper node's children if not exist,contents is conf data
  104.         |-Applications
  105.                     |-NginxConf
  106.                             |-conf-000000000x => data
  107.         """
  108.         self.child_node = join(self.APP_CONF,"conf-")
  109.         path = self.zk.create(self.child_node,data, flags=zookeeper.SEQUENCE|zookeeper.PERSISTENT)
  110.         self.say("create znode %s"%path)
  111.     def say(self, msg):
  112.         """
  113.         print messages to screen
  114.         """
  115.         if self.VERBOSE:
  116.             if self.path:
  117.                 log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
  118.             else:
  119.                 log.info(msg)
  120.         
  121. class Watcher:
  122.     def __init__(self):
  123.         """ Creates a child thread, which returns.  The parent
  124.             thread waits for a KeyboardInterrupt and then kills
  125.             the child thread.
  126.         """
  127.         self.child = os.fork()
  128.         if self.child == 0:
  129.             return
  130.         else:
  131.             self.watch()
  132.     def watch(self):
  133.         try:
  134.             os.wait()
  135.         except KeyboardInterrupt:
  136.             print ' exit...'
  137.             self.kill()
  138.         sys.exit()
  139.     def kill(self):
  140.         try:
  141.             os.kill(self.child, signal.SIGKILL)
  142.         except OSError:
  143.             pass
  144. def start_zk_worker():
  145.     """
  146.         连接到zookeeper执行初始化
  147.     """
  148.     gj_zookeeper = GJZookeeper()
  149.     th1 = threading.Thread(target = start_agent_worker, name = "thread_1", args = (gj_zookeeper,))
  150.     th1.start()
  151.     th1.join()
  152. def start_agent_worker(gj_zookeeper):
  153.     """
  154.         监听配置文件变更信息,解析指令
  155.     """
  156.     import socket
  157.     address = ('', 8877)
  158.     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s = socket.socket()
  159.     s.bind(address)
  160.     s.listen(5)
  161.     print "listen on port 8877 ..."
  162.     while True:
  163.         ss, addr = s.accept()
  164.         print "receive connetcion from " ,addr
  165.         content = ""
  166.         while True:
  167.             try :
  168.                 data = ss.recv(512)
  169.                 if not data:
  170.                     print "close connetcion " ,addr
  171.                     ss.close()
  172.                     break
  173.                 content = content + data
  174.                 print "receive message from %s : %s"%(addr[0],data)
  175.             except Exception,e:
  176.                 print "receive error from %s : %s"%(addr[0],str(e))
  177.                 ss.close()
  178.                 break   
  179.         parse_trigger_package(content,gj_zookeeper)
  180.     s.close()
  181. def parse_trigger_package(data,gj_zookeeper):
  182.     try:
  183.         cmd = data.split('|')[0]
  184.         content = data.split('|')[1]
  185.     except Exception, e:
  186.         print "ERROR :",str(e)
  187.         return
  188.     if cmd == "ADD":
  189.         gj_zookeeper.create_conf_znode(content)
  190.     else:
  191.         pass
  192. def main():
  193.     Watcher()
  194.     start_zk_worker()
  195. if __name__ == "__main__":
  196.     main()
  197.     import time
  198.     time.sleep(20000)
复制代码


配置文件发送:trigger实现的逻辑很简单,就是模拟给zk_agent发送数据包,格式如下

  1. 指令|数据
  2. ADD|helloworld
复制代码

vim trigger.py

  1. #!/usr/bin/python
  2. import socket  
  3. import sys
  4. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
  5. sock.connect(('127.0.0.1', 8877))
  6. str = sys.argv[1] + '|' + sys.argv[2]
  7. sock.send(str)  
  8. sock.close()
复制代码


运行
  1. $ python zk_agent.py
  2. $ python zk_app.py
  3. $ python trigger.py
复制代码



相关文章:
zookeeper原理
zookeeper中Watcher和Notifications
zookeeper适用场景:如何竞选Master及代码实现
zookeeper适用场景:配置文件同步
zookeeper适用场景:分布式锁实现
zookeeper适用场景:zookeeper解决了哪些问题





本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
李新魁 发表于 2014-10-11 15:10:58
$ python zk_agent.py
$ python zk_app.py
$ python trigger.py
一定要运行3个代码脚本才能实现配置文件的同步么?他不是一定时间内就会同步你修改过的配置文件么?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条