diff --git a/build.gradle b/build.gradle index 147d744..5060393 100644 --- a/build.gradle +++ b/build.gradle @@ -132,7 +132,7 @@ protobuf { } ext { - retrofit_version = "2.9.0" + retrofit_version = "2.11.0" jackson_version = "2.15.3" swagger_annotations_version = "2.2.18" lombok_version = "1.18.30" @@ -142,6 +142,7 @@ ext { mockito_core_version = "5.6.0" protobuf_version = "3.24.4" openfeature_version = "1.7.0" + eventsource_version = "4.1.1" } dependencies { @@ -163,6 +164,7 @@ dependencies { implementation("com.google.protobuf:protobuf-java:$protobuf_version") implementation("dev.openfeature:sdk:$openfeature_version") + implementation("com.launchdarkly:okhttp-eventsource:$eventsource_version") compileOnly("org.projectlombok:lombok:$lombok_version") @@ -191,17 +193,17 @@ configurations { task runLocalExample(type: JavaExec) { description = "Run the local bucketing example" classpath = sourceSets.examples.runtimeClasspath - main = 'com.devcycle.examples.LocalExample' + mainClass = 'com.devcycle.examples.LocalExample' } task runCloudExample(type: JavaExec) { description = "Run the cloud bucketing example" classpath = sourceSets.examples.runtimeClasspath - main = 'com.devcycle.examples.CloudExample' + mainClass = 'com.devcycle.examples.CloudExample' } task runOpenFeatureExample(type: JavaExec) { description = "Run the OpenFeature example" classpath = sourceSets.examples.runtimeClasspath - main = 'com.devcycle.examples.OpenFeatureExample' + mainClass = 'com.devcycle.examples.OpenFeatureExample' } diff --git a/src/examples/java/com/devcycle/examples/LocalExample.java b/src/examples/java/com/devcycle/examples/LocalExample.java index fad2665..ccb0953 100644 --- a/src/examples/java/com/devcycle/examples/LocalExample.java +++ b/src/examples/java/com/devcycle/examples/LocalExample.java @@ -1,5 +1,6 @@ package com.devcycle.examples; +import com.devcycle.sdk.server.common.logging.SimpleDevCycleLogger; import com.devcycle.sdk.server.common.model.DevCycleUser; import com.devcycle.sdk.server.local.api.DevCycleLocalClient; import com.devcycle.sdk.server.local.model.DevCycleLocalOptions; @@ -22,8 +23,11 @@ public static void main(String[] args) throws InterruptedException { // The default value can be of type string, boolean, number, or JSON Boolean defaultValue = false; - DevCycleLocalOptions options = DevCycleLocalOptions.builder().configPollingIntervalMs(60000) - .disableAutomaticEventLogging(false).disableCustomEventLogging(false).build(); + DevCycleLocalOptions options = DevCycleLocalOptions.builder() + .configPollingIntervalMS(60000) + .customLogger(new SimpleDevCycleLogger(SimpleDevCycleLogger.Level.DEBUG)) + .enableBetaRealtimeUpdates(true) + .build(); // Initialize DevCycle Client DevCycleLocalClient client = new DevCycleLocalClient(server_sdk_key, options); @@ -46,5 +50,6 @@ public static void main(String[] args) throws InterruptedException { } else { System.out.println("feature is NOT enabled"); } + Thread.sleep(10000); } } diff --git a/src/main/java/com/devcycle/sdk/server/common/model/ProjectConfig.java b/src/main/java/com/devcycle/sdk/server/common/model/ProjectConfig.java index 32ff1ca..9e96458 100644 --- a/src/main/java/com/devcycle/sdk/server/common/model/ProjectConfig.java +++ b/src/main/java/com/devcycle/sdk/server/common/model/ProjectConfig.java @@ -13,6 +13,7 @@ @NoArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) public class ProjectConfig { + @Schema(description = "Project Settings") private Object project; @@ -30,4 +31,8 @@ public class ProjectConfig { @Schema(description = "Variable Hashes for all Variables in this Project") private Object variableHashes; -} \ No newline at end of file + + @Schema(description = "SSE Configuration") + private SSE sse; +} + diff --git a/src/main/java/com/devcycle/sdk/server/common/model/SSE.java b/src/main/java/com/devcycle/sdk/server/common/model/SSE.java new file mode 100644 index 0000000..20f3b74 --- /dev/null +++ b/src/main/java/com/devcycle/sdk/server/common/model/SSE.java @@ -0,0 +1,18 @@ +package com.devcycle.sdk.server.common.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class SSE { + private String hostname; + private String path; +} + diff --git a/src/main/java/com/devcycle/sdk/server/common/model/SSEMessage.java b/src/main/java/com/devcycle/sdk/server/common/model/SSEMessage.java new file mode 100644 index 0000000..735fef9 --- /dev/null +++ b/src/main/java/com/devcycle/sdk/server/common/model/SSEMessage.java @@ -0,0 +1,18 @@ +package com.devcycle.sdk.server.common.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class SSEMessage { + private String etag; + private double lastModified; + private String type; +} diff --git a/src/main/java/com/devcycle/sdk/server/local/managers/EnvironmentConfigManager.java b/src/main/java/com/devcycle/sdk/server/local/managers/EnvironmentConfigManager.java index e1edb2f..9ee497c 100644 --- a/src/main/java/com/devcycle/sdk/server/local/managers/EnvironmentConfigManager.java +++ b/src/main/java/com/devcycle/sdk/server/local/managers/EnvironmentConfigManager.java @@ -3,19 +3,22 @@ import com.devcycle.sdk.server.common.api.IDevCycleApi; import com.devcycle.sdk.server.common.exception.DevCycleException; import com.devcycle.sdk.server.common.logging.DevCycleLogger; -import com.devcycle.sdk.server.common.model.ErrorResponse; -import com.devcycle.sdk.server.common.model.HttpResponseCode; -import com.devcycle.sdk.server.common.model.ProjectConfig; +import com.devcycle.sdk.server.common.model.*; import com.devcycle.sdk.server.local.api.DevCycleLocalApiClient; import com.devcycle.sdk.server.local.bucketing.LocalBucketing; import com.devcycle.sdk.server.local.model.DevCycleLocalOptions; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.launchdarkly.eventsource.FaultEvent; +import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.StartedEvent; import retrofit2.Call; import retrofit2.Response; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.Executors; @@ -26,9 +29,12 @@ public final class EnvironmentConfigManager { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final int DEFAULT_POLL_INTERVAL_MS = 30000; private static final int MIN_INTERVALS_MS = 1000; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); + private ScheduledExecutorService scheduler; private final IDevCycleApi configApiClient; private final LocalBucketing localBucketing; + private SSEManager sseManager; + private boolean isSSEConnected = false; + private final DevCycleLocalOptions options; private ProjectConfig config; private String configETag = ""; @@ -36,11 +42,13 @@ public final class EnvironmentConfigManager { private final String sdkKey; private final int pollingIntervalMS; + private static final int pollingIntervalSSEMS = 15 * 60 * 60 * 1000; private boolean pollingEnabled = true; public EnvironmentConfigManager(String sdkKey, LocalBucketing localBucketing, DevCycleLocalOptions options) { this.sdkKey = sdkKey; this.localBucketing = localBucketing; + this.options = options; configApiClient = new DevCycleLocalApiClient(sdkKey, options).initialize(); @@ -48,24 +56,25 @@ public EnvironmentConfigManager(String sdkKey, LocalBucketing localBucketing, De pollingIntervalMS = configPollingIntervalMS >= MIN_INTERVALS_MS ? configPollingIntervalMS : DEFAULT_POLL_INTERVAL_MS; - setupScheduler(); + scheduler = setupScheduler(); + scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS); } - private void setupScheduler() { - Runnable getConfigRunnable = new Runnable() { - public void run() { - try { - if (pollingEnabled) { - getConfig(); - } - } catch (DevCycleException e) { - DevCycleLogger.error("Failed to load config: " + e.getMessage()); + private ScheduledExecutorService setupScheduler() { + return Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); + } + + private final Runnable getConfigRunnable = new Runnable() { + public void run() { + try { + if (pollingEnabled) { + getConfig(); } + } catch (DevCycleException e) { + DevCycleLogger.error("Failed to load config: " + e.getMessage()); } - }; - - scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS); - } + } + }; public boolean isConfigInitialized() { return config != null; @@ -74,9 +83,57 @@ public boolean isConfigInitialized() { private ProjectConfig getConfig() throws DevCycleException { Call config = this.configApiClient.getConfig(this.sdkKey, this.configETag, this.configLastModified); this.config = getResponseWithRetries(config, 1); + if (this.options.isEnableBetaRealtimeUpdates()) { + try { + URI uri = new URI(this.config.getSse().getHostname() + this.config.getSse().getPath()); + if (sseManager == null) { + sseManager = new SSEManager(uri); + } + sseManager.restart(uri, this::handleSSEMessage, this::handleSSEError, this::handleSSEStarted); + } catch (URISyntaxException e) { + DevCycleLogger.warning("Failed to create SSEManager: " + e.getMessage()); + } + } return this.config; } + private Void handleSSEMessage(MessageEvent messageEvent) { + DevCycleLogger.debug("Received message: " + messageEvent.getData()); + if (!isSSEConnected) + { + handleSSEStarted(null); + } + + String data = messageEvent.getData(); + if (data == null || data.isEmpty() || data.equals("keepalive")) { + return null; + } + try { + SSEMessage message = OBJECT_MAPPER.readValue(data, SSEMessage.class); + if (message.getType() == null || message.getType().equals("refetchConfig") || message.getType().isEmpty()) { + DevCycleLogger.debug("Received refetchConfig message, fetching new config"); + getConfigRunnable.run(); + } + } catch (JsonProcessingException e) { + DevCycleLogger.warning("Failed to parse SSE message: " + e.getMessage()); + } + return null; + } + + private Void handleSSEError(FaultEvent faultEvent) { + DevCycleLogger.warning("Received error: " + faultEvent.getCause()); + return null; + } + + private Void handleSSEStarted(StartedEvent startedEvent) { + isSSEConnected = true; + DevCycleLogger.debug("SSE Connected - setting polling interval to " + pollingIntervalSSEMS); + scheduler.shutdown(); + scheduler = setupScheduler(); + scheduler.scheduleAtFixedRate(getConfigRunnable, 0, pollingIntervalSSEMS, TimeUnit.MILLISECONDS); + return null; + } + private ProjectConfig getResponseWithRetries(Call call, int maxRetries) throws DevCycleException { // attempt 0 is the initial request, attempt > 0 are all retries int attempt = 0; @@ -206,6 +263,9 @@ private void stopPolling() { } public void cleanup() { + if (sseManager != null) { + sseManager.close(); + } stopPolling(); } } \ No newline at end of file diff --git a/src/main/java/com/devcycle/sdk/server/local/managers/SSEManager.java b/src/main/java/com/devcycle/sdk/server/local/managers/SSEManager.java new file mode 100644 index 0000000..fb3581e --- /dev/null +++ b/src/main/java/com/devcycle/sdk/server/local/managers/SSEManager.java @@ -0,0 +1,107 @@ +package com.devcycle.sdk.server.local.managers; + +import com.devcycle.sdk.server.common.logging.DevCycleLogger; +import com.launchdarkly.eventsource.*; + +import java.net.URI; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class SSEManager { + + private EventSource eventSource; + private static Thread messageHandlerThread; + private URI uri; + + public SSEManager(URI uri) { + this.eventSource = buildEventSource(uri); + + this.uri = uri; + } + + public void close() { + eventSource.close(); + } + + public void restart(URI uri, Function messageHandler, Function errorHandler, Function stateHandler) { + if (this.uri.equals(uri) && eventSource != null && (eventSource.getState() == ReadyState.OPEN || eventSource.getState() == ReadyState.CONNECTING || eventSource.getState() == ReadyState.CLOSED)) { + return; + } + this.uri = uri; + if (eventSource != null && eventSource.getState() == ReadyState.OPEN) { + eventSource.close(); + } + if (messageHandlerThread != null) { + messageHandlerThread.interrupt(); + } + eventSource = buildEventSource(uri); + start(messageHandler, errorHandler, stateHandler); + } + + private EventSource buildEventSource(URI uri) { + return new EventSource.Builder(ConnectStrategy.http(uri).clientBuilderActions(clientBuilder -> + clientBuilder + .connectTimeout(100 * 60, TimeUnit.SECONDS) + .readTimeout(100 * 60, TimeUnit.SECONDS) + .writeTimeout(100 * 60, TimeUnit.SECONDS) + )).build(); + } + + private boolean start(Function messageHandler, Function errorHandler, Function stateHandler) { + switch (eventSource.getState()) { + case CONNECTING: + case OPEN: + break; + case CLOSED: + case RAW: + try { + eventSource.start(); + } catch (StreamException e) { + DevCycleLogger.error("Error starting event source", e); + return false; + } + break; + } + messageHandlerThread = new Thread(new SSEMessageHandler(eventSource, messageHandler, errorHandler, stateHandler)); + messageHandlerThread.start(); + return true; + } + + private static class SSEMessageHandler implements Runnable { + + private final Function messageHandler; + private final Function errorHandler; + private final Function stateHandler; + private final EventSource sse; + + public SSEMessageHandler(EventSource sse, Function messageHandler, Function errorHandler, Function stateHandler) { + this.messageHandler = messageHandler; + this.errorHandler = errorHandler; + this.stateHandler = stateHandler; + this.sse = sse; + } + + @Override + public void run() { + while (true) { + try { + StreamEvent event = sse.readAnyEvent(); + if (event instanceof MessageEvent) { + messageHandler.apply((MessageEvent) event); + } else if (event instanceof FaultEvent) { + errorHandler.apply((FaultEvent) event); + } else if (event instanceof StartedEvent) { + stateHandler.apply((StartedEvent) event); + } else if (event instanceof CommentEvent) { + messageHandler.apply(new MessageEvent(((CommentEvent) event).getText())); + } else { + DevCycleLogger.warning("Unknown event type: " + event.getClass().getName()); + } + } catch (StreamException e) { + DevCycleLogger.warning("Error reading event"); + DevCycleLogger.warning(e.getMessage()); + } + } + } + } +} diff --git a/src/main/java/com/devcycle/sdk/server/local/model/DevCycleLocalOptions.java b/src/main/java/com/devcycle/sdk/server/local/model/DevCycleLocalOptions.java index 23cd030..e6e36cc 100644 --- a/src/main/java/com/devcycle/sdk/server/local/model/DevCycleLocalOptions.java +++ b/src/main/java/com/devcycle/sdk/server/local/model/DevCycleLocalOptions.java @@ -28,6 +28,8 @@ public class DevCycleLocalOptions implements IDevCycleOptions { private boolean disableAutomaticEventLogging = false; + private boolean enableBetaRealtimeUpdates = false; + @JsonIgnore private IDevCycleLogger customLogger = null; private boolean disableCustomEventLogging = false; @@ -49,7 +51,8 @@ public DevCycleLocalOptions( boolean disableAutomaticEventLogging, boolean disableCustomEventLogging, IDevCycleLogger customLogger, - IRestOptions restOptions + IRestOptions restOptions, + boolean enableBetaRealtimeUpdates ) { this.configRequestTimeoutMs = configRequestTimeoutMs > 0 ? configRequestTimeoutMs : this.configRequestTimeoutMs; this.configPollingIntervalMS = getConfigPollingIntervalMS(configPollingIntervalMs, configPollingIntervalMS); @@ -63,6 +66,7 @@ public DevCycleLocalOptions( this.disableCustomEventLogging = disableCustomEventLogging; this.customLogger = customLogger; this.restOptions = restOptions; + this.enableBetaRealtimeUpdates = enableBetaRealtimeUpdates; if (this.flushEventQueueSize >= this.maxEventQueueSize) { DevCycleLogger.warning("flushEventQueueSize: " + this.flushEventQueueSize + " must be smaller than maxEventQueueSize: " + this.maxEventQueueSize); diff --git a/src/main/resources/bucketing-lib.release.wasm b/src/main/resources/bucketing-lib.release.wasm index 2c28ab2..348b8bd 100644 Binary files a/src/main/resources/bucketing-lib.release.wasm and b/src/main/resources/bucketing-lib.release.wasm differ