ZooKeeper源码阅读——客户端负载均衡算法

  zk客户端连接、ping server端时采用了轮询算法作为负载均衡算法。每次向服务端发送ping请求时,都会调用next方法获取一个server地址,然后发送ping请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public InetSocketAddress next(long spinDelay) {
++currentIndex;
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}

return serverAddresses.get(currentIndex);
}

ZooKeeper源码阅读——揭开watcher神秘面纱

原创技术文章,转载请注明:转自http://newliferen.github.io/

  ZooKeeper客户端代码相对简单,核心的类就是ZooKeeper、HostProvider、ClientCnxn、ClientCnxnSocketNIO这几个核心类。其中ZooKeeper是客户端接口类,为客户端提供各种操作访问zk的接口方法,HostProvider持有服务器连接端口等信息,应用程序通过创建ZooKeeper对象建立与zk server端的socket连接,ClientCnxnSocketNIO则向socket连接读写数据。

创建ZooKeeper对象时发生了什么
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)

throws IOException
{

LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

watchManager.defaultWatcher = watcher;

ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
// 解析链接地址
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
// 创建ClientCnxn对象,其内部则是启动sendThread和eventThread两个线程,sendThread线程启动时,负责创建和zk server端的链接。
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// 启动内部的两个线程
cnxn.start();
}
ClientCnxn构造器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;

// 创建发送请求线程
sendThread = new SendThread(clientCnxnSocket);
// 创建事件处理线程
eventThread = new EventThread();

}
揭开ZooKeeper请求的神秘面纱

  以创建节点为例,对zk客户端内部实现展开学习,在学习之前,一直在猜想zk server是如何处理客户端的watcher事件,以往的经验是发送消息,或者是在响应结果中进行回调,但是看了zk源码之后,发现zk采用了很聪明的方式处理watcher事件。每一次请求,只要是注册了watcher或者回调事件,在组织request请求对象时,都不会让watcher事件和request产生关联,并且watcher事件也不会随着request发送到zk server端,只是创建了Packet对象,将watcher事件的实现保存在Packet对象内部,在向zk server端发送请求后,将Packet对象保存到pendingQueue队列中。eventThread线程从pendingQueue队列中取出Packet对象后,封装成WatcheEvent进行回调,这样客户端的回调事件得到执行。

  在整个请求过程中,可以看出ZooKeeper的实现一直在尽一切可能减少网络传输时占用带宽,提高请求响应效率,整个过程中回调事件对zk server端是透明的,回调对server端不产生任何压力,在高并发场景中server高可用性得到进一步保障。

  • 创建节点入口方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx)

{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
// 设置请求类型,zk server端会根据这个类型得知客户端向进行何种操作
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
ReplyHeader r = new ReplyHeader();
// 组装请求对象,在这里可以看出zk并没有将StringCallback的实现类组织到request对象中
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
request.setAcl(acl);
// 将请求压入sendingQueue队列中
cnxn.queuePacket(h, r, request, response, cb, clientPath,
serverPath, ctx, null);

}
  • 请求对象入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)

{
Packet packet = null;

// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
// 从这里可以看出request对象和cb仍然没有任何关联
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// packet对象入队
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
  • 触发发送请求
1
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
  • 发送请求阶段

  真正要发送到zk server端的数据,就在createBB()方法中组织的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());


if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {

p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
// 向socket连接写入数据
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {

synchronized (pendingQueue) {
// 将packet对象入队
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
  • 序列化request对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}

ZooKeeper源码阅读——MyCommandOptions

原创技术文章,转载请注明:转自http://newliferen.github.io/

  MyCommandOptions类为命令处理器类,该类作为用户控制台和ZooKeeper核心类的一个适配接口,解析客户端输入命令,将用户输入内容解析为command和args。针对不同的command调用ZooKeeper类的对应方法,将args中path作为参数传给ZooKeeper类,由ZooKeeper进行请求对象封装,并向ZK server发起请求和处理响应结果。

解析命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean parseCommand( String cmdstring ) {
StringTokenizer cmdTokens = new StringTokenizer(cmdstring, " ");
String[] args = new String[cmdTokens.countTokens()];
int tokenIndex = 0;
while (cmdTokens.hasMoreTokens()) {
args[tokenIndex] = cmdTokens.nextToken();
tokenIndex++;
}
if (args.length == 0){
return false;
}
command = args[0];
cmdArgs = Arrays.asList(args);
return true;
}

处理命令

以“ls /”命令为例,实际就是调用ZooKeeper对象的获取子节点方法。
1
2
3
4
5
if (cmd.equals("ls") && args.length >= 2) {
path = args[1];
List<String> children = zk.getChildren(path, watch);
System.out.println(children);
}

ZooKeeper源码阅读——debug client命令

原创技术文章,转载请注明:转自http://newliferen.github.io/

  客户端启动过程可以通过debug org.apache.zookeeper.ZooKeeperMain类即可了解启动时zk客户端都做了些什么以及核心处理类的功能,这里不再赘述,接下来记录一下如何debug客户端命令。

  由于zk客户端启动后,ZooKeeperMain类内部通过jline监听客户端输入命令,然后执行,所以需要设置vm arguments,即可实现通过IDE控制台输入命令debug客户端代码,简单方便。

1
-Djline.terminal=jline.UnsupportedTerminal

Spring拦截器切面配置

原创技术文章,转载请注明:转自http://newliferen.github.io/

  本文记录重点不是spring拦截器的配置方式,而是使用spring aop切面注解方式代替传统的在spring mvc配置文件中配置mvc:interceptor方法。这种传统方式配置起来比较灵活,修改配置后重启系统即可,不需要重新发布到线上,缺点是配置较臃肿,如果请求路径无法使用通配符,那么每个路径都需要在拦截器的配置中写一遍,但是使用切面配置的方式,代码相对优雅很多。

Spring aop无法拦截Controller

  今天在已有项目中开发一个spring面向切面的功能,发现切面在Controller中不生效,经查,是由于把aop:aspectj-autoproxy配置在了applicationContext.xml配置文件中了。如果需要切面在Controller中生效,需要在mvc.xml配置文件中增加aop:aspectj-autoproxy proxy-target-class=”true”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.lang.annotation.*;

/**
* @auth zhengzhi.ren
* @date 2015/8/10.
*/

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface LoginRequired {

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
* @auth zhengzhi.ren
* @date 2015/8/10.
*/

@Component
@Aspect
public class LoginCheckAspect{

@Pointcut("@annotation(com.xxx.LoginRequired)")
public void annotation (){}

@Before("annotation ()")
public void before(JoinPoint joinPoint) throws Throwable{
System.out.println(".....");
}
}

spring mvc.xml配置文件中增加:

1
<aop:aspectj-autoproxy proxy-target-class="true"/>

Java算法之找出数组中的奇数

原创技术文章,转载请注明:转自http://newliferen.github.io/

问题描述:给定一个整型数组,写一个算法,找出其中所有的奇数。
Java实现:数组的每个元素和1进行&运算,结果是1表示该元素为奇数,结果是0表示该元素是偶数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @auth zhengzhi.ren
* @date 2015/8/6.
*/

public class FindOdd {

public static void main(String[] args) {
// 找出数组中所有奇数
int[] arr = {21,23,12,33,1,5,9};
int len = arr.length;
for (int i=0;i<len;i++) {
if ((arr[i] & 1) == 1) {
System.out.println("第" + i + "个数字是奇数, 值:" + arr[i]);
}
}
}
}

该算法时间复杂度是n,n为数组的长度。

Java算法之单身汉

原创技术文章,转载请注明:转自http://newliferen.github.io/

问题描述:有一个整形数组,数组中只有一个元素出现一次,其他元素都出现两次,写一个算法,找出这个只出现一次的元素。

Java实现:每个元素与数组中的其他元素(元素自身除外)进行^(异或)运算,结果是0,表示有相同元素,否则表示不存在相同元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @auth zhengzhi.ren
* @date 2015/8/6.
*/

public class Bachelor {

public static void main(String[] args) {
int[] arr = {1,3,4,6,7,1,4,5,6,7};
int len = arr.length;
int tmp;
for (int i=0;i<len;i++) {
tmp = arr[i];
for (int j=0;j<len;j ++) {
if ((tmp^arr[j]) == 0 && i != j){
System.out.println("元素" + tmp + "存在相同元素");
break;
}
if (j == len -1) {
System.out.println("元素" + tmp + "不存在相同元素");
}
}
}
}
}

这种实现方式算法的时间复杂度为n^2,期待有更好的实现方式,降低时间复杂度。

Java位运算实现权限控制

转载请注明:转自http://newliferen.github.io/
参考文章:http://my.oschina.net/pangzhuzhu/blog/301828

  工作几年之后,大学学过的数字电路与、或、非、异或等位运算忘的差不多了,这里使用位运算模拟linux操作系统权限,复习一下位运算。

实现思路

  1. 设定“增加”、“删除”、”更新”权限的十进制表示分别为3、4、5,设置基数base为2(其他数字也可以,但是需要限定在一定范围内);
  2. 权限对应的二进制位为1时,表示当前用户拥有对应权限。
  3. 赋权限时,执行”|”或操作,1|0=1,0|1=1,这样既可以保证不影响原有权限,又能将新权限赋给用户,例如当前用户权限二进制位为
    0000 0000 0000 0000 0000 0000 0000 0000,为用户赋“增加”权限时,2左移3位对应的二进制为
    0000 0000 0000 0000 0000 0000 0001 0000,执行或操作,用户当前权限位
    0000 0000 0000 0000 0000 0000 0001 0000,表示用户取得了对应的权限;
  4. 删除权限时,先将权限对应的十进制数取反,此时权限位为0,其他位为1,再和用户权限对应的二进制数执行”&”与操作,这样不影响用户原有权限,并且删除目标权限。假设当前用户的权限为
    0000 0000 0000 0000 0000 0000 1101 0000,2左移3位
    0000 0000 0000 0000 0000 0000 0001 0000,按位取反为
    1111 1111 1111 1111 1111 1111 1110 1111,和用户权限二进制数进行&操作,得出结果为
    0000 0000 0000 0000 0000 0000 1100 0000,这样用户对应的权限即被删除;
  5. 查看用户是否有某些权限时,思路大致一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* @auth zhengzhi.ren
* @date 2015/8/6.
*/

public class MyPermission {


public static int add = 3; // 增加权限十进制表示
public static int del = 4; // 删除权限十进制表示
public static int update = 5; // 更新权限十进制表示

public static int base = 2;// 该数字可以是一个范围

/**
* 增加权限
* @param permission
* @param operate
*/

public static int addPermission(int permission, int operate) {
int per = base<<operate;
permission |= per; //增加权限时使用或,可以将权限对应的二进制位的1赋值给权限变量的0
return permission;
}

public static int delPermission(int permission, int operate){
int per = base<<operate;
per = ~per; // 按位取反,取反之前的1,取反后变为0,再和权限数字进行与运算,1&0=0,1&1=1,0&1=0,这样删除对应权限,并且不影响原有权限
return permission & per;
}

public static boolean hasPermission(int permission, int operate) {
int per = base<<operate;
return (permission & per) == per; // 1&0=0;1&1=1;0&1=0; permission和per&操作之后,如果permission对应的位为1,则和per&的结果就等于per
}

public static void printPermission(int permission) {
int binaryAdd = base<<add;
int binaryDel = base<<del;
int binaryUpdate = base<<update;
System.out.println("当前用户是否有add权限:" + ((permission & binaryAdd) == binaryAdd));
System.out.println("当前用户是否有del权限:" + ((permission & binaryDel) == binaryDel));
System.out.println("当前用户是否有update权限:" + ((permission & binaryUpdate) == binaryUpdate));
}

public static void main(String[] args) {
System.out.println(base);
int permission = 0;
System.out.println("是否有增加权限:" + hasPermission(permission, add));
// 赋权-增加权限
permission = MyPermission.addPermission(permission, add);
System.out.println("是否有增加权限:" + hasPermission(permission, add));
// 赋权-删除权限
permission = MyPermission.addPermission(permission, del);
System.out.println("是否有删除权限:" + hasPermission(permission, del));
// 解除权限
permission = MyPermission.delPermission(permission, add);
System.out.println("是否有增加权限:" + hasPermission(permission, add));
// 赋权-更新权限
permission = MyPermission.addPermission(permission, update);
System.out.println("是否有更新权限:" + hasPermission(permission, update));

// 查看用户有哪些权限
printPermission(permission);
}


}