diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java index 4bfedc5b41..5158efb18e 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CouldWatchLogsIT.java @@ -51,9 +51,11 @@ import java.util.List; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; @ExtendWith(MockitoExtension.class) public class CouldWatchLogsIT { + static final int NUM_RECORDS = 2; @Mock private PluginSetting pluginSetting; @@ -70,7 +72,7 @@ public class CouldWatchLogsIT { private ThresholdConfig thresholdConfig; @Mock - CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; + private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; @Mock private Counter counter; @@ -80,13 +82,13 @@ public class CouldWatchLogsIT { private String logGroupName; private String logStreamName; private CloudWatchLogsSink sink; - private int count; + private AtomicInteger count; private CloudWatchLogsClient cloudWatchLogsClient; private ObjectMapper objectMapper; @BeforeEach void setUp() { - count = 0; + count = new AtomicInteger(0); objectMapper = new ObjectMapper(); pluginSetting = mock(PluginSetting.class); when(pluginSetting.getPipelineName()).thenReturn("pipeline"); @@ -105,9 +107,14 @@ void setUp() { pluginMetrics = mock(PluginMetrics.class); counter = mock(Counter.class); lenient().doAnswer((a)-> { - count++; + int v = (int)(double)(a.getArgument(0)); + count.addAndGet(v); return null; }).when(counter).increment(any(Double.class)); + lenient().doAnswer((a)-> { + count.addAndGet(1); + return null; + }).when(counter).increment(); when(pluginMetrics.counter(anyString())).thenReturn(counter); cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class); when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName); @@ -157,7 +164,7 @@ void TestSinkOperationWithLogSendInterval() throws Exception { when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); sink = createObjectUnderTest(); - Collection> records = getRecordList(2); + Collection> records = getRecordList(NUM_RECORDS); sink.doOutput(records); await().atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { @@ -172,7 +179,7 @@ void TestSinkOperationWithLogSendInterval() throws Exception { .build(); GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); List events = response.events(); - assertThat(events.size(), equalTo(2)); + assertThat(events.size(), equalTo(NUM_RECORDS)); for (int i = 0; i < events.size(); i++) { String message = events.get(i).message(); Map event = objectMapper.readValue(message, Map.class); @@ -180,6 +187,9 @@ void TestSinkOperationWithLogSendInterval() throws Exception { assertThat(event.get("age"), equalTo(Integer.toString(i))); } }); + // NUM_RECORDS success + // 1 request success + assertThat(count.get(), equalTo(NUM_RECORDS+1)); } @@ -191,7 +201,7 @@ void TestSinkOperationWithBatchSize() throws Exception { when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); sink = createObjectUnderTest(); - Collection> records = getRecordList(2); + Collection> records = getRecordList(NUM_RECORDS); sink.doOutput(records); await().atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { @@ -205,7 +215,7 @@ void TestSinkOperationWithBatchSize() throws Exception { .build(); GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); List events = response.events(); - assertThat(events.size(), equalTo(2)); + assertThat(events.size(), equalTo(NUM_RECORDS)); for (int i = 0; i < events.size(); i++) { String message = events.get(i).message(); Map event = objectMapper.readValue(message, Map.class); @@ -213,6 +223,9 @@ void TestSinkOperationWithBatchSize() throws Exception { assertThat(event.get("age"), equalTo(Integer.toString(i))); } }); + // NUM_RECORDS success + // NUM_RECORDS request success + assertThat(count.get(), equalTo(NUM_RECORDS*2)); } @@ -223,7 +236,7 @@ void TestSinkOperationWithMaxRequestSize() throws Exception { when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(108L); sink = createObjectUnderTest(); - Collection> records = getRecordList(2); + Collection> records = getRecordList(NUM_RECORDS); sink.doOutput(records); await().atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { @@ -237,7 +250,7 @@ void TestSinkOperationWithMaxRequestSize() throws Exception { .build(); GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); List events = response.events(); - assertThat(events.size(), equalTo(2)); + assertThat(events.size(), equalTo(NUM_RECORDS)); for (int i = 0; i < events.size(); i++) { String message = events.get(i).message(); Map event = objectMapper.readValue(message, Map.class); @@ -245,6 +258,9 @@ void TestSinkOperationWithMaxRequestSize() throws Exception { assertThat(event.get("age"), equalTo(Integer.toString(i))); } }); + // NUM_RECORDS success + // 1 request success + assertThat(count.get(), equalTo(NUM_RECORDS+1)); }