Skip to content

Commit

Permalink
Add timeout to delta chunk send operation.
Browse files Browse the repository at this point in the history
Add a metric for how long this operation takes so we can reduce the timeout once we have more data.
  • Loading branch information
lionell-pack-ttd committed Feb 1, 2024
1 parent 2abb5ea commit e852463
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 51 deletions.
111 changes: 61 additions & 50 deletions src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import com.uid2.shared.Utils;
import com.uid2.shared.optout.OptOutEntry;
import com.uid2.shared.optout.OptOutUtils;
import io.micrometer.core.instrument.Metrics;
import io.netty.handler.codec.http.HttpMethod;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.micrometer.core.instrument.Timer;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,6 +18,7 @@
import java.net.URISyntaxException;
import java.net.http.HttpRequest;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint {
Expand All @@ -30,9 +33,14 @@ public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint {

private final EndpointConfig config;
private final RetryingWebClient retryingClient;
private final Timer timer;

public OptOutPartnerEndpoint(Vertx vertx, EndpointConfig config) {
this.config = config;
this.timer = Timer.builder("uid2.optout.deltasend_successfulchunktime_ms")
.description("Timer for each HTTP connection that successfully transfers part of a delta to a partner")
.tag("remote_partner", this.name())
.register(Metrics.globalRegistry);
this.retryingClient = new RetryingWebClient(vertx, config.url(), config.method(), config.retryCount(), config.retryBackoffMs());
}

Expand All @@ -43,58 +51,61 @@ public String name() {

@Override
public Future<Void> send(OptOutEntry entry) {
long startTimeMs = System.currentTimeMillis();
return this.retryingClient.send(
(URI uri, HttpMethod method) -> {
URIBuilder uriBuilder = new URIBuilder(uri);

for (String queryParam : config.queryParams()) {
int indexOfEqualSign = queryParam.indexOf('=');
String paramName = queryParam.substring(0, indexOfEqualSign);
String paramValue = queryParam.substring(indexOfEqualSign + 1);
String replacedValue = replaceValueReferences(entry, paramValue);

uriBuilder.addParameter(paramName, replacedValue);
}

URI uriWithParams;
try {
uriWithParams = uriBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uriWithParams)
.method(method.toString(), HttpRequest.BodyPublishers.noBody());

for (String additionalHeader : this.config.additionalHeaders()) {
int indexOfColonSign = additionalHeader.indexOf(':');
String headerName = additionalHeader.substring(0, indexOfColonSign);
String headerValue = additionalHeader.substring(indexOfColonSign + 1);
String replacedValue = replaceValueReferences(entry, headerValue);
builder.header(headerName, replacedValue);
}

LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp);

return builder.build();
},
resp -> {
if (resp == null) {
throw new RuntimeException("response is null");
}

if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) {
return true;
}

LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp);
if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) {
return false;
} else {
throw new UnexpectedStatusCodeException(resp.statusCode());
(URI uri, HttpMethod method) -> {
URIBuilder uriBuilder = new URIBuilder(uri);

for (String queryParam : config.queryParams()) {
int indexOfEqualSign = queryParam.indexOf('=');
String paramName = queryParam.substring(0, indexOfEqualSign);
String paramValue = queryParam.substring(indexOfEqualSign + 1);
String replacedValue = replaceValueReferences(entry, paramValue);

uriBuilder.addParameter(paramName, replacedValue);
}

URI uriWithParams;
try {
uriWithParams = uriBuilder.build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}

HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uriWithParams)
.method(method.toString(), HttpRequest.BodyPublishers.noBody());

for (String additionalHeader : this.config.additionalHeaders()) {
int indexOfColonSign = additionalHeader.indexOf(':');
String headerName = additionalHeader.substring(0, indexOfColonSign);
String headerValue = additionalHeader.substring(indexOfColonSign + 1);
String replacedValue = replaceValueReferences(entry, headerValue);
builder.header(headerName, replacedValue);
}

LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp);

return builder.build();
},
resp -> {
if (resp == null) {
throw new RuntimeException("response is null");
}

if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) {
long finishTimeMs = System.currentTimeMillis();
timer.record(finishTimeMs - startTimeMs, TimeUnit.MILLISECONDS);
return true;
}

LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp);
if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) {
return false;
} else {
throw new UnexpectedStatusCodeException(resp.statusCode());
}
}
}
);
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/uid2/optout/web/RetryingWebClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;

public class RetryingWebClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RetryingWebClient.class);
private final URI uri;
private final HttpMethod method;
private final long resultTimeoutMs;
private final int retryCount;
private final int retryBackoffMs;
private final HttpClient httpClient;
private Vertx vertx;

public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs) {
this(vertx, uri, method, retryCount, retryBackoffMs, 5*60*1000);
}
public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs, long resultTimeoutMs) {
this.vertx = vertx;
this.uri = URI.create(uri);
this.method = method;
this.resultTimeoutMs = resultTimeoutMs;
this.httpClient = HttpClient.newHttpClient();

this.retryCount = retryCount;
Expand All @@ -42,7 +48,8 @@ public Future<Void> send(BiFunction<URI, HttpMethod, HttpRequest> requestCreator
Promise<Void> promise = Promise.promise();

HttpRequest request = requestCreator.apply(this.uri, this.method);
CompletableFuture<HttpResponse<String>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
CompletableFuture<HttpResponse<String>> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.orTimeout(this.resultTimeoutMs, TimeUnit.MILLISECONDS);

asyncResponse.thenAccept(response -> {
try {
Expand Down
46 changes: 46 additions & 0 deletions src/test/java/com/uid2/optout/web/RetryingWebClientTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.uid2.optout.web;

import io.netty.handler.codec.http.HttpMethod;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.After;
Expand All @@ -11,8 +15,11 @@

import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

@RunWith(VertxUnitRunner.class)
public class RetryingWebClientTest {
Expand All @@ -34,6 +41,13 @@ public void setup(TestContext ctx) {
// pick a random code and respond with it
int statusCode = Integer.valueOf(statusCodes[rand.nextInt(statusCodes.length)]);
req.response().setStatusCode(statusCode).end();
} else if (subPath.startsWith("delayed")) {
vertx.setTimer(1000, id -> {
try {
req.response().setStatusCode(200).end();
}
catch (Exception ex) {}
});
} else {
int statusCode = Integer.valueOf(subPath);
req.response().setStatusCode(statusCode).end();
Expand Down Expand Up @@ -175,4 +189,36 @@ private void expectImmediateFailure_withNonRetryErrors(TestContext ctx, HttpMeth
}));
}
}

public Function<HttpResponse, Boolean> assertStatusCodeFactory(TestContext ctx, int code) {
return result -> {
ctx.assertEquals(code, result.statusCode());
return code == result.statusCode();
};
}
public Handler<AsyncResult<Void>> ensureAsyncExceptionFactory(TestContext ctx, Class<? extends Exception> exceptionClass) {
return ctx.asyncAssertFailure(cause -> {
ctx.assertTrue(cause.getClass() == exceptionClass, "Expected a " + exceptionClass.toString() + " but got a " + cause);
});
}

@Test
public void longRequest_longerTimeout_expectSuccess(TestContext ctx) {
testDelayedResponse(ctx, assertStatusCodeFactory(ctx, 200), 1500)
.onComplete(ctx.asyncAssertSuccess());
}

@Test
public void longRequest_shorterTimeout_expectFailure(TestContext ctx) {
testDelayedResponse(ctx, req -> true, 500)
.onComplete(ensureAsyncExceptionFactory(ctx, TimeoutException.class));
}

private Future<Void> testDelayedResponse(TestContext ctx, Function<HttpResponse, Boolean> assertion, int resultTimeoutMs) {
Async async = ctx.async();

RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/delayed", HttpMethod.GET, 0, 0, resultTimeoutMs);
return c.send((URI uri, HttpMethod method) -> HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(), assertion)
.andThen(r -> async.complete());
}
}

0 comments on commit e852463

Please sign in to comment.