Skip to content

Commit

Permalink
Trigger notifications for changed schemas (airbytehq#21680)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew authored Feb 14, 2023
1 parent cee17bd commit fc02b79
Show file tree
Hide file tree
Showing 28 changed files with 221 additions and 69 deletions.
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2789,6 +2789,8 @@ components:
format: uuid
disable_cache:
type: boolean
notifySchemaChange:
type: boolean
SourceUpdate:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand All @@ -81,8 +82,10 @@
import java.util.Optional;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;

@Singleton
@Slf4j
public class SchedulerHandler {

private static final HashFunction HASH_FUNCTION = Hashing.md5();
Expand All @@ -100,6 +103,7 @@ public class SchedulerHandler {
private final JobConverter jobConverter;
private final EventRunner eventRunner;
private final FeatureFlags envVariableFeatureFlags;
private final WebUrlHelper webUrlHelper;

// TODO: Convert to be fully using micronaut
public SchedulerHandler(final ConfigRepository configRepository,
Expand All @@ -111,7 +115,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final LogConfigs logConfigs,
final EventRunner eventRunner,
final ConnectionsHandler connectionsHandler,
final FeatureFlags envVariableFeatureFlags) {
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -122,7 +127,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
eventRunner,
new JobConverter(workerEnvironment, logConfigs),
connectionsHandler,
envVariableFeatureFlags);
envVariableFeatureFlags,
webUrlHelper);
}

@VisibleForTesting
Expand All @@ -135,7 +141,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final EventRunner eventRunner,
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler,
final FeatureFlags envVariableFeatureFlags) {
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -146,6 +153,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.jobConverter = jobConverter;
this.connectionsHandler = connectionsHandler;
this.envVariableFeatureFlags = envVariableFeatureFlags;
this.webUrlHelper = webUrlHelper;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -272,7 +280,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source

if (persistedCatalogId.isSuccess() && discoverSchemaRequestBody.getConnectionId() != null) {
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody, source.getWorkspaceId());
}

return discoveredSchema;
Expand Down Expand Up @@ -394,9 +402,10 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
// determine whether 1. the source schema change resulted in a broken connection or 2. the user
// wants the connection disabled when non-breaking changes are detected. If so, disable that
// connection. Modify the current discoveredSchema object to add a CatalogDiff,
// containsBreakingChange paramter, and connectionStatus parameter.
// containsBreakingChange parameter, and connectionStatus parameter.
private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody,
final UUID workspaceId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false);
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
Expand All @@ -418,12 +427,23 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) {
final String url = webUrlHelper.getConnectionUrl(workspaceId, connectionRead.getConnectionId());
eventRunner.sendSchemaChangeNotification(connectionRead.getConnectionId(), url);
}
if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) {
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);
}
}
}

private boolean shouldNotifySchemaChange(final CatalogDiff diff,
final ConnectionRead connectionRead,
final SourceDiscoverSchemaRequestBody requestBody) {
return !diff.getTransforms().isEmpty() && connectionRead.getNotifySchemaChanges() && requestBody.getNotifySchemaChange() != null
&& requestBody.getNotifySchemaChange();
}

private boolean shouldDisableConnection(final boolean containsBreakingChange,
final NonBreakingChangesPreference preference,
final CatalogDiff diff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody()
.sourceId(sourceId)
.disableCache(true)
.connectionId(connectionId);
.connectionId(connectionId)
.notifySchemaChange(false);
final SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);
return Optional.ofNullable(schemaRead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface EventRunner {

void update(final UUID connectionId);

void sendSchemaChangeNotification(final UUID connectionId, final String url);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public void update(final UUID connectionId) {
temporalClient.update(connectionId);
}

@Override
public void sendSchemaChangeNotification(final UUID connectionId, final String url) {
temporalClient.sendSchemaChangeNotification(connectionId, url);
}

}
Loading

0 comments on commit fc02b79

Please sign in to comment.