Skip to content

Commit

Permalink
Modified to use a static ack expiry time in Integrated Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 28, 2023
1 parent d482a91 commit 35dc759
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import static org.opensearch.dataprepper.plugins.InMemorySource.ACK_EXPIRY_TIME;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.Assert.assertFalse;
import org.slf4j.Logger;
Expand All @@ -38,7 +39,7 @@ class PipelinesWithAcksIT {
private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml";
private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml";
private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml";
private static final int WAIT_TIME = 30000;
private static final long WAIT_TIME_MS = ACK_EXPIRY_TIME.minusMillis(5000L).toMillis();
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;
Expand Down Expand Up @@ -66,7 +67,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -82,7 +83,7 @@ void simple_pipeline_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -97,7 +98,7 @@ void two_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -112,7 +113,7 @@ void three_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -127,7 +128,7 @@ void three_pipelines_with_route_and_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -142,7 +143,7 @@ void two_parallel_pipelines_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -157,7 +158,7 @@ void three_pipelines_multi_sink_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -172,7 +173,7 @@ void one_pipeline_three_sinks_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -187,7 +188,7 @@ void one_pipeline_ack_expiry_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -203,7 +204,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() {
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySinkAccessor.setResult(false);

await().atMost(WAIT_TIME, TimeUnit.MILLISECONDS)
await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
@DataPrepperPlugin(name = "in_memory", pluginType = Source.class, pluginConfigurationType = InMemoryConfig.class)
public class InMemorySource implements Source<Record<Event>> {
public static final Duration ACK_EXPIRY_TIME = Duration.ofSeconds(35);
private static final Logger LOG = LoggerFactory.getLogger(InMemorySource.class);

private final String testingKey;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void run() {
{
inMemorySourceAccessor.setAckReceived(result);
},
Duration.ofSeconds(35));
ACK_EXPIRY_TIME);
records.stream().forEach((record) -> { ackSet.add(record.getData()); });
ackSet.complete();
writeToBuffer(records);
Expand Down

0 comments on commit 35dc759

Please sign in to comment.