Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Internal and external latency to OpenSearch and S3 sinks. #3583

Merged
merged 16 commits into from
Nov 7, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.time.Instant;

public class LatencyMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are able to move the tracking of this into data-prepper-core, please move this class into data-prepper-core. There should be no need to expose it in that situation.

public static final String INTERNAL_LATENCY = "internalLatency";
public static final String EXTERNAL_LATENCY = "externalLatency";
private final DistributionSummary internalLatencySummary;
private final DistributionSummary externalLatencySummary;

public LatencyMetrics(PluginMetrics pluginMetrics) {
internalLatencySummary = pluginMetrics.summary(INTERNAL_LATENCY);
externalLatencySummary = pluginMetrics.summary(EXTERNAL_LATENCY);
}
public void update(final EventHandle eventHandle) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to think that this should take the result passed into the EventHandle and make the decision as to whether or not to record here.

public void update(final EventHandle eventHandle, final boolean result) {
  if(result == false)
    return;
  ...
}

if (eventHandle == null) {
return;
}
Instant now = Instant.now();
internalLatencySummary.record(Duration.between(eventHandle.getInternalOriginationTime(), now).toMillis());
if (eventHandle.getExternalOriginationTime() == null) {
return;
}
externalLatencySummary.record(Duration.between(eventHandle.getExternalOriginationTime(), now).toMillis());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.EventHandle;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.instrument.DistributionSummary;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

import java.time.Instant;

class LatencyMetricsTest {

private PluginMetrics pluginMetrics;
private EventHandle eventHandle;
private LatencyMetrics latencyMetrics;
private DistributionSummary internalLatencySummary;
private DistributionSummary externalLatencySummary;

public LatencyMetrics createObjectUnderTest() {
return new LatencyMetrics(pluginMetrics);
}

@BeforeEach
void setup() {
pluginMetrics = mock(PluginMetrics.class);
SimpleMeterRegistry registry = new SimpleMeterRegistry();
internalLatencySummary = DistributionSummary
.builder("internalLatency")
.baseUnit("milliseconds")
.register(registry);
externalLatencySummary = DistributionSummary
.builder("externalLatency")
.baseUnit("milliseconds")
.register(registry);
when(pluginMetrics.summary(LatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencySummary);
when(pluginMetrics.summary(LatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencySummary);
eventHandle = mock(EventHandle.class);
when(eventHandle.getInternalOriginationTime()).thenReturn(Instant.now());
latencyMetrics = createObjectUnderTest();
}

@Test
public void testInternalOriginationTime() {
latencyMetrics.update(eventHandle);
assertThat(internalLatencySummary.count(), equalTo(1L));
}

@Test
public void testExternalOriginationTime() {
when(eventHandle.getExternalOriginationTime()).thenReturn(Instant.now().minusMillis(10));
latencyMetrics.update(eventHandle);
assertThat(internalLatencySummary.count(), equalTo(1L));
assertThat(externalLatencySummary.count(), equalTo(1L));
assertThat(externalLatencySummary.max(), greaterThanOrEqualTo(10.0));
}
}


2 changes: 2 additions & 0 deletions data-prepper-plugins/date-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ processor:
* Type: String
* Default: `Locale.ROOT`

* `to_origination_metadata` (Optional): When this option is used, matched time is put into the event's metadata as an instance of `Instant`.

## Metrics

* `dateProcessingMatchSuccessCounter`: Number of records that match with at least one pattern specified in match configuration option.
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/date-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ dependencies {
implementation project(':data-prepper-test-common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'io.micrometer:micrometer-core'
implementation libs.commons.lang3
testImplementation libs.commons.lang3
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

@DataPrepperPlugin(name = "date", pluginType = Processor.class, pluginConfigurationType = DateProcessorConfig.class)
public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand Down Expand Up @@ -71,7 +72,16 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
zonedDateTime = getDateTimeFromTimeReceived(record);

else if (keyToParse != null && !keyToParse.isEmpty()) {
zonedDateTime = getDateTimeFromMatch(record);
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Instant timeStamp = result.getRight();
if (dateProcessorConfig.getToOriginationMetadata()) {
Event event = (Event)record.getData();
event.getMetadata().setExternalOriginationTime(timeStamp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have EventMetadata call setExternalOriginationTime with the value when it is set? This way we can avoid making two calls here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not do that because I am not sure if we want to tie them together. What if we don't want external latency metric but set metadata only? This way, we could introduce another flag in date processor in future if such requirement arises.

event.getEventHandle().setExternalOriginationTime(timeStamp);
}
}
populateDateProcessorMetrics(zonedDateTime);
}

Expand Down Expand Up @@ -119,7 +129,7 @@ private String getDateTimeFromTimeReceived(final Record<Event> record) {
return timeReceived.atZone(dateProcessorConfig.getDestinationZoneId()).format(getOutputFormatter());
}

private String getDateTimeFromMatch(final Record<Event> record) {
private Pair<String, Instant> getDateTimeFromMatch(final Record<Event> record) {
final String sourceTimestamp = getSourceTimestamp(record);
if (sourceTimestamp == null)
return null;
Expand All @@ -136,12 +146,12 @@ private String getSourceTimestamp(final Record<Event> record) {
}
}

private String getFormattedDateTimeString(final String sourceTimestamp) {
private Pair<String, Instant> getFormattedDateTimeString(final String sourceTimestamp) {
for (DateTimeFormatter formatter : dateTimeFormatters) {
try {
return ZonedDateTime.parse(sourceTimestamp, formatter).format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId()));
ZonedDateTime tmp = ZonedDateTime.parse(sourceTimestamp, formatter);
return Pair.of(tmp.format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())), tmp.toInstant());
} catch (Exception ignored) {

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

public class DateProcessorConfig {
static final Boolean DEFAULT_FROM_TIME_RECEIVED = false;
static final Boolean DEFAULT_TO_ORIGINATION_METADATA = false;
static final String DEFAULT_DESTINATION = "@timestamp";
static final String DEFAULT_SOURCE_TIMEZONE = ZoneId.systemDefault().toString();
static final String DEFAULT_DESTINATION_TIMEZONE = ZoneId.systemDefault().toString();
Expand Down Expand Up @@ -45,6 +46,9 @@ public List<String> getPatterns() {
@JsonProperty("from_time_received")
private Boolean fromTimeReceived = DEFAULT_FROM_TIME_RECEIVED;

@JsonProperty("to_origination_metadata")
private Boolean toOriginationMetadata = DEFAULT_TO_ORIGINATION_METADATA;

@JsonProperty("match")
private List<DateMatch> match;

Expand Down Expand Up @@ -76,6 +80,10 @@ public Boolean getFromTimeReceived() {
return fromTimeReceived;
}

public Boolean getToOriginationMetadata() {
return toOriginationMetadata;
}

public List<DateMatch> getMatch() {
return match;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ void isValidMatchAndFromTimestampReceived_should_return_true_if_from_time_receiv
}

@Test
void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_received_and_match_are_not_configured() {
assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false));
void testToOriginationMetadata_should_return_true() throws NoSuchFieldException, IllegalAccessException {
reflectivelySetField(dateProcessorConfig, "toOriginationMetadata", true);
assertThat(dateProcessorConfig.getToOriginationMetadata(), equalTo(true));
}

@Test
Expand Down Expand Up @@ -178,4 +179,4 @@ private void reflectivelySetField(final DateProcessorConfig dateProcessorConfig,
field.setAccessible(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ void setup() {
lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_SUCCESS)).thenReturn(dateProcessingMatchSuccessCounter);
lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_FAILURE)).thenReturn(dateProcessingMatchFailureCounter);
when(mockDateProcessorConfig.getDateWhen()).thenReturn(null);
expectedDateTime = LocalDateTime.now();
expectedInstant = Instant.now();
expectedDateTime = LocalDateTime.ofInstant(expectedInstant, ZoneId.systemDefault());
}

@AfterEach
Expand Down Expand Up @@ -361,6 +362,33 @@ void match_with_different_year_formats_test(String pattern) {
verify(dateProcessingMatchSuccessCounter, times(1)).increment();
}

@ParameterizedTest
@ValueSource(strings = {"yyyy MM dd HH mm ss"})
void match_with_to_origination_metadata(String pattern) {
when(mockDateMatch.getKey()).thenReturn("logDate");
when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(pattern));

List<DateProcessorConfig.DateMatch> dateMatches = Collections.singletonList(mockDateMatch);
when(mockDateProcessorConfig.getMatch()).thenReturn(dateMatches);
when(mockDateProcessorConfig.getSourceZoneId()).thenReturn(ZoneId.systemDefault());
when(mockDateProcessorConfig.getDestinationZoneId()).thenReturn(ZoneId.systemDefault());
when(mockDateProcessorConfig.getSourceLocale()).thenReturn(Locale.ROOT);
when(mockDateProcessorConfig.getToOriginationMetadata()).thenReturn(true);

dateProcessor = createObjectUnderTest();

Map<String, Object> testData = getTestData();
testData.put("logDate", expectedDateTime.format(DateTimeFormatter.ofPattern(pattern)));

final Record<Event> record = buildRecordWithEvent(testData);
final List<Record<Event>> processedRecords = (List<Record<Event>>) dateProcessor.doExecute(Collections.singletonList(record));

Event event = (Event)processedRecords.get(0).getData();
Assertions.assertTrue(event.getMetadata().getExternalOriginationTime() != null);
Assertions.assertTrue(event.getEventHandle().getExternalOriginationTime() != null);
verify(dateProcessingMatchSuccessCounter, times(1)).increment();
}

@ParameterizedTest
@ValueSource(strings = {"MMM/dd", "MM dd"})
void match_without_year_test(String pattern) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,22 @@ public class BulkOperationWrapper {
private final EventHandle eventHandle;
private final BulkOperation bulkOperation;
private final SerializedJson jsonNode;
private final OpenSearchSink sink;

public BulkOperationWrapper(final BulkOperation bulkOperation) {
this.bulkOperation = bulkOperation;
this.eventHandle = null;
this.jsonNode = null;
public BulkOperationWrapper(final OpenSearchSink sink, final BulkOperation bulkOperation) {
this(sink, bulkOperation, null, null);
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) {
public BulkOperationWrapper(final OpenSearchSink sink, final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) {
checkNotNull(bulkOperation);
this.sink = sink;
this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
this.jsonNode = jsonNode;
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle) {
checkNotNull(bulkOperation);
this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
this.jsonNode = null;
public BulkOperationWrapper(final OpenSearchSink sink, final BulkOperation bulkOperation, final EventHandle eventHandle) {
this(sink, bulkOperation, eventHandle, null);
}

public BulkOperation getBulkOperation() {
Expand All @@ -76,6 +73,7 @@ public EventHandle getEventHandle() {

public void releaseEventHandle(boolean result) {
if (eventHandle != null) {
sink.updateLatencyMetrics(eventHandle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately we want this called because release is called. Instead of doubling up calls, how about we configure the EventHandle to provide this type of callback?

interface EventHandle {
  ...
  void onRelease(BiConsumer<EventHandle, Boolean> reseaseConsumer);
  ...
}

Then, in OpenSearchSink, be sure to make this call.

event.getHandle().onRelease((handle, result) -> updateLatencyMetrics(handle));

The current design results in too many classes having to be aware of these interactions.

eventHandle.release(result);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
import org.opensearch.dataprepper.plugins.sink.LatencyMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.opensearchserverless.OpenSearchServerlessClient;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
private static final int INITIALIZE_RETRY_WAIT_TIME_MS = 5000;
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final LatencyMetrics latencyMetrics;

private DlqWriter dlqWriter;
private BufferedWriter dlqFileWriter;
Expand Down Expand Up @@ -141,6 +144,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
this.expressionEvaluator = expressionEvaluator;
this.latencyMetrics = new LatencyMetrics(pluginMetrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really no way to give the EventHandle class a copy of PluginMetrics, and then to just have the releaseEventHandle call automatically populate the metric so we don't have to pass them to every sink, and we keep to just one thing that everyone has to call (releaseEventHandle). You could even add to the function, and make it releaseEventHandle(final EventHandle eventHandle, final String pluginId). The pluginId could just be the plugin name for now, but it can be used in the metric.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we do that? Metrics are sink specific, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but we could "fake" that the metrics are owned by the sink by adding the name of the sink here (since you can name the metrics anything and that it can still follow our metric naming pattern

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that's the clean way because an event handle can only have one name for metrics. How can it have multiple names? I think this is way cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every sink will need to provide this latency and we want to make it as easy as possible for sinks to use this feature.

I think that data-prepper-core could hold a LatencyMetrics per Sink. Just before calling Sink::output, data-prepper-core registers an onRelease method similar to what I suggested in another comment. Then it receives the callback when release is called.

One thing we would need to be sure to do here is ensure that sinks always call release. This is possible now that #3546 is merged.

bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);
invalidActionErrorsCounter = pluginMetrics.counter(INVALID_ACTION_ERRORS);
Expand Down Expand Up @@ -316,6 +320,10 @@ private BulkOperation getBulkOperationForAction(final String action, final Seria
return bulkOperation;
}

public void updateLatencyMetrics(final EventHandle eventHandle) {
latencyMetrics.update(eventHandle);
}

@Override
public void doOutput(final Collection<Record<Event>> records) {
final long threadId = Thread.currentThread().getId();
Expand Down Expand Up @@ -376,7 +384,7 @@ public void doOutput(final Collection<Record<Event>> records) {
}
BulkOperation bulkOperation = getBulkOperationForAction(eventAction, document, indexName, event.getJsonNode());

BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode);
BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(this, bulkOperation, event.getEventHandle(), serializedJsonNode);
final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper);
if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
Expand Down Expand Up @@ -449,6 +457,7 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
try {
dlqFileWriter.write(String.format("{\"Document\": [%s], \"failure\": %s}\n",
BulkOperationWriter.dlqObjectToString(dlqObject), message));
updateLatencyMetrics(dlqObject.getEventHandle());
dlqObject.releaseEventHandle(true);
} catch (final IOException e) {
LOG.error("Failed to write a document to the DLQ", e);
Expand All @@ -459,6 +468,7 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
try {
dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginSetting.getName());
dlqObjects.forEach((dlqObject) -> {
updateLatencyMetrics(dlqObject.getEventHandle());
dlqObject.releaseEventHandle(true);
});
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ public class BulkOperationWrapperTests {
private static final String DOCUMENT = UUID.randomUUID().toString();

private BulkOperation bulkOperation;
private OpenSearchSink sink;

BulkOperationWrapper createObjectUnderTest(final EventHandle eventHandle, BulkOperation aBulkOperation) {
bulkOperation = Objects.isNull(aBulkOperation) ? mock(BulkOperation.class) : aBulkOperation;
if (eventHandle == null) {
return new BulkOperationWrapper(bulkOperation);
return new BulkOperationWrapper(sink, bulkOperation);
}
return new BulkOperationWrapper(bulkOperation, eventHandle);
return new BulkOperationWrapper(sink, bulkOperation, eventHandle);
}

@Test
Expand Down
Loading