From 832c1389b1e03a2f3b6bf997a9f5620d722cb902 Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Mon, 27 Jan 2025 17:29:02 +0100 Subject: [PATCH] fix(vertx): VertxHttpClient uses exclusive Vert.x instance by default A shared Vert.x instance can still be provided to the VertxHttpClientFactory. This instance will be shared across the different VertxHttpClient instances. It's responsibility of the user to handle the Vert.x shared instance lifecycle Signed-off-by: Marc Nuri --- CHANGELOG.md | 1 + .../client/vertx/InputStreamReadStream.java | 2 +- .../client/vertx/VertxHttpClient.java | 26 +++++++-- .../client/vertx/VertxHttpClientBuilder.java | 42 ++++++++++++-- .../client/vertx/VertxHttpClientFactory.java | 55 +++++-------------- .../client/vertx/VertxHttpRequest.java | 5 +- .../vertx/VertxHttpClientBuilderTest.java | 46 ++++++++++++++++ .../mock/DefaultSharedIndexInformerTest.java | 4 +- 8 files changed, 126 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b522b1ebd5..52e57091391 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Bugs * Fix #6781: Allowing ipv6 entries to work in NO_PROXY * Fix #6709: VertxHttpClientFactory reuses the same Vertx instance for each VertxHttpClient instance +* Fix #6792: VertxHttpClient uses exclusive Vert.x instance by default ### 6.13.4 (2024-09-25) diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/InputStreamReadStream.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/InputStreamReadStream.java index 7e676108c98..145321abb16 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/InputStreamReadStream.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/InputStreamReadStream.java @@ -41,7 +41,7 @@ class InputStreamReadStream implements ReadStream { private Handler endHandler; private byte[] bytes; - public InputStreamReadStream(VertxHttpRequest vertxHttpRequest, InputStream is, HttpClientRequest request) { + InputStreamReadStream(VertxHttpRequest vertxHttpRequest, InputStream is, HttpClientRequest request) { this.vertxHttpRequest = vertxHttpRequest; this.is = is; this.request = request; diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java index 2b3f0f28b4c..586aa53fd76 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClient.java @@ -41,13 +41,25 @@ public class VertxHttpClient extends StandardHttpClient, F, VertxHttpClientBuilder> { + private final Vertx vertx; private final HttpClient client; - - VertxHttpClient(VertxHttpClientBuilder vertxHttpClientBuilder, HttpClient client, AtomicBoolean closed) { + private final boolean closeVertx; + + /** + * Create a new VertxHttpClient instance. + * + * @param vertxHttpClientBuilder the builder that created this client. + * @param closed a flag to indicate if the client has been closed. + * @param client the Vert.x HttpClient instance (will be closed alongside the client). + * @param closeVertx whether the Vert.x instance should be closed when the client is closed. + */ + VertxHttpClient(VertxHttpClientBuilder vertxHttpClientBuilder, AtomicBoolean closed, HttpClient client, + boolean closeVertx) { super(vertxHttpClientBuilder, closed); this.vertx = vertxHttpClientBuilder.vertx; this.client = client; + this.closeVertx = closeVertx; } HttpClient getClient() { @@ -69,7 +81,7 @@ public CompletableFuture buildWebSocketDirect(StandardWebSock options.setTimeout(request.getTimeout().toMillis()); } - request.headers().entrySet().stream() + request.headers().entrySet() .forEach(e -> e.getValue().stream().forEach(v -> options.addHeader(e.getKey(), v))); options.setAbsoluteURI(WebSocket.toWebSocketUri(request.uri()).toString()); @@ -121,7 +133,13 @@ public CompletableFuture> consumeBytesDirect(StandardHtt @Override public void doClose() { - client.close(); + try { + client.close(); + } finally { + if (closeVertx) { + vertx.close(); + } + } } } diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java index 5b6e63e17e2..7ea942dec07 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilder.java @@ -23,6 +23,8 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.JdkSslContext; import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.file.FileSystemOptions; import io.vertx.core.http.HttpVersion; import io.vertx.core.net.JdkSSLEngineOptions; import io.vertx.core.net.ProxyOptions; @@ -37,6 +39,7 @@ import java.util.stream.Stream; import static io.fabric8.kubernetes.client.utils.HttpClientUtils.decodeBasicCredentials; +import static io.vertx.core.spi.resolver.ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME; public class VertxHttpClientBuilder extends StandardHttpClientBuilder, F, VertxHttpClientBuilder> { @@ -46,16 +49,25 @@ public class VertxHttpClientBuilder private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE; final Vertx vertx; + private final boolean closeVertx; - public VertxHttpClientBuilder(F clientFactory, Vertx vertx) { + public VertxHttpClientBuilder(F clientFactory, Vertx sharedVertx) { + this( + clientFactory, + sharedVertx != null ? sharedVertx : createVertxInstance(), + sharedVertx == null); + } + + VertxHttpClientBuilder(F clientFactory, Vertx vertx, boolean closeVertx) { super(clientFactory); this.vertx = vertx; + this.closeVertx = closeVertx; } @Override public VertxHttpClient build() { if (this.client != null) { - return new VertxHttpClient<>(this, this.client.getClient(), this.client.getClosed()); + return new VertxHttpClient<>(this, this.client.getClosed(), this.client.getClient(), closeVertx); } WebClientOptions options = new WebClientOptions(); @@ -124,12 +136,12 @@ public SslContextFactory sslContextFactory() { } }); } - return new VertxHttpClient<>(this, vertx.createHttpClient(options), new AtomicBoolean()); + return new VertxHttpClient<>(this, new AtomicBoolean(), vertx.createHttpClient(options), closeVertx); } @Override protected VertxHttpClientBuilder newInstance(F clientFactory) { - return new VertxHttpClientBuilder<>(clientFactory, vertx); + return new VertxHttpClientBuilder<>(clientFactory, vertx, closeVertx); } private ProxyType convertProxyType() { @@ -145,4 +157,26 @@ private ProxyType convertProxyType() { } } + private static Vertx createVertxInstance() { + // We must disable the async DNS resolver as it can cause issues when resolving the Vault instance. + // This is done using the DISABLE_DNS_RESOLVER_PROP_NAME system property. + // The DNS resolver used by vert.x is configured during the (synchronous) initialization. + // So, we just need to disable the async resolver around the Vert.x instance creation. + final String originalValue = System.getProperty(DISABLE_DNS_RESOLVER_PROP_NAME); + Vertx vertx; + try { + System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, "true"); + vertx = Vertx.vertx(new VertxOptions() + .setFileSystemOptions(new FileSystemOptions().setFileCachingEnabled(false).setClassPathResolvingEnabled(false)) + .setUseDaemonThread(true)); + } finally { + // Restore the original value + if (originalValue == null) { + System.clearProperty(DISABLE_DNS_RESOLVER_PROP_NAME); + } else { + System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, originalValue); + } + } + return vertx; + } } diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientFactory.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientFactory.java index c5d969aaf21..d6b535a17b2 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientFactory.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientFactory.java @@ -17,60 +17,31 @@ import io.fabric8.kubernetes.client.Config; import io.vertx.core.Vertx; -import io.vertx.core.VertxOptions; -import io.vertx.core.file.FileSystemOptions; import io.vertx.ext.web.client.WebClientOptions; -import static io.vertx.core.spi.resolver.ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME; - public class VertxHttpClientFactory implements io.fabric8.kubernetes.client.http.HttpClient.Factory { - private static final class VertxHolder { - - private static final Vertx INSTANCE = createVertxInstance(); - - private static synchronized Vertx createVertxInstance() { - // We must disable the async DNS resolver as it can cause issues when resolving the Vault instance. - // This is done using the DISABLE_DNS_RESOLVER_PROP_NAME system property. - // The DNS resolver used by vert.x is configured during the (synchronous) initialization. - // So, we just need to disable the async resolver around the Vert.x instance creation. - final String originalValue = System.getProperty(DISABLE_DNS_RESOLVER_PROP_NAME); - Vertx vertx; - try { - System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, "true"); - vertx = Vertx.vertx(new VertxOptions() - .setFileSystemOptions(new FileSystemOptions().setFileCachingEnabled(false).setClassPathResolvingEnabled(false)) - .setUseDaemonThread(true)); - } finally { - // Restore the original value - if (originalValue == null) { - System.clearProperty(DISABLE_DNS_RESOLVER_PROP_NAME); - } else { - System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, originalValue); - } - } - return vertx; - } - } - - private final Vertx vertx; + final Vertx sharedVertx; public VertxHttpClientFactory() { - this(VertxHolder.INSTANCE); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - if (vertx != null) { - vertx.close(); - } - })); + this(null); } - public VertxHttpClientFactory(Vertx vertx) { - this.vertx = vertx; + /** + * Create a new instance of the factory that will reuse the provided {@link Vertx} instance. + *

+ * It's the user's responsibility to manage the lifecycle of the provided Vert.x instance. + * Operations such as close, and so on are left on hands of the user. + * + * @param sharedVertx the Vertx instance to use. + */ + public VertxHttpClientFactory(Vertx sharedVertx) { + this.sharedVertx = sharedVertx; } @Override public VertxHttpClientBuilder newBuilder() { - return new VertxHttpClientBuilder<>(this, vertx); + return new VertxHttpClientBuilder<>(this, sharedVertx); } /** diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java index 7164c1e0696..bdff0053a8d 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxHttpRequest.java @@ -32,7 +32,6 @@ import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -77,7 +76,7 @@ public Optional> previousResponse() { final Vertx vertx; private final RequestOptions options; - private StandardHttpRequest request; + private final StandardHttpRequest request; public VertxHttpRequest(Vertx vertx, RequestOptions options, StandardHttpRequest request) { this.vertx = vertx; @@ -113,7 +112,7 @@ public void cancel() { }; resp.handler(buffer -> { try { - consumer.consume(Arrays.asList(ByteBuffer.wrap(buffer.getBytes())), result); + consumer.consume(List.of(ByteBuffer.wrap(buffer.getBytes())), result); } catch (Exception e) { resp.request().reset(); result.done().completeExceptionally(e); diff --git a/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilderTest.java b/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilderTest.java index d1e80d3d046..f42ec8b8b2c 100644 --- a/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilderTest.java +++ b/httpclient-vertx/src/test/java/io/fabric8/kubernetes/client/vertx/VertxHttpClientBuilderTest.java @@ -16,10 +16,14 @@ package io.fabric8.kubernetes.client.vertx; import io.fabric8.kubernetes.client.http.HttpClient; +import io.vertx.core.Vertx; +import io.vertx.core.impl.VertxImpl; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; class VertxHttpClientBuilderTest { @@ -35,4 +39,46 @@ void testZeroTimeouts() { } } + @Test + void reusesVertxInstanceWhenSharedVertx() { + Vertx vertx = Vertx.vertx(); + try (HttpClient client = new VertxHttpClientFactory(vertx).newBuilder().build()) { + assertThat(client) + .isInstanceOf(VertxHttpClient.class) + .extracting("vertx") + .isSameAs(vertx); + } finally { + vertx.close(); + } + } + + @Test + void createsVertxInstanceWhenNoSharedVertx() { + try (HttpClient client = new VertxHttpClientFactory().newBuilder().build()) { + assertThat(client) + .isInstanceOf(VertxHttpClient.class) + .extracting("vertx") + .isNotNull(); + } + } + + @Test + void doesntCloseSharedVertxInstanceWhenClientIsClosed() { + final Vertx vertx = Vertx.vertx(); + final var builder = new VertxHttpClientFactory(vertx).newBuilder(); + builder.build().close(); + assertThat(builder.vertx) + .asInstanceOf(InstanceOfAssertFactories.type(VertxImpl.class)) + .returns(false, vi -> vi.closeFuture().isClosed()); + vertx.close(); + } + + @Test + void closesVertxInstanceWhenClientIsClosed() { + final var builder = new VertxHttpClientFactory().newBuilder(); + builder.build().close(); + assertThat(builder.vertx) + .asInstanceOf(InstanceOfAssertFactories.type(VertxImpl.class)) + .returns(true, vi -> vi.closeFuture().isClosed()); + } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 5cc338c9e2a..393bd376285 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -1205,7 +1205,7 @@ void testCustomExceptionHandler() throws InterruptedException { } @Test - void testClientStopClosesInformer() throws InterruptedException { + void testClientStopClosesInformer() throws Exception { // Given setupMockServerExpectations(Animal.class, "ns1", this::getList, r -> new WatchEvent(getAnimal("red-panda", "Carnivora", r), "ADDED"), null, null); @@ -1218,6 +1218,8 @@ void testClientStopClosesInformer() throws InterruptedException { animalSharedIndexInformer.start(); + await().atMost(10, TimeUnit.SECONDS).until(animalSharedIndexInformer::hasSynced); + client.close(); await().atMost(60, TimeUnit.SECONDS).until(() -> animalSharedIndexInformer.stopped().toCompletableFuture().isDone());