diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 31c248e309276..cc72a6ea53e40 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2789,6 +2789,8 @@ components: format: uuid disable_cache: type: boolean + notifySchemaChange: + type: boolean SourceUpdate: type: object required: diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/SchedulerHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/SchedulerHandler.java index 8da81b5335281..f9e479df97bd2 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/SchedulerHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/SchedulerHandler.java @@ -70,6 +70,7 @@ import io.airbyte.config.persistence.SecretsRepositoryReader; import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.WebUrlHelper; import io.airbyte.persistence.job.models.Job; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; @@ -81,8 +82,10 @@ import java.util.Optional; import java.util.UUID; import javax.validation.constraints.NotNull; +import lombok.extern.slf4j.Slf4j; @Singleton +@Slf4j public class SchedulerHandler { private static final HashFunction HASH_FUNCTION = Hashing.md5(); @@ -100,6 +103,7 @@ public class SchedulerHandler { private final JobConverter jobConverter; private final EventRunner eventRunner; private final FeatureFlags envVariableFeatureFlags; + private final WebUrlHelper webUrlHelper; // TODO: Convert to be fully using micronaut public SchedulerHandler(final ConfigRepository configRepository, @@ -111,7 +115,8 @@ public SchedulerHandler(final ConfigRepository configRepository, final LogConfigs logConfigs, final EventRunner eventRunner, final ConnectionsHandler connectionsHandler, - final FeatureFlags envVariableFeatureFlags) { + final FeatureFlags envVariableFeatureFlags, + final WebUrlHelper webUrlHelper) { this( configRepository, secretsRepositoryWriter, @@ -122,7 +127,8 @@ public SchedulerHandler(final ConfigRepository configRepository, eventRunner, new JobConverter(workerEnvironment, logConfigs), connectionsHandler, - envVariableFeatureFlags); + envVariableFeatureFlags, + webUrlHelper); } @VisibleForTesting @@ -135,7 +141,8 @@ public SchedulerHandler(final ConfigRepository configRepository, final EventRunner eventRunner, final JobConverter jobConverter, final ConnectionsHandler connectionsHandler, - final FeatureFlags envVariableFeatureFlags) { + final FeatureFlags envVariableFeatureFlags, + final WebUrlHelper webUrlHelper) { this.configRepository = configRepository; this.secretsRepositoryWriter = secretsRepositoryWriter; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -146,6 +153,7 @@ public SchedulerHandler(final ConfigRepository configRepository, this.jobConverter = jobConverter; this.connectionsHandler = connectionsHandler; this.envVariableFeatureFlags = envVariableFeatureFlags; + this.webUrlHelper = webUrlHelper; } public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody) @@ -272,7 +280,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source if (persistedCatalogId.isSuccess() && discoverSchemaRequestBody.getConnectionId() != null) { // modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus - generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody); + generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody, source.getWorkspaceId()); } return discoveredSchema; @@ -394,9 +402,10 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE // determine whether 1. the source schema change resulted in a broken connection or 2. the user // wants the connection disabled when non-breaking changes are detected. If so, disable that // connection. Modify the current discoveredSchema object to add a CatalogDiff, - // containsBreakingChange paramter, and connectionStatus parameter. + // containsBreakingChange parameter, and connectionStatus parameter. private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDiscoverSchemaRead discoveredSchema, - final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) + final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody, + final UUID workspaceId) throws JsonValidationException, ConfigNotFoundException, IOException { final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false); for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) { @@ -418,12 +427,23 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco } updateObject.status(connectionStatus); connectionsHandler.updateConnection(updateObject); + if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) { + final String url = webUrlHelper.getConnectionUrl(workspaceId, connectionRead.getConnectionId()); + eventRunner.sendSchemaChangeNotification(connectionRead.getConnectionId(), url); + } if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) { discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus); } } } + private boolean shouldNotifySchemaChange(final CatalogDiff diff, + final ConnectionRead connectionRead, + final SourceDiscoverSchemaRequestBody requestBody) { + return !diff.getTransforms().isEmpty() && connectionRead.getNotifySchemaChanges() && requestBody.getNotifySchemaChange() != null + && requestBody.getNotifySchemaChange(); + } + private boolean shouldDisableConnection(final boolean containsBreakingChange, final NonBreakingChangesPreference preference, final CatalogDiff diff) { diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java index 63b43bbd77fe3..b863e56cdedbd 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/WebBackendConnectionsHandler.java @@ -413,7 +413,8 @@ private Optional getRefreshedSchema(final UUID sourceI final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody() .sourceId(sourceId) .disableCache(true) - .connectionId(connectionId); + .connectionId(connectionId) + .notifySchemaChange(false); final SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); return Optional.ofNullable(schemaRead); } diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/EventRunner.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/EventRunner.java index 932202b8fcbd9..59ce6f4394f16 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/EventRunner.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/EventRunner.java @@ -28,4 +28,6 @@ public interface EventRunner { void update(final UUID connectionId); + void sendSchemaChangeNotification(final UUID connectionId, final String url); + } diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/TemporalEventRunner.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/TemporalEventRunner.java index 989f66c8fdea4..9e6ed0068e51a 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/TemporalEventRunner.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/TemporalEventRunner.java @@ -54,4 +54,9 @@ public void update(final UUID connectionId) { temporalClient.update(connectionId); } + @Override + public void sendSchemaChangeNotification(final UUID connectionId, final String url) { + temporalClient.sendSchemaChangeNotification(connectionId, url); + } + } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java index a5a292dace0c4..50b8f99820427 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java @@ -19,9 +19,11 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.Lists; +import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.model.generated.CatalogDiff; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.ConnectionIdRequestBody; @@ -77,6 +79,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.WebUrlHelper; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobStatus; import io.airbyte.protocol.models.AirbyteCatalog; @@ -113,6 +116,7 @@ class SchedulerHandlerTest { private static final String DOGS = "dogs"; private static final String SHOES = "shoes"; private static final String SKU = "sku"; + private static final String CONNECTION_URL = "connection_url"; private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog(SHOES, Field.of(SKU, JsonSchemaType.STRING)); @@ -153,6 +157,7 @@ class SchedulerHandlerTest { private JobConverter jobConverter; private ConnectionsHandler connectionsHandler; private EnvVariableFeatureFlags envVariableFeatureFlags; + private WebUrlHelper webUrlHelper; @BeforeEach void setup() { @@ -176,6 +181,7 @@ void setup() { eventRunner = mock(EventRunner.class); connectionsHandler = mock(ConnectionsHandler.class); envVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); + webUrlHelper = mock(WebUrlHelper.class); jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY)); @@ -189,7 +195,8 @@ void setup() { eventRunner, jobConverter, connectionsHandler, - envVariableFeatureFlags); + envVariableFeatureFlags, + webUrlHelper); } @Test @@ -561,13 +568,14 @@ void testDiscoverSchemaForSourceFromSourceIdFailed() throws IOException, JsonVal } @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { + void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() + throws IOException, JsonValidationException, ConfigNotFoundException, InterruptedException, ApiException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final UUID connectionId = UUID.randomUUID(); final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); @@ -582,6 +590,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION), false)) .thenReturn(discoverResponse); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId)).thenReturn(CONNECTION_URL); when(discoverResponse.isSuccess()).thenReturn(true); when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); @@ -591,7 +600,8 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = - new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).connectionId(connectionId); + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).connectionId(connectionId) + .notifySchemaChanges(true); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead)); @@ -610,17 +620,18 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); + verify(eventRunner).sendSchemaChangeNotification(connectionId, CONNECTION_URL); } @Test void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceNoFeatureFlag() - throws IOException, JsonValidationException, ConfigNotFoundException { + throws IOException, JsonValidationException, ConfigNotFoundException, InterruptedException, ApiException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final UUID connectionId = UUID.randomUUID(); final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); @@ -636,6 +647,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION), false)) .thenReturn(discoverResponse); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId)).thenReturn(CONNECTION_URL); when(discoverResponse.isSuccess()).thenReturn(true); when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); @@ -646,7 +658,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE).status(ConnectionStatus.ACTIVE).connectionId(connectionId); + NonBreakingChangesPreference.DISABLE).status(ConnectionStatus.ACTIVE).connectionId(connectionId).notifySchemaChanges(true); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead)); @@ -666,6 +678,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); + verify(eventRunner).sendSchemaChangeNotification(connectionId, CONNECTION_URL); } @Test @@ -676,7 +689,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); @@ -702,7 +715,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE).connectionId(connectionId); + NonBreakingChangesPreference.DISABLE).connectionId(connectionId).notifySchemaChanges(false); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead)); @@ -722,16 +735,18 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); + verifyNoInteractions(eventRunner); } @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { + void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() + throws IOException, JsonValidationException, ConfigNotFoundException, InterruptedException, ApiException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final UUID connectionId = UUID.randomUUID(); final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)).addUpdateStreamItem(new FieldTransform().transformType( FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); @@ -747,6 +762,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION), false)) .thenReturn(discoverResponse); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId)).thenReturn(CONNECTION_URL); when(discoverResponse.isSuccess()).thenReturn(true); when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); @@ -757,7 +773,8 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).status(ConnectionStatus.ACTIVE) - .connectionId(connectionId); + .connectionId(connectionId) + .notifySchemaChanges(true); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead)); @@ -780,16 +797,18 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException assertEquals(actual.getCatalog(), expectedActorCatalog); assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); verify(connectionsHandler).updateConnection(expectedConnectionUpdate); + verify(eventRunner).sendSchemaChangeNotification(connectionId, CONNECTION_URL); } @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throws IOException, JsonValidationException, ConfigNotFoundException { + void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() + throws IOException, JsonValidationException, ConfigNotFoundException, InterruptedException, ApiException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final UUID connectionId = UUID.randomUUID(); final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)).addUpdateStreamItem(new FieldTransform().transformType( FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); @@ -806,6 +825,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throw when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION), false)) .thenReturn(discoverResponse); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId)).thenReturn(CONNECTION_URL); when(discoverResponse.isSuccess()).thenReturn(true); when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); @@ -815,7 +835,8 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throw CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = - new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).connectionId(connectionId); + new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).connectionId(connectionId) + .notifySchemaChanges(true); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead)); @@ -838,6 +859,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throw assertEquals(actual.getCatalog(), expectedActorCatalog); assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); verify(connectionsHandler).updateConnection(expectedConnectionUpdate); + verify(eventRunner).sendSchemaChangeNotification(connectionId, CONNECTION_URL); } @Test @@ -848,7 +870,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); final CatalogDiff catalogDiff = new CatalogDiff(); final StandardSourceDefinition sourceDef = new StandardSourceDefinition() .withDockerRepository(SOURCE_DOCKER_REPO) @@ -872,7 +894,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE).status(ConnectionStatus.INACTIVE).connectionId(connectionId); + NonBreakingChangesPreference.DISABLE).status(ConnectionStatus.INACTIVE).connectionId(connectionId).notifySchemaChanges(false); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); final ConnectionReadList connectionReadList = new ConnectionReadList().connections(List.of(connectionRead)); @@ -892,10 +914,13 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionP assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); + // notification preferences are turned on, but there is no schema diff detected + verifyNoInteractions(eventRunner); } @Test - void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOException, JsonValidationException, ConfigNotFoundException { + void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() + throws IOException, JsonValidationException, ConfigNotFoundException, InterruptedException, ApiException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final UUID connectionId = UUID.randomUUID(); final UUID connectionId2 = UUID.randomUUID(); @@ -903,7 +928,7 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce final UUID discoveredCatalogId = UUID.randomUUID(); final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); + new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true).notifySchemaChange(true); // 3 connections use the same source. 2 will generate catalog diffs that are non-breaking, 1 will // generate a breaking catalog diff @@ -930,6 +955,9 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION), false)) .thenReturn(discoverResponse); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId)).thenReturn(CONNECTION_URL); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId2)).thenReturn(CONNECTION_URL); + when(webUrlHelper.getConnectionUrl(source.getWorkspaceId(), connectionId3)).thenReturn(CONNECTION_URL); when(discoverResponse.isSuccess()).thenReturn(true); when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); @@ -940,15 +968,15 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).nonBreakingChangesPreference( - NonBreakingChangesPreference.IGNORE).status(ConnectionStatus.ACTIVE).connectionId(connectionId); + NonBreakingChangesPreference.IGNORE).status(ConnectionStatus.ACTIVE).connectionId(connectionId).notifySchemaChanges(true); final ConnectionRead connectionRead2 = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).nonBreakingChangesPreference( - NonBreakingChangesPreference.IGNORE).status(ConnectionStatus.ACTIVE).connectionId(connectionId2); + NonBreakingChangesPreference.IGNORE).status(ConnectionStatus.ACTIVE).connectionId(connectionId2).notifySchemaChanges(true); final ConnectionRead connectionRead3 = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent, sourceDef)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE).status(ConnectionStatus.ACTIVE).connectionId(connectionId3); + NonBreakingChangesPreference.DISABLE).status(ConnectionStatus.ACTIVE).connectionId(connectionId3).notifySchemaChanges(false); when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead, connectionRead2, connectionRead3); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff1, catalogDiff2, catalogDiff3); @@ -976,13 +1004,16 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce assertEquals(ConnectionStatus.ACTIVE, connectionUpdateValues.get(0).getStatus()); assertEquals(ConnectionStatus.ACTIVE, connectionUpdateValues.get(1).getStatus()); assertEquals(ConnectionStatus.INACTIVE, connectionUpdateValues.get(2).getStatus()); + verify(eventRunner).sendSchemaChangeNotification(connectionId, CONNECTION_URL); + verify(eventRunner).sendSchemaChangeNotification(connectionId2, CONNECTION_URL); + verify(eventRunner, times(0)).sendSchemaChangeNotification(connectionId3, CONNECTION_URL); } @Test void testDiscoverSchemaFromSourceIdWithConnectionUpdateNonSuccessResponse() throws IOException, JsonValidationException, ConfigNotFoundException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()) - .connectionId(UUID.randomUUID()); + .connectionId(UUID.randomUUID()).notifySchemaChange(true); // Mock the source definition. when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationUtils.java new file mode 100644 index 0000000000000..3af59a0214a4b --- /dev/null +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationUtils.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal; + +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.validation.json.JsonValidationException; +import io.temporal.client.WorkflowClient; +import jakarta.inject.Singleton; +import java.io.IOException; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; + +@Singleton +@Slf4j +public class NotificationUtils { + + public NotificationUtils() {} + + public void sendSchemaChangeNotification(final WorkflowClient client, final UUID connectionId, final String url) { + final ConnectionNotificationWorkflow notificationWorkflow = + client.newWorkflowStub(ConnectionNotificationWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.NOTIFY)); + try { + notificationWorkflow.sendSchemaChangeNotification(connectionId, url); + } catch (IOException | RuntimeException | InterruptedException | ApiException | ConfigNotFoundException | JsonValidationException e) { + log.error("There was an error while sending a Schema Change Notification", e); + } + } + +} diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index 4334ef4228017..18409f12dd6d8 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -75,6 +75,7 @@ public class TemporalClient { private final WorkflowServiceStubs service; private final StreamResetPersistence streamResetPersistence; private final ConnectionManagerUtils connectionManagerUtils; + private final NotificationUtils notificationUtils; private final StreamResetRecordsHelper streamResetRecordsHelper; public TemporalClient(@Named("workspaceRootTemporal") final Path workspaceRoot, @@ -82,12 +83,14 @@ public TemporalClient(@Named("workspaceRootTemporal") final Path workspaceRoot, final WorkflowServiceStubs service, final StreamResetPersistence streamResetPersistence, final ConnectionManagerUtils connectionManagerUtils, + final NotificationUtils notificationUtils, final StreamResetRecordsHelper streamResetRecordsHelper) { this.workspaceRoot = workspaceRoot; this.client = client; this.service = service; this.streamResetPersistence = streamResetPersistence; this.connectionManagerUtils = connectionManagerUtils; + this.notificationUtils = notificationUtils; this.streamResetRecordsHelper = streamResetRecordsHelper; } @@ -508,6 +511,10 @@ public void forceDeleteWorkflow(final UUID connectionId) { connectionManagerUtils.deleteWorkflowIfItExist(client, connectionId); } + public void sendSchemaChangeNotification(final UUID connectionId, final String url) { + notificationUtils.sendSchemaChangeNotification(client, connectionId, url); + } + public void update(final UUID connectionId) { final ConnectionManagerWorkflow connectionManagerWorkflow; try { diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionNotificationWorkflow.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionNotificationWorkflow.java index 6005696fa9621..f9035ee86d85e 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionNotificationWorkflow.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionNotificationWorkflow.java @@ -16,7 +16,7 @@ public interface ConnectionNotificationWorkflow { @WorkflowMethod - boolean sendSchemaChangeNotification(UUID connectionId) + boolean sendSchemaChangeNotification(UUID connectionId, String url) throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException; } diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index 3b7aa16425002..f1401b79bf41a 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -107,6 +107,7 @@ public class TemporalClientTest { private WorkflowServiceBlockingStub workflowServiceBlockingStub; private StreamResetPersistence streamResetPersistence; private ConnectionManagerUtils connectionManagerUtils; + private NotificationUtils notificationUtils; private StreamResetRecordsHelper streamResetRecordsHelper; private Path workspaceRoot; @@ -123,9 +124,10 @@ void setup() throws IOException { streamResetPersistence = mock(StreamResetPersistence.class); mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING); connectionManagerUtils = spy(new ConnectionManagerUtils()); + notificationUtils = spy(new NotificationUtils()); streamResetRecordsHelper = mock(StreamResetRecordsHelper.class); temporalClient = - spy(new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, connectionManagerUtils, + spy(new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, connectionManagerUtils, notificationUtils, streamResetRecordsHelper)); } @@ -133,13 +135,15 @@ void setup() throws IOException { class RestartPerStatus { private ConnectionManagerUtils mConnectionManagerUtils; + private NotificationUtils mNotificationUtils; @BeforeEach void init() { mConnectionManagerUtils = mock(ConnectionManagerUtils.class); + mNotificationUtils = mock(NotificationUtils.class); temporalClient = spy( - new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, mConnectionManagerUtils, + new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, mConnectionManagerUtils, mNotificationUtils, streamResetRecordsHelper)); } diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java index ae284e80f86b7..279625c280ae8 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/CustomerioNotificationClient.java @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.Notification; +import io.airbyte.config.SlackNotificationConfiguration; import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; @@ -122,7 +123,10 @@ public boolean notifyFailure(final String message) throws IOException, Interrupt } @Override - public boolean notifySchemaChange(final UUID connectionId, final boolean isBreaking) { + public boolean notifySchemaChange(final UUID connectionId, + final boolean isBreaking, + final SlackNotificationConfiguration config, + final String url) { throw new NotImplementedException(); } diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java index 6d6850feb459c..b723efe45fc84 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/NotificationClient.java @@ -6,6 +6,7 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.Notification; +import io.airbyte.config.SlackNotificationConfiguration; import java.io.IOException; import java.util.UUID; @@ -55,7 +56,11 @@ public abstract boolean notifyConnectionDisableWarning(String receiverEmail, public abstract boolean notifyFailure(String message) throws IOException, InterruptedException; - public abstract boolean notifySchemaChange(UUID connectionId, boolean isBreaking) throws IOException, InterruptedException; + public abstract boolean notifySchemaChange(final UUID connectionId, + final boolean isBreaking, + final SlackNotificationConfiguration config, + final String url) + throws IOException, InterruptedException; public static NotificationClient createNotificationClient(final Notification notification) { return switch (notification.getNotificationType()) { diff --git a/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java b/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java index 98616fd0613c5..a939027135925 100644 --- a/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java +++ b/airbyte-notification/src/main/java/io/airbyte/notification/SlackNotificationClient.java @@ -121,10 +121,12 @@ public boolean notifyConnectionDisableWarning(final String receiverEmail, } @Override - public boolean notifySchemaChange(UUID connectionId, boolean isBreaking) throws IOException, InterruptedException { + public boolean notifySchemaChange(UUID connectionId, boolean isBreaking, SlackNotificationConfiguration config, String url) + throws IOException, InterruptedException { final String message = renderTemplate( - isBreaking ? "slack/breaking_schema_change_notification_template.txt" : "slack/non_breaking_schema_change_notification_template.txt", - connectionId.toString()); + isBreaking ? "slack/breaking_schema_change_slack_notification_template.txt" + : "slack/non_breaking_schema_change_slack_notification_template.txt", + connectionId.toString(), url); final String webhookUrl = config.getWebhook(); if (!Strings.isEmpty(webhookUrl)) { return notify(message); diff --git a/airbyte-notification/src/main/resources/slack/breaking_schema_change_notification_template.txt b/airbyte-notification/src/main/resources/slack/breaking_schema_change_notification_template.txt deleted file mode 100644 index 925089685ffc3..0000000000000 --- a/airbyte-notification/src/main/resources/slack/breaking_schema_change_notification_template.txt +++ /dev/null @@ -1,3 +0,0 @@ -Your source schema has changed for connection ID: %s - -Airbyte has disabled this connection because this source schema change will cause broken syncs. Visit your connection page, refresh your source schema, and reset your data in order to fix this connection. diff --git a/airbyte-notification/src/main/resources/slack/breaking_schema_change_slack_notification_template.txt b/airbyte-notification/src/main/resources/slack/breaking_schema_change_slack_notification_template.txt new file mode 100644 index 0000000000000..a20974f3fa551 --- /dev/null +++ b/airbyte-notification/src/main/resources/slack/breaking_schema_change_slack_notification_template.txt @@ -0,0 +1,7 @@ +Your source schema has changed for connection ID: %s + +Airbyte has disabled this connection because this source schema change will cause broken syncs. + +Visit your connection page here: %s + +Refresh your source schema and reset your data in order to fix this connection. diff --git a/airbyte-notification/src/main/resources/slack/non_breaking_schema_change_notification_template.txt b/airbyte-notification/src/main/resources/slack/non_breaking_schema_change_notification_template.txt deleted file mode 100644 index 79c8447abbd30..0000000000000 --- a/airbyte-notification/src/main/resources/slack/non_breaking_schema_change_notification_template.txt +++ /dev/null @@ -1,3 +0,0 @@ -Your source schema has changed for connection ID: %s - -Visit your connection page, refresh your source schema, and reset your data in order to update this connection. diff --git a/airbyte-notification/src/main/resources/slack/non_breaking_schema_change_slack_notification_template.txt b/airbyte-notification/src/main/resources/slack/non_breaking_schema_change_slack_notification_template.txt new file mode 100644 index 0000000000000..4f32a9a30a5d8 --- /dev/null +++ b/airbyte-notification/src/main/resources/slack/non_breaking_schema_change_slack_notification_template.txt @@ -0,0 +1,5 @@ +Your source schema has changed for connection ID: %s + +Visit your connection page here: %s + +Refresh your source schema in order to update this connection. diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 035fa682cab0f..6cb04bf40dc97 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -18,6 +18,7 @@ import io.airbyte.commons.server.scheduler.TemporalEventRunner; import io.airbyte.commons.server.services.AirbyteGithubStore; import io.airbyte.commons.temporal.ConnectionManagerUtils; +import io.airbyte.commons.temporal.NotificationUtils; import io.airbyte.commons.temporal.StreamResetRecordsHelper; import io.airbyte.commons.temporal.TemporalClient; import io.airbyte.commons.temporal.TemporalUtils; @@ -49,6 +50,7 @@ import io.airbyte.persistence.job.tracker.JobTracker; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.workers.helper.ConnectionHelper; +import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubs; import java.net.http.HttpClient; import java.util.Map; @@ -213,14 +215,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configsDatabase); final WorkflowServiceStubs temporalService = temporalUtils.createTemporalService(); final ConnectionManagerUtils connectionManagerUtils = new ConnectionManagerUtils(); + final NotificationUtils notificationUtils = new NotificationUtils(); final StreamResetRecordsHelper streamResetRecordsHelper = new StreamResetRecordsHelper(jobPersistence, streamResetPersistence); + final WorkflowClient workflowClient = TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()); final TemporalClient temporalClient = new TemporalClient( configs.getWorkspaceRoot(), - TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()), + workflowClient, temporalService, streamResetPersistence, connectionManagerUtils, + notificationUtils, streamResetRecordsHelper); final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient); @@ -265,7 +270,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, configs.getLogConfigs(), eventRunner, connectionsHandler, - envVariableFeatureFlags); + envVariableFeatureFlags, + webUrlHelper); final AirbyteProtocolVersionRange airbyteProtocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java index 842154263deb5..ca3d59a69cec7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java @@ -6,11 +6,8 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow; -import io.airbyte.config.Notification; -import io.airbyte.config.Notification.NotificationType; import io.airbyte.config.SlackNotificationConfiguration; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.notification.SlackNotificationClient; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; @@ -36,7 +33,7 @@ public class ConnectionNotificationWorkflowImpl implements ConnectionNotificatio private ConfigFetchActivity configFetchActivity; @Override - public boolean sendSchemaChangeNotification(final UUID connectionId) + public boolean sendSchemaChangeNotification(final UUID connectionId, final String url) throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException { final int getBreakingChangeVersion = Workflow.getVersion(GET_BREAKING_CHANGE_TAG, Workflow.DEFAULT_VERSION, GET_BREAKING_CHANGE_VERSION); @@ -44,11 +41,7 @@ public boolean sendSchemaChangeNotification(final UUID connectionId) final Optional breakingChange = configFetchActivity.getBreakingChange(connectionId); final Optional slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId); if (slackConfig.isPresent() && breakingChange.isPresent()) { - final Notification notification = - new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false) - .withSlackConfiguration(slackConfig.get()); - final SlackNotificationClient notificationClient = new SlackNotificationClient(notification); - return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, breakingChange.get()); + return notifySchemaChangeActivity.notifySchemaChange(connectionId, breakingChange.get(), slackConfig.get(), url); } else { return false; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivity.java index 24e9c62661fab..3fd35e0aeee91 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivity.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; -import io.airbyte.notification.SlackNotificationClient; +import io.airbyte.config.SlackNotificationConfiguration; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import java.io.IOException; @@ -14,7 +14,7 @@ public interface NotifySchemaChangeActivity { @ActivityMethod - public boolean notifySchemaChange(SlackNotificationClient notificationClient, UUID connectionId, boolean isBreaking) + public boolean notifySchemaChange(UUID connectionId, boolean isBreaking, SlackNotificationConfiguration config, String url) throws IOException, InterruptedException; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityImpl.java index bc5707bdf0ae9..fdd012ec0c183 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityImpl.java @@ -4,6 +4,9 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.config.Notification; +import io.airbyte.config.Notification.NotificationType; +import io.airbyte.config.SlackNotificationConfiguration; import io.airbyte.notification.SlackNotificationClient; import jakarta.inject.Singleton; import java.io.IOException; @@ -13,9 +16,20 @@ public class NotifySchemaChangeActivityImpl implements NotifySchemaChangeActivity { @Override - public boolean notifySchemaChange(SlackNotificationClient notificationClient, UUID connectionId, boolean isBreaking) + public boolean notifySchemaChange(UUID connectionId, boolean isBreaking, SlackNotificationConfiguration slackConfig, String url) throws IOException, InterruptedException { - return notificationClient.notifySchemaChange(connectionId, isBreaking); + final Notification notification = createNotification(slackConfig); + final SlackNotificationClient notificationClient = createNotificationClient(notification); + return notificationClient.notifySchemaChange(connectionId, isBreaking, slackConfig, url); + } + + Notification createNotification(SlackNotificationConfiguration slackConfig) { + return new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false) + .withSlackConfiguration(slackConfig); + } + + SlackNotificationClient createNotificationClient(Notification notification) { + return new SlackNotificationClient(notification); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index f6731d438c43c..dc30237d498c3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -56,7 +56,7 @@ public void refreshSchema(final UUID sourceCatalogId, final UUID connectionId) { ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, SOURCE_ID_KEY, sourceCatalogId)); final SourceDiscoverSchemaRequestBody requestBody = - new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId); + new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId).notifySchemaChange(true); try { sourceApi.discoverSchemaForSource(requestBody); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java index 65d554e760c71..9687f584a4c3c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java @@ -14,7 +14,6 @@ import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow; import io.airbyte.config.SlackNotificationConfiguration; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.notification.SlackNotificationClient; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivityImpl; import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivityImpl; @@ -67,7 +66,9 @@ void setUp() throws IOException, InterruptedException, ApiException, JsonValidat .build(); mNotifySchemaChangeActivity = mock(NotifySchemaChangeActivityImpl.class); - when(mNotifySchemaChangeActivity.notifySchemaChange(any(SlackNotificationClient.class), any(UUID.class), any(boolean.class))).thenReturn(true); + when(mNotifySchemaChangeActivity.notifySchemaChange(any(UUID.class), any(boolean.class), any(SlackNotificationConfiguration.class), + any(String.class))) + .thenReturn(true); mSlackConfigActivity = mock(SlackConfigActivityImpl.class); when(mSlackConfigActivity.fetchSlackConfiguration(any(UUID.class))).thenReturn( @@ -100,11 +101,13 @@ void sendSchemaChangeNotificationNonBreakingChangeTest() client.newWorkflowStub(ConnectionNotificationWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(NOTIFICATIONS_QUEUE).build()); final UUID connectionId = UUID.randomUUID(); + final String connectionUrl = "connection_url"; when(mConfigFetchActivity.getBreakingChange(connectionId)).thenReturn(Optional.of(false)); - workflow.sendSchemaChangeNotification(connectionId); + workflow.sendSchemaChangeNotification(connectionId, connectionUrl); - verify(mNotifySchemaChangeActivity, times(1)).notifySchemaChange(any(SlackNotificationClient.class), any(UUID.class), any(boolean.class)); + verify(mNotifySchemaChangeActivity, times(1)).notifySchemaChange(any(UUID.class), any(boolean.class), + any(SlackNotificationConfiguration.class), any(String.class)); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityTest.java index ded19de0a6c26..3e464713edb61 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NotifySchemaChangeActivityTest.java @@ -5,32 +5,44 @@ package io.airbyte.workers.temporal.scheduling.activities; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.airbyte.config.Notification; +import io.airbyte.config.SlackNotificationConfiguration; import io.airbyte.notification.SlackNotificationClient; import java.io.IOException; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +@Slf4j class NotifySchemaChangeActivityTest { static private SlackNotificationClient mNotificationClient; static private NotifySchemaChangeActivityImpl notifySchemaChangeActivity; + static private Notification mNotification; @BeforeEach void setUp() { mNotificationClient = mock(SlackNotificationClient.class); - notifySchemaChangeActivity = new NotifySchemaChangeActivityImpl(); + mNotification = mock(Notification.class); + notifySchemaChangeActivity = spy(new NotifySchemaChangeActivityImpl()); } @Test void testNotifySchemaChange() throws IOException, InterruptedException { UUID connectionId = UUID.randomUUID(); + String connectionUrl = "connection_url"; boolean isBreaking = false; - notifySchemaChangeActivity.notifySchemaChange(mNotificationClient, connectionId, isBreaking); - verify(mNotificationClient, times(1)).notifySchemaChange(connectionId, isBreaking); + SlackNotificationConfiguration config = new SlackNotificationConfiguration(); + when(notifySchemaChangeActivity.createNotification(config)).thenReturn(mNotification); + when(notifySchemaChangeActivity.createNotificationClient(mNotification)).thenReturn(mNotificationClient); + notifySchemaChangeActivity.notifySchemaChange(connectionId, isBreaking, config, connectionUrl); + verify(mNotificationClient, times(1)).notifySchemaChange(connectionId, isBreaking, config, connectionUrl); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java index e517bebd087d0..870f622891bea 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java @@ -71,7 +71,7 @@ void testRefreshSchema() throws ApiException { UUID connectionId = UUID.randomUUID(); refreshSchemaActivity.refreshSchema(sourceId, connectionId); SourceDiscoverSchemaRequestBody requestBody = - new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true).connectionId(connectionId); + new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true).connectionId(connectionId).notifySchemaChange(true); verify(mSourceApi, times(1)).discoverSchemaForSource(requestBody); } diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 3bf7d9ff3113b..b47332518c816 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -72,5 +72,4 @@ data: WORKER_STATE_STORAGE_TYPE: {{ .Values.global.state.storage.type | quote }} SHOULD_RUN_NOTIFY_WORKFLOWS: "false" MAX_NOTIFY_WORKERS: {{ .Values.worker.maxNotifyWorkers | default "5" | quote }} - {{- end }} diff --git a/docker-compose.yaml b/docker-compose.yaml index a8baa3c3ecbf1..bc91b95e1a48d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -158,6 +158,8 @@ services: - GITHUB_STORE_BRANCH=${GITHUB_STORE_BRANCH} - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} - AUTO_DETECT_SCHEMA=${AUTO_DETECT_SCHEMA} + - MAX_NOTIFY_WORKERS=5 + - SHOULD_RUN_NOTIFY_WORKFLOWS=false ports: - "8001" configs: diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index feb39b876cdc4..62089cd1e2ffe 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -12065,6 +12065,7 @@

SourceDiscoverSchemaRequestB
sourceId
UUID format: uuid
connectionId (optional)
UUID format: uuid
disable_cache (optional)
+
notifySchemaChange (optional)