1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
| package org.zk.distributed.queue;
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
* 分布式队列 * @author renzhengzhi * 分布式队列的实现思想: * 在指定path下创建顺序子节点,序号从小到大递增,出队时获取序号小的节点 */ public class ZkDistributedFIFOQueue { private static final Logger LOGGER = LoggerFactory.getLogger(ZkDistributedFIFOQueue.class); private String queue = "/_fifo_queue_"; private ZooKeeper zk; public ReentrantLock lock = new ReentrantLock(); @Before public void connect(){ try { final CountDownLatch semaphore = new CountDownLatch(1); zk = new ZooKeeper("127.0.0.1:2181", 60000, new Watcher() { @Override public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()){ if (EventType.None == event.getType() && event.getPath() == null){ System.out.println("链接成功!"); semaphore.countDown(); } else if (EventType.NodeChildrenChanged == event.getType()){ try { System.out.println("节点发生变化,当前节点个数:" + zk.getChildren(queue, true).size()); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }); semaphore.await(); } catch (IOException e) { e.printStackTrace(); if (LOGGER.isDebugEnabled()){ LOGGER.debug("链接zookeeper服务异常!"); } } catch (InterruptedException e) { e.printStackTrace(); if (LOGGER.isDebugEnabled()){ LOGGER.debug("主线程中断!"); } } } private static final int coreNum = Runtime.getRuntime().availableProcessors(); private static final ExecutorService threadPool = Executors.newFixedThreadPool(coreNum << 1); * 使用zookeeper的有序临时节点模拟先进先出队列 */ @Test public void createQueue(){ try { *创建队列根节点 */ Stat rootStat = zk.exists(queue, true); if (rootStat == null){ zk.create(queue, "distributed fifo queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } *入队 */ for (int i=0;i<10;i++){ zk.create(queue + "/q" + i, ("我是第" + i +"个入队的人。").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); } *出队 */ List<String> children = zk.getChildren(queue, true); if (children != null){ int size = children.size(); CountDownLatch semaphore = new CountDownLatch(size); for (int i=0;i<size;i++){ Consumer consumer = new Consumer(semaphore); threadPool.submit(consumer); } semaphore.await(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } class Consumer implements Runnable{ CountDownLatch semaphore; public Consumer(CountDownLatch semaphore){ this.semaphore = semaphore; } @Override public void run() { try { lock.lock(); List<String> children = zk.getChildren(queue, true); if (children != null && children.size() > 0){ final String name = children.get(0); zk.delete(queue + "/" + name, 0); System.out.println("当前线程:" + Thread.currentThread().getId() + "消费数据:" + name); semaphore.countDown(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally{ lock.unlock(); } } } }
|