From 81c1c844cc6c99beeb6cd16f180e0e5fde7e5f12 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 5 Feb 2025 15:28:47 -0800 Subject: [PATCH] Add cloudwatch logs sink (#5406) * Add cloudwatch logs sink Signed-off-by: Krishna Kondaka * Addressed review comments Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka --- .../cloudwatch-logs/build.gradle | 38 ++- .../cloudwatch_logs/CouldWatchLogsIT.java | 292 ++++++++++++++++++ .../cloudwatch_logs/CloudWatchLogsSink.java | 6 +- .../client/CloudWatchLogsService.java | 13 + settings.gradle | 2 +- 5 files changed, 344 insertions(+), 7 deletions(-) create mode 100644 data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java diff --git a/data-prepper-plugins/cloudwatch-logs/build.gradle b/data-prepper-plugins/cloudwatch-logs/build.gradle index 3bbb24f443..348275f298 100644 --- a/data-prepper-plugins/cloudwatch-logs/build.gradle +++ b/data-prepper-plugins/cloudwatch-logs/build.gradle @@ -1,8 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + plugins { id 'java' id 'java-library' } +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + dependencies { implementation project(':data-prepper-plugins:aws-plugin-api') implementation project(path: ':data-prepper-plugins:common') @@ -33,4 +53,20 @@ jacocoTestCoverageVerification { test { useJUnitPlatform() -} \ No newline at end of file +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.cloudwatch.log_group', System.getProperty('tests.cloudwatch.log_group') + systemProperty 'tests.cloudwatch.log_stream', System.getProperty('tests.cloudwatch.log_stream') + systemProperty 'tests.aws.region', System.getProperty('tests.aws.region') + systemProperty 'tests.aws.role', System.getProperty('tests.aws.role') + filter { + includeTestsMatching '*IT' + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java new file mode 100644 index 0000000000..5158efb18e --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java @@ -0,0 +1,292 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs; + +import io.micrometer.core.instrument.Counter; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; +import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.DeleteLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.record.Record; +import software.amazon.awssdk.regions.Region; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.lenient; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +@ExtendWith(MockitoExtension.class) +public class CouldWatchLogsIT { + static final int NUM_RECORDS = 2; + @Mock + private PluginSetting pluginSetting; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private AwsConfig awsConfig; + + @Mock + private ThresholdConfig thresholdConfig; + + @Mock + private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; + + @Mock + private Counter counter; + + private String awsRegion; + private String awsRole; + private String logGroupName; + private String logStreamName; + private CloudWatchLogsSink sink; + private AtomicInteger count; + private CloudWatchLogsClient cloudWatchLogsClient; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + count = new AtomicInteger(0); + objectMapper = new ObjectMapper(); + pluginSetting = mock(PluginSetting.class); + when(pluginSetting.getPipelineName()).thenReturn("pipeline"); + when(pluginSetting.getName()).thenReturn("name"); + awsRegion = System.getProperty("tests.aws.region"); + awsRole = System.getProperty("tests.aws.role"); + awsConfig = mock(AwsConfig.class); + when(awsConfig.getAwsRegion()).thenReturn(Region.of(awsRegion)); + when(awsConfig.getAwsStsRoleArn()).thenReturn(awsRole); + when(awsConfig.getAwsStsExternalId()).thenReturn(null); + when(awsConfig.getAwsStsHeaderOverrides()).thenReturn(null); + when(awsCredentialsSupplier.getProvider(any())).thenAnswer(options -> DefaultCredentialsProvider.create()); + cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier); + logGroupName = System.getProperty("tests.cloudwatch.log_group"); + logStreamName = createLogStream(logGroupName); + pluginMetrics = mock(PluginMetrics.class); + counter = mock(Counter.class); + lenient().doAnswer((a)-> { + int v = (int)(double)(a.getArgument(0)); + count.addAndGet(v); + return null; + }).when(counter).increment(any(Double.class)); + lenient().doAnswer((a)-> { + count.addAndGet(1); + return null; + }).when(counter).increment(); + when(pluginMetrics.counter(anyString())).thenReturn(counter); + cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); + when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName); + when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName); + when(cloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(awsConfig); + when(cloudWatchLogsSinkConfig.getBufferType()).thenReturn(CloudWatchLogsSinkConfig.DEFAULT_BUFFER_TYPE); + + thresholdConfig = mock(ThresholdConfig.class); + when(thresholdConfig.getBackOffTime()).thenReturn(500L); + when(thresholdConfig.getLogSendInterval()).thenReturn(60L); + when(thresholdConfig.getRetryCount()).thenReturn(10); + when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); + when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); + } + + @AfterEach + void tearDown() { + DeleteLogStreamRequest deleteRequest = DeleteLogStreamRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .build(); + cloudWatchLogsClient.deleteLogStream(deleteRequest); + } + + private CloudWatchLogsSink createObjectUnderTest() { + return new CloudWatchLogsSink(pluginSetting, pluginMetrics, cloudWatchLogsSinkConfig, awsCredentialsSupplier); + } + + private String createLogStream(final String logGroupName) { + final String newLogStreamName = "CouldWatchLogsIT_"+RandomStringUtils.randomAlphabetic(6); + CreateLogStreamRequest createRequest = CreateLogStreamRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(newLogStreamName) + .build(); + CreateLogStreamResponse response = cloudWatchLogsClient.createLogStream(createRequest); + return newLogStreamName; + + } + + @Test + void TestSinkOperationWithLogSendInterval() throws Exception { + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(10); + when(thresholdConfig.getLogSendInterval()).thenReturn(10L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + sink.doOutput(records); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + sink.doOutput(Collections.emptyList()); + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + assertThat(events.size(), equalTo(NUM_RECORDS)); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person"+i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + }); + // NUM_RECORDS success + // 1 request success + assertThat(count.get(), equalTo(NUM_RECORDS+1)); + + } + + @Test + void TestSinkOperationWithBatchSize() throws Exception { + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(1); + when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + sink.doOutput(records); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + assertThat(events.size(), equalTo(NUM_RECORDS)); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person"+i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + }); + // NUM_RECORDS success + // NUM_RECORDS request success + assertThat(count.get(), equalTo(NUM_RECORDS*2)); + + } + + @Test + void TestSinkOperationWithMaxRequestSize() throws Exception { + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(20); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(108L); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + sink.doOutput(records); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + assertThat(events.size(), equalTo(NUM_RECORDS)); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person"+i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + }); + // NUM_RECORDS success + // 1 request success + assertThat(count.get(), equalTo(NUM_RECORDS+1)); + + } + + private Collection> getRecordList(int numberOfRecords) { + final Collection> recordList = new ArrayList<>(); + List records = generateRecords(numberOfRecords); + for (int i = 0; i < numberOfRecords; i++) { + final Event event = JacksonLog.builder().withData(records.get(i)).build(); + recordList.add(new Record<>(event)); + } + return recordList; + } + + private static List generateRecords(int numberOfRecords) { + + List recordList = new ArrayList<>(); + + for (int rows = 0; rows < numberOfRecords; rows++) { + + HashMap eventData = new HashMap<>(); + + eventData.put("name", "Person" + rows); + eventData.put("age", Integer.toString(rows)); + recordList.add((eventData)); + + } + return recordList; + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index f17d79c7af..ddc5654d27 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -87,10 +87,6 @@ public void doInitialize() { @Override public void doOutput(Collection> records) { - if (records.isEmpty()) { - return; - } - cloudWatchLogsService.processLogEvents(records); } @@ -98,4 +94,4 @@ public void doOutput(Collection> records) { public boolean isReady() { return isInitialized; } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java index fc9963ab46..af54d19267 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsService.java @@ -60,6 +60,19 @@ public CloudWatchLogsService(final Buffer buffer, */ public void processLogEvents(final Collection> logs) { sinkStopWatch.startIfNotRunning(); + if (logs.isEmpty() && buffer.getEventCount() > 0) { + processLock.lock(); + try { + if (cloudWatchLogsLimits.isGreaterThanLimitReached(sinkStopWatch.getElapsedTimeInSeconds(), + buffer.getBufferSize(), buffer.getEventCount())) { + stageLogEvents(); + } + } finally { + processLock.unlock(); + } + return; + } + for (Record log : logs) { String logString = log.getData().toJsonString(); int logLength = logString.length(); diff --git a/settings.gradle b/settings.gradle index d2aa09b52c..37a125aaa5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -168,7 +168,7 @@ include 'data-prepper-plugins:aws-sqs-common' include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:sqs-source' include 'data-prepper-plugins:sqs-common' -//include 'data-prepper-plugins:cloudwatch-logs' +include 'data-prepper-plugins:cloudwatch-logs' //include 'data-prepper-plugins:http-sink' //include 'data-prepper-plugins:sns-sink' //include 'data-prepper-plugins:prometheus-sink'