Skip to content

Commit

Permalink
fix(vertx): VertxHttpClient uses exclusive Vert.x instance by default
Browse files Browse the repository at this point in the history
A shared Vert.x instance can still be provided to the VertxHttpClientFactory.

This instance will be shared across the different VertxHttpClient instances.

It's responsibility of the user to handle the Vert.x shared instance lifecycle

Signed-off-by: Marc Nuri <[email protected]>
  • Loading branch information
manusa committed Jan 28, 2025
1 parent ffe667d commit 832c138
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs
* Fix #6781: Allowing ipv6 entries to work in NO_PROXY
* Fix #6709: VertxHttpClientFactory reuses the same Vertx instance for each VertxHttpClient instance
* Fix #6792: VertxHttpClient uses exclusive Vert.x instance by default

### 6.13.4 (2024-09-25)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class InputStreamReadStream implements ReadStream<Buffer> {
private Handler<Void> endHandler;
private byte[] bytes;

public InputStreamReadStream(VertxHttpRequest vertxHttpRequest, InputStream is, HttpClientRequest request) {
InputStreamReadStream(VertxHttpRequest vertxHttpRequest, InputStream is, HttpClientRequest request) {
this.vertxHttpRequest = vertxHttpRequest;
this.is = is;
this.request = request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,25 @@

public class VertxHttpClient<F extends io.fabric8.kubernetes.client.http.HttpClient.Factory>
extends StandardHttpClient<VertxHttpClient<F>, F, VertxHttpClientBuilder<F>> {

private final Vertx vertx;
private final HttpClient client;

VertxHttpClient(VertxHttpClientBuilder<F> vertxHttpClientBuilder, HttpClient client, AtomicBoolean closed) {
private final boolean closeVertx;

/**
* Create a new VertxHttpClient instance.
*
* @param vertxHttpClientBuilder the builder that created this client.
* @param closed a flag to indicate if the client has been closed.
* @param client the Vert.x HttpClient instance (will be closed alongside the client).
* @param closeVertx whether the Vert.x instance should be closed when the client is closed.
*/
VertxHttpClient(VertxHttpClientBuilder<F> vertxHttpClientBuilder, AtomicBoolean closed, HttpClient client,
boolean closeVertx) {
super(vertxHttpClientBuilder, closed);
this.vertx = vertxHttpClientBuilder.vertx;
this.client = client;
this.closeVertx = closeVertx;
}

HttpClient getClient() {
Expand All @@ -69,7 +81,7 @@ public CompletableFuture<WebSocketResponse> buildWebSocketDirect(StandardWebSock
options.setTimeout(request.getTimeout().toMillis());
}

request.headers().entrySet().stream()
request.headers().entrySet()
.forEach(e -> e.getValue().stream().forEach(v -> options.addHeader(e.getKey(), v)));
options.setAbsoluteURI(WebSocket.toWebSocketUri(request.uri()).toString());

Expand Down Expand Up @@ -121,7 +133,13 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytesDirect(StandardHtt

@Override
public void doClose() {
client.close();
try {
client.close();
} finally {
if (closeVertx) {
vertx.close();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.file.FileSystemOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.ProxyOptions;
Expand All @@ -37,6 +39,7 @@
import java.util.stream.Stream;

import static io.fabric8.kubernetes.client.utils.HttpClientUtils.decodeBasicCredentials;
import static io.vertx.core.spi.resolver.ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME;

public class VertxHttpClientBuilder<F extends HttpClient.Factory>
extends StandardHttpClientBuilder<VertxHttpClient<F>, F, VertxHttpClientBuilder<F>> {
Expand All @@ -46,16 +49,25 @@ public class VertxHttpClientBuilder<F extends HttpClient.Factory>
private static final int MAX_WS_MESSAGE_SIZE = Integer.MAX_VALUE;

final Vertx vertx;
private final boolean closeVertx;

public VertxHttpClientBuilder(F clientFactory, Vertx vertx) {
public VertxHttpClientBuilder(F clientFactory, Vertx sharedVertx) {
this(
clientFactory,
sharedVertx != null ? sharedVertx : createVertxInstance(),
sharedVertx == null);
}

VertxHttpClientBuilder(F clientFactory, Vertx vertx, boolean closeVertx) {
super(clientFactory);
this.vertx = vertx;
this.closeVertx = closeVertx;
}

@Override
public VertxHttpClient<F> build() {
if (this.client != null) {
return new VertxHttpClient<>(this, this.client.getClient(), this.client.getClosed());
return new VertxHttpClient<>(this, this.client.getClosed(), this.client.getClient(), closeVertx);
}

WebClientOptions options = new WebClientOptions();
Expand Down Expand Up @@ -124,12 +136,12 @@ public SslContextFactory sslContextFactory() {
}
});
}
return new VertxHttpClient<>(this, vertx.createHttpClient(options), new AtomicBoolean());
return new VertxHttpClient<>(this, new AtomicBoolean(), vertx.createHttpClient(options), closeVertx);
}

@Override
protected VertxHttpClientBuilder<F> newInstance(F clientFactory) {
return new VertxHttpClientBuilder<>(clientFactory, vertx);
return new VertxHttpClientBuilder<>(clientFactory, vertx, closeVertx);
}

private ProxyType convertProxyType() {
Expand All @@ -145,4 +157,26 @@ private ProxyType convertProxyType() {
}
}

private static Vertx createVertxInstance() {
// We must disable the async DNS resolver as it can cause issues when resolving the Vault instance.
// This is done using the DISABLE_DNS_RESOLVER_PROP_NAME system property.
// The DNS resolver used by vert.x is configured during the (synchronous) initialization.
// So, we just need to disable the async resolver around the Vert.x instance creation.
final String originalValue = System.getProperty(DISABLE_DNS_RESOLVER_PROP_NAME);
Vertx vertx;
try {
System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, "true");
vertx = Vertx.vertx(new VertxOptions()
.setFileSystemOptions(new FileSystemOptions().setFileCachingEnabled(false).setClassPathResolvingEnabled(false))
.setUseDaemonThread(true));
} finally {
// Restore the original value
if (originalValue == null) {
System.clearProperty(DISABLE_DNS_RESOLVER_PROP_NAME);
} else {
System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, originalValue);
}
}
return vertx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,31 @@

import io.fabric8.kubernetes.client.Config;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.file.FileSystemOptions;
import io.vertx.ext.web.client.WebClientOptions;

import static io.vertx.core.spi.resolver.ResolverProvider.DISABLE_DNS_RESOLVER_PROP_NAME;

public class VertxHttpClientFactory implements io.fabric8.kubernetes.client.http.HttpClient.Factory {

private static final class VertxHolder {

private static final Vertx INSTANCE = createVertxInstance();

private static synchronized Vertx createVertxInstance() {
// We must disable the async DNS resolver as it can cause issues when resolving the Vault instance.
// This is done using the DISABLE_DNS_RESOLVER_PROP_NAME system property.
// The DNS resolver used by vert.x is configured during the (synchronous) initialization.
// So, we just need to disable the async resolver around the Vert.x instance creation.
final String originalValue = System.getProperty(DISABLE_DNS_RESOLVER_PROP_NAME);
Vertx vertx;
try {
System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, "true");
vertx = Vertx.vertx(new VertxOptions()
.setFileSystemOptions(new FileSystemOptions().setFileCachingEnabled(false).setClassPathResolvingEnabled(false))
.setUseDaemonThread(true));
} finally {
// Restore the original value
if (originalValue == null) {
System.clearProperty(DISABLE_DNS_RESOLVER_PROP_NAME);
} else {
System.setProperty(DISABLE_DNS_RESOLVER_PROP_NAME, originalValue);
}
}
return vertx;
}
}

private final Vertx vertx;
final Vertx sharedVertx;

public VertxHttpClientFactory() {
this(VertxHolder.INSTANCE);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (vertx != null) {
vertx.close();
}
}));
this(null);
}

public VertxHttpClientFactory(Vertx vertx) {
this.vertx = vertx;
/**
* Create a new instance of the factory that will reuse the provided {@link Vertx} instance.
* <p>
* It's the user's responsibility to manage the lifecycle of the provided Vert.x instance.
* Operations such as close, and so on are left on hands of the user.
*
* @param sharedVertx the Vertx instance to use.
*/
public VertxHttpClientFactory(Vertx sharedVertx) {
this.sharedVertx = sharedVertx;
}

@Override
public VertxHttpClientBuilder<VertxHttpClientFactory> newBuilder() {
return new VertxHttpClientBuilder<>(this, vertx);
return new VertxHttpClientBuilder<>(this, sharedVertx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -77,7 +76,7 @@ public Optional<HttpResponse<?>> previousResponse() {

final Vertx vertx;
private final RequestOptions options;
private StandardHttpRequest request;
private final StandardHttpRequest request;

public VertxHttpRequest(Vertx vertx, RequestOptions options, StandardHttpRequest request) {
this.vertx = vertx;
Expand Down Expand Up @@ -113,7 +112,7 @@ public void cancel() {
};
resp.handler(buffer -> {
try {
consumer.consume(Arrays.asList(ByteBuffer.wrap(buffer.getBytes())), result);
consumer.consume(List.of(ByteBuffer.wrap(buffer.getBytes())), result);
} catch (Exception e) {
resp.request().reset();
result.done().completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
package io.fabric8.kubernetes.client.vertx;

import io.fabric8.kubernetes.client.http.HttpClient;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxImpl;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;

class VertxHttpClientBuilderTest {
Expand All @@ -35,4 +39,46 @@ void testZeroTimeouts() {
}
}

@Test
void reusesVertxInstanceWhenSharedVertx() {
Vertx vertx = Vertx.vertx();
try (HttpClient client = new VertxHttpClientFactory(vertx).newBuilder().build()) {
assertThat(client)
.isInstanceOf(VertxHttpClient.class)
.extracting("vertx")
.isSameAs(vertx);
} finally {
vertx.close();
}
}

@Test
void createsVertxInstanceWhenNoSharedVertx() {
try (HttpClient client = new VertxHttpClientFactory().newBuilder().build()) {
assertThat(client)
.isInstanceOf(VertxHttpClient.class)
.extracting("vertx")
.isNotNull();
}
}

@Test
void doesntCloseSharedVertxInstanceWhenClientIsClosed() {
final Vertx vertx = Vertx.vertx();
final var builder = new VertxHttpClientFactory(vertx).newBuilder();
builder.build().close();
assertThat(builder.vertx)
.asInstanceOf(InstanceOfAssertFactories.type(VertxImpl.class))
.returns(false, vi -> vi.closeFuture().isClosed());
vertx.close();
}

@Test
void closesVertxInstanceWhenClientIsClosed() {
final var builder = new VertxHttpClientFactory().newBuilder();
builder.build().close();
assertThat(builder.vertx)
.asInstanceOf(InstanceOfAssertFactories.type(VertxImpl.class))
.returns(true, vi -> vi.closeFuture().isClosed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ void testCustomExceptionHandler() throws InterruptedException {
}

@Test
void testClientStopClosesInformer() throws InterruptedException {
void testClientStopClosesInformer() throws Exception {
// Given
setupMockServerExpectations(Animal.class, "ns1", this::getList,
r -> new WatchEvent(getAnimal("red-panda", "Carnivora", r), "ADDED"), null, null);
Expand All @@ -1218,6 +1218,8 @@ void testClientStopClosesInformer() throws InterruptedException {

animalSharedIndexInformer.start();

await().atMost(10, TimeUnit.SECONDS).until(animalSharedIndexInformer::hasSynced);

client.close();

await().atMost(60, TimeUnit.SECONDS).until(() -> animalSharedIndexInformer.stopped().toCompletableFuture().isDone());
Expand Down

0 comments on commit 832c138

Please sign in to comment.