Skip to content

Commit

Permalink
ZOOKEEPER-4747: Add synchronous sync to ease synchronous call chains
Browse files Browse the repository at this point in the history
Previously, there is no synchronous `sync` so client has to convert
asynchronous `sync` a bit to fit synchronous call chains. This is
apparently unfriendly.

Besides above, in absent of ZOOKEEPER-22, we can't issue a fire and
forget asynchronous `sync` to gain strong consistent. So it becomes
crucial for client to have a convenient synchronous `sync`.

Refs: ZOOKEEPER-1167, ZOOKEEPER-4747
  • Loading branch information
kezhuw committed Sep 26, 2023
1 parent f42c01d commit f1424cd
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 52 deletions.
25 changes: 25 additions & 0 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,31 @@ public void getEphemerals(AsyncCallback.EphemeralsCallback cb, Object ctx) {
getEphemerals("/", cb, ctx);
}

/**
* Synchronous sync. Flushes channel between process and leader.
*
* @param path the given path
* @throws KeeperException If the server signals an error with a non-zero error code
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
public void sync(final String path) throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.sync);
SyncRequest request = new SyncRequest();
SyncResponse response = new SyncResponse();
request.setPath(serverPath);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
}

/**
* Asynchronous sync. Flushes channel between process and leader.
* @param path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.util.ServiceUtils;
import org.hamcrest.CustomMatcher;
Expand Down Expand Up @@ -58,6 +59,30 @@ protected String getTestName() {
return testName;
}

public void syncClient(ZooKeeper zk) {
syncClient(zk, true);
}

public void syncClient(ZooKeeper zk, boolean synchronous) {
if (synchronous) {
try {
zk.sync("/");
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return;
}
final CompletableFuture<Void> synced = new CompletableFuture<>();
zk.sync("/", (rc, path, ctx) -> {
if (rc == 0) {
synced.complete(null);
} else {
synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
}
}, null);
synced.join();
}

@BeforeAll
public static void before() {
if (!testBaseDir.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.QuorumBase;
Expand Down Expand Up @@ -95,18 +93,6 @@ public void setUp(ServerState serverState, boolean checkEnabled) throws Exceptio
clientWatchB.waitForConnected(CONNECTION_TIMEOUT);
}

void syncClient(ZooKeeper zk) {
CompletableFuture<Void> synced = new CompletableFuture<>();
zk.sync("/", (rc, path, ctx) -> {
if (rc == 0) {
synced.complete(null);
} else {
synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}, null);
synced.join();
}

@AfterEach
public void tearDown() throws Exception {
if (zkClient != null) {
Expand All @@ -133,7 +119,7 @@ private void assertTransactionState(String operation, QuorumPeer peer, long last
// to sync leader client to go through commit and response path in leader to
// build happen-before between `zkLeader.getLastLoggedZxid()` and side effect
// of previous operation.
syncClient(zkLeaderClient);
syncClient(zkLeaderClient, false);
}
assertTrue(peer == zkLeader || peer == zkConnected);
boolean eagerACL = ZooKeeperServer.isEnableEagerACLCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
Expand Down Expand Up @@ -179,4 +181,15 @@ public void processResult(int rc, String path, Object ctx) {
assertTrue(complete, String.format("%s Sync completed", serverState));
}

@ParameterizedTest
@MethodSource("data")
public void testSynchronousSync(ServerState serverState) throws Exception {
setUp(serverState);
create2EmptyNode(zkClient, PARENT_PATH);
ForkJoinTask<Void> task = ForkJoinPool.commonPool().submit(() -> {
zkClient.sync(PARENT_PATH);
return null;
});
task.get(30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -182,15 +183,24 @@ public void testMultipleWatcherObjs() throws IOException, InterruptedException,
}

/**
* Make sure that we can change sessions
* from follower to leader.
*
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
* Make sure that we can change sessions among servers and maintain consistent view
* using {@link ZooKeeper#sync(String)}.
*/
@Test
public void testSessionMoved() throws Exception {
public void testSessionMovedWithSynchronousSync() throws Exception {
testSessionMoved(true);
}

/**
* Make sure that we can change sessions among servers and maintain consistent view
* using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback, Object)}.
*/
@Test
public void testSessionMovedWithAsynchronousSync() throws Exception {
testSessionMoved(false);
}

public void testSessionMoved(boolean synchronous_sync) throws Exception {
String[] hostPorts = qb.hostPort.split(",");
DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
hostPorts[0],
Expand All @@ -208,21 +218,8 @@ public void testSessionMoved() throws Exception {
zk.getSessionId(),
zk.getSessionPasswd());
zknew.setData("/", new byte[1], -1);
final int[] result = new int[1];
result[0] = Integer.MAX_VALUE;
zknew.sync("/", (rc, path, ctx) -> {
synchronized (result) {
result[0] = rc;
result.notify();
}
}, null);
synchronized (result) {
if (result[0] == Integer.MAX_VALUE) {
result.wait(5000);
}
}
LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]);
assertTrue(result[0] == KeeperException.Code.OK.intValue());
syncClient(zknew, synchronous_sync);
LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
try {
zk.setData("/", new byte[1], -1);
fail("Should have lost the connection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,21 +243,8 @@ public void testSessionMove() throws Exception {
% hostPorts.length], CONNECTION_TIMEOUT, new MyWatcher(Integer.toString(
i
+ 1)), zk.getSessionId(), zk.getSessionPasswd());
final int[] result = new int[1];
result[0] = Integer.MAX_VALUE;
zknew.sync("/", (rc, path, ctx) -> {
synchronized (result) {
result[0] = rc;
result.notify();
}
}, null);
synchronized (result) {
if (result[0] == Integer.MAX_VALUE) {
result.wait(5000);
}
}
LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]);
assertTrue(result[0] == KeeperException.Code.OK.intValue());
zknew.sync("/");
LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
zknew.setData("/", new byte[1], -1);
try {
zk.setData("/", new byte[1], -1);
Expand All @@ -270,6 +257,7 @@ public void testSessionMove() throws Exception {
}
zk.close();
}

/**
* This test makes sure that duplicate state changes are not communicated
* to the client watcher. For example we should not notify state as
Expand Down

0 comments on commit f1424cd

Please sign in to comment.