Skip to content

Commit

Permalink
Dagster registry generation and persist (airbytehq#25260)
Browse files Browse the repository at this point in the history
* Define legacy and latest registry at same time

* Fix file persist

* Get persist to work

* Expand generate models to all models

* Add new registry types

* Fix class name

* Get valid registry type definition

* Remove uuid hack

* Fix ids for json schemas

* Resolve issues

* Update legacy assets

* Add typed legacy registry

* Fix icon issue

* Regenerate models

* Update spec mask to use registry type

* Move v1 to v0

* Add json sanitized dict helper

* Fix tests and format

* Ensure we only get latest

* Code review comments

* fix missing spec error

* Move registry code to helper
  • Loading branch information
bnchrch authored Apr 21, 2023
1 parent 7d75e5e commit de6811f
Show file tree
Hide file tree
Showing 45 changed files with 1,372 additions and 207 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bash

set -e


YAML_DIR=metadata_service/models/src
OUTPUT_DIR=metadata_service/models/generated

# Ensure the yaml directory exists
if [ ! -d "$YAML_DIR" ]; then
echo "The yaml directory does not exist: $YAML_DIR"
exit 1
fi


rm -rf "$OUTPUT_DIR"/*.py
mkdir -p "$OUTPUT_DIR"

echo "# generated by generate-python-classes" > "$OUTPUT_DIR"/__init__.py

for f in "$YAML_DIR"/*.yaml; do
filename_wo_ext=$(basename "$f" | cut -d . -f 1)
echo "from .$filename_wo_ext import *" >> "$OUTPUT_DIR"/__init__.py

datamodel-codegen \
--input "$YAML_DIR/$filename_wo_ext.yaml" \
--output "$OUTPUT_DIR/$filename_wo_ext.py" \
--use-title-as-name \
--use-double-quotes \
--enum-field-as-literal all \
--disable-timestamp
done
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import click
from metadata_service.gcs_upload import upload_metadata_to_gcs
from metadata_service.validators.metadata_validator import validate_metadata_file
from metadata_service.constants import METADATA_FILE_NAME
from pydantic import ValidationError


Expand All @@ -17,15 +18,15 @@ def metadata_service():
@metadata_service.command(help="Validate a given metadata YAML file.")
@click.argument("file_path", type=click.Path(exists=True, path_type=pathlib.Path))
def validate(file_path: pathlib.Path):
file_path = file_path if not file_path.is_dir() else file_path / "metadata.yaml"
file_path = file_path if not file_path.is_dir() else file_path / METADATA_FILE_NAME

click.echo(f"Validating {file_path}...")

is_valid, error = validate_metadata_file(file_path)
if is_valid:
click.echo(f"{file_path} is a valid ConnectorMetadataDefinitionV1 YAML file.")
click.echo(f"{file_path} is a valid ConnectorMetadataDefinitionV0 YAML file.")
else:
click.echo(f"{file_path} is not a valid ConnectorMetadataDefinitionV1 YAML file.")
click.echo(f"{file_path} is not a valid ConnectorMetadataDefinitionV0 YAML file.")
click.echo(str(error))
exit(1)

Expand All @@ -37,7 +38,7 @@ def validate(file_path: pathlib.Path):
"--service-account-file-path", "-sa", type=click.Path(exists=True, path_type=pathlib.Path), envvar="GOOGLE_APPLICATION_CREDENTIALS"
)
def upload(metadata_file_path: pathlib.Path, bucket_name: str, service_account_file_path: pathlib.Path):
metadata_file_path = metadata_file_path if not metadata_file_path.is_dir() else metadata_file_path / "metadata.yaml"
metadata_file_path = metadata_file_path if not metadata_file_path.is_dir() else metadata_file_path / METADATA_FILE_NAME
try:
uploaded, blob_id = upload_metadata_to_gcs(bucket_name, metadata_file_path, service_account_file_path)
except (ValidationError, FileNotFoundError) as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import yaml
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1
from metadata_service.models.generated.ConnectorMetadataDefinitionV0 import ConnectorMetadataDefinitionV0
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER


Expand Down Expand Up @@ -38,7 +38,7 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_a
"""
uploaded = False
raw_metadata = yaml.safe_load(metadata_file_path.read_text())
metadata = ConnectorMetadataDefinitionV1.parse_obj(raw_metadata)
metadata = ConnectorMetadataDefinitionV0.parse_obj(raw_metadata)

credentials = service_account.Credentials.from_service_account_file(service_account_file_path)
storage_client = storage.Client(credentials=credentials)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# generated by datamodel-codegen:
# filename: ActorDefinitionResourceRequirements.yaml

from __future__ import annotations

from typing import List, Optional

from pydantic import BaseModel, Extra, Field
from typing_extensions import Literal


class ResourceRequirements(BaseModel):
class Config:
extra = Extra.forbid

cpu_request: Optional[str] = None
cpu_limit: Optional[str] = None
memory_request: Optional[str] = None
memory_limit: Optional[str] = None


class JobType(BaseModel):
__root__: Literal[
"get_spec",
"check_connection",
"discover_schema",
"sync",
"reset_connection",
"connection_updater",
"replicate",
] = Field(
...,
description="enum that describes the different types of jobs that the platform runs.",
title="JobType",
)


class JobTypeResourceLimit(BaseModel):
class Config:
extra = Extra.forbid

jobType: JobType
resourceRequirements: ResourceRequirements


class ActorDefinitionResourceRequirements(BaseModel):
class Config:
extra = Extra.forbid

default: Optional[ResourceRequirements] = Field(
None,
description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.",
)
jobSpecific: Optional[List[JobTypeResourceLimit]] = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# generated by datamodel-codegen:
# filename: AllowedHosts.yaml

from __future__ import annotations

from typing import List, Optional

from pydantic import BaseModel, Extra, Field


class AllowedHosts(BaseModel):
class Config:
extra = Extra.allow

hosts: Optional[List[str]] = Field(
None,
description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.",
)
Original file line number Diff line number Diff line change
@@ -1,34 +1,13 @@
# generated by datamodel-codegen:
# filename: ConnectorMetadataDefinitionV1.yaml
# filename: ConnectorMetadataDefinitionV0.yaml

from __future__ import annotations

from enum import Enum
from typing import List, Optional
from uuid import UUID

from pydantic import AnyUrl, BaseModel, Extra, Field


class ConnectorType(Enum):
destination = "destination"
source = "source"


class ConnectorSubtype(Enum):
api = "api"
database = "database"
file = "file"
custom = "custom"
message_queue = "message_queue"
unknown = "unknown"


class ReleaseStage(Enum):
alpha = "alpha"
beta = "beta"
generally_available = "generally_available"
source = "source"
from typing_extensions import Literal


class AllowedHosts(BaseModel):
Expand Down Expand Up @@ -79,14 +58,20 @@ class Config:
memory_limit: Optional[str] = None


class JobType(Enum):
get_spec = "get_spec"
check_connection = "check_connection"
discover_schema = "discover_schema"
sync = "sync"
reset_connection = "reset_connection"
connection_updater = "connection_updater"
replicate = "replicate"
class JobType(BaseModel):
__root__: Literal[
"get_spec",
"check_connection",
"discover_schema",
"sync",
"reset_connection",
"connection_updater",
"replicate",
] = Field(
...,
description="enum that describes the different types of jobs that the platform runs.",
title="JobType",
)


class JobTypeResourceLimit(BaseModel):
Expand Down Expand Up @@ -137,25 +122,28 @@ class Config:

class Data(BaseModel):
name: str
icon: Optional[str] = None
definitionId: UUID
connectorType: ConnectorType
connectorType: Literal["destination", "source"]
dockerRepository: str
dockerImageTag: str
supportsDbt: Optional[bool] = None
supportsNormalization: Optional[bool] = None
license: str
supportUrl: AnyUrl
githubIssueLabel: str
connectorSubtype: ConnectorSubtype
releaseStage: ReleaseStage
connectorSubtype: Literal[
"api", "database", "file", "custom", "message_queue", "unknown"
]
releaseStage: Literal["alpha", "beta", "generally_available", "source"]
registries: Optional[Registry] = None
allowedHosts: Optional[AllowedHosts] = None
normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None
suggestedStreams: Optional[SuggestedStreams] = None
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None


class ConnectorMetadataDefinitionV1(BaseModel):
class ConnectorMetadataDefinitionV0(BaseModel):
class Config:
extra = Extra.forbid

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# generated by datamodel-codegen:
# filename: ConnectorRegistryDestinationDefinition.yaml

from __future__ import annotations

from datetime import date
from typing import Any, Dict, List, Optional
from uuid import UUID

from pydantic import BaseModel, Extra, Field
from typing_extensions import Literal


class ReleaseStage(BaseModel):
__root__: Literal["alpha", "beta", "generally_available", "custom"] = Field(
...,
description="enum that describes a connector's release stage",
title="ReleaseStage",
)


class ResourceRequirements(BaseModel):
class Config:
extra = Extra.forbid

cpu_request: Optional[str] = None
cpu_limit: Optional[str] = None
memory_request: Optional[str] = None
memory_limit: Optional[str] = None


class JobType(BaseModel):
__root__: Literal[
"get_spec",
"check_connection",
"discover_schema",
"sync",
"reset_connection",
"connection_updater",
"replicate",
] = Field(
...,
description="enum that describes the different types of jobs that the platform runs.",
title="JobType",
)


class NormalizationDestinationDefinitionConfig(BaseModel):
class Config:
extra = Extra.allow

normalizationRepository: str = Field(
...,
description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.",
)
normalizationTag: str = Field(
...,
description="a field indicating the tag of the docker repository to be used for normalization.",
)
normalizationIntegrationType: str = Field(
...,
description="a field indicating the type of integration dialect to use for normalization.",
)


class AllowedHosts(BaseModel):
class Config:
extra = Extra.allow

hosts: Optional[List[str]] = Field(
None,
description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.",
)


class JobTypeResourceLimit(BaseModel):
class Config:
extra = Extra.forbid

jobType: JobType
resourceRequirements: ResourceRequirements


class ActorDefinitionResourceRequirements(BaseModel):
class Config:
extra = Extra.forbid

default: Optional[ResourceRequirements] = Field(
None,
description="if set, these are the requirements that should be set for ALL jobs run for this actor definition.",
)
jobSpecific: Optional[List[JobTypeResourceLimit]] = None


class ConnectorRegistryDestinationDefinition(BaseModel):
class Config:
extra = Extra.allow

destinationDefinitionId: UUID
name: str
dockerRepository: str
dockerImageTag: str
documentationUrl: str
icon: Optional[str] = None
spec: Dict[str, Any]
tombstone: Optional[bool] = Field(
False,
description="if false, the configuration is active. if true, then this configuration is permanently off.",
)
public: Optional[bool] = Field(
False,
description="true if this connector definition is available to all workspaces",
)
custom: Optional[bool] = Field(
False, description="whether this is a custom connector definition"
)
releaseStage: Optional[ReleaseStage] = None
releaseDate: Optional[date] = Field(
None,
description="The date when this connector was first released, in yyyy-mm-dd format.",
)
resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None
protocolVersion: Optional[str] = Field(
None, description="the Airbyte Protocol version supported by the connector"
)
normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None
supportsDbt: Optional[bool] = Field(
None,
description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.",
)
allowedHosts: Optional[AllowedHosts] = None
Loading

0 comments on commit de6811f

Please sign in to comment.