ZooKeeper源码阅读——揭开watcher神秘面纱

原创技术文章,转载请注明:转自http://newliferen.github.io/

  ZooKeeper客户端代码相对简单,核心的类就是ZooKeeper、HostProvider、ClientCnxn、ClientCnxnSocketNIO这几个核心类。其中ZooKeeper是客户端接口类,为客户端提供各种操作访问zk的接口方法,HostProvider持有服务器连接端口等信息,应用程序通过创建ZooKeeper对象建立与zk server端的socket连接,ClientCnxnSocketNIO则向socket连接读写数据。

创建ZooKeeper对象时发生了什么
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)

throws IOException
{

LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

watchManager.defaultWatcher = watcher;

ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
// 解析链接地址
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
// 创建ClientCnxn对象,其内部则是启动sendThread和eventThread两个线程,sendThread线程启动时,负责创建和zk server端的链接。
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// 启动内部的两个线程
cnxn.start();
}
ClientCnxn构造器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;

// 创建发送请求线程
sendThread = new SendThread(clientCnxnSocket);
// 创建事件处理线程
eventThread = new EventThread();

}
揭开ZooKeeper请求的神秘面纱

  以创建节点为例,对zk客户端内部实现展开学习,在学习之前,一直在猜想zk server是如何处理客户端的watcher事件,以往的经验是发送消息,或者是在响应结果中进行回调,但是看了zk源码之后,发现zk采用了很聪明的方式处理watcher事件。每一次请求,只要是注册了watcher或者回调事件,在组织request请求对象时,都不会让watcher事件和request产生关联,并且watcher事件也不会随着request发送到zk server端,只是创建了Packet对象,将watcher事件的实现保存在Packet对象内部,在向zk server端发送请求后,将Packet对象保存到pendingQueue队列中。eventThread线程从pendingQueue队列中取出Packet对象后,封装成WatcheEvent进行回调,这样客户端的回调事件得到执行。

  在整个请求过程中,可以看出ZooKeeper的实现一直在尽一切可能减少网络传输时占用带宽,提高请求响应效率,整个过程中回调事件对zk server端是透明的,回调对server端不产生任何压力,在高并发场景中server高可用性得到进一步保障。

  • 创建节点入口方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx)

{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
// 设置请求类型,zk server端会根据这个类型得知客户端向进行何种操作
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
ReplyHeader r = new ReplyHeader();
// 组装请求对象,在这里可以看出zk并没有将StringCallback的实现类组织到request对象中
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
request.setAcl(acl);
// 将请求压入sendingQueue队列中
cnxn.queuePacket(h, r, request, response, cb, clientPath,
serverPath, ctx, null);

}
  • 请求对象入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)

{
Packet packet = null;

// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
// 从这里可以看出request对象和cb仍然没有任何关联
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// packet对象入队
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
  • 触发发送请求
1
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
  • 发送请求阶段

  真正要发送到zk server端的数据,就在createBB()方法中组织的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());


if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {

p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
// 向socket连接写入数据
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {

synchronized (pendingQueue) {
// 将packet对象入队
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
  • 序列化request对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}