diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index e04984fa918..05d85d008cd 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -2708,6 +2708,31 @@ public void getEphemerals(AsyncCallback.EphemeralsCallback cb, Object ctx) { getEphemerals("/", cb, ctx); } + /** + * Synchronous sync. Flushes channel between process and leader. + * + * @param path the given path + * @throws KeeperException If the server signals an error with a non-zero error code + * @throws InterruptedException If the server transaction is interrupted. + * @throws IllegalArgumentException if an invalid path is specified + */ + public void sync(final String path) throws KeeperException, InterruptedException { + final String clientPath = path; + PathUtils.validatePath(clientPath); + + final String serverPath = prependChroot(clientPath); + + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.sync); + SyncRequest request = new SyncRequest(); + SyncResponse response = new SyncResponse(); + request.setPath(serverPath); + ReplyHeader r = cnxn.submitRequest(h, request, response, null); + if (r.getErr() != 0) { + throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); + } + } + /** * Asynchronous sync. Flushes channel between process and leader. * @param path diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java index b29deedb0bc..b62fe688ec3 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.util.ServiceUtils; import org.hamcrest.CustomMatcher; @@ -58,6 +59,30 @@ protected String getTestName() { return testName; } + public void syncClient(ZooKeeper zk) { + syncClient(zk, true); + } + + public void syncClient(ZooKeeper zk, boolean synchronous) { + if (synchronous) { + try { + zk.sync("/"); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + return; + } + final CompletableFuture synced = new CompletableFuture<>(); + zk.sync("/", (rc, path, ctx) -> { + if (rc == 0) { + synced.complete(null); + } else { + synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); + } + }, null); + synced.join(); + } + @BeforeAll public static void before() { if (!testBaseDir.exists()) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java index 4141c586ff8..91381e16cb5 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java @@ -22,13 +22,11 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.TestableZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; import org.apache.zookeeper.test.QuorumBase; @@ -95,18 +93,6 @@ public void setUp(ServerState serverState, boolean checkEnabled) throws Exceptio clientWatchB.waitForConnected(CONNECTION_TIMEOUT); } - void syncClient(ZooKeeper zk) { - CompletableFuture synced = new CompletableFuture<>(); - zk.sync("/", (rc, path, ctx) -> { - if (rc == 0) { - synced.complete(null); - } else { - synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); - } - }, null); - synced.join(); - } - @AfterEach public void tearDown() throws Exception { if (zkClient != null) { @@ -133,7 +119,7 @@ private void assertTransactionState(String operation, QuorumPeer peer, long last // to sync leader client to go through commit and response path in leader to // build happen-before between `zkLeader.getLastLoggedZxid()` and side effect // of previous operation. - syncClient(zkLeaderClient); + syncClient(zkLeaderClient, false); } assertTrue(peer == zkLeader || peer == zkConnected); boolean eagerACL = ZooKeeperServer.isEnableEagerACLCheck(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java index 624661044b6..f848b02895f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java @@ -26,6 +26,8 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.zookeeper.AsyncCallback.VoidCallback; @@ -179,4 +181,15 @@ public void processResult(int rc, String path, Object ctx) { assertTrue(complete, String.format("%s Sync completed", serverState)); } + @ParameterizedTest + @MethodSource("data") + public void testSynchronousSync(ServerState serverState) throws Exception { + setUp(serverState); + create2EmptyNode(zkClient, PARENT_PATH); + ForkJoinTask task = ForkJoinPool.commonPool().submit(() -> { + zkClient.sync(PARENT_PATH); + return null; + }); + task.get(30, TimeUnit.SECONDS); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java index ff53b0fe932..01aa9ddc7a1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.TimeoutException; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.DummyWatcher; import org.apache.zookeeper.KeeperException; @@ -182,15 +183,24 @@ public void testMultipleWatcherObjs() throws IOException, InterruptedException, } /** - * Make sure that we can change sessions - * from follower to leader. - * - * @throws IOException - * @throws InterruptedException - * @throws KeeperException + * Make sure that we can change sessions among servers and maintain consistent view + * using {@link ZooKeeper#sync(String)}. */ @Test - public void testSessionMoved() throws Exception { + public void testSessionMovedWithSynchronousSync() throws Exception { + testSessionMoved(true); + } + + /** + * Make sure that we can change sessions among servers and maintain consistent view + * using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback, Object)}. + */ + @Test + public void testSessionMovedWithAsynchronousSync() throws Exception { + testSessionMoved(false); + } + + public void testSessionMoved(boolean synchronous_sync) throws Exception { String[] hostPorts = qb.hostPort.split(","); DisconnectableZooKeeper zk = new DisconnectableZooKeeper( hostPorts[0], @@ -208,21 +218,8 @@ public void testSessionMoved() throws Exception { zk.getSessionId(), zk.getSessionPasswd()); zknew.setData("/", new byte[1], -1); - final int[] result = new int[1]; - result[0] = Integer.MAX_VALUE; - zknew.sync("/", (rc, path, ctx) -> { - synchronized (result) { - result[0] = rc; - result.notify(); - } - }, null); - synchronized (result) { - if (result[0] == Integer.MAX_VALUE) { - result.wait(5000); - } - } - LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]); - assertTrue(result[0] == KeeperException.Code.OK.intValue()); + syncClient(zknew, synchronous_sync); + LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]); try { zk.setData("/", new byte[1], -1); fail("Should have lost the connection"); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java index 85f76c21381..3a4766c7716 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java @@ -243,21 +243,8 @@ public void testSessionMove() throws Exception { % hostPorts.length], CONNECTION_TIMEOUT, new MyWatcher(Integer.toString( i + 1)), zk.getSessionId(), zk.getSessionPasswd()); - final int[] result = new int[1]; - result[0] = Integer.MAX_VALUE; - zknew.sync("/", (rc, path, ctx) -> { - synchronized (result) { - result[0] = rc; - result.notify(); - } - }, null); - synchronized (result) { - if (result[0] == Integer.MAX_VALUE) { - result.wait(5000); - } - } - LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]); - assertTrue(result[0] == KeeperException.Code.OK.intValue()); + zknew.sync("/"); + LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]); zknew.setData("/", new byte[1], -1); try { zk.setData("/", new byte[1], -1); @@ -270,6 +257,7 @@ public void testSessionMove() throws Exception { } zk.close(); } + /** * This test makes sure that duplicate state changes are not communicated * to the client watcher. For example we should not notify state as