ZooKeeper应用之分分布式队列

原创技术文章,转载请注明:转自http://newliferen.github.io/

  分布式队列的实现和分布式锁的思想很类似,在指定的节点下,写入有序的子节点,出队时节点序号最小的先出队。代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

package org.zk.distributed.queue;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 分布式队列
* @author renzhengzhi
* 分布式队列的实现思想:
* 在指定path下创建顺序子节点,序号从小到大递增,出队时获取序号小的节点
*/

public class ZkDistributedFIFOQueue {

private static final Logger LOGGER = LoggerFactory.getLogger(ZkDistributedFIFOQueue.class);

private String queue = "/_fifo_queue_";

private ZooKeeper zk;

public ReentrantLock lock = new ReentrantLock();


@Before
public void connect(){
try {
final CountDownLatch semaphore = new CountDownLatch(1);
zk = new ZooKeeper("127.0.0.1:2181", 60000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()){
if (EventType.None == event.getType() && event.getPath() == null){
System.out.println("链接成功!");
semaphore.countDown();
} else if (EventType.NodeChildrenChanged == event.getType()){
try {
System.out.println("节点发生变化,当前节点个数:" + zk.getChildren(queue, true).size());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});

semaphore.await();
} catch (IOException e) {
e.printStackTrace();
if (LOGGER.isDebugEnabled()){
LOGGER.debug("链接zookeeper服务异常!");
}
} catch (InterruptedException e) {
e.printStackTrace();
if (LOGGER.isDebugEnabled()){
LOGGER.debug("主线程中断!");
}
}
}

private static final int coreNum = Runtime.getRuntime().availableProcessors();

private static final ExecutorService threadPool = Executors.newFixedThreadPool(coreNum << 1);

/**
* 使用zookeeper的有序临时节点模拟先进先出队列
*/

@Test
public void createQueue(){
try {
/**
*创建队列根节点
*/

Stat rootStat = zk.exists(queue, true);
if (rootStat == null){
zk.create(queue, "distributed fifo queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
*入队
*/

for (int i=0;i<10;i++){
zk.create(queue + "/q" + i, ("我是第" + i +"个入队的人。").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}

/**
*出队
*/

List<String> children = zk.getChildren(queue, true);
if (children != null){
int size = children.size();
CountDownLatch semaphore = new CountDownLatch(size);
for (int i=0;i<size;i++){
Consumer consumer = new Consumer(semaphore);
threadPool.submit(consumer);
}
semaphore.await();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

class Consumer implements Runnable{
CountDownLatch semaphore;
public Consumer(CountDownLatch semaphore){
this.semaphore = semaphore;
}
@Override
public void run() {
try {
lock.lock();
List<String> children = zk.getChildren(queue, true);
if (children != null && children.size() > 0){
final String name = children.get(0);
zk.delete(queue + "/" + name, 0);
System.out.println("当前线程:" + Thread.currentThread().getId() + "消费数据:" + name);
semaphore.countDown();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}
}
}

ZooKeeper应用之分布式锁

原创技术文章,转载请注明:转自http://newliferen.github.io/

  目前Java运行环境中多数都是集群环境,多台主机并行计算,导致在代码中很难进行并发控制,有开发人员自行实现分布式锁,开发工作量大,代码稳定性难以保证,ZooKeeper框架的出现,让开发人员可以使用zk中成熟的分布式锁服务,减轻了开发人员对分布式任务并发控制负担,可以专注业务逻辑实现。
  zk中分布式锁的思想是,创建一个锁节点/locknode/guid-lock,集群中每台服务器都在该节点下写入有序的子节点,节点列表中第一个节点对应的服务器被认为是拿到分布式锁,可以进行计算任务,计算任务结束后,删除该服务器对应的节点,下一个服务器写入的节点排到了第一位,可以进行计算任务,如此往复,每台机器写入的节点排在列表中第一位的时候,就被认为是拿到分布式锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package org.zk.distributed.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* 分布式锁
* @author renzhengzhi
*/

public class ZkDistributedLock {

private ZooKeeper zk;

@Before
public void connect(){
CountDownLatch connectorSemaphore = new CountDownLatch(1);
String url = "127.0.0.1:2181";
int timeout = 60000;
try {
zk = new ZooKeeper(url, timeout, new ConnectWatcher(connectorSemaphore));
System.out.println(zk.getState());
connectorSemaphore.await();
System.out.println(zk.getState());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 获取分布式锁
*/

@Test
public void getDistributedLock(){
try {
String path = zk.create("/_locknode_/guid-lock", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren("/_locknode_", true);
if (children != null && children.size() > 0){
// 避免羊群效应
// 首先找到比自己小的那个节点
int size = children.size();
String prevOne = null;// 上一个比自己小的节点
for (int i=0;i<size;i++){
if (i == 0 && path.lastIndexOf(children.get(i)) > 0){
System.out.println("当前线程获取分布式锁。。。");
return;
}
if (path.lastIndexOf(children.get(i)) > 0){
prevOne = children.get(i-1);
break;
}
}
System.out.println("prevOne:" + prevOne);
// 判断比当前节点小的节点是否存在,其节点发生变化之后会回调watcher类,即当前线程获取到分布式锁
zk.exists("/_locknode_/" + prevOne, new LockWatcher("/_locknode_"));
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}

private class LockWatcher implements Watcher {
private final String path;
public LockWatcher(String path){
this.path = path;
}
public void process(WatchedEvent event) {
try {
Stat stat = zk.exists(path, false);
System.out.println("当前线程获取到分布式锁" + stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ConnectWatcher implements Watcher{
private final CountDownLatch connectorSemaphore;

public ConnectWatcher(CountDownLatch connectorSemaphore){
this.connectorSemaphore = connectorSemaphore;
}
@Override
public void process(WatchedEvent e) {
if (e.getState() == KeeperState.SyncConnected){
if (EventType.None == e.getType() && e.getPath() == null){
connectorSemaphore.countDown();
System.out.println("链接状态:" + e.getState());
}
}
}

}
}

  在使用zk实现分布式锁服务时,需要注意一点,如果服务器监听锁节点下所有子节点,那么zk集群在负责发起watcher通知时,势必给zk集群服务带来压力,而客户端关注了与自己无关的节点变化情况,也浪费了客户端服务器资源,这种情况叫做羊群效应。羊群效应的解决方案是,每个客户端服务器只关注比自己小的那个节点的变化情况即可,如果比自己小的节点被删除,那么就说明当前服务器拿到分布式锁。

Java Web服务器负载指标

  目前Java web应用中大规模使用分布式系统和集群,集群中存在多台服务器对外提供服务,那么就存在一个问题,如何确定一个请求应该分发到哪台服务器上呢,这就需要获取当前服务器CPU、内存、网络、文件句柄的使用情况,合理分发请求,以使集群中所有服务器达到负载均衡的效果。

获取CPU利用率

  通过使用java.lang.Runtime类执行cat /proc/stat命令获取Cpu时钟数据,进行Cpu利用率计算。“proc文件系统是一个伪文件系统,它只存在内存当中,而不占用外存空间。它以文件系统的方式为访问系统内核数据的操作提供接口。用户和应用程序可以通过proc得到系统的信息,并可以改变内核的某些参数。” 因此,程序在从/proc/stat中获取Cpu指标时,不会发生磁盘I/O,计算Cpu使用率的实时性更强,计算结果更可靠。[参数解释]
参考地址:http://blog.csdn.net/blue_jjw/article/details/8741000

代码实现

ZooKeeper应用之负载均衡

负载均衡

  服务器端负载均衡分为算法包括:轮循算法(Round Robin)、哈希算法(HASH)、响应速度算法(Response Time)、加权法(Weighted )等。网上有文章提到“最少连接算法(Least Connection)”也是负载均衡算法的一种,当然,这样说是需要有前提条件的,就是当服务器性能良好,可以及时处理所有请求的时候,最少连接算法才能达到负载均衡的目的,否则最少连接的服务器可能正在处理比较耗时的操作,继续将请求分发到该服务器可能导致该服务器load值升高,性能下降。参见https://devcentral.f5.com/articles/back-to-basics-least-connections-is-not-least-loaded 。之后我会写关于负载均衡各类算法的博客。

ZK实现负载均衡
  使用zk做负载均衡,需要在zk上指定path下注册提供服务的集群中每个节点的ip地址,消费者需要在该path上注册watcher,当服务提供者集群中增删几点时,zk会发起watcher通知,所有消费者都会收到该watcher回调,此时,消费者可以及时知晓服务提供者集群中节点状况,在可用服务器列表中进行负载均衡计算。

ZooKeeper应用入门之订阅/发布

发布/订阅
点击这里查看发布/订阅讲解

  • pom依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<properties>
<slf4j.version>1.7.5</slf4j.version>
<zookeeper.version>3.4.6</zookeeper.version>
<zkclient.version>0.2-SNAPSHOT</zkclient.version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
  • 获取ZK连接

ZooKeeper应用之命名空间

命名服务
点击这里查看命名空间讲解

  • pom依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<properties>
<slf4j.version>1.7.5</slf4j.version>
<zookeeper.version>3.4.6</zookeeper.version>
<zkclient.version>0.2-SNAPSHOT</zkclient.version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>${zkclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
  • ZK客户端类

IntelliJ IDEA运行test控制台中文乱码

刚刚开始使用idea,运行junit test类的时候输出到控制台的中文都是乱码
———————————华丽的分割线—————————————
解决方案:要运行的文件窗口右键->File Encoding>弹出框中选择UTF-8

总结:网上有的文章叙述问题解决方案的时候铺垫会比较多,花时间浏览到最后还不一定解决问题,看些比较短小的文章受些启发,再自己探索效果更好。

ZooKeeper入门

ZooKeeper是什么?

  Zookeeper简称ZK,是高性能、高可用的分布式协调框架,采用了自定义的ZAB(Zookeeper Atomic Broadcast)事务一致性协议,在高并发分布式系统环境中保证事务一致性。ZK采用类似Linux操作系统文件目录结构管理节点,节点数据常驻内存,避免磁盘I/O影响性能,并提供对用户透明的持久化功能保证数据安全。ZK适用于小量数据、高性能、高并发的应用场景。

  ZooKeeper为用户提供以下服务:

命名服务
  命名服务(Naming Service)提供了一种为对象命名的机制,可以定位任何通过网络可以访问的机器上的对象,使得用户可以在无需知道对象位置的情况下获取和使用对象。服务提供方在ZK上创建临时Node(全局唯一),服务消费方通过读取ZK上的临时Node节点获取到服务提供方信息,进而调用服务提供者。通过ZK的命名服务,可以达到服务提供者动态增加或减少服务提供者服务器数量。典型示例:阿里的Dubbo框架,即采用ZK作为注册中心。[代码示例]

发布订阅服务
  发布订阅模型,即配置中心,可将应用可配置项发布到ZK上,供订阅者动态获取配置,同一集群使用同一套配置,实现配置集中、动态管理,避免修改配置后重启应用服务。

负载均衡
  由于分布式系统的使用越来越多,为了使同一集群内的多台服务器负载更均衡,就需要一个能够根据服务器负载情况进行请求分发的框架,ZK正是这种可以根据服务提供者列表进行请求分发的框架,可以动态的注册和发现服务,使服务更透明,实现软负载均衡和故障恢复。