Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cloudwatch logs sink #5406

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion data-prepper-plugins/cloudwatch-logs/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
id 'java-library'
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

dependencies {
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-plugins:common')
Expand Down Expand Up @@ -33,4 +53,20 @@ jacocoTestCoverageVerification {

test {
useJUnitPlatform()
}
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.cloudwatch.log_group', System.getProperty('tests.cloudwatch.log_group')
systemProperty 'tests.cloudwatch.log_stream', System.getProperty('tests.cloudwatch.log_stream')
systemProperty 'tests.aws.region', System.getProperty('tests.aws.region')
systemProperty 'tests.aws.role', System.getProperty('tests.aws.role')
filter {
includeTestsMatching '*IT'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs;

import io.micrometer.core.instrument.Counter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.DeleteLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.awssdk.regions.Region;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.lenient;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

@ExtendWith(MockitoExtension.class)
public class CouldWatchLogsIT {
static final int NUM_RECORDS = 2;
@Mock
private PluginSetting pluginSetting;

@Mock
private PluginMetrics pluginMetrics;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

@Mock
private AwsConfig awsConfig;

@Mock
private ThresholdConfig thresholdConfig;

@Mock
private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig;

@Mock
private Counter counter;

private String awsRegion;
private String awsRole;
private String logGroupName;
private String logStreamName;
private CloudWatchLogsSink sink;
private AtomicInteger count;
private CloudWatchLogsClient cloudWatchLogsClient;
private ObjectMapper objectMapper;

@BeforeEach
void setUp() {
count = new AtomicInteger(0);
objectMapper = new ObjectMapper();
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
when(pluginSetting.getName()).thenReturn("name");
awsRegion = System.getProperty("tests.aws.region");
awsRole = System.getProperty("tests.aws.role");
awsConfig = mock(AwsConfig.class);
when(awsConfig.getAwsRegion()).thenReturn(Region.of(awsRegion));
when(awsConfig.getAwsStsRoleArn()).thenReturn(awsRole);
when(awsConfig.getAwsStsExternalId()).thenReturn(null);
when(awsConfig.getAwsStsHeaderOverrides()).thenReturn(null);
when(awsCredentialsSupplier.getProvider(any())).thenAnswer(options -> DefaultCredentialsProvider.create());
cloudWatchLogsClient = CloudWatchLogsClientFactory.createCwlClient(awsConfig, awsCredentialsSupplier);
logGroupName = System.getProperty("tests.cloudwatch.log_group");
logStreamName = createLogStream(logGroupName);
pluginMetrics = mock(PluginMetrics.class);
counter = mock(Counter.class);
lenient().doAnswer((a)-> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see you validating this. So you can probably just remove these lines.

int v = (int)(double)(a.getArgument(0));
count.addAndGet(v);
return null;
}).when(counter).increment(any(Double.class));
lenient().doAnswer((a)-> {
count.addAndGet(1);
return null;
}).when(counter).increment();
when(pluginMetrics.counter(anyString())).thenReturn(counter);
cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class);
when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName);
when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName);
when(cloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(awsConfig);
when(cloudWatchLogsSinkConfig.getBufferType()).thenReturn(CloudWatchLogsSinkConfig.DEFAULT_BUFFER_TYPE);

thresholdConfig = mock(ThresholdConfig.class);
when(thresholdConfig.getBackOffTime()).thenReturn(500L);
when(thresholdConfig.getLogSendInterval()).thenReturn(60L);
when(thresholdConfig.getRetryCount()).thenReturn(10);
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
}

@AfterEach
void tearDown() {
DeleteLogStreamRequest deleteRequest = DeleteLogStreamRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.build();
cloudWatchLogsClient.deleteLogStream(deleteRequest);
}

private CloudWatchLogsSink createObjectUnderTest() {
return new CloudWatchLogsSink(pluginSetting, pluginMetrics, cloudWatchLogsSinkConfig, awsCredentialsSupplier);
}

private String createLogStream(final String logGroupName) {
final String newLogStreamName = "CouldWatchLogsIT_"+RandomStringUtils.randomAlphabetic(6);
CreateLogStreamRequest createRequest = CreateLogStreamRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(newLogStreamName)
.build();
CreateLogStreamResponse response = cloudWatchLogsClient.createLogStream(createRequest);
return newLogStreamName;

}

@Test
void TestSinkOperationWithLogSendInterval() throws Exception {
long startTime = Instant.now().toEpochMilli();
when(thresholdConfig.getBatchSize()).thenReturn(10);
when(thresholdConfig.getLogSendInterval()).thenReturn(10L);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
sink.doOutput(Collections.emptyList());
long endTime = Instant.now().toEpochMilli();
GetLogEventsRequest getRequest = GetLogEventsRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startTime(startTime)
.endTime(endTime)
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
// NUM_RECORDS success
// 1 request success
assertThat(count.get(), equalTo(NUM_RECORDS+1));

}

@Test
void TestSinkOperationWithBatchSize() throws Exception {
long startTime = Instant.now().toEpochMilli();
when(thresholdConfig.getBatchSize()).thenReturn(1);
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
long endTime = Instant.now().toEpochMilli();
GetLogEventsRequest getRequest = GetLogEventsRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startTime(startTime)
.endTime(endTime)
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
// NUM_RECORDS success
// NUM_RECORDS request success
assertThat(count.get(), equalTo(NUM_RECORDS*2));

}

@Test
void TestSinkOperationWithMaxRequestSize() throws Exception {
long startTime = Instant.now().toEpochMilli();
when(thresholdConfig.getBatchSize()).thenReturn(20);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(108L);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
long endTime = Instant.now().toEpochMilli();
GetLogEventsRequest getRequest = GetLogEventsRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startTime(startTime)
.endTime(endTime)
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
// NUM_RECORDS success
// 1 request success
assertThat(count.get(), equalTo(NUM_RECORDS+1));

}

private Collection<Record<Event>> getRecordList(int numberOfRecords) {
final Collection<Record<Event>> recordList = new ArrayList<>();
List<HashMap> records = generateRecords(numberOfRecords);
for (int i = 0; i < numberOfRecords; i++) {
final Event event = JacksonLog.builder().withData(records.get(i)).build();
recordList.add(new Record<>(event));
}
return recordList;
}

private static List<HashMap> generateRecords(int numberOfRecords) {

List<HashMap> recordList = new ArrayList<>();

for (int rows = 0; rows < numberOfRecords; rows++) {

HashMap<String, String> eventData = new HashMap<>();

eventData.put("name", "Person" + rows);
dlvenable marked this conversation as resolved.
Show resolved Hide resolved
eventData.put("age", Integer.toString(rows));
recordList.add((eventData));

}
return recordList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,11 @@ public void doInitialize() {

@Override
public void doOutput(Collection<Record<Event>> records) {
if (records.isEmpty()) {
return;
}

cloudWatchLogsService.processLogEvents(records);
}

@Override
public boolean isReady() {
return isInitialized;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ public CloudWatchLogsService(final Buffer buffer,
*/
public void processLogEvents(final Collection<Record<Event>> logs) {
sinkStopWatch.startIfNotRunning();
if (logs.isEmpty() && buffer.getEventCount() > 0) {
processLock.lock();
try {
if (cloudWatchLogsLimits.isGreaterThanLimitReached(sinkStopWatch.getElapsedTimeInSeconds(),
buffer.getBufferSize(), buffer.getEventCount())) {
stageLogEvents();
}
} finally {
processLock.unlock();
}
return;
}

for (Record<Event> log : logs) {
String logString = log.getData().toJsonString();
int logLength = logString.length();
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ include 'data-prepper-plugins:aws-sqs-common'
include 'data-prepper-plugins:buffer-common'
include 'data-prepper-plugins:sqs-source'
include 'data-prepper-plugins:sqs-common'
//include 'data-prepper-plugins:cloudwatch-logs'
include 'data-prepper-plugins:cloudwatch-logs'
//include 'data-prepper-plugins:http-sink'
//include 'data-prepper-plugins:sns-sink'
//include 'data-prepper-plugins:prometheus-sink'
Expand Down
Loading