Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Feb 5, 2025
1 parent 1915995 commit eb01f42
Showing 1 changed file with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@
import java.util.List;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

@ExtendWith(MockitoExtension.class)
public class CouldWatchLogsIT {
static final int NUM_RECORDS = 2;
@Mock
private PluginSetting pluginSetting;

Expand All @@ -70,7 +72,7 @@ public class CouldWatchLogsIT {
private ThresholdConfig thresholdConfig;

@Mock
CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig;
private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig;

@Mock
private Counter counter;
Expand All @@ -80,13 +82,13 @@ public class CouldWatchLogsIT {
private String logGroupName;
private String logStreamName;
private CloudWatchLogsSink sink;
private int count;
private AtomicInteger count;
private CloudWatchLogsClient cloudWatchLogsClient;
private ObjectMapper objectMapper;

@BeforeEach
void setUp() {
count = 0;
count = new AtomicInteger(0);
objectMapper = new ObjectMapper();
pluginSetting = mock(PluginSetting.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
Expand All @@ -105,9 +107,14 @@ void setUp() {
pluginMetrics = mock(PluginMetrics.class);
counter = mock(Counter.class);
lenient().doAnswer((a)-> {
count++;
int v = (int)(double)(a.getArgument(0));
count.addAndGet(v);
return null;
}).when(counter).increment(any(Double.class));
lenient().doAnswer((a)-> {
count.addAndGet(1);
return null;
}).when(counter).increment();
when(pluginMetrics.counter(anyString())).thenReturn(counter);
cloudWatchLogsSinkConfig = mock(CloudWatchLogsSinkConfig.class);
when(cloudWatchLogsSinkConfig.getLogGroup()).thenReturn(logGroupName);
Expand Down Expand Up @@ -157,7 +164,7 @@ void TestSinkOperationWithLogSendInterval() throws Exception {
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(2);
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
Expand All @@ -172,14 +179,17 @@ void TestSinkOperationWithLogSendInterval() throws Exception {
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(2));
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
// NUM_RECORDS success
// 1 request success
assertThat(count.get(), equalTo(NUM_RECORDS+1));

}

Expand All @@ -191,7 +201,7 @@ void TestSinkOperationWithBatchSize() throws Exception {
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(2);
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
Expand All @@ -205,14 +215,17 @@ void TestSinkOperationWithBatchSize() throws Exception {
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(2));
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
// NUM_RECORDS success
// NUM_RECORDS request success
assertThat(count.get(), equalTo(NUM_RECORDS*2));

}

Expand All @@ -223,7 +236,7 @@ void TestSinkOperationWithMaxRequestSize() throws Exception {
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(108L);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(2);
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
Expand All @@ -237,14 +250,17 @@ void TestSinkOperationWithMaxRequestSize() throws Exception {
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(2));
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person"+i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
// NUM_RECORDS success
// 1 request success
assertThat(count.get(), equalTo(NUM_RECORDS+1));

}

Expand Down

0 comments on commit eb01f42

Please sign in to comment.