diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 75fd8744823..0048682ceb3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -218,7 +218,7 @@ static class AuthData { * If any request's response in not received in configured requestTimeout * then it is assumed that the response packet is lost. */ - private long requestTimeout; + private final long requestTimeout; ZKWatchManager getWatcherManager() { return watchManager; @@ -286,6 +286,8 @@ static class Packet { WatchDeregistration watchDeregistration; + long deadline = Long.MAX_VALUE; + /** Convenience ctor */ Packet( RequestHeader requestHeader, @@ -414,7 +416,12 @@ public ClientCnxn( this.sendThread = new SendThread(clientCnxnSocket); this.eventThread = new EventThread(); - initRequestTimeout(); + this.requestTimeout = clientConfig.getRequestTimeout(); + LOG.info( + "{} value is {}. feature enabled={}", + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, + requestTimeout, + requestTimeout > 0); } public void start() { @@ -728,8 +735,6 @@ protected void finishPacket(Packet p) { p.replyHeader.setErr(Code.OK.intValue()); } } - } catch (KeeperException.NoWatcherException nwe) { - p.replyHeader.setErr(nwe.code().intValue()); } catch (KeeperException ke) { p.replyHeader.setErr(ke.code().intValue()); } @@ -765,21 +770,26 @@ protected void onConnecting(InetSocketAddress addr) { } - private void conLossPacket(Packet p) { + private Code abortPacket(Packet p, Code cause) { if (p.replyHeader == null) { - return; + return cause; } switch (state) { - case AUTH_FAILED: - p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); - break; - case CLOSED: - p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); - break; - default: - p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); + case AUTH_FAILED: + p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); + break; + case CLOSED: + p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); + break; + default: + p.replyHeader.setErr(cause.intValue()); } finishPacket(p); + return Code.CONNECTIONLOSS; + } + + private void conLossPacket(Packet p) { + abortPacket(p, Code.CONNECTIONLOSS); } private volatile long lastZxid; @@ -852,6 +862,8 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { replyHdr.deserialize(bbia, "header"); switch (replyHdr.getXid()) { + case SET_WATCHES_XID: + return; case PING_XID: LOG.debug("Got ping response for session id: 0x{} after {}ms.", Long.toHexString(sessionId), @@ -1116,6 +1128,39 @@ private void logStartConnect(InetSocketAddress addr) { } } + private long requestDeadline() { + if (requestTimeout == 0) { + return Long.MAX_VALUE; + } + + // The correctness of following code depends on several implementation details: + // 1. Polling of outgoingQueue happens only in SendThread. + // 2. Adding to pendingQueue happens only in SendThread. + // + // It is possible for netty socket to readResponse for first pendingQueue entry + // while we are checking deadline for the same entry. So, it is possible that + // a request was responded near deadline, but we disconnect the session. Given + // that we are dealing with timeout, this should not be much matter. + // + // In long term, we should sequence all pendingQueue operations to SendThread. + + Packet p; + synchronized (pendingQueue) { + p = pendingQueue.peek(); + } + if (p != null) { + return p.deadline; + } + + for (Packet packet : outgoingQueue) { + if (packet.requestHeader != null && packet.requestHeader.getXid() >= 0) { + return packet.deadline; + } + } + + return Long.MAX_VALUE; + } + @Override @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") public void run() { @@ -1192,6 +1237,14 @@ public void run() { LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } + long deadline = requestDeadline(); + if (deadline != Long.MAX_VALUE) { + long now = Time.currentElapsedTime(); + if (now >= deadline) { + throw new KeeperException.RequestTimeoutException(); + } + to = Integer.min(to, (int) (deadline - now)); + } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small @@ -1240,9 +1293,14 @@ public void run() { serverAddress, e); + Code cause = Code.CONNECTIONLOSS; + if (e instanceof KeeperException) { + cause = ((KeeperException) e).code(); + } + // At this point, there might still be new packets appended to outgoingQueue. // they will be handled in next connection or cleared up if closed. - cleanAndNotifyState(); + cleanAndNotifyState(cause); } } } @@ -1268,8 +1326,8 @@ public void run() { "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); } - private void cleanAndNotifyState() { - cleanup(); + private void cleanAndNotifyState(Code cause) { + cleanup(cause); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } @@ -1328,10 +1386,14 @@ private void pingRwServer() throws RWServerFoundException { } private void cleanup() { + cleanup(Code.CONNECTIONLOSS); + } + + private void cleanup(Code cause) { clientCnxnSocket.cleanup(); synchronized (pendingQueue) { for (Packet p : pendingQueue) { - conLossPacket(p); + cause = abortPacket(p, cause); } pendingQueue.clear(); } @@ -1341,7 +1403,7 @@ private void cleanup() { Iterator iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); - conLossPacket(p); + cause = abortPacket(p, cause); iter.remove(); } } @@ -1525,37 +1587,13 @@ public ReplyHeader submitRequest( watchRegistration, watchDeregistration); synchronized (packet) { - if (requestTimeout > 0) { - // Wait for request completion with timeout - waitForPacketFinish(r, packet); - } else { - // Wait for request completion infinitely - while (!packet.finished) { - packet.wait(); - } + while (!packet.finished) { + packet.wait(); } } - if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) { - sendThread.cleanAndNotifyState(); - } return r; } - /** - * Wait for request completion with timeout. - */ - private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException { - long waitStartTime = Time.currentElapsedTime(); - while (!packet.finished) { - packet.wait(requestTimeout); - if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) { - LOG.error("Timeout error occurred for the packet '{}'.", packet); - r.setErr(Code.REQUESTTIMEOUT.intValue()); - break; - } - } - } - public void saslCompleted() { sendThread.getClientCnxnSocket().saslCompleted(); } @@ -1612,6 +1650,9 @@ public Packet queuePacket( packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; + if (requestTimeout != 0 && h.getXid() >= 0) { + packet.deadline = Time.currentElapsedTime() + requestTimeout; + } // The synchronized block here is for two purpose: // 1. synchronize with the final cleanup() in SendThread.run() to avoid race // 2. synchronized against each packet. So if a closeSession packet is added, @@ -1669,25 +1710,6 @@ public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) { } - private void initRequestTimeout() { - try { - requestTimeout = clientConfig.getLong( - ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, - ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT); - LOG.info( - "{} value is {}. feature enabled={}", - ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, - requestTimeout, - requestTimeout > 0); - } catch (NumberFormatException e) { - LOG.error( - "Configured value {} for property {} can not be parsed to long.", - clientConfig.getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT), - ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT); - throw e; - } - } - public ZooKeeperSaslClient getZooKeeperSaslClient() { return sendThread.getZooKeeperSaslClient(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java index 3ecce4c8844..16dcbc595d4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java @@ -34,7 +34,6 @@ import java.util.concurrent.LinkedBlockingDeque; import org.apache.zookeeper.ClientCnxn.EndOfStreamException; import org.apache.zookeeper.ClientCnxn.Packet; -import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.client.ZKClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,9 +108,7 @@ void doIO(Queue pendingQueue, ClientCnxn cnxn) throws InterruptedExcepti updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { - if ((p.requestHeader != null) - && (p.requestHeader.getType() != OpCode.ping) - && (p.requestHeader.getType() != OpCode.auth)) { + if (p.requestHeader != null && p.requestHeader.getXid() >= 0) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); @@ -120,9 +117,7 @@ void doIO(Queue pendingQueue, ClientCnxn cnxn) throws InterruptedExcepti if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); - if (p.requestHeader != null - && p.requestHeader.getType() != OpCode.ping - && p.requestHeader.getType() != OpCode.auth) { + if (p.requestHeader != null && p.requestHeader.getXid() >= 0) { synchronized (pendingQueue) { pendingQueue.add(p); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java index 5b82c37f9da..c589a2ecf3c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java @@ -354,9 +354,7 @@ private void doWrite(Queue pendingQueue, Packet p, ClientCnxn cnxn) thro boolean anyPacketsSent = false; while (true) { if (p != WakeupPacket.getInstance()) { - if ((p.requestHeader != null) - && (p.requestHeader.getType() != ZooDefs.OpCode.ping) - && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) { + if (p.requestHeader != null && p.requestHeader.getXid() >= 0) { p.requestHeader.setXid(cnxn.getXid()); synchronized (pendingQueue) { pendingQueue.add(p); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java index 06826a672ed..8dbadb2b486 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java @@ -514,6 +514,8 @@ static String getCodeMessage(Code code) { return "Quota has exceeded"; case THROTTLEDOP: return "Op throttled due to high load"; + case REQUESTTIMEOUT: + return "Request timeout"; default: return "Unknown error " + code; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java index 10c61375d1d..0da5d9d4b98 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java @@ -23,6 +23,8 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.common.ZKConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Handles client specific properties @@ -30,6 +32,7 @@ */ @InterfaceAudience.Public public class ZKClientConfig extends ZKConfig { + private static final Logger LOG = LoggerFactory.getLogger(ZKClientConfig.class); public static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; public static final String ZK_SASL_CLIENT_USERNAME_DEFAULT = "zookeeper"; @@ -142,4 +145,18 @@ public long getLong(String key, long defaultValue) { return defaultValue; } + @InterfaceAudience.Private + public long getRequestTimeout() { + try { + return getLong( + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT); + } catch (NumberFormatException e) { + LOG.error( + "Configured value {} for property {} can not be parsed to long.", + getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT), + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT); + throw e; + } + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java index 27bc02df785..9ac36575408 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java @@ -18,58 +18,73 @@ package org.apache.zookeeper; -import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import java.io.IOException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.client.HostProvider; -import org.apache.zookeeper.client.ZKClientConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.test.ClientBase; -import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -public class ClientRequestTimeoutTest extends QuorumPeerTestBase { - - private static final int SERVER_COUNT = 3; - private boolean dropPacket = false; - private int dropPacketType = ZooDefs.OpCode.create; +public class ClientRequestTimeoutTest extends ClientBase { + + private volatile boolean dropPacket = false; + private volatile int dropPacketType = ZooDefs.OpCode.create; + private volatile long delayPacket = 0; + + @BeforeEach + @Override + public void setUp() throws Exception { + super.setUp(); + ZooKeeperServer zkServer = serverFactory.getZooKeeperServer(); + Field firstProcessField = zkServer.getClass().getDeclaredField("firstProcessor"); + firstProcessField.setAccessible(true); + RequestProcessor processor = (RequestProcessor) firstProcessField.get(zkServer); + for (;;) { + Field nextProcessorField = processor.getClass().getDeclaredField("nextProcessor"); + nextProcessorField.setAccessible(true); + RequestProcessor nextProcessor = (RequestProcessor) nextProcessorField.get(processor); + if (nextProcessor instanceof FinalRequestProcessor) { + nextProcessor = spy(nextProcessor); + doAnswer(invocation -> { + Request request = invocation.getArgument(0); + if (dropPacket && request.type == dropPacketType) { + Field cnxnField = request.getClass().getDeclaredField("cnxn"); + cnxnField.setAccessible(true); + cnxnField.set(request, null); + } + if (delayPacket != 0) { + Thread.sleep(delayPacket); + } + return invocation.callRealMethod(); + }).when(nextProcessor).processRequest(any()); + nextProcessorField.set(processor, nextProcessor); + break; + } + processor = nextProcessor; + } + } @Test @Timeout(value = 120) public void testClientRequestTimeout() throws Exception { - int requestTimeOut = 15000; + int requestTimeOut = CONNECTION_TIMEOUT / 4; System.setProperty("zookeeper.request.timeout", Integer.toString(requestTimeOut)); - final int[] clientPorts = new int[SERVER_COUNT]; - StringBuilder sb = new StringBuilder(); - String server; - - for (int i = 0; i < SERVER_COUNT; i++) { - clientPorts[i] = PortAssignment.unique(); - server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() - + ":participant;127.0.0.1:" + clientPorts[i]; - sb.append(server + "\n"); - } - String currentQuorumCfgSection = sb.toString(); - MainThread[] mt = new MainThread[SERVER_COUNT]; - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false); - mt[i].start(); - } + CountdownWatcher watch = new CountdownWatcher(); - // ensure server started - for (int i = 0; i < SERVER_COUNT; i++) { - assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), - "waiting for server " + i + " being up"); - } + ZooKeeper zk = createClient(watch); - CountdownWatcher watch1 = new CountdownWatcher(); - CustomZooKeeper zk = new CustomZooKeeper(getCxnString(clientPorts), ClientBase.CONNECTION_TIMEOUT, watch1); - watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT); String data = "originalData"; // lets see one successful operation @@ -79,6 +94,8 @@ public void testClientRequestTimeout() throws Exception { dropPacket = true; dropPacketType = ZooDefs.OpCode.create; + watch.reset(); + // Test synchronous API try { zk.create("/clientHang2", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -87,92 +104,43 @@ public void testClientRequestTimeout() throws Exception { assertEquals(KeeperException.Code.REQUESTTIMEOUT.intValue(), exception.code().intValue()); } - // do cleanup - zk.close(); - for (int i = 0; i < SERVER_COUNT; i++) { - mt[i].shutdown(); - } - } - - /** - * @return connection string in the form of - * 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3 - */ - private String getCxnString(int[] clientPorts) { - StringBuffer hostPortBuffer = new StringBuffer(); - for (int i = 0; i < clientPorts.length; i++) { - hostPortBuffer.append("127.0.0.1:"); - hostPortBuffer.append(clientPorts[i]); - if (i != (clientPorts.length - 1)) { - hostPortBuffer.append(','); + watch.waitForConnected(CONNECTION_TIMEOUT); + + // Test asynchronous API + CompletableFuture future3 = new CompletableFuture<>(); + CompletableFuture future4 = new CompletableFuture<>(); + // delay so below two requests are handled in same batch in client side + delayPacket = 500; + zk.create("/clientHang3", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> { + if (rc == 0) { + future3.complete(name); + } else { + future3.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); } - } - return hostPortBuffer.toString(); - } - - class CustomClientCnxn extends ClientCnxn { - - CustomClientCnxn( - HostProvider hostProvider, - int sessionTimeout, - ZKClientConfig clientConfig, - Watcher defaultWatcher, - ClientCnxnSocket clientCnxnSocket, - long sessionId, - byte[] sessionPasswd, - boolean canBeReadOnly - ) throws IOException { - super( - hostProvider, - sessionTimeout, - clientConfig, - defaultWatcher, - clientCnxnSocket, - sessionId, - sessionPasswd, - canBeReadOnly); - } - - @Override - public void finishPacket(Packet p) { - if (dropPacket && p.requestHeader.getType() == dropPacketType) { - // do nothing, just return, it is the same as packet is dropped - // by the network - return; + }, null); + zk.create("/clientHang4", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> { + if (rc == 0) { + future4.complete(name); + } else { + future4.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path)); } - super.finishPacket(p); + }, null); + try { + future3.get(); + } catch (Exception ex) { + KeeperException exception = (KeeperException) ex.getCause(); + assertEquals(KeeperException.Code.REQUESTTIMEOUT.intValue(), exception.code().intValue()); } - - } - - class CustomZooKeeper extends ZooKeeper { - - public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { - super(connectString, sessionTimeout, watcher); + try { + future4.get(); + } catch (Exception ex) { + KeeperException exception = (KeeperException) ex.getCause(); + assertEquals(KeeperException.Code.CONNECTIONLOSS.intValue(), exception.code().intValue()); } - @Override - ClientCnxn createConnection( - HostProvider hostProvider, - int sessionTimeout, - ZKClientConfig clientConfig, - Watcher defaultWatcher, - ClientCnxnSocket clientCnxnSocket, - long sessionId, - byte[] sessionPasswd, - boolean canBeReadOnly - ) throws IOException { - return new CustomClientCnxn( - hostProvider, - sessionTimeout, - clientConfig, - defaultWatcher, - clientCnxnSocket, - sessionId, - sessionPasswd, - canBeReadOnly); - } + dropPacket = false; + // do cleanup + zk.close(); } - }