Skip to content

Commit

Permalink
Add test with jdbc catalog (#469)
Browse files Browse the repository at this point in the history
* Add jdbc catalog test

* Add jdbc catalog test

* Add jdbc catalog test

* Add jdbc catalog test
  • Loading branch information
ismailsimsek authored Jan 3, 2025
1 parent 83afc6a commit 6e8909b
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static Optional<Table> loadIcebergTable(Catalog icebergCatalog, TableIden
Table table = icebergCatalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("Table not found: {}", tableId.toString());
LOGGER.debug("Table not found: {}", tableId.toString());
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import com.google.common.collect.Lists;
import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.CatalogJdbc;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import java.time.Duration;

/**
* Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination.
*
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
public class IcebergChangeConsumerJdbcCatalogTest extends BaseTest {

@Test
public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> result = getTableDataV2("testc.inventory.customers");
return Lists.newArrayList(result).size() >= 3;
} catch (Exception e) {
return false;
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ public class TestConfigSource implements ConfigSource {

public static final String S3_REGION = "us-east-1";
public static final String S3_BUCKET_NAME = "test-bucket";
public static final String CATALOG_TABLE_NAMESPACE = "debeziumevents";
public static final String S3_MINIO_ACCESS_KEY = "admin";
public static final String S3_MINIO_SECRET_KEY = "12345678";
public static final String ICEBERG_CATALOG_TABLE_NAMESPACE = "debeziumevents";
public static final String ICEBERG_CATALOG_NAME = "iceberg";
public static final String ICEBERG_CATALOG_FILEIO = "org.apache.iceberg.aws.s3.S3FileIO";
public static final String S3_BUCKET = "s3a://" + S3_BUCKET_NAME + "/iceberg_warehouse";
public static final String ICEBERG_FILEIO = "org.apache.iceberg.aws.s3.S3FileIO";
public static final String ICEBERG_WAREHOUSE_S3A = "s3a://" + S3_BUCKET_NAME + "/iceberg_warehouse";
protected Map<String, String> config = new HashMap<>();


Expand All @@ -34,7 +36,7 @@ public TestConfigSource() {
config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");

// iceberg config
config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);
config.put("debezium.sink.iceberg.warehouse", ICEBERG_WAREHOUSE_S3A);

// ==== configure batch behaviour/size ====
// Positive integer value that specifies the maximum size of each batch of events that should be processed during
Expand All @@ -45,7 +47,7 @@ public TestConfigSource() {
config.put("debezium.source.poll.interval.ms", "10000"); // 5 seconds!
// iceberg
config.put("debezium.sink.iceberg.table-prefix", "debeziumcdc_");
config.put("debezium.sink.iceberg.table-namespace", CATALOG_TABLE_NAMESPACE);
config.put("debezium.sink.iceberg.table-namespace", ICEBERG_CATALOG_TABLE_NAMESPACE);
config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME);
// use hadoop catalog for tests
config.put("debezium.sink.iceberg.type", "hadoop");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.HashMap;
import java.util.Map;

import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.TestConfigSource.ICEBERG_CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -91,7 +91,7 @@ public void testGetSet() throws Exception {
assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key"))));
Assertions.assertNull(values.get(toByteBuffer("bad")));

CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of(CATALOG_TABLE_NAMESPACE, "debezium_offset_storage"));
CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of(ICEBERG_CATALOG_TABLE_NAMESPACE, "debezium_offset_storage"));
Assertions.assertEquals(1, Lists.newArrayList(d).size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.HashMap;
import java.util.Map;

import static io.debezium.server.iceberg.TestConfigSource.CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;
import static io.debezium.server.iceberg.TestConfigSource.ICEBERG_CATALOG_TABLE_NAMESPACE;
import static io.debezium.server.iceberg.TestConfigSource.ICEBERG_WAREHOUSE_S3A;

/**
* Integration test that uses spark to consumer data is consumed.
Expand All @@ -48,17 +48,17 @@ static void setup() {
.set("spark.eventLog.enabled", "false")
// enable iceberg SQL Extensions and Catalog
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.warehouse.dir", S3_BUCKET)
.set("spark.sql.warehouse.dir", ICEBERG_WAREHOUSE_S3A)
// hadoop catalog
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", S3_BUCKET)
.set("spark.sql.catalog.spark_catalog.default-namespaces", CATALOG_TABLE_NAMESPACE)
.set("spark.sql.catalog.spark_catalog.io-impl", TestConfigSource.ICEBERG_CATALOG_FILEIO)
.set("spark.sql.catalog.spark_catalog.warehouse", ICEBERG_WAREHOUSE_S3A)
.set("spark.sql.catalog.spark_catalog.default-namespaces", ICEBERG_CATALOG_TABLE_NAMESPACE)
.set("spark.sql.catalog.spark_catalog.io-impl", TestConfigSource.ICEBERG_FILEIO)
.set("spark.sql.catalog.spark_catalog.s3.endpoint", S3Minio.container.getS3URL())
.set("spark.sql.catalog.spark_catalog.s3.path-style-access", "true")
.set("spark.sql.catalog.spark_catalog.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY)
.set("spark.sql.catalog.spark_catalog.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY)
.set("spark.sql.catalog.spark_catalog.s3.access-key-id", TestConfigSource.S3_MINIO_ACCESS_KEY)
.set("spark.sql.catalog.spark_catalog.s3.secret-access-key", TestConfigSource.S3_MINIO_SECRET_KEY)
.set("spark.sql.catalog.spark_catalog.client.region", TestConfigSource.S3_REGION)
;

Expand Down Expand Up @@ -140,7 +140,7 @@ protected HadoopCatalog getIcebergCatalog() {
}

public Dataset<Row> getTableData(String table) {
table = CATALOG_TABLE_NAMESPACE + ".debeziumcdc_" + table.replace(".", "_");
table = ICEBERG_CATALOG_TABLE_NAMESPACE + ".debeziumcdc_" + table.replace(".", "_");
return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM " + table);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class BaseTest {

@Inject
IcebergChangeConsumer consumer;
public IcebergChangeConsumer consumer;

public CloseableIterable<Record> getTableDataV2(String table) {
return getTableDataV2("debeziumevents", table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.iceberg.jdbc.JdbcCatalog;
import org.testcontainers.containers.MySQLContainer;

public class JdbcCatalogDB implements QuarkusTestResourceLifecycleManager {
public static MySQLContainer<?> container = new MySQLContainer<>();
public class CatalogJdbc implements QuarkusTestResourceLifecycleManager {
public static final String ICEBERG_CATALOG_TABLE_NAMESPACE = "debeziumevents";
public static final String ICEBERG_CATALOG_NAME = "iceberg";
public static final MySQLContainer<?> container = new MySQLContainer<>("mysql:8");

@Override
public Map<String, String> start() {
container.start();

Map<String, String> config = new ConcurrentHashMap<>();

config.put("debezium.sink.iceberg.catalog-impl", JdbcCatalog.class.getName());
config.put("debezium.sink.iceberg.type", "jdbc");
config.put("debezium.sink.iceberg.uri", container.getJdbcUrl());
config.put("debezium.sink.iceberg.jdbc.user", container.getUsername());
config.put("debezium.sink.iceberg.jdbc.password", container.getPassword());
config.put("debezium.sink.iceberg.table-namespace", ICEBERG_CATALOG_TABLE_NAMESPACE);
config.put("debezium.sink.iceberg.catalog-name", ICEBERG_CATALOG_NAME);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,15 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET;
import static io.debezium.server.iceberg.TestConfigSource.S3_BUCKET_NAME;

public class S3Minio implements QuarkusTestResourceLifecycleManager {

public static final String MINIO_ACCESS_KEY = "admin";
public static final String MINIO_SECRET_KEY = "12345678";
protected static final Logger LOGGER = LoggerFactory.getLogger(S3Minio.class);
static final String DEFAULT_IMAGE = "minio/minio:latest";
public static MinioClient client;

static public final MinIOContainer container = new MinIOContainer(DockerImageName.parse(DEFAULT_IMAGE))
.withUserName(MINIO_ACCESS_KEY)
.withPassword(MINIO_SECRET_KEY);
.withUserName(TestConfigSource.S3_MINIO_ACCESS_KEY)
.withPassword(TestConfigSource.S3_MINIO_SECRET_KEY);

public static void listFiles() {
LOGGER.info("-----------------------------------------------------------------");
Expand Down Expand Up @@ -77,7 +72,7 @@ public Map<String, String> start() {
client.ignoreCertCheck();
client.makeBucket(MakeBucketArgs.builder()
.region(TestConfigSource.S3_REGION)
.bucket(S3_BUCKET_NAME)
.bucket(TestConfigSource.S3_BUCKET_NAME)
.build());
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -87,17 +82,16 @@ public Map<String, String> start() {
// FOR JDBC CATALOG
config.put("debezium.sink.iceberg.s3.endpoint", container.getS3URL());
config.put("debezium.sink.iceberg.s3.path-style-access", "true");
config.put("debezium.sink.iceberg.s3.access-key-id", S3Minio.MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.s3.secret-access-key", S3Minio.MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.s3.access-key-id", TestConfigSource.S3_MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.s3.secret-access-key", TestConfigSource.S3_MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.client.region", TestConfigSource.S3_REGION);
config.put("debezium.sink.iceberg.io-impl", TestConfigSource.ICEBERG_CATALOG_FILEIO);
config.put("debezium.sink.iceberg.warehouse", S3_BUCKET);
config.put("debezium.sink.iceberg.io-impl", TestConfigSource.ICEBERG_FILEIO);
// FOR HADOOP CATALOG
config.put("debezium.sink.iceberg.fs.s3a.endpoint", container.getS3URL());
config.put("debezium.sink.iceberg.fs.s3a.access.key", S3Minio.MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.fs.s3a.secret.key", S3Minio.MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.fs.s3a.access.key", TestConfigSource.S3_MINIO_ACCESS_KEY);
config.put("debezium.sink.iceberg.fs.s3a.secret.key", TestConfigSource.S3_MINIO_SECRET_KEY);
config.put("debezium.sink.iceberg.fs.s3a.path.style.access", "true");
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + S3_BUCKET_NAME);
config.put("debezium.sink.iceberg.fs.defaultFS", "s3a://" + TestConfigSource.S3_BUCKET_NAME);

return config;
}
Expand Down

0 comments on commit 6e8909b

Please sign in to comment.