发布/订阅点击这里查看发布/订阅讲解
- pom依赖
1 | <properties> |
- 获取ZK连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211package org.zk.publish;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* @auth zhengzhi.ren
* @date 2015/7/24.
*/
public class ZKClient {
private String rootNode;
private String domainNode;
private String confNode;
private String keyNode;
private List<String> nodes = new ArrayList<String>(4);
private Map<String, String> data = new ConcurrentHashMap<String, String>(4);
private String keyData;
protected static ZooKeeper zkConn = null;
private String host = "127.0.0.1:2181";
private int timeout = 500000;
private ExecutorService threadPool = Executors.newFixedThreadPool(1);
public ZKClient() {
connect();
}
public ZKClient(String rootNode, String domainNode, String confNode, String keyNode, String keyData) {
this.rootNode = rootNode;
this.domainNode = domainNode;
this.confNode = confNode;
this.keyNode = keyNode;
this.keyData = keyData;
connect();
}
/**
* initialize the nodes list
*/
public void initialize (){
nodes.add(rootNode);
nodes.add(domainNode);
nodes.add(confNode);
nodes.add(keyNode);
data.put(keyNode, keyData);
}
private FutureTask<ZooKeeper> task = new FutureTask<ZooKeeper>(new Callable<ZooKeeper>() {
@Override
public ZooKeeper call() throws Exception {
ZooKeeper zk = null;
try {
CountDownLatch semaphore = new CountDownLatch(1);
zk = new ZooKeeper(host, timeout, new ConnectWatcher(semaphore));
semaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return zk;
}
});
/**
* get zk connection
*/
public void connect() {
threadPool.submit(task);
}
/**
* Set data to node depend on the latest version
* @param path
* @param data
* @throws KeeperException
* @throws InterruptedException
*/
public void setData(String path, String data) throws KeeperException, InterruptedException, ExecutionException {
zkConn = task.get();
Stat stat = new Stat();
zkConn.getData(path, null, stat);
zkConn.setData(path, data != null ? data.getBytes() : null, stat.getVersion());
}
/**
* a simple load balance implement
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getData(String path) throws KeeperException, InterruptedException, ExecutionException {
String value = null;
try {
zkConn = task.get();
Stat stat = new Stat();
byte[] data = zkConn.getData(path, null, stat);
value = new String(data);
} finally {
close();
}
return value;
}
/**
* Connect success callback
*/
class ConnectWatcher implements Watcher {
private final CountDownLatch semaphore;
private ConnectWatcher(CountDownLatch semaphore) {
this.semaphore = semaphore;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
if (Event.EventType.None == watchedEvent.getType() && watchedEvent.getPath() == null) {
System.out.println("ZK connect success!");
semaphore.countDown();
}
}
}
}
/**
* @throws InterruptedException
* @throws KeeperException
*/
public void createRecursive() throws InterruptedException, KeeperException, ExecutionException {
for (String node : nodes) {
CountDownLatch semaphore = new CountDownLatch(1);
create(node, data.get(node), new CreateCallback(node), "", semaphore);
semaphore.await();
}
}
/**
* Create a asynchronized node
* @param path
* @param data
* @param cb
* @param ctx
* @param semaphore
* @return
* @throws KeeperException
* @throws InterruptedException
* @throws ExecutionException
*/
public boolean create(String path, String data, AsyncCallback.StringCallback cb, Object ctx, CountDownLatch semaphore) throws KeeperException, InterruptedException, ExecutionException {
zkConn = task.get();
Stat rootStat = zkConn.exists(path, false);
boolean result = false;
if (rootStat == null) {
zkConn.create(path, data != null ? data.getBytes() : null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx);
result = true;
} else {
setData(path, data);
}
semaphore.countDown();
return result;
}
class CreateCallback implements AsyncCallback.StringCallback {
private final String data;
public CreateCallback(String data) {
this.data = data;
}
@Override
public void processResult(int i, String path, Object o, String s1) {
System.out.println("Create conf node callback by WatcherManager");
if (path == null || "".equals(path)) {
System.out.println("Create " + path + " failure!");
} else {
System.out.println("Create " + path + " success!");
}
}
}
private void close(){
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public synchronized void start() {
try {
zkConn.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
发布配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48package org.zk.publish;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
/**
* @auth zhengzhi.ren
* @date 2015/7/24.
*/
public class Publisher {
// root node's name
private String root = "/configuration";
// application domain
private String domain = root + "/order.xx.com";
// configuration node's name
private String configNode = domain + "/resource";
private String keyNode = configNode + "/timeout";
private String keyData = "50000";
/**
* Publish configuration to ZK server
*/
@Test
public void publish() throws KeeperException, InterruptedException, ExecutionException {
ZKClient client = new ZKClient(root, domain, configNode, keyNode, keyData);
client.initialize();
client.createRecursive();
}
/**
* publish new data to zk node
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void setData() throws KeeperException, InterruptedException, ExecutionException {
ZKClient client = new ZKClient();
client.setData(keyNode, "300");
}
}读取配置
1 | package org.zk.subscribe; |