From 666043290f555b543f9e18262c903c1a3571f2b0 Mon Sep 17 00:00:00 2001 From: Augustin Date: Sat, 22 Apr 2023 10:45:45 +0200 Subject: [PATCH] connectors-ci: package normalization inside destinations images supporting it (#25359) --- .../pipelines/actions/environments.py | 158 +++++++++++++----- .../ci_connector_ops/pipelines/bases.py | 6 +- .../pipelines/builds/common.py | 11 +- .../pipelines/builds/normalization.py | 17 +- 4 files changed, 129 insertions(+), 63 deletions(-) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py index f100ab0f819b2..dff4eec3987d3 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/actions/environments.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple from ci_connector_ops.pipelines.utils import get_file_contents, get_version_from_dockerfile, should_enable_sentry, slugify -from dagger import CacheSharingMode, CacheVolume, Container, Directory, File, Secret +from dagger import CacheSharingMode, CacheVolume, Container, Directory, File, Platform, Secret if TYPE_CHECKING: from ci_connector_ops.pipelines.contexts import ConnectorContext, PipelineContext @@ -474,77 +474,153 @@ def with_poetry_module(context: PipelineContext, parent_dir: Directory, module_p ) -def with_integration_base(context: PipelineContext, build_platform: str, jdk_version: str = "17.0.4") -> Container: - """Create an integration base container. - - Reproduce with Dagger the Dockerfile defined here: airbyte-integrations/bases/base/Dockerfile - """ - base_sh = context.get_repo_dir("airbyte-integrations/bases/base", include=["base.sh"]).file("base.sh") - +def with_integration_base(context: PipelineContext, build_platform: Platform) -> Container: return ( context.dagger_client.container(platform=build_platform) - .from_(f"amazoncorretto:{jdk_version}") + .from_("amazonlinux:2022.0.20220831.1") .with_workdir("/airbyte") - .with_file("base.sh", base_sh) + .with_file("base.sh", context.get_repo_dir("airbyte-integrations/bases/base", include=["base.sh"]).file("base.sh")) .with_env_variable("AIRBYTE_ENTRYPOINT", "/airbyte/base.sh") .with_label("io.airbyte.version", "0.1.0") .with_label("io.airbyte.name", "airbyte/integration-base") ) -async def with_java_base(context: PipelineContext, build_platform: str, jdk_version: str = "17.0.4") -> Container: - """Create a java base container. - - Reproduce with Dagger the Dockerfile defined here: airbyte-integrations/bases/base-java/Dockerfile_ - """ - datadog_java_agent = context.dagger_client.http("https://dtdg.co/latest-java-tracer") - javabase_sh = context.get_repo_dir("airbyte-integrations/bases/base-java", include=["javabase.sh"]).file("javabase.sh") - dockerfile = context.get_repo_dir("airbyte-integrations/bases/base-java", include=["Dockerfile"]).file("Dockerfile") - java_base_version = await get_version_from_dockerfile(dockerfile) +def with_integration_base_java(context: PipelineContext, build_platform: Platform, jdk_version: str = "17.0.4") -> Container: + integration_base = with_integration_base(context, build_platform) return ( - with_integration_base(context, build_platform, jdk_version) + context.dagger_client.container(platform=build_platform) + .from_(f"amazoncorretto:{jdk_version}") + .with_directory("/airbyte", integration_base.directory("/airbyte")) .with_exec(["yum", "install", "-y", "tar", "openssl"]) .with_exec(["yum", "clean", "all"]) .with_workdir("/airbyte") - .with_file("dd-java-agent.jar", datadog_java_agent) - .with_file("javabase.sh", javabase_sh) + .with_file("dd-java-agent.jar", context.dagger_client.http("https://dtdg.co/latest-java-tracer")) + .with_file("javabase.sh", context.get_repo_dir("airbyte-integrations/bases/base-java", include=["javabase.sh"]).file("javabase.sh")) .with_env_variable("AIRBYTE_SPEC_CMD", "/airbyte/javabase.sh --spec") .with_env_variable("AIRBYTE_CHECK_CMD", "/airbyte/javabase.sh --check") .with_env_variable("AIRBYTE_DISCOVER_CMD", "/airbyte/javabase.sh --discover") .with_env_variable("AIRBYTE_READ_CMD", "/airbyte/javabase.sh --read") .with_env_variable("AIRBYTE_WRITE_CMD", "/airbyte/javabase.sh --write") .with_env_variable("AIRBYTE_ENTRYPOINT", "/airbyte/base.sh") - .with_label("io.airbyte.version", java_base_version) + .with_label("io.airbyte.version", "0.1.2") .with_label("io.airbyte.name", "airbyte/integration-base-java") ) -async def with_airbyte_java_connector(context: ConnectorContext, connector_java_tar_file: File, build_platform: str): +BASE_DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = { + "destination-clickhouse": "clickhouse.Dockerfile", + "destination-duckdb": "duckdb.Dockerfile", + "destination-mssql": "mssql.Dockerfile", + "destination-mysql": "mysql.Dockerfile", + "destination-oracle": "oracle.Dockerfile", + "destination-tidb": "tidb.Dockerfile", + "destination-bigquery": "Dockerfile", + "destination-redshift": "redshift.Dockerfile", + "destination-snowflake": "snowflake.Dockerfile", +} + +BASE_DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING = { + "destination-clickhouse": "dbt-clickhouse>=1.4.0", + "destination-duckdb": "duckdb.Dockerfile", + "destination-mssql": "dbt-sqlserver==1.0.0", + "destination-mysql": "dbt-mysql==1.0.0", + "destination-oracle": "dbt-oracle==0.4.3", + "destination-tidb": "dbt-tidb==1.0.1", + "destination-bigquery": "dbt-bigquery==1.0.0", +} + +DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = { + **BASE_DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING, + **{f"{k}-strict-encrypt": v for k, v in BASE_DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.items()}, +} + +DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING = { + **BASE_DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING, + **{f"{k}-strict-encrypt": v for k, v in BASE_DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING.items()}, +} + + +def with_normalization(context: ConnectorContext) -> Container: + normalization_directory = context.get_repo_dir("airbyte-integrations/bases/base-normalization") + sshtunneling_file = context.get_repo_dir( + "airbyte-connector-test-harnesses/acceptance-test-harness/src/main/resources", include="sshtunneling.sh" + ).file("sshtunneling.sh") + normalization_directory_with_build = normalization_directory.with_new_directory("build") + normalization_directory_with_sshtunneling = normalization_directory_with_build.with_file("build/sshtunneling.sh", sshtunneling_file) + normalization_dockerfile_name = DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.get( + context.connector.technical_name, "Dockerfile" + ) + return normalization_directory_with_sshtunneling.docker_build(normalization_dockerfile_name) + + +def with_integration_base_java_and_normalization(context: PipelineContext, build_platform: Platform) -> Container: + yum_packages_to_install = [ + "python3", + "python3-devel", + "jq", + "sshpass", + "git", + ] + + dbt_adapter_package = DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING.get(context.connector.technical_name, "dbt-bigquery==1.0.0") + + pip_cache: CacheVolume = context.dagger_client.cache_volume("pip_cache") + + return ( + with_integration_base_java(context, build_platform) + .with_exec(["yum", "install", "-y"] + yum_packages_to_install) + .with_exec(["alternatives", "--install", "/usr/bin/python", "python", "/usr/bin/python3", "60"]) + .with_mounted_cache("/root/.cache/pip", pip_cache) + .with_exec(["python", "-m", "ensurepip", "--upgrade"]) + .with_exec(["pip3", "install", dbt_adapter_package]) + .with_directory("airbyte_normalization", with_normalization(context).directory("/airbyte")) + .with_workdir("airbyte_normalization") + .with_exec(["sh", "-c", "mv * .."]) + .with_workdir("/airbyte") + .with_exec(["rm", "-rf", "airbyte_normalization"]) + .with_workdir("/airbyte/base_python_structs") + .with_exec(["pip3", "install", "."]) + .with_workdir("/airbyte/normalization_code") + .with_exec(["pip3", "install", "."]) + .with_workdir("/airbyte/normalization_code/dbt-template/") + .with_exec(["dbt", "deps"]) + .with_workdir("/airbyte") + ) + + +async def with_airbyte_java_connector(context: ConnectorContext, connector_java_tar_file: File, build_platform: Platform): dockerfile = context.get_connector_dir(include=["Dockerfile"]).file("Dockerfile") + # TODO find a way to infer this without the Dockerfile. From metadata.yaml? + enable_sentry = await should_enable_sentry(dockerfile) connector_version = await get_version_from_dockerfile(dockerfile) + application = context.connector.technical_name - java_base = await with_java_base(context, build_platform) - enable_sentry = await should_enable_sentry(dockerfile) - return ( - java_base.with_workdir("/airbyte") - .with_env_variable("APPLICATION", application) + build_stage = ( + with_integration_base_java(context, build_platform) + .with_workdir("/airbyte") + .with_env_variable("APPLICATION", context.connector.technical_name) .with_file(f"{application}.tar", connector_java_tar_file) .with_exec(["tar", "xf", f"{application}.tar", "--strip-components=1"]) .with_exec(["rm", "-rf", f"{application}.tar"]) - .with_label("io.airbyte.version", connector_version) - .with_label("io.airbyte.name", f"airbyte/{application}") - .with_env_variable("ENABLE_SENTRY", str(enable_sentry).lower()) - .with_entrypoint("/airbyte/base.sh") ) + if context.connector.supports_normalization: + base = with_integration_base_java_and_normalization(context, build_platform) + else: + base = with_integration_base_java(context, build_platform) -def with_normalization(context, normalization_dockerfile_name: str) -> Container: - normalization_directory = context.get_repo_dir("airbyte-integrations/bases/base-normalization") - sshtunneling_file = context.get_repo_dir( - "airbyte-connector-test-harnesses/acceptance-test-harness/src/main/resources", include="sshtunneling.sh" - ).file("sshtunneling.sh") - normalization_directory_with_build = normalization_directory.with_new_directory("build") - normalization_directory_with_sshtunneling = normalization_directory_with_build.with_file("build/sshtunneling.sh", sshtunneling_file) - return normalization_directory_with_sshtunneling.docker_build(normalization_dockerfile_name) + return ( + base.with_workdir("/airbyte") + .with_env_variable("APPLICATION", application) + .with_env_variable("ENABLE_SENTRY", str(enable_sentry).lower()) + .with_directory("builts_artifacts", build_stage.directory("/airbyte")) + .with_exec(["sh", "-c", "mv builts_artifacts/* ."]) + .with_exec(["rm", "-rf", "builts_artifacts"]) + # TODO get version from metadata + .with_label("io.airbyte.version", connector_version) + .with_label("io.airbyte.name", f"airbyte/{application}") + .with_entrypoint(["/airbyte/base.sh"]) + ) diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py index 5c43407d6f81b..e4bd7819a06f0 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py @@ -49,13 +49,11 @@ def from_exit_code(exit_code: int) -> StepStatus: """ if exit_code == 0: return StepStatus.SUCCESS - if exit_code == 1: - return StepStatus.FAILURE # pytest returns a 5 exit code when no test is found. - if exit_code == 5: + elif exit_code == 5: return StepStatus.SKIPPED else: - raise ValueError(f"No step status is mapped to exit code {exit_code}") + return StepStatus.FAILURE def get_rich_style(self) -> Style: """Match color used in the console output to the step status.""" diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py index a6c070419d762..27535d14a5795 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/common.py @@ -39,7 +39,10 @@ def image_name(self) -> Tuple: async def _run(self) -> StepResult: _, exported_tarball_path = await export_container_to_tarball(self.context, self.container) client = docker.from_env() - with open(exported_tarball_path, "rb") as tarball_content: - new_image = client.images.load(tarball_content.read())[0] - new_image.tag(self.image_name, tag=self.IMAGE_TAG) - return StepResult(self, StepStatus.SUCCESS) + try: + with open(exported_tarball_path, "rb") as tarball_content: + new_image = client.images.load(tarball_content.read())[0] + new_image.tag(self.image_name, tag=self.IMAGE_TAG) + return StepResult(self, StepStatus.SUCCESS) + except ConnectionError: + return StepResult(self, StepStatus.FAILURE, stderr="The connection to the local docker host failed.") diff --git a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py index 767fb028bb75a..833803329afab 100644 --- a/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py +++ b/tools/ci_connector_ops/ci_connector_ops/pipelines/builds/normalization.py @@ -6,24 +6,14 @@ from ci_connector_ops.pipelines.actions import environments from ci_connector_ops.pipelines.bases import Step, StepResult, StepStatus from ci_connector_ops.pipelines.contexts import ConnectorContext -from ci_connector_ops.utils import Connector from dagger import Container +# TODO this class could be deleted +# if java connectors tests are not relying on an existing local normalization image to run class BuildOrPullNormalization(Step): """A step to build or pull the normalization image for a connector according to the image name.""" - DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = { - Connector("destination-clickhouse"): "clickhouse.Dockerfile", - Connector("destination-duckdb"): "duckdb.Dockerfile", - Connector("destination-mssql"): "mssql.Dockerfile", - Connector("destination-mysql"): "mysql.Dockerfile", - Connector("destination-oracle"): "oracle.Dockerfile", - Connector("destination-redshift"): "redshift.Dockerfile", - Connector("destination-snowflake"): "snowflake.Dockerfile", - Connector("destination-tidb"): "tidb.Dockerfile", - } - def __init__(self, context: ConnectorContext, normalization_image: str) -> None: """Initialize the step to build or pull the normalization image. @@ -34,12 +24,11 @@ def __init__(self, context: ConnectorContext, normalization_image: str) -> None: super().__init__(context) self.use_dev_normalization = normalization_image.endswith(":dev") self.normalization_image = normalization_image - self.normalization_dockerfile = self.DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.get(context.connector, "Dockerfile") self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}" async def _run(self) -> Tuple[StepResult, Container]: if self.use_dev_normalization: - build_normalization_container = environments.with_normalization(self.context, self.normalization_dockerfile) + build_normalization_container = environments.with_normalization(self.context) else: build_normalization_container = self.context.dagger_client.container().from_(self.normalization_image) return StepResult(self, StepStatus.SUCCESS), build_normalization_container