diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 727d97daa6a..a5d4877d608 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -180,8 +180,6 @@ static class AuthData { */ private boolean readOnly; - final String chrootPath; - final SendThread sendThread; final EventThread eventThread; @@ -346,9 +344,8 @@ public String toString() { /** * Creates a connection object. The actual network connect doesn't get * established until needed. The start() instance method must be called - * subsequent to construction. + * after construction. * - * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 * @param hostProvider the list of ZooKeeper servers to connect to * @param sessionTimeout the timeout for connections. * @param clientConfig the client configuration. @@ -357,7 +354,6 @@ public String toString() { * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning */ public ClientCnxn( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, @@ -366,7 +362,6 @@ public ClientCnxn( boolean canBeReadOnly ) throws IOException { this( - chrootPath, hostProvider, sessionTimeout, clientConfig, @@ -380,9 +375,8 @@ public ClientCnxn( /** * Creates a connection object. The actual network connect doesn't get * established until needed. The start() instance method must be called - * subsequent to construction. + * after construction. * - * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 * @param hostProvider the list of ZooKeeper servers to connect to * @param sessionTimeout the timeout for connections. * @param clientConfig the client configuration. @@ -394,7 +388,6 @@ public ClientCnxn( * @throws IOException in cases of broken network */ public ClientCnxn( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, @@ -404,7 +397,6 @@ public ClientCnxn( byte[] sessionPasswd, boolean canBeReadOnly ) throws IOException { - this.chrootPath = chrootPath; this.hostProvider = hostProvider; this.sessionTimeout = sessionTimeout; this.clientConfig = clientConfig; @@ -660,9 +652,7 @@ private void processEvent(Object event) { rc, clientPath, p.ctx, - (chrootPath == null - ? rsp.getPath() - : rsp.getPath().substring(chrootPath.length()))); + rsp.getPath()); } else { cb.processResult(rc, clientPath, p.ctx, null); } @@ -674,9 +664,7 @@ private void processEvent(Object event) { rc, clientPath, p.ctx, - (chrootPath == null - ? rsp.getPath() - : rsp.getPath().substring(chrootPath.length())), + rsp.getPath(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); @@ -733,7 +721,7 @@ protected void finishPacket(Packet p) { for (Entry> entry : materializedWatchers.entrySet()) { Set watchers = entry.getValue(); if (watchers.size() > 0) { - queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey()); + queueEvent(p.watchDeregistration.getServerPath(), err, watchers, entry.getKey()); // ignore connectionloss when removing from local // session p.replyHeader.setErr(Code.OK.intValue()); @@ -757,13 +745,13 @@ protected void finishPacket(Packet p) { } } - void queueEvent(String clientPath, int err, Set materializedWatchers, EventType eventType) { + void queueEvent(String serverPath, int err, Set materializedWatchers, EventType eventType) { KeeperState sessionState = KeeperState.SyncConnected; if (KeeperException.Code.SESSIONEXPIRED.intValue() == err || KeeperException.Code.CONNECTIONLOSS.intValue() == err) { sessionState = Event.KeeperState.Disconnected; } - WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath); + WatchedEvent event = new WatchedEvent(eventType, sessionState, serverPath); eventThread.queueEvent(event, materializedWatchers); } @@ -855,19 +843,6 @@ class SendThread extends ZooKeeperThread { private boolean isFirstConnect = true; private volatile ZooKeeperSaslClient zooKeeperSaslClient; - private String stripChroot(String serverPath) { - if (serverPath.startsWith(chrootPath)) { - if (serverPath.length() == chrootPath.length()) { - return "/"; - } - return serverPath.substring(chrootPath.length()); - } else if (serverPath.startsWith(ZooDefs.ZOOKEEPER_NODE_SUBTREE)) { - return serverPath; - } - LOG.warn("Got server path {} which is not descendant of chroot path {}.", serverPath, chrootPath); - return serverPath; - } - void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); @@ -895,13 +870,6 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); - // convert from a server path to a client path - if (chrootPath != null) { - String serverPath = event.getPath(); - String clientPath = stripChroot(serverPath); - event.setPath(clientPath); - } - WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid()); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); eventThread.queueEvent(we); @@ -1010,11 +978,11 @@ void primeConnection() throws IOException { List persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty() || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) { - Iterator dataWatchesIter = prependChroot(dataWatches).iterator(); - Iterator existWatchesIter = prependChroot(existWatches).iterator(); - Iterator childWatchesIter = prependChroot(childWatches).iterator(); - Iterator persistentWatchesIter = prependChroot(persistentWatches).iterator(); - Iterator persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator(); + Iterator dataWatchesIter = dataWatches.iterator(); + Iterator existWatchesIter = existWatches.iterator(); + Iterator childWatchesIter = childWatches.iterator(); + Iterator persistentWatchesIter = persistentWatches.iterator(); + Iterator persistentRecursiveWatchesIter = persistentRecursiveWatches.iterator(); long setWatchesLastZxid = lastZxid; while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext() @@ -1084,23 +1052,6 @@ record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress()); } - private List prependChroot(List paths) { - if (chrootPath != null && !paths.isEmpty()) { - for (int i = 0; i < paths.size(); ++i) { - String clientPath = paths.get(i); - String serverPath; - // handle clientPath = "/" - if (clientPath.length() == 1) { - serverPath = chrootPath; - } else { - serverPath = chrootPath + clientPath; - } - paths.set(i, serverPath); - } - } - return paths; - } - private void sendPing() { lastPingSentNs = System.nanoTime(); RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java index 710f47b34c3..7d3b5f13517 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchDeregistration.java @@ -25,23 +25,23 @@ /** * Handles the special case of removing watches which has registered for a - * client path + * server path */ public class WatchDeregistration { - private final String clientPath; + private final String serverPath; private final Watcher watcher; private final WatcherType watcherType; private final boolean local; private final ZKWatchManager zkManager; public WatchDeregistration( - String clientPath, + String serverPath, Watcher watcher, WatcherType watcherType, boolean local, ZKWatchManager zkManager) { - this.clientPath = clientPath; + this.serverPath = serverPath; this.watcher = watcher; this.watcherType = watcherType; this.local = local; @@ -56,16 +56,16 @@ public WatchDeregistration( * watch on the path. */ public Map> unregister(int rc) throws KeeperException { - return zkManager.removeWatcher(clientPath, watcher, watcherType, local, rc); + return zkManager.removeWatcher(serverPath, watcher, watcherType, local, rc); } /** - * Returns client path which has specified for unregistering its watcher + * Returns server path which has specified for unregistering its watcher * - * @return client path + * @return server path */ - public String getClientPath() { - return clientPath; + public String getServerPath() { + return serverPath; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java index 514da01cf24..ce0df4e9a12 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java @@ -30,7 +30,7 @@ /** * Manage watchers and handle events generated by the {@link ClientCnxn} object. - * + *

* This class is intended to be packaged-private so that it doesn't serve * as part of ZooKeeper client API. */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index 475430b89ec..e04984fa918 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.OpResult.ErrorResult; import org.apache.zookeeper.Watcher.WatcherType; +import org.apache.zookeeper.client.Chroot; import org.apache.zookeeper.client.ConnectStringParser; import org.apache.zookeeper.client.HostProvider; import org.apache.zookeeper.client.StaticHostProvider; @@ -173,6 +174,8 @@ public class ZooKeeper implements AutoCloseable { protected final HostProvider hostProvider; + private final Chroot chroot; + /** * This function allows a client to update the connection string by providing * a new comma separated list of host:port pairs, each corresponding to a @@ -267,11 +270,11 @@ ZKWatchManager getWatchManager() { public abstract static class WatchRegistration { private Watcher watcher; - private String clientPath; + private String serverPath; - public WatchRegistration(Watcher watcher, String clientPath) { + public WatchRegistration(Watcher watcher, String serverPath) { this.watcher = watcher; - this.clientPath = clientPath; + this.serverPath = serverPath; } protected abstract Map> getWatches(int rc); @@ -285,10 +288,10 @@ public void register(int rc) { if (shouldAddWatch(rc)) { Map> watches = getWatches(rc); synchronized (watches) { - Set watchers = watches.get(clientPath); + Set watchers = watches.get(serverPath); if (watchers == null) { watchers = new HashSet<>(); - watches.put(clientPath, watchers); + watches.put(serverPath, watchers); } watchers.add(watcher); } @@ -312,7 +315,7 @@ protected boolean shouldAddWatch(int rc) { class ExistsWatchRegistration extends WatchRegistration { public ExistsWatchRegistration(Watcher watcher, String clientPath) { - super(watcher, clientPath); + super(chroot.interceptWatcher(watcher), prependChroot(clientPath)); } @Override @@ -328,10 +331,22 @@ protected boolean shouldAddWatch(int rc) { } + class ServerDataWatchRegistration extends WatchRegistration { + public ServerDataWatchRegistration(Watcher watcher, String serverPath) { + super(watcher, serverPath); + } + + @Override + protected Map> getWatches(int rc) { + return getWatchManager().getDataWatches(); + } + + } + class DataWatchRegistration extends WatchRegistration { public DataWatchRegistration(Watcher watcher, String clientPath) { - super(watcher, clientPath); + super(chroot.interceptWatcher(watcher), prependChroot(clientPath)); } @Override @@ -344,7 +359,7 @@ protected Map> getWatches(int rc) { class ChildWatchRegistration extends WatchRegistration { public ChildWatchRegistration(Watcher watcher, String clientPath) { - super(watcher, clientPath); + super(chroot.interceptWatcher(watcher), prependChroot(clientPath)); } @Override @@ -358,7 +373,7 @@ class AddWatchRegistration extends WatchRegistration { private final AddWatchMode mode; public AddWatchRegistration(Watcher watcher, String clientPath, AddWatchMode mode) { - super(watcher, clientPath); + super(chroot.interceptWatcher(watcher), prependChroot(clientPath)); this.mode = mode; } @@ -654,7 +669,6 @@ public ZooKeeper( } ClientCnxn createConnection( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, @@ -665,7 +679,6 @@ ClientCnxn createConnection( boolean canBeReadOnly ) throws IOException { return new ClientCnxn( - chrootPath, hostProvider, sessionTimeout, clientConfig, @@ -1093,8 +1106,8 @@ public ZooKeeper(ZooKeeperOptions options) throws IOException { } this.hostProvider = hostProvider; + chroot = Chroot.ofNullable(connectStringParser.getChrootPath()); cnxn = createConnection( - connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, @@ -1303,15 +1316,7 @@ public boolean close(int waitForShutdownTimeoutMs) throws InterruptedException { * @return server view of the path (chroot prepended to client path) */ private String prependChroot(String clientPath) { - if (cnxn.chrootPath != null) { - // handle clientPath = "/" - if (clientPath.length() == 1) { - return cnxn.chrootPath; - } - return cnxn.chrootPath + clientPath; - } else { - return clientPath; - } + return chroot.prepend(clientPath); } /** @@ -1393,11 +1398,7 @@ public String create( if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } - if (cnxn.chrootPath == null) { - return response.getPath(); - } else { - return response.getPath().substring(cnxn.chrootPath.length()); - } + return chroot.strip(response.getPath()); } /** @@ -1499,11 +1500,7 @@ public String create( if (stat != null) { DataTree.copyStat(response.getStat(), stat); } - if (cnxn.chrootPath == null) { - return response.getPath(); - } else { - return response.getPath().substring(cnxn.chrootPath.length()); - } + return chroot.strip(response.getPath()); } private void setCreateHeader(CreateMode createMode, RequestHeader h) { @@ -1552,6 +1549,7 @@ public void create( EphemeralType.validateTTL(createMode, -1); final String serverPath = prependChroot(clientPath); + cb = chroot.interceptCallback(cb); RequestHeader h = new RequestHeader(); h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); @@ -1598,6 +1596,7 @@ public void create( EphemeralType.validateTTL(createMode, ttl); final String serverPath = prependChroot(clientPath); + cb = chroot.interceptCallback(cb); RequestHeader h = new RequestHeader(); setCreateHeader(createMode, h); @@ -1894,7 +1893,6 @@ public Stat exists(final String path, Watcher watcher) throws KeeperException, I final String clientPath = path; PathUtils.validatePath(clientPath); - // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ExistsWatchRegistration(watcher, clientPath); @@ -1949,7 +1947,6 @@ public void exists(final String path, Watcher watcher, StatCallback cb, Object c final String clientPath = path; PathUtils.validatePath(clientPath); - // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new ExistsWatchRegistration(watcher, clientPath); @@ -2000,7 +1997,6 @@ public byte[] getData(final String path, Watcher watcher, Stat stat) throws Keep final String clientPath = path; PathUtils.validatePath(clientPath); - // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); @@ -2056,7 +2052,6 @@ public void getData(final String path, Watcher watcher, DataCallback cb, Object final String clientPath = path; PathUtils.validatePath(clientPath); - // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); @@ -2107,7 +2102,7 @@ public byte[] getConfig(Watcher watcher, Stat stat) throws KeeperException, Inte // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { - wcb = new DataWatchRegistration(watcher, configZnode); + wcb = new ServerDataWatchRegistration(watcher, configZnode); } RequestHeader h = new RequestHeader(); @@ -2137,7 +2132,7 @@ public void getConfig(Watcher watcher, DataCallback cb, Object ctx) { // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { - wcb = new DataWatchRegistration(watcher, configZnode); + wcb = new ServerDataWatchRegistration(watcher, configZnode); } RequestHeader h = new RequestHeader(); @@ -2944,7 +2939,7 @@ private void removeWatches( PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); - WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, getWatchManager()); + WatchDeregistration wcb = new WatchDeregistration(serverPath, chroot.interceptWatcher(watcher), watcherType, local, getWatchManager()); RequestHeader h = new RequestHeader(); h.setType(opCode); @@ -2967,7 +2962,7 @@ private void removeWatches( PathUtils.validatePath(path); final String clientPath = path; final String serverPath = prependChroot(clientPath); - WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, getWatchManager()); + WatchDeregistration wcb = new WatchDeregistration(serverPath, chroot.interceptWatcher(watcher), watcherType, local, getWatchManager()); RequestHeader h = new RequestHeader(); h.setType(opCode); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/Chroot.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/Chroot.java new file mode 100644 index 00000000000..64b909ba1f2 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/Chroot.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.util.Objects; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +@InterfaceAudience.Private +public interface Chroot { + static Chroot ofNullable(String chroot) { + if (chroot == null) { + return new Root(); + } + return new NotRoot(chroot); + } + + /** + * Creates server path by prepending chroot to given client path. + * + * @param clientPath client path + * @return sever path with chroot prepended + */ + String prepend(String clientPath); + + /** + * Creates client path by stripping chroot from given sever path. + * + * @param serverPath sever path with chroot prepended + * @return client path with chroot stripped + * @throws IllegalArgumentException if given server path contains no chroot + */ + String strip(String serverPath); + + /** + * Creates a delegating callback to strip chroot from created node name. + */ + AsyncCallback.StringCallback interceptCallback(AsyncCallback.StringCallback callback); + + /** + * Creates a delegating callback to strip chroot from created node name. + */ + AsyncCallback.Create2Callback interceptCallback(AsyncCallback.Create2Callback callback); + + /** + * Creates a delegating watcher to strip chroot from {@link WatchedEvent#getPath()} for given watcher. + */ + Watcher interceptWatcher(Watcher watcher); + + final class Root implements Chroot { + @Override + public String prepend(String clientPath) { + return clientPath; + } + + @Override + public String strip(String serverPath) { + return serverPath; + } + + @Override + public AsyncCallback.StringCallback interceptCallback(AsyncCallback.StringCallback callback) { + return callback; + } + + @Override + public AsyncCallback.Create2Callback interceptCallback(AsyncCallback.Create2Callback callback) { + return callback; + } + + @Override + public Watcher interceptWatcher(Watcher watcher) { + return watcher; + } + } + + final class NotRoot implements Chroot { + private final String chroot; + + public NotRoot(String chroot) { + this.chroot = Objects.requireNonNull(chroot); + } + + @Override + public String prepend(String clientPath) { + // handle clientPath = "/" + if (clientPath.length() == 1) { + return chroot; + } + return chroot + clientPath; + } + + @Override + public String strip(String serverPath) { + if (!serverPath.startsWith(chroot)) { + String msg = String.format("server path %s does no start with chroot %s", serverPath, chroot); + throw new IllegalArgumentException(msg); + } + if (chroot.length() == serverPath.length()) { + return "/"; + } else { + return serverPath.substring(chroot.length()); + } + } + + @Override + public AsyncCallback.StringCallback interceptCallback(AsyncCallback.StringCallback callback) { + return new ChrootCreateCallback(this, callback); + } + + @Override + public AsyncCallback.Create2Callback interceptCallback(AsyncCallback.Create2Callback callback) { + return new ChrootCreateCallback(this, callback); + } + + @Override + public Watcher interceptWatcher(Watcher watcher) { + return new ChrootWatcher(this, watcher); + } + + @Override + public boolean equals(Object other) { + if (other instanceof NotRoot) { + return Objects.equals(chroot, ((NotRoot) other).chroot); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(chroot); + } + + @Override + public String toString() { + return chroot; + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ChrootCreateCallback.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ChrootCreateCallback.java new file mode 100644 index 00000000000..cf4aee7dc61 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ChrootCreateCallback.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.data.Stat; + +@InterfaceAudience.Private +class ChrootCreateCallback implements AsyncCallback.StringCallback, AsyncCallback.Create2Callback { + private final Chroot.NotRoot chroot; + private final AsyncCallback callback; + + public ChrootCreateCallback(Chroot.NotRoot chroot, StringCallback callback) { + this.chroot = chroot; + this.callback = callback; + } + + public ChrootCreateCallback(Chroot.NotRoot chroot, Create2Callback callback) { + this.chroot = chroot; + this.callback = callback; + } + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + StringCallback cb = (StringCallback) callback; + cb.processResult(rc, path, ctx, name == null ? null : chroot.strip(name)); + } + + @Override + public void processResult(int rc, String path, Object ctx, String name, Stat stat) { + Create2Callback cb = (Create2Callback) callback; + cb.processResult(rc, path, ctx, name == null ? null : chroot.strip(name), stat); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ChrootWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ChrootWatcher.java new file mode 100644 index 00000000000..47a61328617 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ChrootWatcher.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.util.Objects; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +@InterfaceAudience.Private +class ChrootWatcher implements Watcher { + private final Chroot.NotRoot chroot; + private final Watcher watcher; + + public ChrootWatcher(Chroot.NotRoot chroot, Watcher watcher) { + this.chroot = chroot; + this.watcher = watcher; + } + + @Override + public boolean equals(Object other) { + if (other instanceof ChrootWatcher) { + return chroot.equals(((ChrootWatcher) other).chroot) && watcher.equals(((ChrootWatcher) other).watcher); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(chroot, watcher); + } + + @Override + public void process(WatchedEvent event) { + String path = event.getPath(); + if (path != null) { + path = chroot.strip(path); + event = new WatchedEvent(event.getType(), event.getState(), path, event.getZxid()); + } + watcher.process(event); + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/BlockingQueueWatcher.java b/zookeeper-server/src/test/java/org/apache/zookeeper/BlockingQueueWatcher.java new file mode 100644 index 00000000000..caafb06eb48 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/BlockingQueueWatcher.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class BlockingQueueWatcher implements Watcher { + private final BlockingQueue events = new LinkedBlockingQueue<>(); + + @Override + public void process(WatchedEvent event) { + assertTrue(events.add(event)); + } + + public WatchedEvent pollEvent(Duration timeout) throws InterruptedException { + return events.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Format {@link Duration} with suffix "ms" or "s". + * + *

I guess {@link Duration#toString()} is verbose and not intuitive. + */ + private String formatTimeout(Duration timeout) { + long millis = timeout.toMillis(); + if (millis < TimeUnit.SECONDS.toMillis(1)) { + return millis + "ms"; + } + long secs = millis / TimeUnit.SECONDS.toMillis(1); + millis %= TimeUnit.SECONDS.toMillis(1); + // We are test code, second unit is large enough. + if (millis == 0) { + return secs + "s"; + } + return secs + "s" + millis + "ms"; + } + + private Supplier noEventMessage(Duration timeout) { + return () -> String.format("no event after %s", formatTimeout(timeout)); + } + + public WatchedEvent takeEvent(Duration timeout) throws InterruptedException { + WatchedEvent event = pollEvent(timeout); + assertNotNull(event, noEventMessage(timeout)); + return event; + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java index bd2f1c287a0..608bc01c4c8 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java @@ -280,7 +280,6 @@ class CustomClientCnxn extends ClientCnxn { private volatile boolean hitUnsafeRegion = false; public CustomClientCnxn( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig zkClientConfig, @@ -291,7 +290,6 @@ public CustomClientCnxn( boolean canBeReadOnly ) throws IOException { super( - chrootPath, hostProvider, sessionTimeout, zkClientConfig, @@ -357,7 +355,6 @@ public boolean isAlive() { @Override ClientCnxn createConnection( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, @@ -370,7 +367,6 @@ ClientCnxn createConnection( assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO); socket = (FragileClientCnxnSocketNIO) clientCnxnSocket; ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn( - chrootPath, hostProvider, sessionTimeout, clientConfig, diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java index 6d1db27a276..b7d6af387e7 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientReconnectTest.java @@ -67,7 +67,6 @@ public void testClientReconnect() throws IOException, InterruptedException { ClientCnxnSocketNIO nioCnxn = new MockCnxn(); ClientCnxn clientCnxn = new ClientCnxn( - "tmp", hostProvider, 5000, zk.getClientConfig(), diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java index d6b6da94b56..27bc02df785 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java @@ -113,7 +113,6 @@ private String getCxnString(int[] clientPorts) { class CustomClientCnxn extends ClientCnxn { CustomClientCnxn( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, @@ -124,7 +123,6 @@ class CustomClientCnxn extends ClientCnxn { boolean canBeReadOnly ) throws IOException { super( - chrootPath, hostProvider, sessionTimeout, clientConfig, @@ -155,7 +153,6 @@ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher @Override ClientCnxn createConnection( - String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, @@ -166,7 +163,6 @@ ClientCnxn createConnection( boolean canBeReadOnly ) throws IOException { return new CustomClientCnxn( - chrootPath, hostProvider, sessionTimeout, clientConfig, diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ConfigWatcherPathTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ConfigWatcherPathTest.java new file mode 100644 index 00000000000..db56ad47407 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ConfigWatcherPathTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.test.ClientBase; +import org.junit.jupiter.api.Test; + +public class ConfigWatcherPathTest extends ClientBase { + private void join(Consumer> task) { + CompletableFuture future = new CompletableFuture<>(); + task.accept(future); + future.join(); + } + + private AsyncCallback.DataCallback complete(CompletableFuture future) { + return (rc, path, ctx, data, stat) -> { + if (rc == 0) { + future.complete(null); + } else { + future.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); + } + }; + } + + private void testConfigWatcherPathWithChroot(String chroot) throws Exception { + ZooKeeper zk1 = createClient(hostPort + chroot); + + BlockingQueueWatcher configWatcher = new BlockingQueueWatcher(); + + // given|>config watcher: attach to config node multiple times + byte[] configData = zk1.getConfig(configWatcher, null); + join(future -> zk1.getConfig(configWatcher, complete(future), null)); + + // given|>default watcher: attach to config node multiple times + BlockingQueueWatcher defaultWatcher = new BlockingQueueWatcher(); + zk1.getWatchManager().setDefaultWatcher(defaultWatcher); + zk1.getConfig(true, null); + zk1.getConfig(defaultWatcher, null); + + // when: make change to config node + ZooKeeper zk2 = createClient(); + zk2.addAuthInfo("digest", "super:test".getBytes()); + zk2.setData(ZooDefs.CONFIG_NODE, configData, -1); + + // then|>config watcher: only one event with path "/zookeeper/config" + WatchedEvent configEvent = configWatcher.takeEvent(Duration.ofSeconds(10)); + assertEquals("/zookeeper/config", configEvent.getPath()); + assertNull(configWatcher.pollEvent(Duration.ofMillis(10))); + + // then|>default watcher: only one event with path "/zookeeper/config" + WatchedEvent defaultWatcherEvent = defaultWatcher.takeEvent(Duration.ofSeconds(10)); + assertEquals("/zookeeper/config", defaultWatcherEvent.getPath()); + assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10))); + + // given: all watchers fired + // when: make change to config node + zk2.setData(ZooDefs.CONFIG_NODE, configData, -1); + + // then: no more events + assertNull(configWatcher.pollEvent(Duration.ofMillis(10))); + assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10))); + } + + @Test + public void testConfigWatcherPathWithNoChroot() throws Exception { + testConfigWatcherPathWithChroot(""); + } + + @Test + public void testConfigWatcherPathWithShortChroot() throws Exception { + testConfigWatcherPathWithChroot("/short"); + } + + @Test + public void testConfigWatcherPathWithLongChroot() throws Exception { + testConfigWatcherPathWithChroot("/pretty-long-chroot-path"); + } + + @Test + public void testConfigWatcherPathWithChrootZooKeeperTree() throws Exception { + testConfigWatcherPathWithChroot("/zookeeper"); + testConfigWatcherPathWithChroot("/zookeeper/a"); + testConfigWatcherPathWithChroot("/zookeeper/config"); + testConfigWatcherPathWithChroot("/zookeeper/config/a"); + } + + @Test + public void testConfigWatcherPathWithChrootZoo() throws Exception { + // "/zoo" is prefix of "/zookeeper/config" + testConfigWatcherPathWithChroot("/zoo"); + } + + private void testDataWatcherPathWithChroot(String chroot) throws Exception { + assertTrue("/zookeeper/config".startsWith(chroot)); + String leafPath = "/zookeeper/config".substring(chroot.length()); + String dataPath = leafPath.isEmpty() ? "/" : leafPath; + PathUtils.validatePath(dataPath); + + ZooKeeper zk1 = createClient(hostPort + chroot); + + BlockingQueueWatcher dataWatcher = new BlockingQueueWatcher(); + BlockingQueueWatcher configWatcher = new BlockingQueueWatcher(); + + // given|>config watcher: attach to config node multiple times + byte[] configData = zk1.getConfig(configWatcher, null); + zk1.getConfig(configWatcher, null); + + // given|>data watcher: attach to config node through getData multiple times + zk1.getData(dataPath, dataWatcher, null); + join(future -> zk1.getData(dataPath, dataWatcher, complete(future), null)); + + // given|>default watcher: attach to config node through getData and getConfig multiple times + BlockingQueueWatcher defaultWatcher = new BlockingQueueWatcher(); + zk1.getWatchManager().setDefaultWatcher(defaultWatcher); + zk1.getData(dataPath, true, null); + zk1.getData(dataPath, defaultWatcher, null); + zk1.getConfig(true, null); + zk1.getConfig(defaultWatcher, null); + + // when: make change to config node + ZooKeeper zk2 = createClient(); + zk2.addAuthInfo("digest", "super:test".getBytes()); + zk2.setData(ZooDefs.CONFIG_NODE, configData, -1); + + // then|>data watcher: only one event with path dataPath + WatchedEvent dataEvent = dataWatcher.takeEvent(Duration.ofSeconds(10)); + assertEquals(dataPath, dataEvent.getPath()); + assertNull(dataWatcher.pollEvent(Duration.ofMillis(10))); + + // then|>config watcher: only one event with path "/zookeeper/config" + WatchedEvent configEvent = configWatcher.takeEvent(Duration.ofSeconds(10)); + assertEquals("/zookeeper/config", configEvent.getPath()); + assertNull(configWatcher.pollEvent(Duration.ofMillis(10))); + + if (dataPath.equals("/zookeeper/config")) { + // then|>default watcher: only one event with path "/zookeeper/config" + WatchedEvent defaultWatcherEvent = defaultWatcher.takeEvent(Duration.ofSeconds(10)); + assertEquals("/zookeeper/config", defaultWatcherEvent.getPath()); + } else { + // then|>default watcher: two events with path dataPath and "/zookeeper/config" + Set defaultWatcherPaths = new HashSet<>(); + defaultWatcherPaths.add(dataPath); + defaultWatcherPaths.add("/zookeeper/config"); + + WatchedEvent defaultWatcherEvent1 = defaultWatcher.takeEvent(Duration.ofSeconds(10)); + assertThat(defaultWatcherPaths, hasItem(defaultWatcherEvent1.getPath())); + defaultWatcherPaths.remove(defaultWatcherEvent1.getPath()); + + WatchedEvent defaultWatcherEvent2 = defaultWatcher.takeEvent(Duration.ofSeconds(10)); + assertNotNull(defaultWatcherEvent2); + assertThat(defaultWatcherPaths, hasItem(defaultWatcherEvent2.getPath())); + } + assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10))); + + // given: all watchers fired + // when: make change to config node + zk2.setData(ZooDefs.CONFIG_NODE, configData, -1); + + // then: no more events + assertNull(dataWatcher.pollEvent(Duration.ofMillis(10))); + assertNull(configWatcher.pollEvent(Duration.ofMillis(10))); + assertNull(defaultWatcher.pollEvent(Duration.ofMillis(10))); + } + + @Test + public void testDataWatcherPathWithNoChroot() throws Exception { + testDataWatcherPathWithChroot(""); + } + + @Test + public void testDataWatcherPathWithChrootZooKeeper() throws Exception { + testDataWatcherPathWithChroot("/zookeeper"); + } + + @Test + public void testDataWatcherPathWithChrootZooKeeperConfig() throws Exception { + testDataWatcherPathWithChroot("/zookeeper/config"); + } + + @Test + public void testDataWatcherPathWithChrootAndConfigPath() throws Exception { + try (ZooKeeper zk1 = createClient(hostPort + "/root1"); ZooKeeper zk2 = createClient()) { + // given: watcher client path "/zookeeper/config" in chroot "/root1" + BlockingQueueWatcher dataWatcher = new BlockingQueueWatcher(); + zk1.addWatch("/zookeeper/config", dataWatcher, AddWatchMode.PERSISTENT); + + // and: watch for "/zookeeper/config" in server + BlockingQueueWatcher configWatcher = new BlockingQueueWatcher(); + byte[] configData = zk1.getConfig(configWatcher, null); + + // when: make change to config node + zk2.addAuthInfo("digest", "super:test".getBytes()); + zk2.setData(ZooDefs.CONFIG_NODE, configData, -1); + + // then: config watcher works normally + WatchedEvent configEvent = configWatcher.takeEvent(Duration.ofSeconds(10)); + assertEquals("/zookeeper/config", configEvent.getPath()); + + // and: no data watcher for "/zookeeper/config" in chroot "/root1" + assertNull(dataWatcher.pollEvent(Duration.ofSeconds(1))); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ChrootTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ChrootTest.java index fca48c751f1..0b156156d0d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ChrootTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ChrootTest.java @@ -25,15 +25,12 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.junit.jupiter.api.Test; @@ -63,25 +60,6 @@ public boolean matches() throws InterruptedException { } - @Test - public void testChrootWithZooKeeperPathWatcher() throws Exception { - ZooKeeper zk1 = createClient(hostPort + "/chroot"); - BlockingQueue events = new LinkedBlockingQueue<>(); - byte[] config = zk1.getConfig(events::add, null); - - ZooKeeper zk2 = createClient(); - zk2.addAuthInfo("digest", "super:test".getBytes()); - zk2.setData(ZooDefs.CONFIG_NODE, config, -1); - - waitFor("config watcher receive no event", () -> !events.isEmpty(), 10); - - WatchedEvent event = events.poll(); - assertNotNull(event); - assertEquals(Watcher.Event.KeeperState.SyncConnected, event.getState()); - assertEquals(Watcher.Event.EventType.NodeDataChanged, event.getType()); - assertEquals(ZooDefs.CONFIG_NODE, event.getPath()); - } - @Test public void testChrootSynchronous() throws IOException, InterruptedException, KeeperException { ZooKeeper zk1 = createClient();