Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout and metrics on delta chunk sends. #73

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import com.uid2.shared.Utils;
import com.uid2.shared.optout.OptOutEntry;
import com.uid2.shared.optout.OptOutUtils;
import io.micrometer.core.instrument.Metrics;
import io.netty.handler.codec.http.HttpMethod;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.micrometer.core.instrument.Timer;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,6 +18,7 @@
import java.net.URISyntaxException;
import java.net.http.HttpRequest;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint {
Expand All @@ -30,9 +33,14 @@ public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint {

private final EndpointConfig config;
private final RetryingWebClient retryingClient;
private final Timer timer;

public OptOutPartnerEndpoint(Vertx vertx, EndpointConfig config) {
this.config = config;
this.timer = Timer.builder("uid2.optout.deltasend_successfulchunktime_ms")
.description("Timer for each HTTP connection that successfully transfers part of a delta to a partner")
.tag("remote_partner", this.name())
.register(Metrics.globalRegistry);
this.retryingClient = new RetryingWebClient(vertx, config.url(), config.method(), config.retryCount(), config.retryBackoffMs());
}

Expand All @@ -43,6 +51,7 @@ public String name() {

@Override
public Future<Void> send(OptOutEntry entry) {
long startTimeMs = System.currentTimeMillis();
return this.retryingClient.send(
(URI uri, HttpMethod method) -> {
URIBuilder uriBuilder = new URIBuilder(uri);
Expand Down Expand Up @@ -85,6 +94,8 @@ public Future<Void> send(OptOutEntry entry) {
}

if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) {
long finishTimeMs = System.currentTimeMillis();
timer.record(finishTimeMs - startTimeMs, TimeUnit.MILLISECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only recording successful sends - failures could happen for a range of reasons that might affect the time they take, so having a timer on "any failed request" probably isn't useful.

return true;
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/uid2/optout/web/RetryingWebClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;

public class RetryingWebClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RetryingWebClient.class);
private final URI uri;
private final HttpMethod method;
private final long resultTimeoutMs;
private final int retryCount;
private final int retryBackoffMs;
private final HttpClient httpClient;
private Vertx vertx;

public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs) {
this(vertx, uri, method, retryCount, retryBackoffMs, 5*60*1000);
}
public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs, long resultTimeoutMs) {
this.vertx = vertx;
this.uri = URI.create(uri);
this.method = method;
this.resultTimeoutMs = resultTimeoutMs;
this.httpClient = HttpClient.newHttpClient();

this.retryCount = retryCount;
Expand All @@ -42,7 +48,8 @@ public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator
Promise<Void> promise = Promise.promise();

HttpRequest request = requestCreator.apply(this.uri, this.method);
CompletableFuture<HttpResponse<String>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
CompletableFuture<HttpResponse<String>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.orTimeout(this.resultTimeoutMs, TimeUnit.MILLISECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout. This won't hit the retry logic - the retries only kick in if the overall network request succeeds and the server returns a failure (i.e. the future succeeds with a response indicating failure). This is consistent with that behavior, but maybe we want to think about whether exceptions should also trigger the retry logic.


asyncResponse.thenAccept(response -> {
try {
Expand Down
46 changes: 46 additions & 0 deletions src/test/java/com/uid2/optout/web/RetryingWebClientTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.uid2.optout.web;

import io.netty.handler.codec.http.HttpMethod;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
Expand All @@ -11,8 +15,11 @@

import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

@RunWith(VertxUnitRunner.class)
public class RetryingWebClientTest {
Expand All @@ -34,6 +41,13 @@ public void setup(TestContext ctx) {
// pick a random code and respond with it
int statusCode = Integer.valueOf(statusCodes[rand.nextInt(statusCodes.length)]);
req.response().setStatusCode(statusCode).end();
} else if (subPath.startsWith("delayed")) {
vertx.setTimer(1000, id -> {
try {
req.response().setStatusCode(200).end();
}
catch (Exception ex) {}
});
} else {
int statusCode = Integer.valueOf(subPath);
req.response().setStatusCode(statusCode).end();
Expand Down Expand Up @@ -175,4 +189,36 @@ private void expectImmediateFailure_withNonRetryErrors(TestContext ctx, HttpMeth
}));
}
}

public Function<HttpResponse, Boolean> assertStatusCodeFactory(TestContext ctx, int code) {
return result -> {
ctx.assertEquals(code, result.statusCode());
return code == result.statusCode();
};
}
public Handler<AsyncResult<Void>> ensureAsyncExceptionFactory(TestContext ctx, Class<? extends Exception> exceptionClass) {
return ctx.asyncAssertFailure(cause -> {
ctx.assertTrue(cause.getClass() == exceptionClass, "Expected a " + exceptionClass.toString() + " but got a " + cause);
});
}

@Test
public void longRequest_longerTimeout_expectSuccess(TestContext ctx) {
testDelayedResponse(ctx, assertStatusCodeFactory(ctx, 200), 1500)
.onComplete(ctx.asyncAssertSuccess());
}

@Test
public void longRequest_shorterTimeout_expectFailure(TestContext ctx) {
testDelayedResponse(ctx, req -> true, 500)
.onComplete(ensureAsyncExceptionFactory(ctx, TimeoutException.class));
}

private Future<Void> testDelayedResponse(TestContext ctx, Function<HttpResponse, Boolean> assertion, int resultTimeoutMs) {
Async async = ctx.async();

RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/delayed", HttpMethod.GET, 0, 0, resultTimeoutMs);
return c.send((URI uri, HttpMethod method) -> HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(), assertion)
.andThen(r -> async.complete());
}
}
Loading