Skip to content

Commit

Permalink
ZOOKEEPER-838: Move chroot from ClientCnxn to ZooKeeper (#2065)
Browse files Browse the repository at this point in the history
This enables possibility for future that multiple `ZooKeeper` instances root at different paths and share same session.

This also fix bugs due to eager chroot stripping in `ClientCnxn`:

* Previously, in chroot "/zoo", client could lose connection due to delivering events for abnormal path "keeper/config". This path is constructed by eager chroot striping for path "/zookeeper/config" from events for `getConfig`. See ZOOKEEPER-4742.
* Previously, in chroot "/zookeeper" or "/zookeeper/config", watching through `getConfig` receive no event as `getConfig` register watcher in path "/zookeeper/config". But the path get stripped to "/config" or "/" before event delivery. See ZOOKEEPER-4601.

Co-authored-by: tison <[email protected]>
  • Loading branch information
kezhuw and tisonkun authored Sep 24, 2023
1 parent b31f776 commit f42c01d
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 141 deletions.
73 changes: 12 additions & 61 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ static class AuthData {
*/
private boolean readOnly;

final String chrootPath;

final SendThread sendThread;

final EventThread eventThread;
Expand Down Expand Up @@ -346,9 +344,8 @@ public String toString() {
/**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
* after construction.
*
* @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param clientConfig the client configuration.
Expand All @@ -357,7 +354,6 @@ public String toString() {
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZKClientConfig clientConfig,
Expand All @@ -366,7 +362,6 @@ public ClientCnxn(
boolean canBeReadOnly
) throws IOException {
this(
chrootPath,
hostProvider,
sessionTimeout,
clientConfig,
Expand All @@ -380,9 +375,8 @@ public ClientCnxn(
/**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
* after construction.
*
* @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param clientConfig the client configuration.
Expand All @@ -394,7 +388,6 @@ public ClientCnxn(
* @throws IOException in cases of broken network
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZKClientConfig clientConfig,
Expand All @@ -404,7 +397,6 @@ public ClientCnxn(
byte[] sessionPasswd,
boolean canBeReadOnly
) throws IOException {
this.chrootPath = chrootPath;
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
this.clientConfig = clientConfig;
Expand Down Expand Up @@ -660,9 +652,7 @@ private void processEvent(Object event) {
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())));
rsp.getPath());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
Expand All @@ -674,9 +664,7 @@ private void processEvent(Object event) {
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())),
rsp.getPath(),
rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
Expand Down Expand Up @@ -733,7 +721,7 @@ protected void finishPacket(Packet p) {
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
queueEvent(p.watchDeregistration.getServerPath(), err, watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
Expand All @@ -757,13 +745,13 @@ protected void finishPacket(Packet p) {
}
}

void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) {
void queueEvent(String serverPath, int err, Set<Watcher> materializedWatchers, EventType eventType) {
KeeperState sessionState = KeeperState.SyncConnected;
if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
|| KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
sessionState = Event.KeeperState.Disconnected;
}
WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath);
WatchedEvent event = new WatchedEvent(eventType, sessionState, serverPath);
eventThread.queueEvent(event, materializedWatchers);
}

Expand Down Expand Up @@ -855,19 +843,6 @@ class SendThread extends ZooKeeperThread {
private boolean isFirstConnect = true;
private volatile ZooKeeperSaslClient zooKeeperSaslClient;

private String stripChroot(String serverPath) {
if (serverPath.startsWith(chrootPath)) {
if (serverPath.length() == chrootPath.length()) {
return "/";
}
return serverPath.substring(chrootPath.length());
} else if (serverPath.startsWith(ZooDefs.ZOOKEEPER_NODE_SUBTREE)) {
return serverPath;
}
LOG.warn("Got server path {} which is not descendant of chroot path {}.", serverPath, chrootPath);
return serverPath;
}

void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
Expand Down Expand Up @@ -895,13 +870,6 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");

// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
String clientPath = stripChroot(serverPath);
event.setPath(clientPath);
}

WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
Expand Down Expand Up @@ -1010,11 +978,11 @@ void primeConnection() throws IOException {
List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
Iterator<String> dataWatchesIter = dataWatches.iterator();
Iterator<String> existWatchesIter = existWatches.iterator();
Iterator<String> childWatchesIter = childWatches.iterator();
Iterator<String> persistentWatchesIter = persistentWatches.iterator();
Iterator<String> persistentRecursiveWatchesIter = persistentRecursiveWatches.iterator();
long setWatchesLastZxid = lastZxid;

while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
Expand Down Expand Up @@ -1084,23 +1052,6 @@ record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}

private List<String> prependChroot(List<String> paths) {
if (chrootPath != null && !paths.isEmpty()) {
for (int i = 0; i < paths.size(); ++i) {
String clientPath = paths.get(i);
String serverPath;
// handle clientPath = "/"
if (clientPath.length() == 1) {
serverPath = chrootPath;
} else {
serverPath = chrootPath + clientPath;
}
paths.set(i, serverPath);
}
}
return paths;
}

private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@

/**
* Handles the special case of removing watches which has registered for a
* client path
* server path
*/
public class WatchDeregistration {

private final String clientPath;
private final String serverPath;
private final Watcher watcher;
private final WatcherType watcherType;
private final boolean local;
private final ZKWatchManager zkManager;

public WatchDeregistration(
String clientPath,
String serverPath,
Watcher watcher,
WatcherType watcherType,
boolean local,
ZKWatchManager zkManager) {
this.clientPath = clientPath;
this.serverPath = serverPath;
this.watcher = watcher;
this.watcherType = watcherType;
this.local = local;
Expand All @@ -56,16 +56,16 @@ public WatchDeregistration(
* watch on the path.
*/
public Map<EventType, Set<Watcher>> unregister(int rc) throws KeeperException {
return zkManager.removeWatcher(clientPath, watcher, watcherType, local, rc);
return zkManager.removeWatcher(serverPath, watcher, watcherType, local, rc);
}

/**
* Returns client path which has specified for unregistering its watcher
* Returns server path which has specified for unregistering its watcher
*
* @return client path
* @return server path
*/
public String getClientPath() {
return clientPath;
public String getServerPath() {
return serverPath;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

/**
* Manage watchers and handle events generated by the {@link ClientCnxn} object.
*
* <p>
* This class is intended to be packaged-private so that it doesn't serve
* as part of ZooKeeper client API.
*/
Expand Down
Loading

0 comments on commit f42c01d

Please sign in to comment.