From 06262819e125bd9f91e8c86a79be80bcee204df7 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Mon, 27 Nov 2023 10:09:58 -0800 Subject: [PATCH] adjusted wait times Signed-off-by: Kondaka --- .../aggregate/AggregateProcessorIT.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java index 6f6e60f6b4..48d1b40196 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java @@ -159,7 +159,7 @@ void aggregateWithNoConcludingGroupsReturnsExpectedResult() throws InterruptedEx }); } - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); assertThat(aggregatedResult.size(), equalTo(NUM_UNIQUE_EVENTS_PER_BATCH)); @@ -185,7 +185,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -198,7 +198,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted }); } - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); assertThat(aggregatedResult.size(), equalTo(NUM_UNIQUE_EVENTS_PER_BATCH)); @@ -236,7 +236,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -249,7 +249,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { }); } - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); assertThat(aggregatedResult.size(), equalTo(NUM_UNIQUE_EVENTS_PER_BATCH/2)); @@ -274,7 +274,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); AtomicInteger allowedEventsCount = new AtomicInteger(0); for (int i = 0; i < NUM_THREADS; i++) { @@ -307,7 +307,7 @@ void aggregateWithRateLimiterAction() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -320,7 +320,7 @@ void aggregateWithRateLimiterAction() throws InterruptedException { }); } - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); // Expect less number of events to be received, because of rate limiting @@ -342,7 +342,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -355,7 +355,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { }); } - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); // Expect all events to be received even with rate limiting because no events are dropped @@ -383,9 +383,9 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); Collection> results = objectUnderTest.doExecute(new ArrayList>()); @@ -429,9 +429,9 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); Collection> results = objectUnderTest.doExecute(new ArrayList>()); @@ -489,9 +489,9 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); Collection> results = objectUnderTest.doExecute(new ArrayList>()); @@ -535,7 +535,7 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc final int numberOfSpans = 5; eventBatch = getBatchOfEventsForTailSampling(numberOfErrorTraces, numberOfSpans); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { @@ -544,8 +544,8 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); - boolean allThreadsFinished = countDownLatch.await(10L, TimeUnit.SECONDS); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); List errorEventList = eventBatch.stream().map(Record::getData).filter(event -> { Event ev = ((Event)event);