Skip to content

Commit

Permalink
Add support for metadata in Events for the total time spent in grok (#…
Browse files Browse the repository at this point in the history
…4230)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Mar 4, 2024
1 parent 42e763e commit ebd2d47
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.file.Files;
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -54,6 +55,7 @@

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY;


@SingleThread
Expand Down Expand Up @@ -121,6 +123,8 @@ public GrokProcessor(final PluginSetting pluginSetting, final ExpressionEvaluato
@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {

final Instant startTime = Instant.now();
final Event event = record.getData();
try {
if (Objects.nonNull(grokProcessorConfig.getGrokWhen()) && !expressionEvaluator.evaluateConditional(grokProcessorConfig.getGrokWhen(), event)) {
Expand All @@ -142,6 +146,16 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
LOG.error(EVENT, "An exception occurred when matching record [{}]", record.getData(), e);
grokProcessingErrorsCounter.increment();
}

final Instant endTime = Instant.now();

Long totalEventTimeInGrok = (Long) event.getMetadata().getAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY);
if (totalEventTimeInGrok == null) {
totalEventTimeInGrok = 0L;
}

final long timeSpentInThisGrok = endTime.toEpochMilli() - startTime.toEpochMilli();
event.getMetadata().setAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY, totalEventTimeInGrok + timeSpentInThisGrok);
}
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class GrokProcessorConfig {

static final String TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY = "_total_grok_patterns_attempted";

static final String TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY = "_total_grok_processing_time";

static final String BREAK_ON_MATCH = "break_on_match";
static final String KEEP_EMPTY_CAPTURES = "keep_empty_captures";
static final String MATCH = "match";
Expand All @@ -28,7 +30,7 @@ public class GrokProcessorConfig {
static final String TAGS_ON_MATCH_FAILURE = "tags_on_match_failure";
static final String TAGS_ON_TIMEOUT = "tags_on_timeout";

static final String INCLUDE_PERFORMANCE_METADATA = "include_performance_metadata";
static final String INCLUDE_PERFORMANCE_METADATA = "performance_metadata";

static final boolean DEFAULT_BREAK_ON_MATCH = true;
static final boolean DEFAULT_KEEP_EMPTY_CAPTURES = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -59,6 +60,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessor.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY;
import static org.opensearch.dataprepper.test.matcher.MapEquals.isEqualWithoutTimestamp;


Expand Down Expand Up @@ -544,6 +546,7 @@ public void testMatchOnSecondPattern() throws JsonProcessingException {
assertThat(grokkedRecords.get(0).getData(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(2));
assertThat((Long) grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY), greaterThan(0L));
assertRecordsAreEqual(grokkedRecords.get(0), record);
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
Expand All @@ -565,6 +568,7 @@ public void testMatchOnSecondPatternWithExistingMetadataForTotalPatternMatches()
final Record<Event> record = buildRecordWithEvent(testData);

record.getData().getMetadata().setAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY, 1);
record.getData().getMetadata().setAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY, 300L);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

Expand All @@ -573,6 +577,7 @@ public void testMatchOnSecondPatternWithExistingMetadataForTotalPatternMatches()
assertThat(grokkedRecords.get(0).getData(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(3));
assertThat((Long) grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_TIME_SPENT_IN_GROK_METADATA_KEY), greaterThan(300L));
assertRecordsAreEqual(grokkedRecords.get(0), record);
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
Expand Down

0 comments on commit ebd2d47

Please sign in to comment.