原创技术文章,转载请注明:转自http://newliferen.github.io/
目前Java运行环境中多数都是集群环境,多台主机并行计算,导致在代码中很难进行并发控制,有开发人员自行实现分布式锁,开发工作量大,代码稳定性难以保证,ZooKeeper框架的出现,让开发人员可以使用zk中成熟的分布式锁服务,减轻了开发人员对分布式任务并发控制负担,可以专注业务逻辑实现。
zk中分布式锁的思想是,创建一个锁节点/locknode/guid-lock,集群中每台服务器都在该节点下写入有序的子节点,节点列表中第一个节点对应的服务器被认为是拿到分布式锁,可以进行计算任务,计算任务结束后,删除该服务器对应的节点,下一个服务器写入的节点排到了第一位,可以进行计算任务,如此往复,每台机器写入的节点排在列表中第一位的时候,就被认为是拿到分布式锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| package org.zk.distributed.lock;
import org.apache.zookeeper.*; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch;
* 分布式锁 * @author renzhengzhi */ public class ZkDistributedLock {
private ZooKeeper zk; @Before public void connect(){ CountDownLatch connectorSemaphore = new CountDownLatch(1); String url = "127.0.0.1:2181"; int timeout = 60000; try { zk = new ZooKeeper(url, timeout, new ConnectWatcher(connectorSemaphore)); System.out.println(zk.getState()); connectorSemaphore.await(); System.out.println(zk.getState()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
* 获取分布式锁 */ @Test public void getDistributedLock(){ try { String path = zk.create("/_locknode_/guid-lock", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> children = zk.getChildren("/_locknode_", true); if (children != null && children.size() > 0){ int size = children.size(); String prevOne = null; for (int i=0;i<size;i++){ if (i == 0 && path.lastIndexOf(children.get(i)) > 0){ System.out.println("当前线程获取分布式锁。。。"); return; } if (path.lastIndexOf(children.get(i)) > 0){ prevOne = children.get(i-1); break; } } System.out.println("prevOne:" + prevOne); zk.exists("/_locknode_/" + prevOne, new LockWatcher("/_locknode_")); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } private class LockWatcher implements Watcher { private final String path; public LockWatcher(String path){ this.path = path; } public void process(WatchedEvent event) { try { Stat stat = zk.exists(path, false); System.out.println("当前线程获取到分布式锁" + stat); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } class ConnectWatcher implements Watcher{ private final CountDownLatch connectorSemaphore; public ConnectWatcher(CountDownLatch connectorSemaphore){ this.connectorSemaphore = connectorSemaphore; } @Override public void process(WatchedEvent e) { if (e.getState() == KeeperState.SyncConnected){ if (EventType.None == e.getType() && e.getPath() == null){ connectorSemaphore.countDown(); System.out.println("链接状态:" + e.getState()); } } } } }
|
在使用zk实现分布式锁服务时,需要注意一点,如果服务器监听锁节点下所有子节点,那么zk集群在负责发起watcher通知时,势必给zk集群服务带来压力,而客户端关注了与自己无关的节点变化情况,也浪费了客户端服务器资源,这种情况叫做羊群效应。羊群效应的解决方案是,每个客户端服务器只关注比自己小的那个节点的变化情况即可,如果比自己小的节点被删除,那么就说明当前服务器拿到分布式锁。