ZooKeeper源码阅读——client(二)

原创技术文章,转载请注明:转自http://newliferen.github.io/

如何连接ZooKeeper集群

  要想了解ZooKeeper客户端实现原理,首先需要关注一下客户端的使用方式,会使用之后,方便更进一步的了解zk的源码实现。

客户端程序连接ZooKeeper集群代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void connect() {
try {
final CountDownLatch semaphore = new CountDownLatch(1);
ZooKeeper zkConn = new ZooKeeper(host, timeout, new Watcher() {
// 连接server成功后,由ZK发起回调通知,使用者收到连接成功的通知后即可进行后续操作
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
if (Event.EventType.None == watchedEvent.getType() && watchedEvent.getPath() == null) {
System.out.println("ZK connect success!");
semaphore.countDown();
}
}
}
});
semaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

客户端线程模型

客户端线程模型

ZooKeeper实现原理

  接下来我们来分析zk客户端的源码实现。ZooKeeper为线程安全类,用户可以放心使用。客户端类库中最主要的三个类:ClientCnxn、SendThread、EventThread,后两者做为前者的内部类存在,当客户端创建ZooKeeper对象时,在ZooKeeper类的构造函数内解析客户端传入的server地址、实例化ClientCnxn对象,并通过ClientCnxn对象的start()方法启动SendThread和EventThread两个线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)

throws IOException
{

LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

watchManager.defaultWatcher = watcher;
// connectString形如:192.168.155.144:2181,192.168.155.155:2181
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
// 持有所有server连接对象
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
// 实例化ClientCnxn对象
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// 启动客户端线程
cnxn.start();
}

1
2
3
4
public void start() {
sendThread.start();
eventThread.start();
}

  其中,SendThread线程负责从与server端建立连接、定时发送心跳消息、从outgoingQueue队列中取得任务,并通过tcp连接将请求发送到server端。SendThread类收到server端响应事件后,向waitingEvents队列中addwatcher对象,由EventThread从waitingEvents队列中获取事件,判断事件类型,回调客户端的Watcher实现类。一个完成的客户端流程就完成了。创建连接时的Watcher实现类将作为zk的默认watcher类。

StaticHostProvider连接持有者

StaticHostProvider类的功能非常简单

  1. 该类持有解析之后的zk server所有连接对象。
  2. 采用轮询算法作为负载均衡策略,从StaticHostProvider的所有连接对象中获取一个连接,这个连提供给ClientCnxn对象使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
throws UnknownHostException {
for (InetSocketAddress address : serverAddresses) {
InetAddress ia = address.getAddress();
InetAddress resolvedAddresses[] = InetAddress.getAllByName((ia!=null) ? ia.getHostAddress():
address.getHostName());

for (InetAddress resolvedAddress : resolvedAddresses) {
if (resolvedAddress.toString().startsWith("/")
&& resolvedAddress.getAddress() != null) {

this.serverAddresses.add(
new InetSocketAddress(InetAddress.getByAddress(
address.getHostName(),
resolvedAddress.getAddress()),

address.getPort()))
;

} else {
this.serverAddresses.add(new InetSocketAddress(resolvedAddress.getHostAddress(), address.getPort()));
}
}
}

if (this.serverAddresses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");

}
Collections.shuffle(this.serverAddresses);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public InetSocketAddress next(long spinDelay) {
++currentIndex;
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}

return serverAddresses.get(currentIndex);
}

SendThread

SendThread类主要实现的功能有三个:

  • 建立到zk server端的tcp连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
  • 定时向zk server端发送心跳信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}

// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = System.currentTimeMillis();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
  • 将任务队列中的任务发送到zk server端
1
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
  • 接收从zk server端返回的响应,并将响应事件加入到EventThread线程的任务队列中,并记录事务id
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(
    incomingBuffer);

    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();

    replyHdr.deserialize(bbia, "header");
    if (replyHdr.getXid() == -2) {
    // -2 is the xid for pings
    if (LOG.isDebugEnabled()) {
    LOG.debug("Got ping response for sessionid: 0x"
    + Long.toHexString(sessionId)
    + " after "
    + ((System.nanoTime() - lastPingSentNs) / 1000000)
    + "ms");

    }
    return;
    }
    if (replyHdr.getXid() == -4) {
    // -4 is the xid for AuthPacket
    if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
    state = States.AUTH_FAILED;
    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
    Watcher.Event.KeeperState.AuthFailed, null) )
    ;

    }
    if (LOG.isDebugEnabled()) {
    LOG.debug("Got auth sessionid:0x"
    + Long.toHexString(sessionId));

    }
    return;
    }
    if (replyHdr.getXid() == -1) {
    // -1 means notification
    if (LOG.isDebugEnabled()) {
    LOG.debug("Got notification sessionid:0x"
    + Long.toHexString(sessionId));

    }
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");

    // convert from a server path to a client path
    if (chrootPath != null) {
    String serverPath = event.getPath();
    if(serverPath.compareTo(chrootPath)==0)
    event.setPath("/");
    else if (serverPath.length() > chrootPath.length())
    event.setPath(serverPath.substring(chrootPath.length()));
    else {
    LOG.warn("Got server path " + event.getPath()
    + " which is too short for chroot path "
    + chrootPath);

    }
    }

    WatchedEvent we = new WatchedEvent(event);
    if (LOG.isDebugEnabled()) {
    LOG.debug("Got " + we + " for sessionid 0x"
    + Long.toHexString(sessionId));

    }

    eventThread.queueEvent( we );
    return;
    }

    // If SASL authentication is currently in progress, construct and
    // send a response packet immediately, rather than queuing a
    // response as with other packets.
    if (clientTunneledAuthenticationInProgress()) {
    GetSASLRequest request = new GetSASLRequest();
    request.deserialize(bbia,"token");
    zooKeeperSaslClient.respondToServer(request.getToken(),
    ClientCnxn.this);

    return;
    }

    Packet packet;
    synchronized (pendingQueue) {
    if (pendingQueue.size() == 0) {
    throw new IOException("Nothing in the queue, but got "
    + replyHdr.getXid());

    }
    packet = pendingQueue.remove();
    }
    /*
    * Since requests are processed in order, we better get a response
    * to the first request!
    */

    try {
    if (packet.requestHeader.getXid() != replyHdr.getXid()) {
    packet.replyHeader.setErr(
    KeeperException.Code.CONNECTIONLOSS.intValue());

    throw new IOException("Xid out of order. Got Xid "
    + replyHdr.getXid() + " with err " +
    + replyHdr.getErr() +
    " expected Xid "
    + packet.requestHeader.getXid()
    + " for a packet with details: "
    + packet );

    }

    packet.replyHeader.setXid(replyHdr.getXid());
    packet.replyHeader.setErr(replyHdr.getErr());
    packet.replyHeader.setZxid(replyHdr.getZxid());
    if (replyHdr.getZxid() > 0) {
    lastZxid = replyHdr.getZxid();
    }
    if (packet.response != null && replyHdr.getErr() == 0) {
    packet.response.deserialize(bbia, "response");
    }

    if (LOG.isDebugEnabled()) {
    LOG.debug("Reading reply sessionid:0x"
    + Long.toHexString(sessionId) + ", packet:: " + packet);

    }
    } finally {
    finishPacket(packet);
    }
    }

EventThread响应事件处理类

EventThread类负责实例化Watcher回调。每一种操作都有对应的事件回调接口,zk客户端代码会分别进行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {

StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
// 调用client传入的callback实现类实现的processResult方法
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());

} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());

} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());

}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getData(), rsp.getStat());

} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);

}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getAcl(), rsp.getStat());

} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);

}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren());

} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());

} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())))
;

} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
}