From ebd2d470a84228170f2a4f6978e88419a8f653da Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Mon, 4 Mar 2024 15:36:58 -0600 Subject: [PATCH] Add support for metadata in Events for the total time spent in grok (#4230) Signed-off-by: Taylor Gray --- .../plugins/processor/grok/GrokProcessor.java | 14 ++++++++++++++ .../processor/grok/GrokProcessorConfig.java | 4 +++- .../plugins/processor/grok/GrokProcessorTests.java | 5 +++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java index 16a9ce5b24..391d50f603 100644 --- a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java +++ b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessor.java @@ -33,6 +33,7 @@ import java.nio.file.Files; import java.nio.file.NotDirectoryException; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -54,6 +55,7 @@ import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY; +import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY; @SingleThread @@ -121,6 +123,8 @@ public GrokProcessor(final PluginSetting pluginSetting, final ExpressionEvaluato @Override public Collection> doExecute(final Collection> records) { for (final Record record : records) { + + final Instant startTime = Instant.now(); final Event event = record.getData(); try { if (Objects.nonNull(grokProcessorConfig.getGrokWhen()) && !expressionEvaluator.evaluateConditional(grokProcessorConfig.getGrokWhen(), event)) { @@ -142,6 +146,16 @@ public Collection> doExecute(final Collection> recor LOG.error(EVENT, "An exception occurred when matching record [{}]", record.getData(), e); grokProcessingErrorsCounter.increment(); } + + final Instant endTime = Instant.now(); + + Long totalEventTimeInGrok = (Long) event.getMetadata().getAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY); + if (totalEventTimeInGrok == null) { + totalEventTimeInGrok = 0L; + } + + final long timeSpentInThisGrok = endTime.toEpochMilli() - startTime.toEpochMilli(); + event.getMetadata().setAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY, totalEventTimeInGrok + timeSpentInThisGrok); } return records; } diff --git a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java index 1311728366..e611099589 100644 --- a/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java +++ b/data-prepper-plugins/grok-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorConfig.java @@ -14,6 +14,8 @@ public class GrokProcessorConfig { static final String TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY = "_total_grok_patterns_attempted"; + static final String TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY = "_total_grok_processing_time"; + static final String BREAK_ON_MATCH = "break_on_match"; static final String KEEP_EMPTY_CAPTURES = "keep_empty_captures"; static final String MATCH = "match"; @@ -28,7 +30,7 @@ public class GrokProcessorConfig { static final String TAGS_ON_MATCH_FAILURE = "tags_on_match_failure"; static final String TAGS_ON_TIMEOUT = "tags_on_timeout"; - static final String INCLUDE_PERFORMANCE_METADATA = "include_performance_metadata"; + static final String INCLUDE_PERFORMANCE_METADATA = "performance_metadata"; static final boolean DEFAULT_BREAK_ON_MATCH = true; static final boolean DEFAULT_KEEP_EMPTY_CAPTURES = false; diff --git a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java index 2eb9db2592..d2ff11c3e0 100644 --- a/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java +++ b/data-prepper-plugins/grok-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/grok/GrokProcessorTests.java @@ -46,6 +46,7 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; @@ -59,6 +60,7 @@ import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessor.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT; import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY; +import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY; import static org.opensearch.dataprepper.test.matcher.MapEquals.isEqualWithoutTimestamp; @@ -544,6 +546,7 @@ public void testMatchOnSecondPattern() throws JsonProcessingException { assertThat(grokkedRecords.get(0).getData(), notNullValue()); assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue()); assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(2)); + assertThat((Long) grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY), greaterThan(0L)); assertRecordsAreEqual(grokkedRecords.get(0), record); verify(grokProcessingMismatchCounter, times(1)).increment(); verify(grokProcessingTime, times(1)).record(any(Runnable.class)); @@ -565,6 +568,7 @@ public void testMatchOnSecondPatternWithExistingMetadataForTotalPatternMatches() final Record record = buildRecordWithEvent(testData); record.getData().getMetadata().setAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY, 1); + record.getData().getMetadata().setAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY, 300L); final List> grokkedRecords = (List>) grokProcessor.doExecute(Collections.singletonList(record)); @@ -573,6 +577,7 @@ public void testMatchOnSecondPatternWithExistingMetadataForTotalPatternMatches() assertThat(grokkedRecords.get(0).getData(), notNullValue()); assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue()); assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(3)); + assertThat((Long) grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY), greaterThan(300L)); assertRecordsAreEqual(grokkedRecords.get(0), record); verify(grokProcessingMismatchCounter, times(1)).increment(); verify(grokProcessingTime, times(1)).record(any(Runnable.class));