From 83180414f004e8e9dbece891391dec2a18ec6109 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 15 Jan 2025 17:17:05 -0600 Subject: [PATCH] Report SQS message delay immediately after message is received, and as 0 when there are no messages found in queue (#5333) Signed-off-by: Taylor Gray --- .../plugins/source/s3/SqsWorker.java | 11 +++++++---- .../plugins/source/s3/SqsWorkerTest.java | 19 +++++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 2861ffa6d7..08c643b39c 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -156,6 +156,9 @@ private List getMessagesFromSqs() { final ReceiveMessageRequest receiveMessageRequest = createReceiveMessageRequest(); final List messages = sqsClient.receiveMessage(receiveMessageRequest).messages(); failedAttemptCount = 0; + if (messages.isEmpty()) { + sqsMessageDelayTimer.record(Duration.ZERO); + } return messages; } catch (final SqsException | StsException e) { LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage()); @@ -228,6 +231,10 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size()); for (ParsedMessage parsedMessage : parsedMessagesToRead) { + sqsMessageDelayTimer.record(Duration.between( + Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()), + Instant.now() + )); List waitingForAcknowledgements = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; final int visibilityTimeout = (int)sqsOptions.getVisibilityTimeout().getSeconds(); @@ -318,10 +325,6 @@ private Optional processS3Object( // SQS messages won't be deleted if we are unable to process S3Objects because of an exception try { s3Service.addS3Object(s3ObjectReference, acknowledgementSet); - sqsMessageDelayTimer.record(Duration.between( - Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()), - Instant.now() - )); return Optional.of(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage())); } catch (final Exception e) { LOG.error("Error processing from S3: {}. Retrying with exponential backoff.", e.getMessage()); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java index ada789cea6..bed9437f85 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -67,6 +68,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -700,7 +702,10 @@ void processSqsMessages_should_stop_updating_visibility_timeout_after_stop() thr objectUnderTest.stop(); assertThat(messagesProcessed, equalTo(1)); - verify(s3Service).addS3Object(any(S3ObjectReference.class), any()); + + final InOrder inOrder = inOrder(s3Service, sqsMessageDelayTimer); + inOrder.verify(sqsMessageDelayTimer).record(any(Duration.class)); + inOrder.verify(s3Service).addS3Object(any(S3ObjectReference.class), any()); verify(acknowledgementSetManager).create(any(), any(Duration.class)); ArgumentCaptor> progressConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); @@ -711,7 +716,17 @@ void processSqsMessages_should_stop_updating_visibility_timeout_after_stop() thr verify(sqsClient, never()).changeMessageVisibility(any(ChangeMessageVisibilityRequest.class)); verify(sqsMessagesReceivedCounter).increment(1); - verify(sqsMessageDelayTimer).record(any(Duration.class)); + } + + @Test + void processSqsMessages_should_record_zero_message_delay_when_no_messages_are_found_on_poll() { + final ReceiveMessageResponse receiveMessageResponse = mock(ReceiveMessageResponse.class); + when(receiveMessageResponse.messages()).thenReturn(Collections.emptyList()); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResponse); + final int messagesProcessed = createObjectUnderTest().processSqsMessages(); + assertThat(messagesProcessed, equalTo(0)); + verify(sqsMessageDelayTimer).record(Duration.ZERO); } private static String createPutNotification(final Instant startTime) {