diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 48d6859ba4..0431b39372 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -105,7 +105,7 @@ public void start(Buffer> buffer) { if (sourceConfig.isStreamEnabled()) { BinaryLogClient binaryLogClient = new BinlogClientFactory(sourceConfig, rdsClient, dbMetadata).create(); - if (sourceConfig.getTlsConfig() == null || !sourceConfig.getTlsConfig().isInsecure()) { + if (sourceConfig.isTlsEnabled()) { binaryLogClient.setSSLMode(SSLMode.REQUIRED); } else { binaryLogClient.setSSLMode(SSLMode.DISABLED); @@ -146,7 +146,7 @@ private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final dbMetadata.getPort(), sourceConfig.getAuthenticationConfig().getUsername(), sourceConfig.getAuthenticationConfig().getPassword(), - !sourceConfig.getTlsConfig().isInsecure()); + sourceConfig.isTlsEnabled()); return new SchemaManager(connectionManager); } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java index a1cb8c7e2f..65a65a4fde 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceConfig.java @@ -85,6 +85,9 @@ public class RdsSourceConfig { @JsonProperty("tls") private TlsConfig tlsConfig; + @JsonProperty("disable_s3_read_for_leader") + private boolean disableS3ReadForLeader = false; + public String getDbIdentifier() { return dbIdentifier; } @@ -153,6 +156,14 @@ public TlsConfig getTlsConfig() { return tlsConfig; } + public boolean isTlsEnabled() { + return tlsConfig == null || !tlsConfig.isInsecure(); + } + + public boolean isDisableS3ReadForLeader() { + return disableS3ReadForLeader; + } + public AuthenticationConfig getAuthenticationConfig() { return this.authenticationConfig; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java index 51cce6541e..cc1f897bea 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverter.java @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -63,6 +64,11 @@ public Event convert(final Map rowData, EventMetadata eventMetadata = event.getMetadata(); + // Only set external origination time for stream events, not export + final Instant externalOriginationTime = Instant.ofEpochMilli(eventCreateTimeEpochMillis); + event.getEventHandle().setExternalOriginationTime(externalOriginationTime); + eventMetadata.setExternalOriginationTime(externalOriginationTime); + eventMetadata.setAttribute(EVENT_DATABASE_NAME_METADATA_ATTRIBUTE, databaseName); eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName); eventMetadata.setAttribute(BULK_ACTION_METADATA_ATTRIBUTE, bulkAction.toString()); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java index c69dcc7651..feba4555b2 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java @@ -36,7 +36,9 @@ public static ExportObjectKey fromString(final String objectKeyString) { final String prefix = parts[0]; final String exportTaskId = parts[1]; final String databaseName = parts[2]; - final String tableName = parts[3]; + // fullTableName is in the format of "databaseName.tableName" + final String fullTableName = parts[3]; + final String tableName = fullTableName.split("\\.")[1]; final String numberedFolder = parts[4]; final String fileName = parts[5]; return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index 1886bba451..14d61e6626 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -22,6 +22,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY; + public class StreamScheduler implements Runnable { @@ -57,18 +59,24 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator, @Override public void run() { LOG.debug("Start running Stream Scheduler"); + StreamPartition streamPartition = null; while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { try { final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { LOG.info("Acquired partition to read from stream"); - final StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); + if (sourceConfig.isDisableS3ReadForLeader()) { + // Primary node that acquires the stream partition will not perform work on the S3 buffer + System.setProperty(STOP_S3_SCAN_PROCESSING_PROPERTY, "true"); + } + + streamPartition = (StreamPartition) sourcePartition.get(); final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); binaryLogClient.registerEventListener(new BinlogEventListener( buffer, sourceConfig, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager)); final StreamWorker streamWorker = StreamWorker.create(sourceCoordinator, binaryLogClient, pluginMetrics); - executorService.submit(() -> streamWorker.processStream(streamPartition)); + executorService.submit(() -> streamWorker.processStream((StreamPartition) sourcePartition.get())); } try { @@ -81,6 +89,13 @@ public void run() { } catch (Exception e) { LOG.error("Received an exception during stream processing, backing off and retrying", e); + if (streamPartition != null) { + if (sourceConfig.isDisableS3ReadForLeader()) { + System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY); + } + sourceCoordinator.giveUpPartition(streamPartition); + } + try { Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); } catch (final InterruptedException ex) { diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java index f5036e8890..43f08ff3fc 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java @@ -124,7 +124,7 @@ void test_given_export_partition_and_export_task_id_then_complete_export() throw String tableName = UUID.randomUUID().toString(); // objectKey needs to have this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" S3Object s3Object = S3Object.builder() - .key("prefix/" + exportTaskId + "/my_db/" + tableName + "/1/file1" + PARQUET_SUFFIX) + .key("prefix/" + exportTaskId + "/my_db/my_db." + tableName + "/1/file1" + PARQUET_SUFFIX) .build(); when(listObjectsV2Response.contents()).thenReturn(List.of(s3Object)); when(listObjectsV2Response.isTruncated()).thenReturn(false); @@ -185,7 +185,7 @@ void test_given_export_partition_without_export_task_id_then_start_and_complete_ String tableName = UUID.randomUUID().toString(); // objectKey needs to have this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" S3Object s3Object = S3Object.builder() - .key("prefix/" + exportTaskId + "/my_db/" + tableName + "/1/file1" + PARQUET_SUFFIX) + .key("prefix/" + exportTaskId + "/my_db/my_db." + tableName + "/1/file1" + PARQUET_SUFFIX) .build(); when(listObjectsV2Response.contents()).thenReturn(List.of(s3Object)); when(listObjectsV2Response.isTruncated()).thenReturn(false); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java index 7056114572..697d721c9b 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java @@ -16,7 +16,7 @@ class ExportObjectKeyTest { @Test void test_fromString_with_valid_input_string() { - final String objectKeyString = "prefix/export-task-id/db-name/table-name/1/file-name.parquet"; + final String objectKeyString = "prefix/export-task-id/db-name/db-name.table-name/1/file-name.parquet"; final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString); assertThat(exportObjectKey.getPrefix(), equalTo("prefix"));