diff --git a/airbyte-ci/connectors/metadata_service/lib/bin/generate-python-classes.sh b/airbyte-ci/connectors/metadata_service/lib/bin/generate-python-classes.sh new file mode 100755 index 0000000000000..4bab5ffea1f21 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/bin/generate-python-classes.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +set -e + + +YAML_DIR=metadata_service/models/src +OUTPUT_DIR=metadata_service/models/generated + +# Ensure the yaml directory exists +if [ ! -d "$YAML_DIR" ]; then + echo "The yaml directory does not exist: $YAML_DIR" + exit 1 +fi + + +rm -rf "$OUTPUT_DIR"/*.py +mkdir -p "$OUTPUT_DIR" + +echo "# generated by generate-python-classes" > "$OUTPUT_DIR"/__init__.py + +for f in "$YAML_DIR"/*.yaml; do + filename_wo_ext=$(basename "$f" | cut -d . -f 1) + echo "from .$filename_wo_ext import *" >> "$OUTPUT_DIR"/__init__.py + + datamodel-codegen \ + --input "$YAML_DIR/$filename_wo_ext.yaml" \ + --output "$OUTPUT_DIR/$filename_wo_ext.py" \ + --use-title-as-name \ + --use-double-quotes \ + --enum-field-as-literal all \ + --disable-timestamp +done diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py index a382b59792812..17abf4b61e594 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/commands.py @@ -6,6 +6,7 @@ import click from metadata_service.gcs_upload import upload_metadata_to_gcs from metadata_service.validators.metadata_validator import validate_metadata_file +from metadata_service.constants import METADATA_FILE_NAME from pydantic import ValidationError @@ -17,15 +18,15 @@ def metadata_service(): @metadata_service.command(help="Validate a given metadata YAML file.") @click.argument("file_path", type=click.Path(exists=True, path_type=pathlib.Path)) def validate(file_path: pathlib.Path): - file_path = file_path if not file_path.is_dir() else file_path / "metadata.yaml" + file_path = file_path if not file_path.is_dir() else file_path / METADATA_FILE_NAME click.echo(f"Validating {file_path}...") is_valid, error = validate_metadata_file(file_path) if is_valid: - click.echo(f"{file_path} is a valid ConnectorMetadataDefinitionV1 YAML file.") + click.echo(f"{file_path} is a valid ConnectorMetadataDefinitionV0 YAML file.") else: - click.echo(f"{file_path} is not a valid ConnectorMetadataDefinitionV1 YAML file.") + click.echo(f"{file_path} is not a valid ConnectorMetadataDefinitionV0 YAML file.") click.echo(str(error)) exit(1) @@ -37,7 +38,7 @@ def validate(file_path: pathlib.Path): "--service-account-file-path", "-sa", type=click.Path(exists=True, path_type=pathlib.Path), envvar="GOOGLE_APPLICATION_CREDENTIALS" ) def upload(metadata_file_path: pathlib.Path, bucket_name: str, service_account_file_path: pathlib.Path): - metadata_file_path = metadata_file_path if not metadata_file_path.is_dir() else metadata_file_path / "metadata.yaml" + metadata_file_path = metadata_file_path if not metadata_file_path.is_dir() else metadata_file_path / METADATA_FILE_NAME try: uploaded, blob_id = upload_metadata_to_gcs(bucket_name, metadata_file_path, service_account_file_path) except (ValidationError, FileNotFoundError) as e: diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py index e99e110490ffa..90304d619ee2b 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/gcs_upload.py @@ -7,7 +7,7 @@ import yaml from google.cloud import storage from google.oauth2 import service_account -from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1 +from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0 from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER @@ -38,7 +38,7 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_a """ uploaded = False raw_metadata = yaml.safe_load(metadata_file_path.read_text()) - metadata = ConnectorMetadataDefinitionV1.parse_obj(raw_metadata) + metadata = ConnectorMetadataDefinitionV0.parse_obj(raw_metadata) credentials = service_account.Credentials.from_service_account_file(service_account_file_path) storage_client = storage.Client(credentials=credentials) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ActorDefinitionResourceRequirements.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ActorDefinitionResourceRequirements.py new file mode 100644 index 0000000000000..1f6e484eef731 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ActorDefinitionResourceRequirements.py @@ -0,0 +1,54 @@ +# generated by datamodel-codegen: +# filename: ActorDefinitionResourceRequirements.yaml + +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Extra, Field +from typing_extensions import Literal + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + + +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) + + +class JobTypeResourceLimit(BaseModel): + class Config: + extra = Extra.forbid + + jobType: JobType + resourceRequirements: ResourceRequirements + + +class ActorDefinitionResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + default: Optional[ResourceRequirements] = Field( + None, + description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.", + ) + jobSpecific: Optional[List[JobTypeResourceLimit]] = None diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/AllowedHosts.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/AllowedHosts.py new file mode 100644 index 0000000000000..ce4534f3adcaf --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/AllowedHosts.py @@ -0,0 +1,18 @@ +# generated by datamodel-codegen: +# filename: AllowedHosts.yaml + +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Extra, Field + + +class AllowedHosts(BaseModel): + class Config: + extra = Extra.allow + + hosts: Optional[List[str]] = Field( + None, + description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV1.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py similarity index 81% rename from airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV1.py rename to airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py index ce4366a528b2d..895cfc68f1ce4 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV1.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py @@ -1,34 +1,13 @@ # generated by datamodel-codegen: -# filename: ConnectorMetadataDefinitionV1.yaml +# filename: ConnectorMetadataDefinitionV0.yaml from __future__ import annotations -from enum import Enum from typing import List, Optional from uuid import UUID from pydantic import AnyUrl, BaseModel, Extra, Field - - -class ConnectorType(Enum): - destination = "destination" - source = "source" - - -class ConnectorSubtype(Enum): - api = "api" - database = "database" - file = "file" - custom = "custom" - message_queue = "message_queue" - unknown = "unknown" - - -class ReleaseStage(Enum): - alpha = "alpha" - beta = "beta" - generally_available = "generally_available" - source = "source" +from typing_extensions import Literal class AllowedHosts(BaseModel): @@ -79,14 +58,20 @@ class Config: memory_limit: Optional[str] = None -class JobType(Enum): - get_spec = "get_spec" - check_connection = "check_connection" - discover_schema = "discover_schema" - sync = "sync" - reset_connection = "reset_connection" - connection_updater = "connection_updater" - replicate = "replicate" +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) class JobTypeResourceLimit(BaseModel): @@ -137,8 +122,9 @@ class Config: class Data(BaseModel): name: str + icon: Optional[str] = None definitionId: UUID - connectorType: ConnectorType + connectorType: Literal["destination", "source"] dockerRepository: str dockerImageTag: str supportsDbt: Optional[bool] = None @@ -146,8 +132,10 @@ class Data(BaseModel): license: str supportUrl: AnyUrl githubIssueLabel: str - connectorSubtype: ConnectorSubtype - releaseStage: ReleaseStage + connectorSubtype: Literal[ + "api", "database", "file", "custom", "message_queue", "unknown" + ] + releaseStage: Literal["alpha", "beta", "generally_available", "source"] registries: Optional[Registry] = None allowedHosts: Optional[AllowedHosts] = None normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None @@ -155,7 +143,7 @@ class Data(BaseModel): resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None -class ConnectorMetadataDefinitionV1(BaseModel): +class ConnectorMetadataDefinitionV0(BaseModel): class Config: extra = Extra.forbid diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py new file mode 100644 index 0000000000000..fccf41c148558 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py @@ -0,0 +1,131 @@ +# generated by datamodel-codegen: +# filename: ConnectorRegistryDestinationDefinition.yaml + +from __future__ import annotations + +from datetime import date +from typing import Any, Dict, List, Optional +from uuid import UUID + +from pydantic import BaseModel, Extra, Field +from typing_extensions import Literal + + +class ReleaseStage(BaseModel): + __root__: Literal["alpha", "beta", "generally_available", "custom"] = Field( + ..., + description="enum that describes a connector's release stage", + title="ReleaseStage", + ) + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + + +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) + + +class NormalizationDestinationDefinitionConfig(BaseModel): + class Config: + extra = Extra.allow + + normalizationRepository: str = Field( + ..., + description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.", + ) + normalizationTag: str = Field( + ..., + description="a field indicating the tag of the docker repository to be used for normalization.", + ) + normalizationIntegrationType: str = Field( + ..., + description="a field indicating the type of integration dialect to use for normalization.", + ) + + +class AllowedHosts(BaseModel): + class Config: + extra = Extra.allow + + hosts: Optional[List[str]] = Field( + None, + description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.", + ) + + +class JobTypeResourceLimit(BaseModel): + class Config: + extra = Extra.forbid + + jobType: JobType + resourceRequirements: ResourceRequirements + + +class ActorDefinitionResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + default: Optional[ResourceRequirements] = Field( + None, + description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.", + ) + jobSpecific: Optional[List[JobTypeResourceLimit]] = None + + +class ConnectorRegistryDestinationDefinition(BaseModel): + class Config: + extra = Extra.allow + + destinationDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, + description="if false, the configuration is active. if true, then this configuration is permanently off.", + ) + public: Optional[bool] = Field( + False, + description="true if this connector definition is available to all workspaces", + ) + custom: Optional[bool] = Field( + False, description="whether this is a custom connector definition" + ) + releaseStage: Optional[ReleaseStage] = None + releaseDate: Optional[date] = Field( + None, + description="The date when this connector was first released, in yyyy-mm-dd format.", + ) + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field( + None, description="the Airbyte Protocol version supported by the connector" + ) + normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None + supportsDbt: Optional[bool] = Field( + None, + description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", + ) + allowedHosts: Optional[AllowedHosts] = None diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py new file mode 100644 index 0000000000000..00468bb4943b8 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py @@ -0,0 +1,124 @@ +# generated by datamodel-codegen: +# filename: ConnectorRegistrySourceDefinition.yaml + +from __future__ import annotations + +from datetime import date +from typing import Any, Dict, List, Optional +from uuid import UUID + +from pydantic import BaseModel, Extra, Field +from typing_extensions import Literal + + +class ReleaseStage(BaseModel): + __root__: Literal["alpha", "beta", "generally_available", "custom"] = Field( + ..., + description="enum that describes a connector's release stage", + title="ReleaseStage", + ) + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + + +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) + + +class AllowedHosts(BaseModel): + class Config: + extra = Extra.allow + + hosts: Optional[List[str]] = Field( + None, + description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.", + ) + + +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) + + +class JobTypeResourceLimit(BaseModel): + class Config: + extra = Extra.forbid + + jobType: JobType + resourceRequirements: ResourceRequirements + + +class ActorDefinitionResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + default: Optional[ResourceRequirements] = Field( + None, + description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.", + ) + jobSpecific: Optional[List[JobTypeResourceLimit]] = None + + +class ConnectorRegistrySourceDefinition(BaseModel): + class Config: + extra = Extra.allow + + sourceDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + sourceType: Optional[Literal["api", "file", "database", "custom"]] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, + description="if false, the configuration is active. if true, then this configuration is permanently off.", + ) + public: Optional[bool] = Field( + False, + description="true if this connector definition is available to all workspaces", + ) + custom: Optional[bool] = Field( + False, description="whether this is a custom connector definition" + ) + releaseStage: Optional[ReleaseStage] = None + releaseDate: Optional[date] = Field( + None, + description="The date when this connector was first released, in yyyy-mm-dd format.", + ) + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field( + None, description="the Airbyte Protocol version supported by the connector" + ) + allowedHosts: Optional[AllowedHosts] = None + suggestedStreams: Optional[SuggestedStreams] = None + maxSecondsBetweenMessages: Optional[int] = Field( + None, + description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py new file mode 100644 index 0000000000000..e657dc6fb2768 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py @@ -0,0 +1,186 @@ +# generated by datamodel-codegen: +# filename: ConnectorRegistryV0.yaml + +from __future__ import annotations + +from datetime import date +from typing import Any, Dict, List, Optional +from uuid import UUID + +from pydantic import BaseModel, Extra, Field +from typing_extensions import Literal + + +class ReleaseStage(BaseModel): + __root__: Literal["alpha", "beta", "generally_available", "custom"] = Field( + ..., + description="enum that describes a connector's release stage", + title="ReleaseStage", + ) + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + + +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) + + +class NormalizationDestinationDefinitionConfig(BaseModel): + class Config: + extra = Extra.allow + + normalizationRepository: str = Field( + ..., + description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.", + ) + normalizationTag: str = Field( + ..., + description="a field indicating the tag of the docker repository to be used for normalization.", + ) + normalizationIntegrationType: str = Field( + ..., + description="a field indicating the type of integration dialect to use for normalization.", + ) + + +class AllowedHosts(BaseModel): + class Config: + extra = Extra.allow + + hosts: Optional[List[str]] = Field( + None, + description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.", + ) + + +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) + + +class JobTypeResourceLimit(BaseModel): + class Config: + extra = Extra.forbid + + jobType: JobType + resourceRequirements: ResourceRequirements + + +class ActorDefinitionResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + default: Optional[ResourceRequirements] = Field( + None, + description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.", + ) + jobSpecific: Optional[List[JobTypeResourceLimit]] = None + + +class ConnectorRegistrySourceDefinition(BaseModel): + class Config: + extra = Extra.allow + + sourceDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + sourceType: Optional[Literal["api", "file", "database", "custom"]] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, + description="if false, the configuration is active. if true, then this configuration is permanently off.", + ) + public: Optional[bool] = Field( + False, + description="true if this connector definition is available to all workspaces", + ) + custom: Optional[bool] = Field( + False, description="whether this is a custom connector definition" + ) + releaseStage: Optional[ReleaseStage] = None + releaseDate: Optional[date] = Field( + None, + description="The date when this connector was first released, in yyyy-mm-dd format.", + ) + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field( + None, description="the Airbyte Protocol version supported by the connector" + ) + allowedHosts: Optional[AllowedHosts] = None + suggestedStreams: Optional[SuggestedStreams] = None + maxSecondsBetweenMessages: Optional[int] = Field( + None, + description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach", + ) + + +class ConnectorRegistryDestinationDefinition(BaseModel): + class Config: + extra = Extra.allow + + destinationDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, + description="if false, the configuration is active. if true, then this configuration is permanently off.", + ) + public: Optional[bool] = Field( + False, + description="true if this connector definition is available to all workspaces", + ) + custom: Optional[bool] = Field( + False, description="whether this is a custom connector definition" + ) + releaseStage: Optional[ReleaseStage] = None + releaseDate: Optional[date] = Field( + None, + description="The date when this connector was first released, in yyyy-mm-dd format.", + ) + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field( + None, description="the Airbyte Protocol version supported by the connector" + ) + normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None + supportsDbt: Optional[bool] = Field( + None, + description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", + ) + allowedHosts: Optional[AllowedHosts] = None + + +class ConnectorRegistryV0(BaseModel): + destinations: List[ConnectorRegistryDestinationDefinition] + sources: List[ConnectorRegistrySourceDefinition] diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/JobType.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/JobType.py new file mode 100644 index 0000000000000..aef4f7ad5f999 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/JobType.py @@ -0,0 +1,23 @@ +# generated by datamodel-codegen: +# filename: JobType.yaml + +from __future__ import annotations + +from pydantic import BaseModel, Field +from typing_extensions import Literal + + +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/NormalizationDestinationDefinitionConfig.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/NormalizationDestinationDefinitionConfig.py new file mode 100644 index 0000000000000..00a642bfaeb10 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/NormalizationDestinationDefinitionConfig.py @@ -0,0 +1,24 @@ +# generated by datamodel-codegen: +# filename: NormalizationDestinationDefinitionConfig.yaml + +from __future__ import annotations + +from pydantic import BaseModel, Extra, Field + + +class NormalizationDestinationDefinitionConfig(BaseModel): + class Config: + extra = Extra.allow + + normalizationRepository: str = Field( + ..., + description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.", + ) + normalizationTag: str = Field( + ..., + description="a field indicating the tag of the docker repository to be used for normalization.", + ) + normalizationIntegrationType: str = Field( + ..., + description="a field indicating the type of integration dialect to use for normalization.", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/RegistryOverrides.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/RegistryOverrides.py new file mode 100644 index 0000000000000..56ca0ea767a15 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/RegistryOverrides.py @@ -0,0 +1,111 @@ +# generated by datamodel-codegen: +# filename: RegistryOverrides.yaml + +from __future__ import annotations + +from typing import List, Optional + +from pydantic import AnyUrl, BaseModel, Extra, Field +from typing_extensions import Literal + + +class AllowedHosts(BaseModel): + class Config: + extra = Extra.allow + + hosts: Optional[List[str]] = Field( + None, + description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.", + ) + + +class NormalizationDestinationDefinitionConfig(BaseModel): + class Config: + extra = Extra.allow + + normalizationRepository: str = Field( + ..., + description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.", + ) + normalizationTag: str = Field( + ..., + description="a field indicating the tag of the docker repository to be used for normalization.", + ) + normalizationIntegrationType: str = Field( + ..., + description="a field indicating the type of integration dialect to use for normalization.", + ) + + +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + + +class JobType(BaseModel): + __root__: Literal[ + "get_spec", + "check_connection", + "discover_schema", + "sync", + "reset_connection", + "connection_updater", + "replicate", + ] = Field( + ..., + description="enum that describes the different types of jobs that the platform runs.", + title="JobType", + ) + + +class JobTypeResourceLimit(BaseModel): + class Config: + extra = Extra.forbid + + jobType: JobType + resourceRequirements: ResourceRequirements + + +class ActorDefinitionResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + default: Optional[ResourceRequirements] = Field( + None, + description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.", + ) + jobSpecific: Optional[List[JobTypeResourceLimit]] = None + + +class RegistryOverrides(BaseModel): + class Config: + extra = Extra.forbid + + enabled: bool + name: Optional[str] = None + dockerRepository: Optional[str] = None + dockerImageTag: Optional[str] = None + supportsDbt: Optional[bool] = None + supportsNormalization: Optional[bool] = None + license: Optional[str] = None + supportUrl: Optional[AnyUrl] = None + connectorSubtype: Optional[str] = None + allowedHosts: Optional[AllowedHosts] = None + normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None + suggestedStreams: Optional[SuggestedStreams] = None + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ReleaseStage.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ReleaseStage.py new file mode 100644 index 0000000000000..cb7c9b909b0ba --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ReleaseStage.py @@ -0,0 +1,15 @@ +# generated by datamodel-codegen: +# filename: ReleaseStage.yaml + +from __future__ import annotations + +from pydantic import BaseModel, Field +from typing_extensions import Literal + + +class ReleaseStage(BaseModel): + __root__: Literal["alpha", "beta", "generally_available", "custom"] = Field( + ..., + description="enum that describes a connector's release stage", + title="ReleaseStage", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ResourceRequirements.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ResourceRequirements.py new file mode 100644 index 0000000000000..abc7e6173d054 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ResourceRequirements.py @@ -0,0 +1,18 @@ +# generated by datamodel-codegen: +# filename: ResourceRequirements.yaml + +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel, Extra + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/SuggestedStreams.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/SuggestedStreams.py new file mode 100644 index 0000000000000..9a3d7cdf4012e --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/SuggestedStreams.py @@ -0,0 +1,18 @@ +# generated by datamodel-codegen: +# filename: SuggestedStreams.yaml + +from __future__ import annotations + +from typing import List, Optional + +from pydantic import BaseModel, Extra, Field + + +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py new file mode 100644 index 0000000000000..1091c7cf5fcf4 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py @@ -0,0 +1,13 @@ +# generated by generate-python-classes +from .ActorDefinitionResourceRequirements import * +from .AllowedHosts import * +from .ConnectorMetadataDefinitionV0 import * +from .ConnectorRegistryDestinationDefinition import * +from .ConnectorRegistrySourceDefinition import * +from .ConnectorRegistryV0 import * +from .JobType import * +from .NormalizationDestinationDefinitionConfig import * +from .RegistryOverrides import * +from .ReleaseStage import * +from .ResourceRequirements import * +from .SuggestedStreams import * diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ActorDefinitionResourceRequirements.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ActorDefinitionResourceRequirements.yaml index f4cc1bcfb0fc6..f9ea5817c1ca8 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ActorDefinitionResourceRequirements.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ActorDefinitionResourceRequirements.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte-types/blob/master/models/src/main/resources/ActorDefinitionResourceRequirements.yaml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ActorDefinitionResourceRequirements.yaml title: ActorDefinitionResourceRequirements description: actor definition specific resource requirements type: object diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml index 670cacfea193e..e796573835e39 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte-types/blob/master/models/src/main/resources/AllowedHosts.yaml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/AllowedHosts.yaml title: AllowedHosts description: A connector's allowed hosts. If present, the platform will limit communication to only hosts which are listed in `AllowedHosts.hosts`. type: object diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorMetadataDefinitionV1.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorMetadataDefinitionV0.yaml similarity index 96% rename from airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorMetadataDefinitionV1.yaml rename to airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorMetadataDefinitionV0.yaml index 4ef072f598a1a..f2c35845adf1d 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorMetadataDefinitionV1.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorMetadataDefinitionV0.yaml @@ -1,8 +1,8 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors_ci/metadata_service/lib/models/src/ConnectorMetadataDefinitionV1.yml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors_ci/metadata_service/lib/models/src/ConnectorMetadataDefinitionV0.yml -title: ConnectorMetadataDefinitionV1 +title: ConnectorMetadataDefinitionV0 description: describes the metadata of a connector type: object required: @@ -16,7 +16,6 @@ properties: type: object required: - name - - icon - definitionId - connectorType - dockerRepository @@ -29,6 +28,8 @@ properties: properties: name: type: string + icon: + type: string definitionId: type: string format: uuid diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml new file mode 100644 index 0000000000000..41dc52029186a --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml @@ -0,0 +1,61 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte-platform/blob/main/airbyte-config/config-models/src/main/resources/types/ConnectorRegistryDestinationDefinition.yaml +title: ConnectorRegistryDestinationDefinition +description: describes a destination +type: object +required: + - destinationDefinitionId + - name + - dockerRepository + - dockerImageTag + - documentationUrl + - spec +additionalProperties: true +properties: + destinationDefinitionId: + type: string + format: uuid + name: + type: string + dockerRepository: + type: string + dockerImageTag: + type: string + documentationUrl: + type: string + icon: + type: string + spec: + type: object + tombstone: + description: if false, the configuration is active. if true, then this + configuration is permanently off. + type: boolean + default: false + public: + description: true if this connector definition is available to all workspaces + type: boolean + default: false + custom: + description: whether this is a custom connector definition + type: boolean + default: false + releaseStage: + "$ref": ReleaseStage.yaml + releaseDate: + description: The date when this connector was first released, in yyyy-mm-dd format. + type: string + format: date + resourceRequirements: + "$ref": ActorDefinitionResourceRequirements.yaml + protocolVersion: + type: string + description: the Airbyte Protocol version supported by the connector + normalizationConfig: + "$ref": NormalizationDestinationDefinitionConfig.yaml + supportsDbt: + type: boolean + description: an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used. + allowedHosts: + "$ref": AllowedHosts.yaml diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml new file mode 100644 index 0000000000000..a742bc6a8ce07 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml @@ -0,0 +1,68 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte-platform/blob/main/airbyte-config/config-models/src/main/resources/types/ConnectorRegistrySourceDefinition.yaml +title: ConnectorRegistrySourceDefinition +description: describes a source +type: object +required: + - sourceDefinitionId + - name + - dockerRepository + - dockerImageTag + - documentationUrl + - spec +additionalProperties: true +properties: + sourceDefinitionId: + type: string + format: uuid + name: + type: string + dockerRepository: + type: string + dockerImageTag: + type: string + documentationUrl: + type: string + icon: + type: string + sourceType: + type: string + enum: + - api + - file + - database + - custom + spec: + type: object + tombstone: + description: if false, the configuration is active. if true, then this + configuration is permanently off. + type: boolean + default: false + public: + description: true if this connector definition is available to all workspaces + type: boolean + default: false + custom: + description: whether this is a custom connector definition + type: boolean + default: false + releaseStage: + "$ref": ReleaseStage.yaml + releaseDate: + description: The date when this connector was first released, in yyyy-mm-dd format. + type: string + format: date + resourceRequirements: + "$ref": ActorDefinitionResourceRequirements.yaml + protocolVersion: + type: string + description: the Airbyte Protocol version supported by the connector + allowedHosts: + "$ref": AllowedHosts.yaml + suggestedStreams: + "$ref": SuggestedStreams.yaml + maxSecondsBetweenMessages: + description: Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach + type: integer diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryV0.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryV0.yaml new file mode 100644 index 0000000000000..8e3620816d653 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryV0.yaml @@ -0,0 +1,18 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte-platform/blob/main/airbyte-config/config-models/src/main/resources/types/ConnectorRegistryV0.yaml +title: ConnectorRegistryV0 +description: describes the collection of connectors retrieved from a registry +type: object +required: + - destinations + - sources +properties: + destinations: + type: array + items: + $ref: ConnectorRegistryDestinationDefinition.yaml + sources: + type: array + items: + $ref: ConnectorRegistrySourceDefinition.yaml diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/JobType.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/JobType.yaml index 83b180a8d805d..5cac4337e2bf8 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/JobType.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/JobType.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte-types/blob/master/models/src/main/resources/JobType.yaml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/JobType.yaml title: JobType description: enum that describes the different types of jobs that the platform runs. type: string diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/NormalizationDestinationDefinitionConfig.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/NormalizationDestinationDefinitionConfig.yaml index 9eccbb73c6581..caf3d79e9c159 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/NormalizationDestinationDefinitionConfig.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/NormalizationDestinationDefinitionConfig.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte-types/blob/master/models/src/main/resources/NormalizationDestinationDefinitionConfig.yaml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/NormalizationDestinationDefinitionConfig.yaml title: NormalizationDestinationDefinitionConfig description: describes a normalization config for destination definition type: object diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ReleaseStage.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ReleaseStage.yaml new file mode 100644 index 0000000000000..0a0767efd5efa --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ReleaseStage.yaml @@ -0,0 +1,11 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte-platform/blob/main/airbyte-config/config-models/src/main/resources/types/ReleaseStage.yaml +title: ReleaseStage +description: enum that describes a connector's release stage +type: string +enum: + - alpha + - beta + - generally_available + - custom diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ResourceRequirements.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ResourceRequirements.yaml index f321aeb9dc2a1..aef3e6f6c9f08 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ResourceRequirements.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ResourceRequirements.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte-types/blob/master/models/src/main/resources/ResourceRequirements.yaml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ResourceRequirements.yaml title: ResourceRequirements description: generic configuration for pod source requirements type: object diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/SuggestedStreams.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/SuggestedStreams.yaml index cf35bc37c2ad5..4c540ba7fe982 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/SuggestedStreams.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/SuggestedStreams.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte-types/blob/master/models/src/main/resources/SuggestedStreams.yaml +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/SuggestedStreams.yaml title: SuggestedStreams description: A source's suggested streams. These will be suggested by default for new connections using this source. Otherwise, all streams will be selected. This is useful for when your source has a lot of streams, but the average user will only want a subset of them synced. type: object diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/validators/metadata_validator.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/validators/metadata_validator.py index 7f34f2bfcf71f..3d4efbbaf9998 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/validators/metadata_validator.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/validators/metadata_validator.py @@ -1,7 +1,7 @@ import yaml import pathlib from pydantic import ValidationError -from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1 +from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0 def validate_metadata_file(file_path: pathlib.Path): @@ -10,7 +10,7 @@ def validate_metadata_file(file_path: pathlib.Path): """ try: metadata = yaml.safe_load(file_path.read_text()) - ConnectorMetadataDefinitionV1.parse_obj(metadata) + ConnectorMetadataDefinitionV0.parse_obj(metadata) return True, None except ValidationError as e: return False, e diff --git a/airbyte-ci/connectors/metadata_service/lib/pyproject.toml b/airbyte-ci/connectors/metadata_service/lib/pyproject.toml index 24b1b9d811c0d..a5aa2b212bbbe 100644 --- a/airbyte-ci/connectors/metadata_service/lib/pyproject.toml +++ b/airbyte-ci/connectors/metadata_service/lib/pyproject.toml @@ -28,7 +28,7 @@ pytest-mock = "^3.10.0" metadata_service = "metadata_service.commands:metadata_service" [tool.poe.tasks] -generate-models = "datamodel-codegen --input metadata_service/models/src/ConnectorMetadataDefinitionV1.yaml --output metadata_service/models/generated/ConnectorMetadataDefinitionV1.py --use-title-as-name --use-double-quotes --disable-timestamp" +generate-models = { shell = "./bin/generate-python-classes.sh" } [build-system] requires = ["poetry-core"] diff --git a/airbyte-ci/connectors/metadata_service/lib/tests/test_gcs_upload.py b/airbyte-ci/connectors/metadata_service/lib/tests/test_gcs_upload.py index 4fca471782923..12aa8622f1f55 100644 --- a/airbyte-ci/connectors/metadata_service/lib/tests/test_gcs_upload.py +++ b/airbyte-ci/connectors/metadata_service/lib/tests/test_gcs_upload.py @@ -6,8 +6,9 @@ import pytest import yaml from metadata_service import gcs_upload -from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1 +from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0 from pydantic import ValidationError +from metadata_service.constants import METADATA_FILE_NAME @pytest.mark.parametrize( @@ -28,9 +29,9 @@ ) def test_upload_metadata_to_gcs_valid_metadata(mocker, valid_metadata_yaml_files, version_blob_exists, version_blob_etag, latest_blob_etag): metadata_file_path = pathlib.Path(valid_metadata_yaml_files[0]) - metadata = ConnectorMetadataDefinitionV1.parse_obj(yaml.safe_load(metadata_file_path.read_text())) - expected_version_key = f"metadata/{metadata.data.dockerRepository}/{metadata.data.dockerImageTag}/metadata.yaml" - expected_latest_key = f"metadata/{metadata.data.dockerRepository}/latest/metadata.yaml" + metadata = ConnectorMetadataDefinitionV0.parse_obj(yaml.safe_load(metadata_file_path.read_text())) + expected_version_key = f"metadata/{metadata.data.dockerRepository}/{metadata.data.dockerImageTag}/{METADATA_FILE_NAME}" + expected_latest_key = f"metadata/{metadata.data.dockerRepository}/latest/{METADATA_FILE_NAME}" mock_credentials = mocker.Mock() mock_storage_client = mocker.Mock() diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py index d7242aaba2372..39a52881036d3 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py @@ -3,7 +3,6 @@ # from dagster import Definitions -from orchestrator.jobs.registry import generate_registry, generate_registry_markdown, generate_local_metadata_files from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_directory_blobs, gcs_file_blob, gcs_file_manager from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory from orchestrator.resources.local import simple_local_file_manager @@ -23,13 +22,19 @@ cloud_destinations_dataframe, oss_sources_dataframe, cloud_sources_dataframe, - latest_oss_registry_dict, - latest_cloud_registry_dict, oss_registry_from_metadata, cloud_registry_from_metadata, + legacy_cloud_sources_dataframe, + legacy_oss_sources_dataframe, + legacy_cloud_destinations_dataframe, + legacy_oss_destinations_dataframe, + legacy_cloud_registry_dict, + legacy_cloud_registry, + legacy_oss_registry_dict, + legacy_oss_registry, ) from orchestrator.assets.metadata import ( - registry_derived_metadata_definitions, + legacy_registry_derived_metadata_definitions, valid_metadata_report_dataframe, metadata_definitions, ) @@ -42,6 +47,8 @@ cloud_registry_diff_dataframe, oss_registry_diff_dataframe, metadata_directory_report, + oss_registry_diff_report, + cloud_registry_diff_report, ) from orchestrator.jobs.registry import generate_registry_markdown, generate_local_metadata_files, generate_registry @@ -52,32 +59,40 @@ from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER ASSETS = [ - oss_destinations_dataframe, + all_destinations_dataframe, + all_sources_dataframe, + all_specs_secrets, + cached_specs, cloud_destinations_dataframe, - oss_sources_dataframe, + cloud_registry_diff_dataframe, + cloud_registry_diff_report, + cloud_registry_diff, + cloud_registry_from_metadata, cloud_sources_dataframe, - latest_oss_registry_dict, - latest_cloud_registry_dict, - all_sources_dataframe, - all_destinations_dataframe, - connector_registry_location_markdown, connector_registry_location_html, + connector_registry_location_markdown, github_connector_folders, - registry_derived_metadata_definitions, - valid_metadata_report_dataframe, - persist_metadata_definitions, - overrode_metadata_definitions, - cached_specs, + legacy_cloud_destinations_dataframe, + legacy_cloud_registry_dict, + legacy_cloud_registry, + legacy_cloud_sources_dataframe, + legacy_oss_destinations_dataframe, + legacy_oss_registry_dict, + legacy_oss_registry, + legacy_oss_sources_dataframe, + legacy_registry_derived_metadata_definitions, + metadata_definitions, + metadata_directory_report, + oss_destinations_dataframe, + oss_registry_diff_dataframe, + oss_registry_diff_report, oss_registry_diff, oss_registry_from_metadata, - cloud_registry_diff, - cloud_registry_from_metadata, - cloud_registry_diff_dataframe, - oss_registry_diff_dataframe, - all_specs_secrets, + oss_sources_dataframe, + overrode_metadata_definitions, + persist_metadata_definitions, specs_secrets_mask_yaml, - metadata_directory_report, - metadata_definitions, + valid_metadata_report_dataframe, ] RESOURCES = { @@ -93,9 +108,11 @@ "gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}), "registry_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER}), "registry_report_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER}), - "metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": METADATA_FILE_NAME}), - "latest_oss_registry_gcs_file": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}), - "latest_cloud_registry_gcs_file": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "cloud_registry.json"}), + "latest_metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": f"latest/{METADATA_FILE_NAME}"}), + "legacy_oss_registry_gcs_blob": gcs_file_blob.configured({"prefix": "", "gcs_filename": "oss_catalog.json"}), + "legacy_cloud_registry_gcs_blob": gcs_file_blob.configured({"prefix": "", "gcs_filename": "cloud_catalog.json"}), + "latest_oss_registry_gcs_blob": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}), + "latest_cloud_registry_gcs_blob": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "cloud_registry.json"}), } SENSORS = [ diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/dev.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/dev.py index af3da1a809cca..cd89476f88168 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/dev.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/dev.py @@ -1,12 +1,16 @@ import pandas as pd import yaml +import json from pydash.collections import key_by from deepdiff import DeepDiff -from dagster import Output, asset, OpExecutionContext +from dagster import Output, asset, OpExecutionContext, MetadataValue from typing import List + from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe from orchestrator.models.metadata import PartialMetadataDefinition +from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0 + """ NOTE: This file is temporary and will be removed once we have metadata files checked into source control. @@ -190,14 +194,14 @@ def diff_registries(registry_dict_1: dict, registry_dict_2: dict) -> DeepDiff: @asset(group_name=GROUP_NAME) def overrode_metadata_definitions( - registry_derived_metadata_definitions: List[PartialMetadataDefinition], + legacy_registry_derived_metadata_definitions: List[PartialMetadataDefinition], ) -> List[PartialMetadataDefinition]: """ Overrides the metadata definitions with the values in the OVERRIDES dictionary. This is useful for ensuring all connectors are passing validation when we go live. """ overrode_definitions = [] - for metadata_definition in registry_derived_metadata_definitions: + for metadata_definition in legacy_registry_derived_metadata_definitions: definition_id = metadata_definition["data"]["definitionId"] if definition_id in OVERRIDES: metadata_definition["data"].update(OVERRIDES[definition_id]) @@ -230,19 +234,23 @@ def persist_metadata_definitions(context: OpExecutionContext, overrode_metadata_ @asset(group_name=GROUP_NAME) -def cloud_registry_diff(cloud_registry_from_metadata: dict, latest_cloud_registry_dict: dict) -> dict: +def cloud_registry_diff(cloud_registry_from_metadata: ConnectorRegistryV0, legacy_cloud_registry: ConnectorRegistryV0) -> dict: """ Compares the cloud registry from the metadata with the latest OSS registry. """ - return diff_registries(latest_cloud_registry_dict, cloud_registry_from_metadata).to_dict() + cloud_registry_from_metadata_dict = json.loads(cloud_registry_from_metadata.json()) + legacy_cloud_registry_dict = json.loads(legacy_cloud_registry.json()) + return diff_registries(legacy_cloud_registry_dict, cloud_registry_from_metadata_dict).to_dict() @asset(group_name=GROUP_NAME) -def oss_registry_diff(oss_registry_from_metadata: dict, latest_oss_registry_dict: dict) -> dict: +def oss_registry_diff(oss_registry_from_metadata: ConnectorRegistryV0, legacy_oss_registry: ConnectorRegistryV0) -> dict: """ Compares the OSS registry from the metadata with the latest OSS registry. """ - return diff_registries(latest_oss_registry_dict, oss_registry_from_metadata).to_dict() + oss_registry_from_metadata_dict = json.loads(oss_registry_from_metadata.json()) + legacy_oss_registry_dict = json.loads(legacy_oss_registry.json()) + return diff_registries(legacy_oss_registry_dict, oss_registry_from_metadata_dict).to_dict() @asset(group_name=GROUP_NAME) @@ -257,10 +265,38 @@ def oss_registry_diff_dataframe(oss_registry_diff: dict) -> OutputDataFrame: return output_dataframe(diff_df) -@asset(required_resource_keys={"metadata_file_blobs"}, group_name=GROUP_NAME) -def metadata_directory_report(context): - metadata_file_blobs = context.resources.metadata_file_blobs - blobs = [blob.name for blob in metadata_file_blobs if blob.name.endswith("metadata.yaml")] +@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME) +def metadata_directory_report(context: OpExecutionContext): + latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs + blobs = [blob.name for blob in latest_metadata_file_blobs] blobs_df = pd.DataFrame(blobs) return output_dataframe(blobs_df) + + +@asset(required_resource_keys={"registry_report_directory_manager"}, group_name=GROUP_NAME) +def oss_registry_diff_report(context: OpExecutionContext, oss_registry_diff_dataframe: pd.DataFrame): + markdown = oss_registry_diff_dataframe.to_markdown() + + registry_report_directory_manager = context.resources.registry_report_directory_manager + file_handle = registry_report_directory_manager.write_data(markdown.encode(), ext="md", key="dev/oss_registry_diff_report") + + metadata = { + "preview": MetadataValue.md(markdown), + "gcs_path": MetadataValue.url(file_handle.gcs_path), + } + return Output(metadata=metadata, value=file_handle) + + +@asset(required_resource_keys={"registry_report_directory_manager"}, group_name=GROUP_NAME) +def cloud_registry_diff_report(context: OpExecutionContext, cloud_registry_diff_dataframe: pd.DataFrame): + markdown = cloud_registry_diff_dataframe.to_markdown() + + registry_report_directory_manager = context.resources.registry_report_directory_manager + file_handle = registry_report_directory_manager.write_data(markdown.encode(), ext="md", key="dev/cloud_registry_diff_report") + + metadata = { + "preview": MetadataValue.md(markdown), + "gcs_path": MetadataValue.url(file_handle.gcs_path), + } + return Output(metadata=metadata, value=file_handle) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py index 9b95b861f6998..65ac8a46ea975 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py @@ -1,14 +1,14 @@ import pandas as pd import numpy as np from typing import List -from dagster import Output, asset +from dagster import Output, asset, OpExecutionContext import yaml -from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1 +from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0 from orchestrator.utils.object_helpers import are_values_equal, merge_values from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe -from orchestrator.models.metadata import PartialMetadataDefinition +from orchestrator.models.metadata import PartialMetadataDefinition, MetadataDefinition GROUP_NAME = "metadata" @@ -164,7 +164,7 @@ def build_metadata(connector_registry_entry: dict) -> PartialMetadataDefinition: def validate_metadata(metadata: PartialMetadataDefinition) -> tuple[bool, str]: try: - ConnectorMetadataDefinitionV1.parse_obj(metadata) + ConnectorMetadataDefinitionV0.parse_obj(metadata) return True, None except Exception as e: return False, str(e) @@ -199,28 +199,28 @@ def valid_metadata_report_dataframe(overrode_metadata_definitions: List[PartialM @asset(group_name=GROUP_NAME) -def registry_derived_metadata_definitions( - cloud_sources_dataframe, cloud_destinations_dataframe, oss_sources_dataframe, oss_destinations_dataframe +def legacy_registry_derived_metadata_definitions( + legacy_cloud_sources_dataframe, legacy_cloud_destinations_dataframe, legacy_oss_sources_dataframe, legacy_oss_destinations_dataframe ) -> Output[List[PartialMetadataDefinition]]: - sources_metadata_list = merge_into_metadata_definitions("sourceDefinitionId", "source", oss_sources_dataframe, cloud_sources_dataframe) + sources_metadata_list = merge_into_metadata_definitions( + "sourceDefinitionId", "source", legacy_oss_sources_dataframe, legacy_cloud_sources_dataframe + ) destinations_metadata_list = merge_into_metadata_definitions( - "destinationDefinitionId", "destination", oss_destinations_dataframe, cloud_destinations_dataframe + "destinationDefinitionId", "destination", legacy_oss_destinations_dataframe, legacy_cloud_destinations_dataframe ) all_definitions = sources_metadata_list + destinations_metadata_list return Output(all_definitions, metadata={"count": len(all_definitions)}) -@asset(required_resource_keys={"metadata_file_blobs"}, group_name=GROUP_NAME) -def metadata_definitions(context): - metadata_file_blobs = context.resources.metadata_file_blobs +@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME) +def metadata_definitions(context: OpExecutionContext) -> List[MetadataDefinition]: + latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs metadata_definitions = [] - for blob in metadata_file_blobs: + for blob in latest_metadata_file_blobs: yaml_string = blob.download_as_string().decode("utf-8") metadata_dict = yaml.safe_load(yaml_string) - metadata_def = ConnectorMetadataDefinitionV1.parse_obj(metadata_dict) + metadata_def = MetadataDefinition.parse_obj(metadata_dict) metadata_definitions.append(metadata_def) - metadata_definitions_df = pd.DataFrame(metadata_definitions) - - return output_dataframe(metadata_definitions_df) + return metadata_definitions diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py index b576bfb05c452..eb753800717fd 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py @@ -6,11 +6,18 @@ from typing import List import pandas as pd -from dagster import OpExecutionContext, asset +from dagster import asset, OpExecutionContext, MetadataValue, Output + from metadata_service.spec_cache import get_cached_spec -from orchestrator.models.metadata import PartialMetadataDefinition + +from orchestrator.models.metadata import MetadataDefinition from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe -from orchestrator.utils.object_helpers import deep_copy_params +from orchestrator.utils.object_helpers import deep_copy_params, to_json_sanitized_dict + +from dagster_gcp.gcs.file_manager import GCSFileManager, GCSFileHandle + +from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0 + GROUP_NAME = "registry" @@ -37,6 +44,10 @@ def apply_overrides_from_registry(metadata_data: dict, override_registry_key: st """ override_registry = metadata_data["registries"][override_registry_key] del override_registry["enabled"] + + # remove any None values from the override registry + override_registry = {k: v for k, v in override_registry.items() if v is not None} + metadata_data.update(override_registry) return metadata_data @@ -100,30 +111,31 @@ def is_metadata_connector_type(metadata_definition: dict, connector_type: str) - return metadata_definition["data"]["connectorType"] == connector_type -def construct_registry_from_metadata(registry_derived_metadata_definitions: List[PartialMetadataDefinition], registry_name: str) -> dict: +def construct_registry_from_metadata( + legacy_registry_derived_metadata_definitions: List[MetadataDefinition], registry_name: str +) -> ConnectorRegistryV0: """Construct the registry from the metadata definitions. Args: - registry_derived_metadata_definitions (List[dict]): Metadata definitions that have been derived from the existing registry. + legacy_registry_derived_metadata_definitions (List[dict]): Metadata definitions that have been derived from the existing registry. registry_name (str): The name of the registry to construct. One of "cloud" or "oss". Returns: dict: The registry. """ + registry_derived_metadata_dicts = [metadata_definition.dict() for metadata_definition in legacy_registry_derived_metadata_definitions] registry_sources = [ metadata_to_registry_entry(metadata, "source", registry_name) - for metadata in registry_derived_metadata_definitions + for metadata in registry_derived_metadata_dicts if is_metadata_registry_enabled(metadata, registry_name) and is_metadata_connector_type(metadata, "source") ] registry_destinations = [ metadata_to_registry_entry(metadata, "destination", registry_name) - for metadata in registry_derived_metadata_definitions + for metadata in registry_derived_metadata_dicts if is_metadata_registry_enabled(metadata, registry_name) and is_metadata_connector_type(metadata, "destination") ] - registry = {"sources": registry_sources, "destinations": registry_destinations} - - return registry + return {"sources": registry_sources, "destinations": registry_destinations} def construct_registry_with_spec_from_registry(registry: dict, cached_specs: OutputDataFrame) -> dict: @@ -144,74 +156,173 @@ def construct_registry_with_spec_from_registry(registry: dict, cached_specs: Out else: registry_with_specs["destinations"].append(entry_with_spec) except KeyError: - raise MissingCachedSpecError(f"No cached spec found for {entry['dockerRegistry']:{entry['dockerImageTag']}}") + raise MissingCachedSpecError(f"No cached spec found for {entry['dockerRepository']:{entry['dockerImageTag']}}") return registry_with_specs -# ASSETS +def persist_registry_to_json( + registry: ConnectorRegistryV0, registry_name: str, registry_directory_manager: GCSFileManager +) -> GCSFileHandle: + """Persist the registry to a json file on GCS bucket + Args: + registry (ConnectorRegistryV0): The registry. + registry_name (str): The name of the registry. One of "cloud" or "oss". + registry_directory_manager (OutputDataFrame): The registry directory manager. -@asset(group_name=GROUP_NAME) + Returns: + OutputDataFrame: The registry directory manager. + """ + registry_file_name = f"{registry_name}_registry" + registry_json = registry.json() + file_handle = registry_directory_manager.write_data(registry_json.encode("utf-8"), ext="json", key=registry_file_name) + return file_handle + +def generate_and_persist_registry( + metadata_definitions: List[MetadataDefinition], + cached_specs: OutputDataFrame, + registry_directory_manager: GCSFileManager, + registry_name: str, +) -> Output[ConnectorRegistryV0]: + """Generate the selected registry from the metadata files, and persist it to GCS. + + Args: + context (OpExecutionContext): The execution context. + metadata_definitions (List[MetadataDefinition]): The metadata definitions. + cached_specs (OutputDataFrame): The cached specs. + + Returns: + Output[ConnectorRegistryV0]: The registry. + """ + + from_metadata = construct_registry_from_metadata(metadata_definitions, registry_name) + registry_dict = construct_registry_with_spec_from_registry(from_metadata, cached_specs) + registry_model = ConnectorRegistryV0.parse_obj(registry_dict) + + file_handle = persist_registry_to_json(registry_model, registry_name, registry_directory_manager) + + metadata = { + "gcs_path": MetadataValue.url(file_handle.gcs_path), + } + + return Output(metadata=metadata, value=registry_model) + + +# New Registry + + +@asset(required_resource_keys={"registry_directory_manager"}, group_name=GROUP_NAME) def cloud_registry_from_metadata( - registry_derived_metadata_definitions: List[PartialMetadataDefinition], cached_specs: OutputDataFrame -) -> dict: + context: OpExecutionContext, metadata_definitions: List[MetadataDefinition], cached_specs: OutputDataFrame +) -> Output[ConnectorRegistryV0]: """ This asset is used to generate the cloud registry from the metadata definitions. - - TODO (ben): This asset should be updated to use the GCS metadata definitions once available. """ - from_metadata = construct_registry_from_metadata(registry_derived_metadata_definitions, "cloud") - from_metadata_and_spec = construct_registry_with_spec_from_registry(from_metadata, cached_specs) - return from_metadata_and_spec + registry_name = "cloud" + registry_directory_manager = context.resources.registry_directory_manager + return generate_and_persist_registry( + metadata_definitions=metadata_definitions, + cached_specs=cached_specs, + registry_directory_manager=registry_directory_manager, + registry_name=registry_name, + ) -@asset(group_name=GROUP_NAME) -def oss_registry_from_metadata(registry_derived_metadata_definitions: List[PartialMetadataDefinition], cached_specs: OutputDataFrame) -> dict: + +@asset(required_resource_keys={"registry_directory_manager"}, group_name=GROUP_NAME) +def oss_registry_from_metadata( + context: OpExecutionContext, metadata_definitions: List[MetadataDefinition], cached_specs: OutputDataFrame +) -> Output[ConnectorRegistryV0]: """ This asset is used to generate the oss registry from the metadata definitions. - - TODO (ben): This asset should be updated to use the GCS metadata definitions once available. """ - from_metadata = construct_registry_from_metadata(registry_derived_metadata_definitions, "oss") - from_metadata_and_spec = construct_registry_with_spec_from_registry(from_metadata, cached_specs) - return from_metadata_and_spec + registry_name = "oss" + registry_directory_manager = context.resources.registry_directory_manager + + return generate_and_persist_registry( + metadata_definitions=metadata_definitions, + cached_specs=cached_specs, + registry_directory_manager=registry_directory_manager, + registry_name=registry_name, + ) + + +@asset(group_name=GROUP_NAME) +def cloud_sources_dataframe(cloud_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame: + cloud_registry_from_metadata_dict = to_json_sanitized_dict(cloud_registry_from_metadata) + sources = cloud_registry_from_metadata_dict["sources"] + return output_dataframe(pd.DataFrame(sources)) + + +@asset(group_name=GROUP_NAME) +def oss_sources_dataframe(oss_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame: + oss_registry_from_metadata_dict = to_json_sanitized_dict(oss_registry_from_metadata) + sources = oss_registry_from_metadata_dict["sources"] + return output_dataframe(pd.DataFrame(sources)) + + +@asset(group_name=GROUP_NAME) +def cloud_destinations_dataframe(cloud_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame: + cloud_registry_from_metadata_dict = to_json_sanitized_dict(cloud_registry_from_metadata) + destinations = cloud_registry_from_metadata_dict["destinations"] + return output_dataframe(pd.DataFrame(destinations)) @asset(group_name=GROUP_NAME) -def cloud_sources_dataframe(latest_cloud_registry_dict: dict) -> OutputDataFrame: - sources = latest_cloud_registry_dict["sources"] +def oss_destinations_dataframe(oss_registry_from_metadata: ConnectorRegistryV0) -> OutputDataFrame: + oss_registry_from_metadata_dict = to_json_sanitized_dict(oss_registry_from_metadata) + destinations = oss_registry_from_metadata_dict["destinations"] + return output_dataframe(pd.DataFrame(destinations)) + + +# Old Registry + + +@asset(group_name=GROUP_NAME) +def legacy_cloud_sources_dataframe(legacy_cloud_registry_dict: dict) -> OutputDataFrame: + sources = legacy_cloud_registry_dict["sources"] return output_dataframe(pd.DataFrame(sources)) @asset(group_name=GROUP_NAME) -def oss_sources_dataframe(latest_oss_registry_dict: dict) -> OutputDataFrame: - sources = latest_oss_registry_dict["sources"] +def legacy_oss_sources_dataframe(legacy_oss_registry_dict: dict) -> OutputDataFrame: + sources = legacy_oss_registry_dict["sources"] return output_dataframe(pd.DataFrame(sources)) @asset(group_name=GROUP_NAME) -def cloud_destinations_dataframe(latest_cloud_registry_dict: dict) -> OutputDataFrame: - destinations = latest_cloud_registry_dict["destinations"] +def legacy_cloud_destinations_dataframe(legacy_cloud_registry_dict: dict) -> OutputDataFrame: + destinations = legacy_cloud_registry_dict["destinations"] return output_dataframe(pd.DataFrame(destinations)) @asset(group_name=GROUP_NAME) -def oss_destinations_dataframe(latest_oss_registry_dict: dict) -> OutputDataFrame: - destinations = latest_oss_registry_dict["destinations"] +def legacy_oss_destinations_dataframe(legacy_oss_registry_dict: dict) -> OutputDataFrame: + destinations = legacy_oss_registry_dict["destinations"] return output_dataframe(pd.DataFrame(destinations)) -@asset(required_resource_keys={"latest_cloud_registry_gcs_file"}, group_name=GROUP_NAME) -def latest_cloud_registry_dict(context: OpExecutionContext) -> dict: - oss_registry_file = context.resources.latest_cloud_registry_gcs_file +@asset(required_resource_keys={"legacy_cloud_registry_gcs_blob"}, group_name=GROUP_NAME) +def legacy_cloud_registry(legacy_cloud_registry_dict: dict) -> ConnectorRegistryV0: + return ConnectorRegistryV0.parse_obj(legacy_cloud_registry_dict) + + +@asset(required_resource_keys={"legacy_oss_registry_gcs_blob"}, group_name=GROUP_NAME) +def legacy_oss_registry(legacy_oss_registry_dict: dict) -> ConnectorRegistryV0: + return ConnectorRegistryV0.parse_obj(legacy_oss_registry_dict) + + +@asset(required_resource_keys={"legacy_cloud_registry_gcs_blob"}, group_name=GROUP_NAME) +def legacy_cloud_registry_dict(context: OpExecutionContext) -> dict: + oss_registry_file = context.resources.legacy_cloud_registry_gcs_blob json_string = oss_registry_file.download_as_string().decode("utf-8") oss_registry_dict = json.loads(json_string) return oss_registry_dict -@asset(required_resource_keys={"latest_oss_registry_gcs_file"}, group_name=GROUP_NAME) -def latest_oss_registry_dict(context: OpExecutionContext) -> dict: - oss_registry_file = context.resources.latest_oss_registry_gcs_file +@asset(required_resource_keys={"legacy_oss_registry_gcs_blob"}, group_name=GROUP_NAME) +def legacy_oss_registry_dict(context: OpExecutionContext) -> dict: + oss_registry_file = context.resources.legacy_oss_registry_gcs_blob json_string = oss_registry_file.download_as_string().decode("utf-8") oss_registry_dict = json.loads(json_string) return oss_registry_dict diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/specs_secrets_mask.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/specs_secrets_mask.py index 5df1b234d4e3f..156912ff2b39f 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/specs_secrets_mask.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/specs_secrets_mask.py @@ -6,6 +6,7 @@ import dpath.util import yaml from dagster import MetadataValue, Output, asset +from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0 GROUP_NAME = "specs_secrets_mask" @@ -45,13 +46,16 @@ def get_secrets_properties_from_registry_entry(registry_entry: dict) -> List[str @asset(group_name=GROUP_NAME) -def all_specs_secrets(oss_registry_from_metadata: dict, cloud_registry_from_metadata: dict) -> Set[str]: +def all_specs_secrets(oss_registry_from_metadata: ConnectorRegistryV0, cloud_registry_from_metadata: ConnectorRegistryV0) -> Set[str]: + oss_registry_from_metadata_dict = oss_registry_from_metadata.dict() + cloud_registry_from_metadata_dict = cloud_registry_from_metadata.dict() + all_secret_properties = [] all_entries = ( - oss_registry_from_metadata["sources"] - + cloud_registry_from_metadata["sources"] - + oss_registry_from_metadata["destinations"] - + cloud_registry_from_metadata["destinations"] + oss_registry_from_metadata_dict["sources"] + + cloud_registry_from_metadata_dict["sources"] + + oss_registry_from_metadata_dict["destinations"] + + cloud_registry_from_metadata_dict["destinations"] ) for registry_entry in all_entries: all_secret_properties += get_secrets_properties_from_registry_entry(registry_entry) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/config.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/config.py index 2454936ee30ec..7d67db03e461a 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/config.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/config.py @@ -1,4 +1,4 @@ -REGISTRIES_FOLDER = "registry" +REGISTRIES_FOLDER = "registries/v0" REPORT_FOLDER = "generated_reports" CONNECTOR_REPO_NAME = "airbytehq/airbyte" diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py index 7dfd5d59d5658..51a2a25bd87a6 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py @@ -1,9 +1,11 @@ from dagster import define_asset_job, AssetSelection -metadata_definitions_inclusive = AssetSelection.keys("metadata_directory_report", "metadata_definitions").upstream() +registries_inclusive = AssetSelection.keys( + "metadata_directory_report", "cloud_registry_from_metadata", "oss_registry_from_metadata" +).upstream() registry_reports_inclusive = AssetSelection.keys("connector_registry_location_html", "connector_registry_location_markdown").upstream() -generate_registry = define_asset_job(name="generate_registry", selection=metadata_definitions_inclusive) +generate_registry = define_asset_job(name="generate_registry", selection=registries_inclusive) generate_registry_markdown = define_asset_job(name="generate_registry_markdown", selection=registry_reports_inclusive) # TODO Change to a Asset selection diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py index 16df8ddc918ad..55fa3c7cbeba8 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py @@ -1,4 +1,4 @@ -from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1 +from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0 from pydantic import ValidationError from pydantic import ValidationError @@ -25,6 +25,8 @@ def is_valid(self) -> Tuple[bool, Optional[Any]]: except ValidationError as e: return (False, e) + +class PydanticDictMixin: def __getitem__(self, key: str): return self.__dict__[key] @@ -32,5 +34,9 @@ def __setitem__(self, key: str, value: Any): self.__dict__[key] = value -class PartialMetadataDefinition(PydanticDelayValidationMixin, ConnectorMetadataDefinitionV1): +class PartialMetadataDefinition(PydanticDelayValidationMixin, PydanticDictMixin, ConnectorMetadataDefinitionV0): + pass + + +class MetadataDefinition(PydanticDictMixin, ConnectorMetadataDefinitionV0): pass diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/resources/gcp.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/resources/gcp.py index 3cc0bb62e5317..504e36774b1e9 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/resources/gcp.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/resources/gcp.py @@ -3,7 +3,7 @@ from google.cloud import storage from google.oauth2 import service_account -from dagster import StringSource, InitResourceContext, resource +from dagster import StringSource, InitResourceContext, Noneable, resource from dagster_gcp.gcs.file_manager import GCSFileManager @@ -37,10 +37,7 @@ def gcs_bucket_manager(resource_context: InitResourceContext) -> storage.Bucket: @resource( required_resource_keys={"gcp_gcs_client"}, - config_schema={ - "gcs_bucket": StringSource, - "prefix": StringSource, - }, + config_schema={"gcs_bucket": StringSource, "prefix": StringSource}, ) def gcs_file_manager(resource_context) -> GCSFileManager: """FileManager that provides abstract access to GCS. @@ -60,7 +57,7 @@ def gcs_file_manager(resource_context) -> GCSFileManager: @resource( required_resource_keys={"gcs_bucket_manager"}, config_schema={ - "prefix": StringSource, + "prefix": Noneable(StringSource), "gcs_filename": StringSource, }, ) @@ -75,12 +72,12 @@ def gcs_file_blob(resource_context: InitResourceContext) -> storage.Blob: prefix = resource_context.resource_config["prefix"] gcs_filename = resource_context.resource_config["gcs_filename"] - gcs_file_path = f"{prefix}/{gcs_filename}" + gcs_file_path = f"{prefix}/{gcs_filename}" if prefix else gcs_filename resource_context.log.info(f"retrieving gcs file blob for {gcs_file_path}") gcs_file_blob = bucket.get_blob(gcs_file_path) - if not gcs_file_blob.exists(): + if not gcs_file_blob or not gcs_file_blob.exists(): raise Exception(f"File does not exist at path: {gcs_file_path}") return gcs_file_blob diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/metadata.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/metadata.py index 52c5e2044dcf1..7ad010ad57721 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/metadata.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/metadata.py @@ -26,8 +26,8 @@ def metadata_updated_sensor_definition(context: SensorEvaluationContext): context.log.info(f"Old etag cursor: {etags_cursor}") - metadata_file_blobs = resources.metadata_file_blobs - new_etags_cursor_set = {blob.etag for blob in metadata_file_blobs} + latest_metadata_file_blobs = resources.latest_metadata_file_blobs + new_etags_cursor_set = {blob.etag for blob in latest_metadata_file_blobs} context.log.info(f"New etag cursor: {new_etags_cursor_set}") # Note: ETAGs are GCS's way of providing a version number for a file diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/registry.py index 54f87b7f7765f..b817777d2a9d2 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/registry.py @@ -25,7 +25,7 @@ def registry_updated_sensor_definition(context: SensorEvaluationContext): context.log.info(f"Old etag cursor: {etag_cursor}") new_etag_cursor = serialize_composite_etags_cursor( - [resources.latest_oss_registry_gcs_file.etag, resources.latest_cloud_registry_gcs_file.etag] + [resources.latest_oss_registry_gcs_blob.etag, resources.latest_cloud_registry_gcs_blob.etag] ) context.log.info(f"New etag cursor: {new_etag_cursor}") diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/utils/object_helpers.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/utils/object_helpers.py index a6ca2b2b3c653..8285b38b14db2 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/utils/object_helpers.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/utils/object_helpers.py @@ -1,6 +1,8 @@ import mergedeep +import json from deepdiff import DeepDiff from typing import TypeVar +from pydantic import BaseModel import copy T = TypeVar("T") @@ -28,3 +30,17 @@ def f(*args, **kwargs): return to_call(*copy.deepcopy(args), **copy.deepcopy(kwargs)) return f + + +def to_json_sanitized_dict(pydantic_model_obj: BaseModel) -> dict: + """A helper function to convert a pydantic model to a sanitized dict. + + Without this pydantic dictionary may contain values that are not JSON serializable. + + Args: + pydantic_model_obj (BaseModel): a pydantic model + + Returns: + dict: a sanitized dictionary + """ + return json.loads(pydantic_model_obj.json()) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_debug.py b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_debug.py index 3aa33fc4989dd..f7905fbad0518 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_debug.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_debug.py @@ -1,6 +1,6 @@ from dagster import build_op_context -from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_file_manager, gcs_file_blob +from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_file_manager, gcs_file_blob, gcs_directory_blobs from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory from orchestrator.assets.registry import ( @@ -8,15 +8,22 @@ cloud_destinations_dataframe, oss_sources_dataframe, cloud_sources_dataframe, - latest_oss_registry_dict, - latest_cloud_registry_dict, + oss_registry_from_metadata, + cloud_registry_from_metadata, + legacy_oss_registry_dict, + legacy_oss_registry, ) from orchestrator.assets.metadata import ( - registry_derived_metadata_definitions, + legacy_registry_derived_metadata_definitions, + metadata_definitions, +) +from orchestrator.assets.dev import ( + oss_registry_diff, ) from orchestrator.config import REPORT_FOLDER, REGISTRIES_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME +from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER def debug_registry_projection(): @@ -41,25 +48,57 @@ def debug_registry_projection(): "registry_report_directory_manager": gcs_file_manager.configured( {"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER} ), - "latest_oss_registry_gcs_file": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}), - "latest_cloud_registry_gcs_file": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "cloud_registry.json"}), + "latest_oss_registry_gcs_blob": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}), + "latest_cloud_registry_gcs_blob": gcs_file_blob.configured({"prefix": REGISTRIES_FOLDER, "gcs_filename": "cloud_registry.json"}), } context = build_op_context(resources=resources) - cloud_registry_dict = latest_cloud_registry_dict(context) + cloud_registry_dict = cloud_registry_from_metadata(context) cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_dict).value cloud_sources_df = cloud_sources_dataframe(cloud_registry_dict).value - oss_registry_dict = latest_oss_registry_dict(context) + oss_registry_dict = oss_registry_from_metadata(context) oss_destinations_df = oss_destinations_dataframe(oss_registry_dict).value oss_sources_df = oss_sources_dataframe(oss_registry_dict).value - # github_connector_folders_list = github_connector_folders(context).value - registry_derived_metadata_definitions(context, cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df).value - # valid_metadata_report_dataframe_df = valid_metadata_report_dataframe(metadata_definitions_df).value + legacy_registry_derived_metadata_definitions( + context, cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df + ).value + + +def debug_registry_generation(): + resources = { + "gcp_gcs_client": gcp_gcs_client.configured( + { + "gcp_gcs_cred_string": {"env": "GCS_CREDENTIALS"}, + } + ), + "gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}), + "registry_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER}), + "latest_metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": METADATA_FILE_NAME}), + } - # all_sources_df = all_sources_dataframe(cloud_sources_df, oss_sources_df, github_connector_folders_list, valid_metadata_report_dataframe_df) - # all_destinations_df = all_destinations_dataframe(cloud_destinations_df, oss_destinations_df) + context = build_op_context(resources=resources) + metadata_definitions_asset = metadata_definitions(context) + oss_registry_from_metadata(context, metadata_definitions_asset).value - # connector_registry_location_html(context, all_sources_df, all_destinations_df) - # connector_registry_location_markdown(context, all_sources_df, all_destinations_df) + +def debug_registry_diff(): + resources = { + "gcp_gcs_client": gcp_gcs_client.configured( + { + "gcp_gcs_cred_string": {"env": "GCS_CREDENTIALS"}, + } + ), + "gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}), + "registry_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER}), + "latest_metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": METADATA_FILE_NAME}), + "legacy_oss_registry_gcs_blob": gcs_file_blob.configured({"prefix": "", "gcs_filename": "oss_catalog.json"}), + } + + context = build_op_context(resources=resources) + metadata_definitions_asset = metadata_definitions(context) + oss_registry_from_metadata_value = oss_registry_from_metadata(context, metadata_definitions_asset).value + legacy_oss_registry_dict_value = legacy_oss_registry_dict(context) + legacy_oss_registry_value = legacy_oss_registry(legacy_oss_registry_dict_value) + oss_registry_diff(oss_registry_from_metadata_value, legacy_oss_registry_value) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_metadata.py b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_metadata.py index f4f0630d0a911..b4f5abcb5e053 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_metadata.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_metadata.py @@ -7,16 +7,23 @@ cloud_sources_dataframe, ) -from orchestrator.assets.metadata import registry_derived_metadata_definitions +from orchestrator.assets.metadata import legacy_registry_derived_metadata_definitions + +from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0 def test_no_missing_ids(oss_registry_dict, cloud_registry_dict): - cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_dict).value - cloud_sources_df = cloud_sources_dataframe(cloud_registry_dict).value - oss_destinations_df = oss_destinations_dataframe(oss_registry_dict).value - oss_sources_df = oss_sources_dataframe(oss_registry_dict).value + oss_registry_model = ConnectorRegistryV0.parse_obj(oss_registry_dict) + cloud_registry_model = ConnectorRegistryV0.parse_obj(cloud_registry_dict) + + cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_model).value + cloud_sources_df = cloud_sources_dataframe(cloud_registry_model).value + oss_destinations_df = oss_destinations_dataframe(oss_registry_model).value + oss_sources_df = oss_sources_dataframe(oss_registry_model).value - metadata_def = registry_derived_metadata_definitions(cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df).value + metadata_def = legacy_registry_derived_metadata_definitions( + cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df + ).value metadata_definition_ids = [definition["data"]["definitionId"] for definition in metadata_def] unique_metadata_definition_ids = set(metadata_definition_ids) @@ -55,12 +62,17 @@ def assert_in_expected_registry(metadata_def, oss_registry_dict, cloud_registry_ def test_in_correct_registry(oss_registry_dict, cloud_registry_dict): - cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_dict).value - cloud_sources_df = cloud_sources_dataframe(cloud_registry_dict).value - oss_destinations_df = oss_destinations_dataframe(oss_registry_dict).value - oss_sources_df = oss_sources_dataframe(oss_registry_dict).value + oss_registry_model = ConnectorRegistryV0.parse_obj(oss_registry_dict) + cloud_registry_model = ConnectorRegistryV0.parse_obj(cloud_registry_dict) - metadata_def = registry_derived_metadata_definitions(cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df).value + cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_model).value + cloud_sources_df = cloud_sources_dataframe(cloud_registry_model).value + oss_destinations_df = oss_destinations_dataframe(oss_registry_model).value + oss_sources_df = oss_sources_dataframe(oss_registry_model).value + + metadata_def = legacy_registry_derived_metadata_definitions( + cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df + ).value for definition in metadata_def: assert_in_expected_registry(definition, oss_registry_dict, cloud_registry_dict) @@ -92,11 +104,16 @@ def has_correct_cloud_docker_image(metadata_def, cloud_registry_dict): def test_registry_override(oss_registry_dict, cloud_registry_dict): - cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_dict).value - cloud_sources_df = cloud_sources_dataframe(cloud_registry_dict).value - oss_destinations_df = oss_destinations_dataframe(oss_registry_dict).value - oss_sources_df = oss_sources_dataframe(oss_registry_dict).value + oss_registry_model = ConnectorRegistryV0.parse_obj(oss_registry_dict) + cloud_registry_model = ConnectorRegistryV0.parse_obj(cloud_registry_dict) + + cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_model).value + cloud_sources_df = cloud_sources_dataframe(cloud_registry_model).value + oss_destinations_df = oss_destinations_dataframe(oss_registry_model).value + oss_sources_df = oss_sources_dataframe(oss_registry_model).value - metadata_def = registry_derived_metadata_definitions(cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df).value + metadata_def = legacy_registry_derived_metadata_definitions( + cloud_sources_df, cloud_destinations_df, oss_sources_df, oss_destinations_df + ).value for definition in metadata_def: assert has_correct_cloud_docker_image(definition, cloud_registry_dict) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_registry.py index 916ffb7033fa5..d0331ef74523a 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/tests/test_registry.py @@ -1,5 +1,6 @@ import pandas as pd from metadata_service.spec_cache import CachedSpec +from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0 from orchestrator.assets.registry_report import ( @@ -15,6 +16,9 @@ def test_merged_registry_dataframes(oss_registry_dict, cloud_registry_dict): + oss_registry_model = ConnectorRegistryV0.parse_obj(oss_registry_dict) + cloud_registry_model = ConnectorRegistryV0.parse_obj(cloud_registry_dict) + github_connector_folders = [] valid_metadata_report_dataframe = pd.DataFrame([{"definitionId": "test", "is_metadata_valid": True}]) cached_specs = pd.DataFrame( @@ -32,16 +36,16 @@ def test_merged_registry_dataframes(oss_registry_dict, cloud_registry_dict): num_cloud_sources = len(cloud_registry_dict["sources"]) num_oss_sources = len(oss_registry_dict["sources"]) - cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_dict).value + cloud_destinations_df = cloud_destinations_dataframe(cloud_registry_model).value assert len(cloud_destinations_df) == num_cloud_destinations - cloud_sources_df = cloud_sources_dataframe(cloud_registry_dict).value + cloud_sources_df = cloud_sources_dataframe(cloud_registry_model).value assert len(cloud_sources_df) == num_cloud_sources - oss_destinations_df = oss_destinations_dataframe(oss_registry_dict).value + oss_destinations_df = oss_destinations_dataframe(oss_registry_model).value assert len(oss_destinations_df) == num_oss_destinations - oss_sources_df = oss_sources_dataframe(oss_registry_dict).value + oss_sources_df = oss_sources_dataframe(oss_registry_model).value assert len(oss_sources_df) == num_oss_sources all_sources_df = all_sources_dataframe(