Skip to content

Commit

Permalink
Merge remote-tracking branch 'StevenReitsma/feature/clickhouse-cluste…
Browse files Browse the repository at this point in the history
…r-support'
  • Loading branch information
marlon-costa-dc committed Oct 19, 2024
2 parents 7b70a70 + 789b977 commit ad43637
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TransformCatalog:
To run this transformation:
```
python3 main_dev_transform_catalog.py \
--integration-type <postgres|bigquery|redshift|snowflake>
--integration-type <postgres|bigquery|redshift|snowflake|clickhouse>
--profile-config-dir . \
--catalog integration_tests/catalog.json \
--out dir \
Expand Down Expand Up @@ -53,6 +53,10 @@ def parse(self, args) -> None:
"profile_config_dir": parsed_args.profile_config_dir,
}

# Add support for Clickhouse Replicated* Engine for self-hosted-cluster
if parsed_args.integration_type == "clickhouse":
self.config["engine"] = profiles_yml.get("engine")

def process_catalog(self) -> None:
destination_type = DestinationType.from_string(self.config["integration_type"])
schema = self.config["schema"]
Expand All @@ -68,6 +72,10 @@ def update_dbt_project_vars(self, **vars_config: Dict[str, Any]):
filename = os.path.join(self.config["profile_config_dir"], self.DBT_PROJECT)
config = read_yaml_config(filename)
config["vars"] = {**config.get("vars", {}), **vars_config}

if self.config.get("engine"):
config["models"]["airbyte_utils"]["+engine"] = self.config.get("engine")

write_yaml_config(config, filename)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ def transform_mssql(config: Dict[str, Any]):
@staticmethod
def transform_clickhouse(config: Dict[str, Any]):
print("transform_clickhouse")

deploy_type = config.get("deploy_type", {})
engine = config.get("engine", "MergeTree")
is_cluster_mode = deploy_type.get("deploy_type") == "self-hosted-cluster"
cluster = ""
if is_cluster_mode:
cluster = deploy_type.get("cluster", "")
if deploy_type.get("replication", False):
engine = f"Replicated{engine}"

# https://docs.getdbt.com/reference/warehouse-profiles/clickhouse-profile
dbt_config = {
"type": "clickhouse",
Expand All @@ -325,6 +335,9 @@ def transform_clickhouse(config: Dict[str, Any]):
"port": config["port"],
"schema": config["database"],
"user": config["username"],
"cluster": cluster,
"cluster_mode": is_cluster_mode,
"engine": engine,
}
if "password" in config:
dbt_config["password"] = config["password"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,20 @@ def test_transform_mssql(self):
assert extract_schema(actual) == "my_db"

def test_transform_clickhouse(self):
input = {"host": "airbyte.io", "port": 9440, "database": "default", "username": "ch", "password": "password1234", "ssl": True}
input = {
"host": "airbyte.io",
"port": 9440,
"database": "default",
"username": "ch",
"password": "password1234",
"ssl": True,
"deploy_type": {
"deploy_type": "self-hosted-cluster",
"cluster": "my-cluster",
"replication": True
},
"engine": "MergeTree"
}

actual = TransformConfig().transform_clickhouse(input)
expected = {
Expand All @@ -448,6 +461,9 @@ def test_transform_clickhouse(self):
"user": "ch",
"password": "password1234",
"secure": True,
"cluster_mode": True,
"cluster": "my-cluster",
"engine": "ReplicatedMergeTree"
}

assert actual == expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ application {
dependencies {

implementation 'com.clickhouse:clickhouse-jdbc:0.3.2-patch10:all'
implementation 'org.apache.commons:commons-text:1.10.0'

// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
testImplementation 'org.testcontainers:clickhouse:1.19.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,17 @@ data:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with clickhouse. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
This version removes the option to use "normalization" with clickhouse. It also changes the schema and database of Airbyte's "raw" tables to be compatible with the new [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) format. These changes will likely require updates to downstream dbt / SQL models. Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse
supportsDbt: false
tags:
- language:java
- language:java
ab_internal:
sl: 100
ql: 200
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.dbs.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
Expand All @@ -19,11 +19,29 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.JdbcBufferedConsumerFactory;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,10 +54,17 @@ public class ClickhouseDestination extends AbstractJdbcDestination implements De

public static final String HTTPS_PROTOCOL = "https";
public static final String HTTP_PROTOCOL = "http";
public static final String SSL_VERIFICATION_METHOD_KEY = "ssl_mode";
public static final String DEFAULT_SSL_VERIFICATION_METHOD = "none";
public static final String CA_CERT_FILENAME = "ca.crt";

// Create an extra SqlOperations object because the one in the superclass is
// private and we need to access it
private final ClickhouseSqlOperations configurableSqlOperations;

static final List<String> SSL_PARAMETERS = ImmutableList.of(
"socket_timeout=3000000",
"sslmode=NONE");
"ssl=true");
static final List<String> DEFAULT_PARAMETERS = ImmutableList.of(
"socket_timeout=3000000");

Expand All @@ -49,11 +74,32 @@ public static Destination sshWrappedDestination() {

public ClickhouseDestination() {
super(DRIVER_CLASS, new ClickhouseSQLNameTransformer(), new ClickhouseSqlOperations());
configurableSqlOperations = new ClickhouseSqlOperations();
}

@Override
protected SqlOperations getSqlOperations() {
return configurableSqlOperations;
}

private static void createCertificateFile(String fileName, String fileValue) throws IOException {
try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) {
out.print(fileValue);
}
}

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final boolean isSsl = JdbcUtils.useSsl(config);
final String sslVerificationMethod = config.has(SSL_VERIFICATION_METHOD_KEY)
&& config.get(SSL_VERIFICATION_METHOD_KEY).has("mode")
? config.get(SSL_VERIFICATION_METHOD_KEY).get("mode").asText()
: DEFAULT_SSL_VERIFICATION_METHOD;
final String caCertificate = config.has(SSL_VERIFICATION_METHOD_KEY)
&& config.get(SSL_VERIFICATION_METHOD_KEY).has("ca_certificate")
? config.get(SSL_VERIFICATION_METHOD_KEY).get("ca_certificate").asText()
: null;

final StringBuilder jdbcUrl = new StringBuilder(
String.format(DatabaseDriver.CLICKHOUSE.getUrlFormatString(),
isSsl ? HTTPS_PROTOCOL : HTTP_PROTOCOL,
Expand All @@ -63,6 +109,16 @@ public JsonNode toJdbcConfig(final JsonNode config) {

if (isSsl) {
jdbcUrl.append("?").append(String.join("&", SSL_PARAMETERS));
jdbcUrl.append(String.format("&sslmode=%s", sslVerificationMethod));

if (sslVerificationMethod.equals("strict") && caCertificate != null && !caCertificate.equals("")) {
try {
createCertificateFile(CA_CERT_FILENAME, caCertificate);
} catch (final IOException e) {
throw new RuntimeException("Failed to create encryption file");
}
jdbcUrl.append(String.format("&sslrootcert=%s", CA_CERT_FILENAME));
}
} else {
jdbcUrl.append("?").append(String.join("&", DEFAULT_PARAMETERS));
}
Expand All @@ -89,7 +145,8 @@ public AirbyteConnectionStatus check(final JsonNode config) {
final JdbcDatabase database = getDatabase(dataSource);
final NamingConventionTransformer namingResolver = getNamingResolver();
final String outputSchema = namingResolver.getIdentifier(config.get(JdbcUtils.DATABASE_KEY).asText());
attemptTableOperations(outputSchema, database, namingResolver, getSqlOperations(), false);
configurableSqlOperations.setConfig(ClickhouseDestinationConfig.get(config));
attemptTableOperations(outputSchema, database, namingResolver, configurableSqlOperations, false);
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
Expand Down Expand Up @@ -130,6 +187,16 @@ public boolean isV2Destination() {
@Override
protected String getConfigSchemaKey() {
return "database";


@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
configurableSqlOperations.setConfig(ClickhouseDestinationConfig.get(config));
return JdbcBufferedConsumerFactory.create(outputRecordCollector, getDatabase(getDataSource(config)),
configurableSqlOperations, getNamingResolver(), config,
catalog);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.clickhouse;

import com.fasterxml.jackson.databind.JsonNode;

// This configuration class does not include the default JDBC configuration parameters.
public record ClickhouseDestinationConfig(String engine,
ClickhouseDestinationDeployTypeConfig deploy_config) {

public final static String DEFAULT_ENGINE = "MergeTree";

public static ClickhouseDestinationConfig get(final JsonNode config) {
return new ClickhouseDestinationConfig(
config.has("engine") ? config.get("engine").asText() : DEFAULT_ENGINE,
config.has("deploy_type") ? ClickhouseDestinationDeployTypeConfig.get(config.get("deploy_type"))
: ClickhouseDestinationDeployTypeConfig.defaultConfig());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.clickhouse;

import com.fasterxml.jackson.databind.JsonNode;

public record ClickhouseDestinationDeployTypeConfig(String type, String cluster, boolean replication) {

public static final String DEFAULT_DEPLOY_TYPE = "clickhouse-cloud";
public static final String DEFAULT_CLUSTER_NAME = "default";
public static final boolean DEFAULT_REPLICATION = false;

public static ClickhouseDestinationDeployTypeConfig get(final JsonNode config) {
return new ClickhouseDestinationDeployTypeConfig(
config.has("deploy_type") ? config.get("deploy_type").asText() : DEFAULT_DEPLOY_TYPE,
config.has("cluster") ? config.get("cluster").asText() : DEFAULT_CLUSTER_NAME,
config.has("replication") ? config.get("replication").asBoolean() : DEFAULT_REPLICATION);
}

public static ClickhouseDestinationDeployTypeConfig defaultConfig() {
return new ClickhouseDestinationDeployTypeConfig(DEFAULT_DEPLOY_TYPE, DEFAULT_CLUSTER_NAME,
DEFAULT_REPLICATION);
}

}
Loading

0 comments on commit ad43637

Please sign in to comment.