Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate CDC data from MySQL/Postgres/MongoDb data source #3313

Merged
merged 30 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7ac1e3c
Add KafkaClusterExtensionWithConfig plugin
wanghd89 Sep 4, 2023
3b4e25e
add unit tests to kafka cluster config extension
Sep 4, 2023
7286de8
integrate with kafka connect
Sep 4, 2023
5e373ed
Modify Config, waiting for final confirm of Config files' change
Sep 5, 2023
587a3d5
update config to make mysql/postgres/mongodb as source
Sep 6, 2023
eabc079
add ExtractNewRecordState transforms to simplify the CDC data
Sep 7, 2023
e4d77d7
fix mysql postgres field
Sep 7, 2023
cd8ce61
update readme
Sep 7, 2023
74102d2
add missing unit test for PluginMetrics.
Sep 8, 2023
2c0adf1
fix unit test and add snapshot_fetch_size for mongoDB to increase sna…
Sep 11, 2023
8219819
address PR comments to use Duration
Sep 12, 2023
5f7f502
fix checkstyle
Sep 12, 2023
27d1693
rename config and exclude zookeeper according to PR review
Sep 14, 2023
f5b66c9
Merge remote-tracking branch 'upstream/main' into feat-cdc
Sep 15, 2023
71ed291
expose connector_rebalance_max_delay config to customers. The default…
Sep 15, 2023
dc9ada6
address comments from PR
Sep 18, 2023
fcd1708
Merge branch 'main' of github.com:opensearch-project/data-prepper int…
Sep 28, 2023
1b20224
fix merge conflicts
Sep 28, 2023
74f0f1d
minimize affects the jetty-boom version
Sep 28, 2023
af526c9
revert the change
Sep 28, 2023
b6d1ad0
Merge remote-tracking branch 'upstream/main' into feat-cdc
Oct 7, 2023
926dd1a
merge mainline into feat-cdc-kafkaconnect branch. Resolved conflicts
Oct 7, 2023
45d97ae
resolve jetty CVE check by using latest 9.x version. revert using pla…
Oct 7, 2023
401acb4
downgrade scoped Jetty dependency to use 9.4.52 version for Kafka-Con…
Oct 7, 2023
3dcb858
remove kotlin-compiler to fix service_map plugin
Oct 9, 2023
845949e
Merge remote-tracking branch 'upstream/main' into feat-cdc
Oct 9, 2023
0323ded
fix checkstyle caused by KK
Oct 10, 2023
8f51c45
Merge remote-tracking branch 'upstream/main' into feat-cdc
Oct 11, 2023
1a1dff5
fix CVE
Oct 11, 2023
a9ebf15
update CVE to use latest jetty9.4.53 version
Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ subprojects {
}
implementation('org.eclipse.jetty:jetty-http') {
version {
require '11.0.15'
require '9.4.51.v20230217'
wanghd89 marked this conversation as resolved.
Show resolved Hide resolved
}
because 'CVE from transitive dependencies'
}
implementation('org.eclipse.jetty:jetty-server') {
version {
require '11.0.15'
require '9.4.51.v20230217'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we exclude Jetty from the Debezium project instead?

}
because 'CVE from transitive dependencies'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

Expand Down Expand Up @@ -78,8 +79,11 @@ public <T> T gauge(String name, T obj, ToDoubleFunction<T> valueFunction) {
return Metrics.gauge(getMeterName(name), obj, valueFunction);
}

public <T> T gaugeWithTags(String name, Iterable<Tag> tags, T obj, ToDoubleFunction<T> valueFunction) {
return Metrics.gauge(getMeterName(name), tags, obj, valueFunction);
}

private String getMeterName(final String name) {
return new StringJoiner(MetricNames.DELIMITER).add(metricsPrefix).add(name).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -125,6 +126,18 @@ public void testReferenceGauge() {
assertEquals(3, gauge.length());
}

@Test
public void testReferenceGaugeWithTags() {
final String testString = "abc";
final String gauge = objectUnderTest.gaugeWithTags("gauge", emptyList(), testString, String::length);
assertNotNull(
Metrics.globalRegistry.get(new StringJoiner(MetricNames.DELIMITER)
.add(PIPELINE_NAME).add(PLUGIN_NAME)
.add("gauge").toString()).meter());
assertEquals(3, gauge.length());
}


@Test
public void testEmptyPipelineName() {
assertThrows(
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
testImplementation project(':data-prepper-plugins:common').sourceSets.test.output
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation "org.reflections:reflections:0.10.2"
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
implementation 'io.micrometer:micrometer-registry-prometheus'
implementation 'io.micrometer:micrometer-registry-cloudwatch2'
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
implementation libs.commons.lang3
implementation libs.bouncycastle.bcprov
implementation libs.bouncycastle.bcpkix
implementation 'org.reflections:reflections:0.10.2'
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
testImplementation testLibs.junit.vintage
implementation 'org.apache.parquet:parquet-common:1.13.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.Sink;

import java.util.Collection;

@DataPrepperPlugin(name = "noop", pluginType = Sink.class)
public class NoopSink implements Sink<Record<Object>> {
@Override
public void output(Collection<Record<Object>> records) {
// empty by design.
}

@Override
public void shutdown() {
// empty by design.
}

@Override
public void initialize() {
// empty by design.
}

@Override
public boolean isReady() {
return true;
}
}
278 changes: 278 additions & 0 deletions data-prepper-plugins/kafka-connect-plugins/README.md

Large diffs are not rendered by default.

68 changes: 68 additions & 0 deletions data-prepper-plugins/kafka-connect-plugins/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:kafka-plugins')
implementation 'org.apache.kafka:connect-runtime:3.5.1'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:secretsmanager'
implementation 'javax.validation:validation-api:2.0.1.Final'
implementation libs.reflections.core
implementation 'io.micrometer:micrometer-core'
implementation 'org.eclipse.jetty:jetty-server:9.4.51.v20230217'
implementation 'org.eclipse.jetty:jetty-servlet:9.4.51.v20230217'
implementation 'org.eclipse.jetty:jetty-servlets:9.4.51.v20230217'
implementation 'org.eclipse.jetty:jetty-client:9.4.51.v20230217'
implementation ('io.confluent:kafka-schema-registry:7.5.0') {
exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet'
exclude group: 'org.glassfish.jersey.inject', module: 'jersey-hk2'
exclude group: 'org.glassfish.jersey.ext', module: 'jersey-bean-validation'
}
runtimeOnly 'org.jetbrains.kotlin:kotlin-compiler:1.8.21'
wanghd89 marked this conversation as resolved.
Show resolved Hide resolved
// Common Debezium Connector
implementation 'io.debezium:debezium-api:2.3.0.Final'
implementation 'io.debezium:debezium-core:2.3.0.Final'
wanghd89 marked this conversation as resolved.
Show resolved Hide resolved
implementation 'io.debezium:debezium-storage-kafka:2.3.0.Final'
implementation 'io.debezium:debezium-storage-file:2.3.0.Final'
// Debezium MySQL Connector
implementation 'org.antlr:antlr4-runtime:4.10.1'
implementation 'io.debezium:debezium-connector-mysql:2.3.0.Final'
implementation 'io.debezium:debezium-ddl-parser:2.3.0.Final'
implementation 'com.zendesk:mysql-binlog-connector-java:0.28.1'
implementation 'com.mysql:mysql-connector-j:8.0.33'
implementation 'com.github.luben:zstd-jni:1.5.0-2'
// Debezium Postgres connector
implementation 'io.debezium:debezium-connector-postgres:2.3.0.Final'
implementation 'org.postgresql:postgresql:42.5.1'
implementation 'com.google.protobuf:protobuf-java:3.19.6'
// Debezium Mongodb connector
implementation 'io.debezium:debezium-connector-mongodb:2.3.0.Final'
implementation 'org.mongodb:mongodb-driver-core:4.7.1'
implementation 'org.mongodb:mongodb-driver-sync:4.7.1'
implementation 'org.mongodb:bson:4.7.1'
runtimeOnly 'org.mongodb:bson-record-codec:4.7.1'
// test
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-core')
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
testImplementation testLibs.mockito.inline
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafkaconnect.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.kafkaconnect.util.Connector;

import java.util.List;
import java.util.Properties;

public abstract class ConnectorConfig {
@JsonProperty("force_update")
public Boolean forceUpdate = false;
private String bootstrapServers;
wanghd89 marked this conversation as resolved.
Show resolved Hide resolved
private Properties authProperties;

public abstract List<Connector> buildConnectors();

public Properties getAuthProperties() {
return this.authProperties;
}

public void setAuthProperties(Properties authProperties) {
this.authProperties = authProperties;
}

public String getBootstrapServers() {
return this.bootstrapServers;
}

public void setBootstrapServers(String bootStrapServers) {
this.bootstrapServers = bootStrapServers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafkaconnect.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.plugins.kafkaconnect.util.SecretManagerHelper;

import java.util.Map;

public class CredentialsConfig {
private final String username;
private final String password;

@JsonCreator
public CredentialsConfig(@JsonProperty("plaintext") final PlainText plainText,
@JsonProperty("secret_manager") final SecretManager secretManager) {
if (plainText != null && secretManager != null) {
throw new IllegalArgumentException("plaintext and secret_manager cannot both be set");
}
if (plainText != null) {
if (plainText.username == null || plainText.password == null) {
throw new IllegalArgumentException("user and password must be set for plaintext credentials");
}
this.username = plainText.username;
this.password = plainText.password;
} else if (secretManager != null) {
if (secretManager.secretId == null || secretManager.region == null) {
throw new IllegalArgumentException("secretId and region must be set for aws credential type");
}
final Map<String, String> secretMap = this.getSecretValueMap(secretManager.stsRoleArn, secretManager.region, secretManager.secretId);
if (!secretMap.containsKey("username") || !secretMap.containsKey("password")) {
throw new RuntimeException("username or password missing in secret manager.");
}
this.username = secretMap.get("username");
this.password = secretMap.get("password");
} else {
throw new IllegalArgumentException("plaintext or secret_manager must be set");
}
}

private Map<String, String> getSecretValueMap(String stsRoleArn, String region, String secretId) {
ObjectMapper objectMapper = new ObjectMapper();
try {
final String secretValue = SecretManagerHelper.getSecretValue(stsRoleArn, region, secretId);
return objectMapper.readValue(secretValue, new TypeReference<>() {});
} catch (Exception e) {
throw new RuntimeException("Failed to get credentials.", e);
}
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

static class PlainText {
private String username;
private String password;

@JsonCreator
public PlainText(@JsonProperty("username") String username,
@JsonProperty("password") String password) {
this.username = username;
this.password = password;
}
}

static class SecretManager {
private String region;
private String secretId;
private String stsRoleArn;

@JsonCreator
public SecretManager(@JsonProperty("sts_role_arn") String stsRoleArn,
@JsonProperty("region") String region,
@JsonProperty("secretId") String secretId) {
this.stsRoleArn = stsRoleArn;
this.region = region;
this.secretId = secretId;
}
}
}
Loading