发布/订阅点击这里查看发布/订阅讲解
- pom依赖
| 1 | <properties> | 
- 获取ZK连接
1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211package org.zk.publish; 
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 /**
 * @auth zhengzhi.ren
 * @date 2015/7/24.
 */
 public class ZKClient {
 private String rootNode;
 private String domainNode;
 private String confNode;
 private String keyNode;
 private List<String> nodes = new ArrayList<String>(4);
 private Map<String, String> data = new ConcurrentHashMap<String, String>(4);
 private String keyData;
 protected static ZooKeeper zkConn = null;
 private String host = "127.0.0.1:2181";
 private int timeout = 500000;
 private ExecutorService threadPool = Executors.newFixedThreadPool(1);
 public ZKClient() {
 connect();
 }
 public ZKClient(String rootNode, String domainNode, String confNode, String keyNode, String keyData) {
 this.rootNode = rootNode;
 this.domainNode = domainNode;
 this.confNode = confNode;
 this.keyNode = keyNode;
 this.keyData = keyData;
 connect();
 }
 /**
 * initialize the nodes list
 */
 public void initialize (){
 nodes.add(rootNode);
 nodes.add(domainNode);
 nodes.add(confNode);
 nodes.add(keyNode);
 data.put(keyNode, keyData);
 }
 private FutureTask<ZooKeeper> task = new FutureTask<ZooKeeper>(new Callable<ZooKeeper>() {
 @Override
 public ZooKeeper call() throws Exception {
 ZooKeeper zk = null;
 try {
 CountDownLatch semaphore = new CountDownLatch(1);
 zk = new ZooKeeper(host, timeout, new ConnectWatcher(semaphore));
 semaphore.await();
 } catch (IOException e) {
 e.printStackTrace();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 return zk;
 }
 });
 /**
 * get zk connection
 */
 public void connect() {
 threadPool.submit(task);
 }
 /**
 * Set data to node depend on the latest version
 * @param path
 * @param data
 * @throws KeeperException
 * @throws InterruptedException
 */
 public void setData(String path, String data) throws KeeperException, InterruptedException, ExecutionException {
 zkConn = task.get();
 Stat stat = new Stat();
 zkConn.getData(path, null, stat);
 zkConn.setData(path, data != null ? data.getBytes() : null, stat.getVersion());
 }
 /**
 * a simple load balance implement
 * @param path
 * @return
 * @throws KeeperException
 * @throws InterruptedException
 */
 public String getData(String path) throws KeeperException, InterruptedException, ExecutionException {
 String value = null;
 try {
 zkConn = task.get();
 Stat stat = new Stat();
 byte[] data = zkConn.getData(path, null, stat);
 value = new String(data);
 } finally {
 close();
 }
 return value;
 }
 /**
 * Connect success callback
 */
 class ConnectWatcher implements Watcher {
 private final CountDownLatch semaphore;
 private ConnectWatcher(CountDownLatch semaphore) {
 this.semaphore = semaphore;
 }
 @Override
 public void process(WatchedEvent watchedEvent) {
 if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
 if (Event.EventType.None == watchedEvent.getType() && watchedEvent.getPath() == null) {
 System.out.println("ZK connect success!");
 semaphore.countDown();
 }
 }
 }
 }
 /**
 * @throws InterruptedException
 * @throws KeeperException
 */
 public void createRecursive() throws InterruptedException, KeeperException, ExecutionException {
 for (String node : nodes) {
 CountDownLatch semaphore = new CountDownLatch(1);
 create(node, data.get(node), new CreateCallback(node), "", semaphore);
 semaphore.await();
 }
 }
 /**
 * Create a asynchronized node
 * @param path
 * @param data
 * @param cb
 * @param ctx
 * @param semaphore
 * @return
 * @throws KeeperException
 * @throws InterruptedException
 * @throws ExecutionException
 */
 public boolean create(String path, String data, AsyncCallback.StringCallback cb, Object ctx, CountDownLatch semaphore) throws KeeperException, InterruptedException, ExecutionException {
 zkConn = task.get();
 Stat rootStat = zkConn.exists(path, false);
 boolean result = false;
 if (rootStat == null) {
 zkConn.create(path, data != null ? data.getBytes() : null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx);
 result = true;
 } else {
 setData(path, data);
 }
 semaphore.countDown();
 return result;
 }
 class CreateCallback implements AsyncCallback.StringCallback {
 private final String data;
 public CreateCallback(String data) {
 this.data = data;
 }
 @Override
 public void processResult(int i, String path, Object o, String s1) {
 System.out.println("Create conf node callback by WatcherManager");
 if (path == null || "".equals(path)) {
 System.out.println("Create " + path + " failure!");
 } else {
 System.out.println("Create " + path + " success!");
 }
 }
 }
 private void close(){
 Runtime.getRuntime().addShutdownHook(new Thread(){
 @Override
 public synchronized void start() {
 try {
 zkConn.close();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 });
 }
 }
- 发布配置 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48- package org.zk.publish; 
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 /**
 * @auth zhengzhi.ren
 * @date 2015/7/24.
 */
 public class Publisher {
 // root node's name
 private String root = "/configuration";
 // application domain
 private String domain = root + "/order.xx.com";
 // configuration node's name
 private String configNode = domain + "/resource";
 private String keyNode = configNode + "/timeout";
 private String keyData = "50000";
 /**
 * Publish configuration to ZK server
 */
 @Test
 public void publish() throws KeeperException, InterruptedException, ExecutionException {
 ZKClient client = new ZKClient(root, domain, configNode, keyNode, keyData);
 client.initialize();
 client.createRecursive();
 }
 /**
 * publish new data to zk node
 * @throws KeeperException
 * @throws InterruptedException
 */
 @Test
 public void setData() throws KeeperException, InterruptedException, ExecutionException {
 ZKClient client = new ZKClient();
 client.setData(keyNode, "300");
 }
 }
- 读取配置 
| 1 | package org.zk.subscribe; |