ZooKeeper应用之分分布式队列

原创技术文章,转载请注明:转自http://newliferen.github.io/

  分布式队列的实现和分布式锁的思想很类似,在指定的节点下,写入有序的子节点,出队时节点序号最小的先出队。代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

package org.zk.distributed.queue;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 分布式队列
* @author renzhengzhi
* 分布式队列的实现思想:
* 在指定path下创建顺序子节点,序号从小到大递增,出队时获取序号小的节点
*/

public class ZkDistributedFIFOQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(ZkDistributedFIFOQueue.class);

private String queue = "/_fifo_queue_";

private ZooKeeper zk;

public ReentrantLock lock = new ReentrantLock();


@Before
public void connect(){
try {
final CountDownLatch semaphore = new CountDownLatch(1);
zk = new ZooKeeper("127.0.0.1:2181", 60000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()){
if (EventType.None == event.getType() && event.getPath() == null){
System.out.println("链接成功!");
semaphore.countDown();
} else if (EventType.NodeChildrenChanged == event.getType()){
try {
System.out.println("节点发生变化,当前节点个数:" + zk.getChildren(queue, true).size());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});

semaphore.await();
} catch (IOException e) {
e.printStackTrace();
if (LOGGER.isDebugEnabled()){
LOGGER.debug("链接zookeeper服务异常!");
}
} catch (InterruptedException e) {
e.printStackTrace();
if (LOGGER.isDebugEnabled()){
LOGGER.debug("主线程中断!");
}
}
}

private static final int coreNum = Runtime.getRuntime().availableProcessors();

private static final ExecutorService threadPool = Executors.newFixedThreadPool(coreNum << 1);

/**
* 使用zookeeper的有序临时节点模拟先进先出队列
*/

@Test
public void createQueue(){
try {
/**
*创建队列根节点
*/

Stat rootStat = zk.exists(queue, true);
if (rootStat == null){
zk.create(queue, "distributed fifo queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
*入队
*/

for (int i=0;i<10;i++){
zk.create(queue + "/q" + i, ("我是第" + i +"个入队的人。").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}

/**
*出队
*/

List<String> children = zk.getChildren(queue, true);
if (children != null){
int size = children.size();
CountDownLatch semaphore = new CountDownLatch(size);
for (int i=0;i<size;i++){
Consumer consumer = new Consumer(semaphore);
threadPool.submit(consumer);
}
semaphore.await();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

class Consumer implements Runnable{
CountDownLatch semaphore;
public Consumer(CountDownLatch semaphore){
this.semaphore = semaphore;
}
@Override
public void run() {
try {
lock.lock();
List<String> children = zk.getChildren(queue, true);
if (children != null && children.size() > 0){
final String name = children.get(0);
zk.delete(queue + "/" + name, 0);
System.out.println("当前线程:" + Thread.currentThread().getId() + "消费数据:" + name);
semaphore.countDown();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
}
}