分享

跟着实例学习ZooKeeper的用法: 分布式锁

本帖最后由 坎蒂丝_Swan 于 2014-12-22 11:20 编辑


问题导读

1.ZooKeeper recipes wiki定义了可协商的撤销机制,如何撤销mutex?
2.可以在多个线程中用同一个InterProcessMutex?为什么?











分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。

可重入锁Shared Reentrant Lock

       首先我们先看一个全局可重入的锁。 Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。

它是由类InterProcessMutex来实现。
它的构造函数为:

  1. public InterProcessMutex(CuratorFramework client, String path)
复制代码

通过acquire获得锁,并提供超时机制:
  1. public void acquire()
  2. Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
  3. re-entrantly. Each call to acquire must be balanced by a call to release()
  4. public boolean acquire(long time,
  5.                        TimeUnit unit)
  6. Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can
  7. call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()
  8. Parameters:
  9. time - time to wait
  10. unit - time unit
  11. Returns:
  12. true if the mutex was acquired, false if not
复制代码


通过release()方法释放锁。
InterProcessMutex 实例可以重用。

Revoking

ZooKeeper recipes wiki定义了可协商的撤销机制。
为了撤销mutex, 调用下面的方法:

  1. public void makeRevocable(RevocationListener<T> listener)
  2. 将锁设为可撤销的. 当别的进程或线程想让你释放锁是Listener会被调用。
  3. Parameters:
  4. listener - the listener
复制代码

如果你请求撤销当前的锁, 调用Revoker方法。
  1. public static void attemptRevoke(CuratorFramework client,
  2.                                  String path)
  3.                          throws Exception
  4. Utility to mark a lock for revocation. Assuming that the lock has been registered
  5. with a RevocationListener, it will get called and the lock should be released. Note,
  6. however, that revocation is cooperative.
  7. Parameters:
  8. client - the client
  9. path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()
复制代码

错误处理

还是强烈推荐你使用ConnectionStateListener处理连接状态的改变。 当连接LOST时你不再拥有锁。

首先让我们创建一个模拟的共享资源, 这个资源期望只能单线程的访问,否则会有并发问题。

  1. package com.colobu.zkrecipe.lock;
  2. import java.util.concurrent.atomic.AtomicBoolean;
  3. public class FakeLimitedResource {
  4.         private final AtomicBoolean inUse = new AtomicBoolean(false);
  5.         public void use() throws InterruptedException {
  6.                 // 真实环境中我们会在这里访问/维护一个共享的资源
  7.                 //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
  8.                 //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
  9.                 if (!inUse.compareAndSet(false, true)) {
  10.                         throw new IllegalStateException("Needs to be used by one client at a time");
  11.                 }
  12.                 try {
  13.                         Thread.sleep((long) (3 * Math.random()));
  14.                 } finally {
  15.                         inUse.set(false);
  16.                 }
  17.         }
  18. }
复制代码

然后创建一个ExampleClientThatLocks类, 它负责请求锁, 使用资源,释放锁这样一个完整的访问过程。
  1. package com.colobu.zkrecipe.lock;
  2. import java.util.concurrent.TimeUnit;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. public class ExampleClientThatLocks {
  6.         private final InterProcessMutex lock;
  7.         private final FakeLimitedResource resource;
  8.         private final String clientName;
  9.         public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
  10.                 this.resource = resource;
  11.                 this.clientName = clientName;
  12.                 lock = new InterProcessMutex(client, lockPath);
  13.         }
  14.         public void doWork(long time, TimeUnit unit) throws Exception {
  15.                 if (!lock.acquire(time, unit)) {
  16.                         throw new IllegalStateException(clientName + " could not acquire the lock");
  17.                 }
  18.                 try {
  19.                         System.out.println(clientName + " has the lock");
  20.                         resource.use(); //access resource exclusively
  21.                 } finally {
  22.                         System.out.println(clientName + " releasing the lock");
  23.                         lock.release(); // always release the lock in a finally block
  24.                 }
  25.         }
  26. }
复制代码

最后创建主程序来测试。
  1. package com.colobu.zkrecipe.lock;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.TimeUnit;
  6. import org.apache.curator.framework.CuratorFramework;
  7. import org.apache.curator.framework.CuratorFrameworkFactory;
  8. import org.apache.curator.retry.ExponentialBackoffRetry;
  9. import org.apache.curator.test.TestingServer;
  10. import org.apache.curator.utils.CloseableUtils;
  11. public class InterProcessMutexExample {
  12.         private static final int QTY = 5;
  13.         private static final int REPETITIONS = QTY * 10;
  14.         private static final String PATH = "/examples/locks";
  15.         public static void main(String[] args) throws Exception {
  16.                 final FakeLimitedResource resource = new FakeLimitedResource();
  17.                 ExecutorService service = Executors.newFixedThreadPool(QTY);
  18.                 final TestingServer server = new TestingServer();
  19.                 try {
  20.                         for (int i = 0; i < QTY; ++i) {
  21.                                 final int index = i;
  22.                                 Callable<Void> task = new Callable<Void>() {
  23.                                         @Override
  24.                                         public Void call() throws Exception {
  25.                                                 CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
  26.                                                 try {
  27.                                                         client.start();
  28.                                                         final ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
  29.                                                         for (int j = 0; j < REPETITIONS; ++j) {
  30.                                                                 example.doWork(10, TimeUnit.SECONDS);
  31.                                                         }
  32.                                                 } catch (Throwable e) {
  33.                                                         e.printStackTrace();
  34.                                                 } finally {
  35.                                                         CloseableUtils.closeQuietly(client);
  36.                                                 }
  37.                                                 return null;
  38.                                         }
  39.                                 };
  40.                                 service.submit(task);
  41.                         }
  42.                         service.shutdown();
  43.                         service.awaitTermination(10, TimeUnit.MINUTES);
  44.                 } finally {
  45.                         CloseableUtils.closeQuietly(server);
  46.                 }
  47.         }
  48. }
复制代码

代码也很简单,生成10个client, 每个client重复执行10次 请求锁—访问资源—释放锁的过程。每个client都在独立的线程中。
结果可以看到,锁是随机的被每个实例排他性的使用。

既然是可重用的,你可以在一个线程中多次调用acquire,在线程拥有锁时它总是返回true。
你不应该在多个线程中用同一个InterProcessMutex, 你可以在每个线程中都生成一个InterProcessMutex实例,它们的path都一样,这样它们可以共享同一个锁。

不可重入锁Shared Lock

这个锁和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。
这个类是InterProcessSemaphoreMutex。
使用方法和上面的类类似。

首先我们将上面的例子修改一下,测试一下它的重入。
修改ExampleClientThatLocks.doWork,连续两次acquire:

  1. public void doWork(long time, TimeUnit unit) throws Exception {
  2.         if (!lock.acquire(time, unit)) {
  3.                 throw new IllegalStateException(clientName + " could not acquire the lock");
  4.         }
  5.         System.out.println(clientName + " has the lock");
  6.         if (!lock.acquire(time, unit)) {
  7.                 throw new IllegalStateException(clientName + " could not acquire the lock");
  8.         }
  9.         System.out.println(clientName + " has the lock again");
  10.         
  11.         try {                        
  12.                 resource.use(); //access resource exclusively
  13.         } finally {
  14.                 System.out.println(clientName + " releasing the lock");
  15.                 lock.release(); // always release the lock in a finally block
  16.                 lock.release(); // always release the lock in a finally block
  17.         }
  18. }
复制代码

注意我们也需要调用release两次。这和JDK的ReentrantLock用法一致。如果少调用一次release,则此线程依然拥有锁。
上面的代码没有问题,我们可以多次调用acquire,后续的acquire也不会阻塞。
将上面的InterProcessMutex换成不可重入锁InterProcessSemaphoreMutex,如果再运行上面的代码,结果就会发现线程被阻塞再第二个acquire上。 也就是此锁不是可重入的。

可重入读写锁Shared Reentrant Read Write Lock
类似JDK的ReentrantReadWriteLock.
一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。
此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。
这也意味着写锁可以降级成读锁, 比如请求写锁 —->读锁 ——>释放写锁。 从读锁升级成写锁是不成的。

主要由两个类实现:
  • InterProcessReadWriteLock
  • InterProcessLock

使用时首先创建一个InterProcessReadWriteLock实例,然后再根据你的需求得到读锁或者写锁, 读写锁的类型是InterProcessLock。

  1. public InterProcessLock readLock()
  2. public InterProcessLock writeLock()
复制代码

例子和上面的类似。
  1. package com.colobu.zkrecipe.lock;
  2. import java.util.concurrent.TimeUnit;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
  6. import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
  7. public class ExampleClientReadWriteLocks {
  8.         private final InterProcessReadWriteLock lock;
  9.         private final InterProcessMutex readLock;
  10.         private final InterProcessMutex writeLock;
  11.         private final FakeLimitedResource resource;
  12.         private final String clientName;
  13.         public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
  14.                 this.resource = resource;
  15.                 this.clientName = clientName;
  16.                 lock = new InterProcessReadWriteLock(client, lockPath);
  17.                 readLock = lock.readLock();
  18.                 writeLock = lock.writeLock();
  19.         }
  20.         public void doWork(long time, TimeUnit unit) throws Exception {
  21.                 if (!writeLock.acquire(time, unit)) {
  22.                         throw new IllegalStateException(clientName + " could not acquire the writeLock");
  23.                 }
  24.                 System.out.println(clientName + " has the writeLock");
  25.                
  26.                 if (!readLock.acquire(time, unit)) {
  27.                         throw new IllegalStateException(clientName + " could not acquire the readLock");
  28.                 }
  29.                 System.out.println(clientName + " has the readLock too");
  30.                
  31.                 try {                        
  32.                         resource.use(); //access resource exclusively
  33.                 } finally {
  34.                         System.out.println(clientName + " releasing the lock");
  35.                         readLock.release(); // always release the lock in a finally block
  36.                         writeLock.release(); // always release the lock in a finally block
  37.                 }
  38.         }
  39. }
复制代码

在这个类中我们首先请求了一个写锁, 然后降级成读锁。 执行业务处理,然后释放读写锁。

信号量Shared Semaphore

      一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Cubator中称之为租约(Lease)。

       有两种方式可以决定semaphore的最大租约数。第一种方式是有用户给定的path决定。第二种方式使用SharedCountReader类。

       如果不使用SharedCountReader, 没有内部代码检查进程是否假定有10个租约而进程B假定有20个租约。 所以所有的实例必须使用相同的numberOfLeases值.

       这次调用acquire会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。

租约还可以通过下面的方式返还:

  1. public void returnAll(Collection<Lease> leases)
  2. public void returnLease(Lease lease)
复制代码

注意一次你可以请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。
  1. public Lease acquire()
  2. public Collection<Lease> acquire(int qty)
  3. public Lease acquire(long time, TimeUnit unit)
  4. public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
复制代码


主要类有:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

下面是使用的例子:
  1. package com.colobu.zkrecipe.lock;
  2. import java.util.Collection;
  3. import java.util.concurrent.TimeUnit;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.CuratorFrameworkFactory;
  6. import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
  7. import org.apache.curator.framework.recipes.locks.Lease;
  8. import org.apache.curator.retry.ExponentialBackoffRetry;
  9. import org.apache.curator.test.TestingServer;
  10. import org.apache.curator.utils.CloseableUtils;
  11. public class InterProcessSemaphoreExample {
  12.         private static final int MAX_LEASE = 10;
  13.         private static final String PATH = "/examples/locks";
  14.         public static void main(String[] args) throws Exception {
  15.                 FakeLimitedResource resource = new FakeLimitedResource();
  16.                 try (TestingServer server = new TestingServer()) {
  17.                         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
  18.                         client.start();
  19.                         InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
  20.                         Collection<Lease> leases = semaphore.acquire(5);
  21.                         System.out.println("get " + leases.size() + " leases");
  22.                         Lease lease = semaphore.acquire();
  23.                         System.out.println("get another lease");
  24.                         resource.use();
  25.                         Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
  26.                         System.out.println("Should timeout and acquire return " + leases2);
  27.                         System.out.println("return one lease");
  28.                         semaphore.returnLease(lease);
  29.                         System.out.println("return another 5 leases");
  30.                         semaphore.returnAll(leases);
  31.                 }
  32.         }
  33. }
复制代码

首先我们先获得了5个租约, 最后我们把它还给了semaphore。

接着请求了一个租约,因为semaphore还有5个租约,所以请求可以满足,返回一个租约,还剩4个租约。

然后再请求一个租约,因为租约不够,阻塞到超时,还是没能满足,返回结果为null。

上面说讲的锁都是公平锁(fair)。 总ZooKeeper的角度看, 每个客户端都按照请求的顺序获得锁。 相当公平。

多锁对象 Multi Shared Lock

       Multi Shared Lock是一个锁的容器。 当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。
基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。

主要涉及两个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。
  1. public InterProcessMultiLock(List<InterProcessLock> locks)
  2. public InterProcessMultiLock(CuratorFramework client, List<String> paths)
复制代码

用法和Shared Lock相同。

例子如下:
  1. package com.colobu.zkrecipe.lock;
  2. import java.util.Arrays;
  3. import java.util.concurrent.TimeUnit;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.curator.framework.CuratorFrameworkFactory;
  6. import org.apache.curator.framework.recipes.locks.InterProcessLock;
  7. import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
  8. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  9. import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
  10. import org.apache.curator.retry.ExponentialBackoffRetry;
  11. import org.apache.curator.test.TestingServer;
  12. public class InterProcessMultiLockExample {
  13.         private static final String PATH1 = "/examples/locks1";
  14.         private static final String PATH2 = "/examples/locks2";
  15.         
  16.         public static void main(String[] args) throws Exception {
  17.                 FakeLimitedResource resource = new FakeLimitedResource();
  18.                 try (TestingServer server = new TestingServer()) {
  19.                         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
  20.                         client.start();
  21.                         
  22.                         InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
  23.                         InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);
  24.                         
  25.                         InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
  26.                         if (!lock.acquire(10, TimeUnit.SECONDS)) {
  27.                                 throw new IllegalStateException("could not acquire the lock");
  28.                         }
  29.                         System.out.println("has the lock");
  30.                         
  31.                         System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
  32.                         System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());
  33.                         
  34.                         try {                        
  35.                                 resource.use(); //access resource exclusively
  36.                         } finally {
  37.                                 System.out.println("releasing the lock");
  38.                                 lock.release(); // always release the lock in a finally block
  39.                         }
  40.                         System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
  41.                         System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());
  42.                 }
  43.         }
  44. }
复制代码

新建一个InterProcessMultiLock, 包含一个重入锁和一个非重入锁。

调用acquire后可以看到线程同时拥有了这两个锁。

调用release看到这两个锁都被释放了。

再重申以便, 强烈推荐使用ConnectionStateListener监控连接的状态。










欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(4)人评论

跳转到指定楼层
QQo0oBIQ 发表于 2014-12-22 09:37:43
支持分享,支持about
回复

使用道具 举报

zzuyao 发表于 2014-12-22 11:06:55
回复

使用道具 举报

355815741 发表于 2014-12-23 08:59:47
好东西,学习学习,谢谢分享~
回复

使用道具 举报

zyw8136 发表于 2014-12-25 18:30:30
支持,强烈支持呀。。。。。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条