Skip to content

Commit

Permalink
connectors-ci: package normalization inside destinations images suppo…
Browse files Browse the repository at this point in the history
…rting it (airbytehq#25359)
  • Loading branch information
alafanechere authored Apr 22, 2023
1 parent 0ab82b0 commit 6660432
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import TYPE_CHECKING, List, Optional, Tuple

from ci_connector_ops.pipelines.utils import get_file_contents, get_version_from_dockerfile, should_enable_sentry, slugify
from dagger import CacheSharingMode, CacheVolume, Container, Directory, File, Secret
from dagger import CacheSharingMode, CacheVolume, Container, Directory, File, Platform, Secret

if TYPE_CHECKING:
from ci_connector_ops.pipelines.contexts import ConnectorContext, PipelineContext
Expand Down Expand Up @@ -474,77 +474,153 @@ def with_poetry_module(context: PipelineContext, parent_dir: Directory, module_p
)


def with_integration_base(context: PipelineContext, build_platform: str, jdk_version: str = "17.0.4") -> Container:
"""Create an integration base container.
Reproduce with Dagger the Dockerfile defined here: airbyte-integrations/bases/base/Dockerfile
"""
base_sh = context.get_repo_dir("airbyte-integrations/bases/base", include=["base.sh"]).file("base.sh")

def with_integration_base(context: PipelineContext, build_platform: Platform) -> Container:
return (
context.dagger_client.container(platform=build_platform)
.from_(f"amazoncorretto:{jdk_version}")
.from_("amazonlinux:2022.0.20220831.1")
.with_workdir("/airbyte")
.with_file("base.sh", base_sh)
.with_file("base.sh", context.get_repo_dir("airbyte-integrations/bases/base", include=["base.sh"]).file("base.sh"))
.with_env_variable("AIRBYTE_ENTRYPOINT", "/airbyte/base.sh")
.with_label("io.airbyte.version", "0.1.0")
.with_label("io.airbyte.name", "airbyte/integration-base")
)


async def with_java_base(context: PipelineContext, build_platform: str, jdk_version: str = "17.0.4") -> Container:
"""Create a java base container.
Reproduce with Dagger the Dockerfile defined here: airbyte-integrations/bases/base-java/Dockerfile_
"""
datadog_java_agent = context.dagger_client.http("https://dtdg.co/latest-java-tracer")
javabase_sh = context.get_repo_dir("airbyte-integrations/bases/base-java", include=["javabase.sh"]).file("javabase.sh")
dockerfile = context.get_repo_dir("airbyte-integrations/bases/base-java", include=["Dockerfile"]).file("Dockerfile")
java_base_version = await get_version_from_dockerfile(dockerfile)
def with_integration_base_java(context: PipelineContext, build_platform: Platform, jdk_version: str = "17.0.4") -> Container:

integration_base = with_integration_base(context, build_platform)
return (
with_integration_base(context, build_platform, jdk_version)
context.dagger_client.container(platform=build_platform)
.from_(f"amazoncorretto:{jdk_version}")
.with_directory("/airbyte", integration_base.directory("/airbyte"))
.with_exec(["yum", "install", "-y", "tar", "openssl"])
.with_exec(["yum", "clean", "all"])
.with_workdir("/airbyte")
.with_file("dd-java-agent.jar", datadog_java_agent)
.with_file("javabase.sh", javabase_sh)
.with_file("dd-java-agent.jar", context.dagger_client.http("https://dtdg.co/latest-java-tracer"))
.with_file("javabase.sh", context.get_repo_dir("airbyte-integrations/bases/base-java", include=["javabase.sh"]).file("javabase.sh"))
.with_env_variable("AIRBYTE_SPEC_CMD", "/airbyte/javabase.sh --spec")
.with_env_variable("AIRBYTE_CHECK_CMD", "/airbyte/javabase.sh --check")
.with_env_variable("AIRBYTE_DISCOVER_CMD", "/airbyte/javabase.sh --discover")
.with_env_variable("AIRBYTE_READ_CMD", "/airbyte/javabase.sh --read")
.with_env_variable("AIRBYTE_WRITE_CMD", "/airbyte/javabase.sh --write")
.with_env_variable("AIRBYTE_ENTRYPOINT", "/airbyte/base.sh")
.with_label("io.airbyte.version", java_base_version)
.with_label("io.airbyte.version", "0.1.2")
.with_label("io.airbyte.name", "airbyte/integration-base-java")
)


async def with_airbyte_java_connector(context: ConnectorContext, connector_java_tar_file: File, build_platform: str):
BASE_DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = {
"destination-clickhouse": "clickhouse.Dockerfile",
"destination-duckdb": "duckdb.Dockerfile",
"destination-mssql": "mssql.Dockerfile",
"destination-mysql": "mysql.Dockerfile",
"destination-oracle": "oracle.Dockerfile",
"destination-tidb": "tidb.Dockerfile",
"destination-bigquery": "Dockerfile",
"destination-redshift": "redshift.Dockerfile",
"destination-snowflake": "snowflake.Dockerfile",
}

BASE_DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING = {
"destination-clickhouse": "dbt-clickhouse>=1.4.0",
"destination-duckdb": "duckdb.Dockerfile",
"destination-mssql": "dbt-sqlserver==1.0.0",
"destination-mysql": "dbt-mysql==1.0.0",
"destination-oracle": "dbt-oracle==0.4.3",
"destination-tidb": "dbt-tidb==1.0.1",
"destination-bigquery": "dbt-bigquery==1.0.0",
}

DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = {
**BASE_DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING,
**{f"{k}-strict-encrypt": v for k, v in BASE_DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.items()},
}

DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING = {
**BASE_DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING,
**{f"{k}-strict-encrypt": v for k, v in BASE_DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING.items()},
}


def with_normalization(context: ConnectorContext) -> Container:
normalization_directory = context.get_repo_dir("airbyte-integrations/bases/base-normalization")
sshtunneling_file = context.get_repo_dir(
"airbyte-connector-test-harnesses/acceptance-test-harness/src/main/resources", include="sshtunneling.sh"
).file("sshtunneling.sh")
normalization_directory_with_build = normalization_directory.with_new_directory("build")
normalization_directory_with_sshtunneling = normalization_directory_with_build.with_file("build/sshtunneling.sh", sshtunneling_file)
normalization_dockerfile_name = DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.get(
context.connector.technical_name, "Dockerfile"
)
return normalization_directory_with_sshtunneling.docker_build(normalization_dockerfile_name)


def with_integration_base_java_and_normalization(context: PipelineContext, build_platform: Platform) -> Container:
yum_packages_to_install = [
"python3",
"python3-devel",
"jq",
"sshpass",
"git",
]

dbt_adapter_package = DESTINATION_SPECIFIC_NORMALIZATION_ADAPTER_MAPPING.get(context.connector.technical_name, "dbt-bigquery==1.0.0")

pip_cache: CacheVolume = context.dagger_client.cache_volume("pip_cache")

return (
with_integration_base_java(context, build_platform)
.with_exec(["yum", "install", "-y"] + yum_packages_to_install)
.with_exec(["alternatives", "--install", "/usr/bin/python", "python", "/usr/bin/python3", "60"])
.with_mounted_cache("/root/.cache/pip", pip_cache)
.with_exec(["python", "-m", "ensurepip", "--upgrade"])
.with_exec(["pip3", "install", dbt_adapter_package])
.with_directory("airbyte_normalization", with_normalization(context).directory("/airbyte"))
.with_workdir("airbyte_normalization")
.with_exec(["sh", "-c", "mv * .."])
.with_workdir("/airbyte")
.with_exec(["rm", "-rf", "airbyte_normalization"])
.with_workdir("/airbyte/base_python_structs")
.with_exec(["pip3", "install", "."])
.with_workdir("/airbyte/normalization_code")
.with_exec(["pip3", "install", "."])
.with_workdir("/airbyte/normalization_code/dbt-template/")
.with_exec(["dbt", "deps"])
.with_workdir("/airbyte")
)


async def with_airbyte_java_connector(context: ConnectorContext, connector_java_tar_file: File, build_platform: Platform):
dockerfile = context.get_connector_dir(include=["Dockerfile"]).file("Dockerfile")
# TODO find a way to infer this without the Dockerfile. From metadata.yaml?
enable_sentry = await should_enable_sentry(dockerfile)
connector_version = await get_version_from_dockerfile(dockerfile)

application = context.connector.technical_name
java_base = await with_java_base(context, build_platform)
enable_sentry = await should_enable_sentry(dockerfile)

return (
java_base.with_workdir("/airbyte")
.with_env_variable("APPLICATION", application)
build_stage = (
with_integration_base_java(context, build_platform)
.with_workdir("/airbyte")
.with_env_variable("APPLICATION", context.connector.technical_name)
.with_file(f"{application}.tar", connector_java_tar_file)
.with_exec(["tar", "xf", f"{application}.tar", "--strip-components=1"])
.with_exec(["rm", "-rf", f"{application}.tar"])
.with_label("io.airbyte.version", connector_version)
.with_label("io.airbyte.name", f"airbyte/{application}")
.with_env_variable("ENABLE_SENTRY", str(enable_sentry).lower())
.with_entrypoint("/airbyte/base.sh")
)

if context.connector.supports_normalization:
base = with_integration_base_java_and_normalization(context, build_platform)
else:
base = with_integration_base_java(context, build_platform)

def with_normalization(context, normalization_dockerfile_name: str) -> Container:
normalization_directory = context.get_repo_dir("airbyte-integrations/bases/base-normalization")
sshtunneling_file = context.get_repo_dir(
"airbyte-connector-test-harnesses/acceptance-test-harness/src/main/resources", include="sshtunneling.sh"
).file("sshtunneling.sh")
normalization_directory_with_build = normalization_directory.with_new_directory("build")
normalization_directory_with_sshtunneling = normalization_directory_with_build.with_file("build/sshtunneling.sh", sshtunneling_file)
return normalization_directory_with_sshtunneling.docker_build(normalization_dockerfile_name)
return (
base.with_workdir("/airbyte")
.with_env_variable("APPLICATION", application)
.with_env_variable("ENABLE_SENTRY", str(enable_sentry).lower())
.with_directory("builts_artifacts", build_stage.directory("/airbyte"))
.with_exec(["sh", "-c", "mv builts_artifacts/* ."])
.with_exec(["rm", "-rf", "builts_artifacts"])
# TODO get version from metadata
.with_label("io.airbyte.version", connector_version)
.with_label("io.airbyte.name", f"airbyte/{application}")
.with_entrypoint(["/airbyte/base.sh"])
)
6 changes: 2 additions & 4 deletions tools/ci_connector_ops/ci_connector_ops/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,11 @@ def from_exit_code(exit_code: int) -> StepStatus:
"""
if exit_code == 0:
return StepStatus.SUCCESS
if exit_code == 1:
return StepStatus.FAILURE
# pytest returns a 5 exit code when no test is found.
if exit_code == 5:
elif exit_code == 5:
return StepStatus.SKIPPED
else:
raise ValueError(f"No step status is mapped to exit code {exit_code}")
return StepStatus.FAILURE

def get_rich_style(self) -> Style:
"""Match color used in the console output to the step status."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def image_name(self) -> Tuple:
async def _run(self) -> StepResult:
_, exported_tarball_path = await export_container_to_tarball(self.context, self.container)
client = docker.from_env()
with open(exported_tarball_path, "rb") as tarball_content:
new_image = client.images.load(tarball_content.read())[0]
new_image.tag(self.image_name, tag=self.IMAGE_TAG)
return StepResult(self, StepStatus.SUCCESS)
try:
with open(exported_tarball_path, "rb") as tarball_content:
new_image = client.images.load(tarball_content.read())[0]
new_image.tag(self.image_name, tag=self.IMAGE_TAG)
return StepResult(self, StepStatus.SUCCESS)
except ConnectionError:
return StepResult(self, StepStatus.FAILURE, stderr="The connection to the local docker host failed.")
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,14 @@
from ci_connector_ops.pipelines.actions import environments
from ci_connector_ops.pipelines.bases import Step, StepResult, StepStatus
from ci_connector_ops.pipelines.contexts import ConnectorContext
from ci_connector_ops.utils import Connector
from dagger import Container


# TODO this class could be deleted
# if java connectors tests are not relying on an existing local normalization image to run
class BuildOrPullNormalization(Step):
"""A step to build or pull the normalization image for a connector according to the image name."""

DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING = {
Connector("destination-clickhouse"): "clickhouse.Dockerfile",
Connector("destination-duckdb"): "duckdb.Dockerfile",
Connector("destination-mssql"): "mssql.Dockerfile",
Connector("destination-mysql"): "mysql.Dockerfile",
Connector("destination-oracle"): "oracle.Dockerfile",
Connector("destination-redshift"): "redshift.Dockerfile",
Connector("destination-snowflake"): "snowflake.Dockerfile",
Connector("destination-tidb"): "tidb.Dockerfile",
}

def __init__(self, context: ConnectorContext, normalization_image: str) -> None:
"""Initialize the step to build or pull the normalization image.
Expand All @@ -34,12 +24,11 @@ def __init__(self, context: ConnectorContext, normalization_image: str) -> None:
super().__init__(context)
self.use_dev_normalization = normalization_image.endswith(":dev")
self.normalization_image = normalization_image
self.normalization_dockerfile = self.DESTINATION_SPECIFIC_NORMALIZATION_DOCKERFILE_MAPPING.get(context.connector, "Dockerfile")
self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}"

async def _run(self) -> Tuple[StepResult, Container]:
if self.use_dev_normalization:
build_normalization_container = environments.with_normalization(self.context, self.normalization_dockerfile)
build_normalization_container = environments.with_normalization(self.context)
else:
build_normalization_container = self.context.dagger_client.container().from_(self.normalization_image)
return StepResult(self, StepStatus.SUCCESS), build_normalization_container

0 comments on commit 6660432

Please sign in to comment.