Skip to content

Commit

Permalink
feat: improve wait logic to a more elegant solution open-feature#1160
Browse files Browse the repository at this point in the history
Signed-off-by: christian.lutnik <[email protected]>
  • Loading branch information
chrfwow committed Jan 20, 2025
1 parent 969448a commit 4402e66
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Wait;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
Expand Down Expand Up @@ -35,6 +36,7 @@ public class FlagdProvider extends EventProvider {
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private final Wait connectionWait = new Wait();

protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
Expand All @@ -55,7 +57,7 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::isConnected, this::onConnectionEvent);
this.flagResolver = new InProcessResolver(options, connectionWait, this::onConnectionEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
Expand All @@ -82,6 +84,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws

this.flagResolver.init();
this.initialized = this.connected = true;
connectionWait.onFinished();
}

@Override
Expand Down Expand Up @@ -151,13 +154,12 @@ EvaluationContext getEnrichedContext() {
return enrichedContext;
}

private boolean isConnected() {
return this.connected;
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();
if (isConnected) {
connectionWait.onFinished();
}

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import dev.openfeature.sdk.exceptions.GeneralError;

/**
* A helper class to wait for events.
*/
public class Wait {
private volatile boolean isFinished;

/**
* Create a new Wait object.
*/
public Wait() {}

private Wait(boolean isFinished) {
this.isFinished = isFinished;
}

/**
* Blocks the calling thread until either {@link Wait#onFinished()} is called or the deadline is exceeded, whatever
* happens first.
* If the deadline is exceeded, a GeneralError will be thrown.
*
* @param deadline the maximum time in ms to wait
* @throws GeneralError when the deadline is exceeded before {@link Wait#onFinished()} is called on this object
*/
public void waitUntilFinished(long deadline) {
long start = System.currentTimeMillis();
long end = start + deadline;
while (!isFinished) {
long now = System.currentTimeMillis();
// if wait(0) is called, the thread would wait forever, so we abort when this would happen
if (now >= end) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d ms deadline", deadline));
}
long remaining = end - now;
synchronized (this) {
if (isFinished) { // might have changed in the meantime
return;
}
try {
this.wait(remaining);
} catch (InterruptedException e) {
// try again. Leave the continue to make PMD happy
continue;
}
}
}
}

/**
* Wake up all threads that have called {@link Wait#waitUntilFinished(long)}.
*/
public void onFinished() {
synchronized (this) {
isFinished = true;
this.notifyAll();
}
}

/**
* Create a new Wait object that is already finished. Calls to {@link Wait#waitUntilFinished(long)} will return
* immediately.
*
* @return an already finished Wait object
*/
public static Wait finished() {
return new Wait(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.common.Wait;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
Expand All @@ -28,7 +28,6 @@
import dev.openfeature.sdk.exceptions.TypeMismatchError;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -42,7 +41,7 @@ public class InProcessResolver implements Resolver {
private final Consumer<ConnectionEvent> onConnectionEvent;
private final Operator operator;
private final long deadline;
private final Supplier<Boolean> connectedSupplier;
private final Wait connectionWait;
private final String scope;

/**
Expand All @@ -51,20 +50,17 @@ public class InProcessResolver implements Resolver {
* Flags are evaluated locally.
*
* @param options flagd options
* @param connectedSupplier lambda providing current connection status from
* caller
* @param connectionWait A {@link Wait} object, which waits until a connection is established
* @param onConnectionEvent lambda which handles changes in the
* connection/stream
*/
public InProcessResolver(
FlagdOptions options,
final Supplier<Boolean> connectedSupplier,
Consumer<ConnectionEvent> onConnectionEvent) {
FlagdOptions options, final Wait connectionWait, Consumer<ConnectionEvent> onConnectionEvent) {
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
this.operator = new Operator();
this.connectedSupplier = connectedSupplier;
this.connectionWait = connectionWait;
this.scope = options.getSelector();
}

Expand Down Expand Up @@ -97,7 +93,7 @@ public void init() throws Exception {
stateWatcher.start();

// block till ready
Util.busyWaitAndCheck(this.deadline, this.connectedSupplier);
connectionWait.waitUntilFinished(deadline);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import dev.openfeature.sdk.exceptions.GeneralError;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class WaitTest {
private static final long PERMISSIBLE_EPSILON = 20;

@Timeout(2)
@Test
void waitUntilFinished_failsWhenDeadlineElapses() {
final Wait wait = new Wait();
Assertions.assertThrows(GeneralError.class, () -> wait.waitUntilFinished(10));
}

@Timeout(2)
@Test
void waitUntilFinished_WaitsApproxForDeadline() {
final Wait wait = new Wait();
final AtomicLong start = new AtomicLong();
final AtomicLong end = new AtomicLong();
final long deadline = 45;
Assertions.assertThrows(GeneralError.class, () -> {
start.set(System.currentTimeMillis());
try {
wait.waitUntilFinished(deadline);
} catch (Exception e) {
end.set(System.currentTimeMillis());
throw e;
}
});
final long elapsed = end.get() - start.get();
// should wait at least for the deadline
Assertions.assertTrue(elapsed >= deadline);
// should not wait much longer than the deadline
Assertions.assertTrue(elapsed < deadline + PERMISSIBLE_EPSILON);
}

@Timeout(2)
@Test
void interruptingWaitingThread_isIgnored() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final Wait wait = new Wait();
final long deadline = 500;
Thread t0 = new Thread(() -> {
long start = System.currentTimeMillis();
isWaiting.set(true);
wait.waitUntilFinished(deadline);
long end = System.currentTimeMillis();
long duration = end - start;
// even though thread was interrupted, it still waited for the deadline
Assertions.assertTrue(duration >= deadline);
Assertions.assertTrue(duration < deadline + PERMISSIBLE_EPSILON);
});
t0.start();

while (!isWaiting.get()) {
Thread.yield();
}

Thread.sleep(10); // t0 should have started waiting in the meantime

for (int i = 0; i < 50; i++) {
t0.interrupt();
Thread.sleep(10);
}

t0.join();
}

@Timeout(2)
@Test
void callingOnFinished_wakesUpWaitingThread() throws InterruptedException {
final AtomicBoolean isWaiting = new AtomicBoolean();
final Wait wait = new Wait();
Thread t0 = new Thread(() -> {
long start = System.currentTimeMillis();
isWaiting.set(true);
wait.waitUntilFinished(10000);
long end = System.currentTimeMillis();
long duration = end - start;
Assertions.assertTrue(duration < PERMISSIBLE_EPSILON);
});
t0.start();

while (!isWaiting.get()) {
Thread.yield();
}

Thread.sleep(10); // t0 should have started waiting in the meantime

wait.onFinished();

t0.join();
}

@Timeout(2)
@Test
void waitingOnFinished_returnsInstantly() {
Wait finished = Wait.finished();
long start = System.currentTimeMillis();
finished.waitUntilFinished(10000);
long end = System.currentTimeMillis();
// do not use PERMISSIBLE_EPSILON here, this should happen faster than that
Assertions.assertTrue(start + 2 > end);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import dev.openfeature.contrib.providers.flagd.Config;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Wait;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.MockConnector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState;
Expand Down Expand Up @@ -517,16 +518,16 @@ void flagSetMetadataIsOverwrittenByFlagMetadataToEvaluation() throws Exception {
private InProcessResolver getInProcessResolverWith(final FlagdOptions options, final MockStorage storage)
throws NoSuchFieldException, IllegalAccessException {

final InProcessResolver resolver = new InProcessResolver(options, () -> true, connectionEvent -> {});
final InProcessResolver resolver = new InProcessResolver(options, Wait.finished(), connectionEvent -> {});
return injectFlagStore(resolver, storage);
}

private InProcessResolver getInProcessResolverWith(
final MockStorage storage, final Consumer<ConnectionEvent> onConnectionEvent)
throws NoSuchFieldException, IllegalAccessException {

final InProcessResolver resolver =
new InProcessResolver(FlagdOptions.builder().deadline(1000).build(), () -> true, onConnectionEvent);
final InProcessResolver resolver = new InProcessResolver(
FlagdOptions.builder().deadline(1000).build(), Wait.finished(), onConnectionEvent);
return injectFlagStore(resolver, storage);
}

Expand All @@ -535,7 +536,7 @@ private InProcessResolver getInProcessResolverWith(
throws NoSuchFieldException, IllegalAccessException {

final InProcessResolver resolver = new InProcessResolver(
FlagdOptions.builder().selector(selector).deadline(1000).build(), () -> true, onConnectionEvent);
FlagdOptions.builder().selector(selector).deadline(1000).build(), Wait.finished(), onConnectionEvent);
return injectFlagStore(resolver, storage);
}

Expand Down

0 comments on commit 4402e66

Please sign in to comment.