Skip to content

Commit

Permalink
Fix pipeline latency to calculate correct latency when persistent buf…
Browse files Browse the repository at this point in the history
…fer is used (#4187)

* Fix pipeline latency to calculate correct latency when persistent buffer is used

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle error and addressed comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing tests

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Feb 29, 2024
1 parent 6255784 commit 42e763e
Show file tree
Hide file tree
Showing 35 changed files with 318 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
import java.io.InputStream;
import java.io.Serializable;
import java.util.function.Consumer;
import java.time.Instant;

public interface ByteDecoder extends Serializable {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
*
* @param inputStream The input stream for code to process
* @param timeReceived The time received value to be populated in the Record
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;
void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -24,33 +25,36 @@ public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
parseRecordsArray(jsonParser, eventConsumer);
parseRecordsArray(jsonParser, timeReceived, eventConsumer);
}
}
}

private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Record<Event>> eventConsumer) throws IOException {
private void parseRecordsArray(final JsonParser jsonParser, final Instant timeReceived, final Consumer<Record<Event>> eventConsumer) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);

final Record<Event> record = createRecord(innerJson);
final Record<Event> record = createRecord(innerJson, timeReceived);
eventConsumer.accept(record);
}
}

private Record<Event> createRecord(final Map<String, Object> json) {
final JacksonEvent event = (JacksonEvent)JacksonLog.builder()
private Record<Event> createRecord(final Map<String, Object> json, final Instant timeReceived) {
final JacksonLog.Builder logBuilder = JacksonLog.builder()
.withData(json)
.getThis()
.build();
.getThis();
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();

return new Record<>(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -161,6 +162,18 @@ public Builder withAttributes(final Map<String, Object> attributes) {
return getThis();
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public Builder withTimeReceived(final Instant timeReceived) {
return (Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the observed time of the log event
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.event.EventType;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -237,6 +238,18 @@ public JacksonExponentialHistogram.Builder withPositiveOffset(int offset) {
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public JacksonExponentialHistogram.Builder withTimeReceived(final Instant timeReceived) {
return (JacksonExponentialHistogram.Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the offset for the negative buckets
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.event.EventType;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -67,6 +68,18 @@ public Builder withValue(final Double value) {
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public Builder withTimeReceived(final Instant timeReceived) {
return (Builder)super.withTimeReceived(timeReceived);
}

/**
* Returns a newly created {@link JacksonGauge}
* @return a JacksonGauge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.event.EventType;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -190,6 +191,18 @@ public JacksonHistogram.Builder withAggregationTemporality(String aggregationTe
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public JacksonHistogram.Builder withTimeReceived(final Instant timeReceived) {
return (JacksonHistogram.Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the buckets for this histogram
* @param buckets a list of buckets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -234,6 +235,19 @@ public T withSchemaUrl(final String schemaUrl) {
return getThis();
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public T withTimeReceived(final Instant timeReceived) {
return (T)super.withTimeReceived(timeReceived);
}


/**
* Sets the exemplars that are associated with this metric event
* @param exemplars sets the exemplars for this metric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.event.EventType;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -101,6 +102,18 @@ public Builder withIsMonotonic(final boolean isMonotonic) {
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public Builder withTimeReceived(final Instant timeReceived) {
return (Builder)super.withTimeReceived(timeReceived);
}

/**
* Returns a newly created {@link JacksonSum}
* @return a JacksonSum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.event.EventType;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -106,6 +107,18 @@ public Builder withSum(double sum) {
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public Builder withTimeReceived(final Instant timeReceived) {
return (Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the count
* @param count the count of this summary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -444,6 +445,18 @@ public Builder withTraceGroup(final String traceGroup) {
return this;
}

/**
* Sets the time received for populating event origination time in event handle
*
* @param timeReceived time received
* @return the builder
* @since 2.7
*/
@Override
public Builder withTimeReceived(final Instant timeReceived) {
return (Builder)super.withTimeReceived(timeReceived);
}

/**
* Sets the duration of the span
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
Expand All @@ -18,12 +20,13 @@
public class JsonDecoderTest {
private JsonDecoder jsonDecoder;
private Record<Event> receivedRecord;
private Instant receivedTime;

private JsonDecoder createObjectUnderTest() {
return new JsonDecoder();
}

@BeforeEach
@BeforeEach
void setup() {
jsonDecoder = createObjectUnderTest();
receivedRecord = null;
Expand All @@ -36,7 +39,7 @@ void test_basicJsonDecoder() {
int intValue = r.nextInt();
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
try {
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), (record) -> {
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
receivedRecord = record;
});
} catch (Exception e){}
Expand All @@ -47,4 +50,25 @@ void test_basicJsonDecoder() {
assertThat(map.get("key2"), equalTo(intValue));
}

@Test
void test_basicJsonDecoder_withTimeReceived() {
String stringValue = UUID.randomUUID().toString();
Random r = new Random();
int intValue = r.nextInt();
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
final Instant now = Instant.now();
try {
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> {
receivedRecord = record;
receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime();
});
} catch (Exception e){}

assertNotEquals(receivedRecord, null);
Map<String, Object> map = receivedRecord.getData().toMap();
assertThat(map.get("key1"), equalTo(stringValue));
assertThat(map.get("key2"), equalTo(intValue));
assertThat(receivedTime, equalTo(now));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;

import java.time.Instant;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -83,6 +85,14 @@ public void testGetServiceName() {
assertThat(name, is(equalTo(TEST_SERVICE_NAME)));
}

@Test
public void testGetTimeReceived() {
Instant now = Instant.now();
builder.withTimeReceived(now);
log = builder.build();
assertThat(((DefaultEventHandle)log.getEventHandle()).getInternalOriginationTime(), is(now));
}

@Test
public void testGetSchemaUrl() {
final String schemaUrl = log.getSchemaUrl();
Expand Down
Loading

0 comments on commit 42e763e

Please sign in to comment.