Skip to content

Commit

Permalink
[8.x] Move TransportVersion negotiation to handshake (elastic#120261)…
Browse files Browse the repository at this point in the history
… (elastic#120510)

* Move `TransportVersion` negotiation to handshake (elastic#120261)

Today the `TransportHandshaker` returns the remote node's actual
`TransportVersion`, even if this is an unknown version from some future
release. This is exposed by `Transport.Connection#getTransportVersion`
despite that method's docs saying that the return value is the
`TransportVersion` in use for the connection. The actual version
negotiation doesn't happen until we get around to sending an outbound
message in `OutboundHandler`.

This doesn't matter much today since we only compare versions against
known constants, ordering by ID, so all unknown future versions are
treated equivalently to the current version. But still it's not correct,
and we may need to make this more refined in future.

* Fix
  • Loading branch information
DaveCTurner authored Jan 21, 2025
1 parent 2e12bbb commit ba57820
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,7 +39,9 @@ final class OutboundHandler {
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);

private final String nodeName;

private final TransportVersion version;

private final StatsTracker statsTracker;
private final ThreadPool threadPool;
private final Recycler<BytesRef> recycler;
Expand Down Expand Up @@ -98,11 +101,11 @@ void sendRequest(
final Compression.Scheme compressionScheme,
final boolean isHandshake
) throws IOException, TransportException {
TransportVersion version = TransportVersion.min(this.version, transportVersion);
OutboundMessage.Request message = new OutboundMessage.Request(
assert assertValidTransportVersion(transportVersion);
final OutboundMessage.Request message = new OutboundMessage.Request(
threadPool.getThreadContext(),
request,
version,
transportVersion,
action,
requestId,
isHandshake,
Expand Down Expand Up @@ -137,11 +140,11 @@ void sendResponse(
final boolean isHandshake,
final ResponseStatsConsumer responseStatsConsumer
) {
TransportVersion version = TransportVersion.min(this.version, transportVersion);
assert assertValidTransportVersion(transportVersion);
OutboundMessage.Response message = new OutboundMessage.Response(
threadPool.getThreadContext(),
response,
version,
transportVersion,
requestId,
isHandshake,
compressionScheme
Expand All @@ -158,7 +161,11 @@ void sendResponse(
} catch (Exception ex) {
if (isHandshake) {
logger.error(
() -> format("Failed to send handshake response version [%s] received on [%s], closing channel", version, channel),
() -> format(
"Failed to send handshake response version [%s] received on [%s], closing channel",
transportVersion,
channel
),
ex
);
channel.close();
Expand All @@ -179,9 +186,15 @@ void sendErrorResponse(
final ResponseStatsConsumer responseStatsConsumer,
final Exception error
) {
TransportVersion version = TransportVersion.min(this.version, transportVersion);
RemoteTransportException tx = new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error);
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), tx, version, requestId, false, null);
assert assertValidTransportVersion(transportVersion);
OutboundMessage.Response message = new OutboundMessage.Response(
threadPool.getThreadContext(),
new RemoteTransportException(nodeName, channel.getLocalAddress(), action, error),
transportVersion,
requestId,
false,
null
);
try {
sendMessage(channel, message, responseStatsConsumer, () -> messageListener.onResponseSent(requestId, action, error));
} catch (Exception sendException) {
Expand Down Expand Up @@ -297,4 +310,10 @@ public boolean rstOnClose() {
return rstOnClose;
}

private boolean assertValidTransportVersion(TransportVersion transportVersion) {
assert this.version.before(TransportVersions.MINIMUM_COMPATIBLE) // running an incompatible-version test
|| this.version.onOrAfter(transportVersion) : this.version + " vs " + transportVersion;
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void handleResponse(HandshakeResponse response) {
)
);
} else {
listener.onResponse(responseVersion);
listener.onResponse(TransportVersion.min(TransportHandshaker.this.version, response.getResponseVersion()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void setUp() throws Exception {
.address("host", "host_address", buildNewFakeTransportAddress())
.roles(Collections.emptySet())
.build();
threadPool = new TestThreadPool("thread-poll");
threadPool = new TestThreadPool(getTestName());
handshaker = new TransportHandshaker(TransportVersion.current(), threadPool, requestSender, false);
}

Expand Down Expand Up @@ -85,6 +86,37 @@ public void testHandshakeRequestAndResponse() throws IOException {
assertEquals(TransportVersion.current(), versionFuture.actionGet());
}

public void testHandshakeResponseFromOlderNode() throws Exception {
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
final long reqId = randomNonNegativeLong();
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);

assertFalse(versionFuture.isDone());

final var remoteVersion = TransportVersionUtils.randomCompatibleVersion(random());
handler.handleResponse(new TransportHandshaker.HandshakeResponse(remoteVersion));

assertTrue(versionFuture.isDone());
assertEquals(remoteVersion, versionFuture.result());
}

public void testHandshakeResponseFromNewerNode() throws Exception {
final PlainActionFuture<TransportVersion> versionFuture = new PlainActionFuture<>();
final long reqId = randomNonNegativeLong();
handshaker.sendHandshake(reqId, node, channel, SAFE_AWAIT_TIMEOUT, versionFuture);
TransportResponseHandler<TransportHandshaker.HandshakeResponse> handler = handshaker.removeHandlerForHandshake(reqId);

assertFalse(versionFuture.isDone());

handler.handleResponse(
new TransportHandshaker.HandshakeResponse(TransportVersion.fromId(TransportVersion.current().id() + between(0, 10)))
);

assertTrue(versionFuture.isDone());
assertEquals(TransportVersion.current(), versionFuture.result());
}

public void testHandshakeRequestFutureVersionsCompatibility() throws IOException {
long reqId = randomLongBetween(1, 10);
handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), new PlainActionFuture<>());
Expand Down

0 comments on commit ba57820

Please sign in to comment.