Skip to content

Commit

Permalink
rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jun 19, 2024
1 parent a85e05e commit 9ec93c3
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class CountAggregateAction implements AggregateAction {
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final String exemplarKey = "__exemplar";
static final String EVENT_TYPE = "event";
static final String SUM_METRIC_NAME = "count";
static final String SUM_METRIC_DESCRIPTION = "Number of events";
static final String SUM_METRIC_UNIT = "1";
static final boolean SUM_METRIC_IS_MONOTONIC = true;
Expand All @@ -49,13 +48,15 @@ public class CountAggregateAction implements AggregateAction {
public final String endTimeKey;
public final String outputFormat;
private long startTimeNanos;
private final String name;

@DataPrepperPluginConstructor
public CountAggregateAction(final CountAggregateActionConfig countAggregateActionConfig) {
this.countKey = countAggregateActionConfig.getCountKey();
this.startTimeKey = countAggregateActionConfig.getStartTimeKey();
this.endTimeKey = countAggregateActionConfig.getEndTimeKey();
this.outputFormat = countAggregateActionConfig.getOutputFormat();
this.name = countAggregateActionConfig.getName();
}

public Exemplar createExemplar(final Event event) {
Expand Down Expand Up @@ -133,7 +134,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
Map<String, Object> attr = new HashMap<String, Object>();
groupState.forEach((k, v) -> attr.put((String)k, v));
JacksonSum sum = JacksonSum.builder()
.withName(SUM_METRIC_NAME)
.withName(this.name)
.withDescription(SUM_METRIC_DESCRIPTION)
.withTime(OTelProtoCodec.convertUnixNanosToISO8601(endTimeNanos))
.withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(startTimeNanos))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

public class CountAggregateActionConfig {
static final String SUM_METRIC_NAME = "count";
public static final String DEFAULT_COUNT_KEY = "aggr._count";
public static final String DEFAULT_START_TIME_KEY = "aggr._start_time";
public static final String DEFAULT_END_TIME_KEY = "aggr._end_time";
Expand All @@ -18,6 +19,9 @@ public class CountAggregateActionConfig {
@JsonProperty("count_key")
String countKey = DEFAULT_COUNT_KEY;

@JsonProperty("name")
String name = SUM_METRIC_NAME;

@JsonProperty("start_time_key")
String startTimeKey = DEFAULT_START_TIME_KEY;

Expand All @@ -27,6 +31,10 @@ public class CountAggregateActionConfig {
@JsonProperty("output_format")
String outputFormat = OutputFormat.OTEL_METRICS.toString();

public String getName() {
return name;
}

public String getCountKey() {
return countKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
public class HistogramAggregateAction implements AggregateAction {
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final String EVENT_TYPE = "event";
public static final String HISTOGRAM_METRIC_NAME = "histogram";
private final String countKey;
private final String bucketCountsKey;
private final String bucketsKey;
Expand All @@ -62,6 +61,7 @@ public class HistogramAggregateAction implements AggregateAction {
private Event maxEvent;
private double minValue;
private double maxValue;
private final String name;

private long startTimeNanos;
private double[] buckets;
Expand All @@ -72,6 +72,7 @@ public HistogramAggregateAction(final HistogramAggregateActionConfig histogramAg
List<Number> bucketList = histogramAggregateActionConfig.getBuckets();
this.buckets = new double[bucketList.size()+2];
int bucketIdx = 0;
this.name = histogramAggregateActionConfig.getName();
this.buckets[bucketIdx++] = -Float.MAX_VALUE;
for (int i = 0; i < bucketList.size(); i++) {
this.buckets[bucketIdx++] = convertToDouble(bucketList.get(i));
Expand Down Expand Up @@ -212,7 +213,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
Instant endTime = (Instant)groupState.get(endTimeKey);
long startTimeNanos = getTimeNanos(startTime);
long endTimeNanos = getTimeNanos(endTime);
String histogramKey = HISTOGRAM_METRIC_NAME + "_key";
String histogramKey = this.name + "_key";
List<Exemplar> exemplarList = new ArrayList<>();
exemplarList.add(createExemplar("min", minEvent, minValue));
exemplarList.add(createExemplar("max", maxEvent, maxValue));
Expand Down Expand Up @@ -245,7 +246,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
Integer count = (Integer)groupState.get(countKey);
String description = String.format("Histogram of %s in the events", key);
JacksonHistogram histogram = JacksonHistogram.builder()
.withName(HISTOGRAM_METRIC_NAME)
.withName(this.name)
.withDescription(description)
.withTime(OTelProtoCodec.convertUnixNanosToISO8601(endTimeNanos))
.withStartTime(OTelProtoCodec.convertUnixNanosToISO8601(startTimeNanos))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import jakarta.validation.constraints.NotNull;

public class HistogramAggregateActionConfig {
public static final String HISTOGRAM_METRIC_NAME = "histogram";
public static final String DEFAULT_GENERATED_KEY_PREFIX = "aggr._";
public static final String SUM_KEY = "sum";
public static final String COUNT_KEY = "count";
Expand All @@ -32,6 +33,9 @@ public class HistogramAggregateActionConfig {
@NotNull
String units;

@JsonProperty("name")
String name = HISTOGRAM_METRIC_NAME;

@JsonProperty("generated_key_prefix")
String generatedKeyPrefix = DEFAULT_GENERATED_KEY_PREFIX;

Expand All @@ -45,6 +49,10 @@ public class HistogramAggregateActionConfig {
@JsonProperty("record_minmax")
boolean recordMinMax = false;

public String getName() {
return name;
}

public boolean getRecordMinMax() {
return recordMinMax;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch
countDownLatch.countDown();
});
}
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000);
Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1500);

boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS);
assertThat(allThreadsFinished, equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void testDefault() {
assertThat(countAggregateActionConfig.getCountKey(), equalTo(DEFAULT_COUNT_KEY));
assertThat(countAggregateActionConfig.getStartTimeKey(), equalTo(DEFAULT_START_TIME_KEY));
assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString()));
assertThat(countAggregateActionConfig.getName(), equalTo(CountAggregateActionConfig.SUM_METRIC_NAME));
}

@Test
Expand All @@ -51,6 +52,9 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException {
final String testOutputFormat = OutputFormat.OTEL_METRICS.toString();
setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", testOutputFormat);
assertThat(countAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString()));
final String testName = UUID.randomUUID().toString();
setField(CountAggregateActionConfig.class, countAggregateActionConfig, "name", testName);
assertThat(countAggregateActionConfig.getName(), equalTo(testName));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private AggregateAction createObjectUnderTest(CountAggregateActionConfig config)
@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 100})
void testCountAggregate(int testCount) throws NoSuchFieldException, IllegalAccessException {
final String testName = UUID.randomUUID().toString();
CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig();
setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW.toString());
countAggregateAction = createObjectUnderTest(countAggregateActionConfig);
Expand Down Expand Up @@ -79,8 +80,10 @@ void testCountAggregate(int testCount) throws NoSuchFieldException, IllegalAcces

@ParameterizedTest
@ValueSource(ints = {1, 2, 10, 100})
void testCountAggregateOTelFormat(int testCount) {
void testCountAggregateOTelFormat(int testCount) throws NoSuchFieldException, IllegalAccessException {
CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig();
final String testName = UUID.randomUUID().toString();
setField(CountAggregateActionConfig.class, countAggregateActionConfig, "name", testName);
countAggregateAction = createObjectUnderTest(countAggregateActionConfig);
final String key1 = "key-"+UUID.randomUUID().toString();
final String value1 = UUID.randomUUID().toString();
Expand Down Expand Up @@ -119,6 +122,7 @@ void testCountAggregateOTelFormat(int testCount) {
expectedEventMap.put("isMonotonic", true);
expectedEventMap.put("aggregationTemporality", "AGGREGATION_TEMPORALITY_DELTA");
expectedEventMap.put("unit", "1");
expectedEventMap.put("name", testName);
expectedEventMap.forEach((k, v) -> assertThat(result.get(0).toMap(), hasEntry(k,v)));
assertThat(result.get(0).toMap().get("attributes"), equalTo(eventMap));
JacksonMetric metric = (JacksonMetric) result.get(0);
Expand Down Expand Up @@ -149,6 +153,8 @@ void testCountAggregateOTelFormat(int testCount) {
void testCountAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCount) {
CountAggregateActionConfig mockConfig = mock(CountAggregateActionConfig.class);
when(mockConfig.getCountKey()).thenReturn(CountAggregateActionConfig.DEFAULT_COUNT_KEY);
final String testName = UUID.randomUUID().toString();
when(mockConfig.getName()).thenReturn(testName);
String startTimeKey = UUID.randomUUID().toString();
String endTimeKey = UUID.randomUUID().toString();
when(mockConfig.getStartTimeKey()).thenReturn(startTimeKey);
Expand Down Expand Up @@ -195,7 +201,7 @@ void testCountAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCount) {
assertThat(result.size(), equalTo(1));
Map<String, Object> expectedEventMap = new HashMap<>();
expectedEventMap.put("value", (double)testCount);
expectedEventMap.put("name", "count");
expectedEventMap.put("name", testName);
expectedEventMap.put("description", "Number of events");
expectedEventMap.put("isMonotonic", true);
expectedEventMap.put("aggregationTemporality", "AGGREGATION_TEMPORALITY_DELTA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.apache.commons.lang3.RandomStringUtils;

import java.util.UUID;

import static org.opensearch.dataprepper.plugins.processor.aggregate.actions.HistogramAggregateActionConfig.DEFAULT_GENERATED_KEY_PREFIX;

import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -41,6 +43,7 @@ void testDefault() {
assertThat(histogramAggregateActionConfig.getGeneratedKeyPrefix(), equalTo(DEFAULT_GENERATED_KEY_PREFIX));
assertThat(histogramAggregateActionConfig.getRecordMinMax(), equalTo(false));
assertThat(histogramAggregateActionConfig.getOutputFormat(), equalTo(OutputFormat.OTEL_METRICS.toString()));
assertThat(histogramAggregateActionConfig.getName(), equalTo(HistogramAggregateActionConfig.HISTOGRAM_METRIC_NAME));
}

@Test
Expand Down Expand Up @@ -106,6 +109,9 @@ void testValidConfig() throws NoSuchFieldException, IllegalAccessException {
longBuckets.add(longValue2);
setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "buckets", longBuckets);
assertThat(histogramAggregateActionConfig.getBuckets(), containsInAnyOrder(longBuckets.toArray()));
final String testName = UUID.randomUUID().toString();
setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "name", testName);
assertThat(histogramAggregateActionConfig.getName(), equalTo(testName));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException
final String expectedStartTimeKey = histogramAggregateActionConfig.getStartTimeKey();
Map<String, Object> expectedEventMap = new HashMap<>(Collections.singletonMap("count", (long)testCount));
expectedEventMap.put("unit", testUnits);
expectedEventMap.put("name", HistogramAggregateAction.HISTOGRAM_METRIC_NAME);
expectedEventMap.put("name", HistogramAggregateActionConfig.HISTOGRAM_METRIC_NAME);
expectedEventMap.put("sum", expectedSum);
expectedEventMap.put("min", expectedMin);
expectedEventMap.put("max", expectedMax);
Expand All @@ -212,7 +212,7 @@ void testHistogramAggregateOTelFormat(int testCount) throws NoSuchFieldException
for (int i = 0; i < expectedBucketCounts.length; i++) {
assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i)));
}
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey));
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateActionConfig.HISTOGRAM_METRIC_NAME+"_key", testKey));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(2));
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue));
Expand Down Expand Up @@ -250,6 +250,8 @@ void testHistogramAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCoun
final String testKeyPrefix = RandomStringUtils.randomAlphabetic(5)+"_";
when(mockConfig.getStartTimeKey()).thenReturn(startTimeKey);
when(mockConfig.getEndTimeKey()).thenReturn(endTimeKey);
final String testName = UUID.randomUUID().toString();
when(mockConfig.getName()).thenReturn(testName);
when(mockConfig.getOutputFormat()).thenReturn(OutputFormat.OTEL_METRICS.toString());
String keyPrefix = UUID.randomUUID().toString();
final String testUnits = "ms";
Expand Down Expand Up @@ -323,7 +325,7 @@ void testHistogramAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCoun
final String expectedStartTimeKey = mockConfig.getStartTimeKey();
Map<String, Object> expectedEventMap = new HashMap<>(Collections.singletonMap("count", (long)testCount));
expectedEventMap.put("unit", testUnits);
expectedEventMap.put("name", HistogramAggregateAction.HISTOGRAM_METRIC_NAME);
expectedEventMap.put("name", testName);
expectedEventMap.put("sum", expectedSum);
expectedEventMap.put("min", expectedMin);
expectedEventMap.put("max", expectedMax);
Expand All @@ -337,7 +339,7 @@ void testHistogramAggregateOTelFormatWithStartAndEndTimesInTheEvent(int testCoun
for (int i = 0; i < expectedBucketCounts.length; i++) {
assertThat(expectedBucketCounts[i], equalTo(bucketCountsFromResult.get(i)));
}
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(HistogramAggregateAction.HISTOGRAM_METRIC_NAME+"_key", testKey));
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(testName+"_key", testKey));
List<Exemplar> exemplars = (List <Exemplar>)result.get(0).toMap().get("exemplars");
assertThat(exemplars.size(), equalTo(2));
assertThat(((Map<String, String>)result.get(0).toMap().get("attributes")), hasEntry(dataKey, dataValue));
Expand Down

0 comments on commit 9ec93c3

Please sign in to comment.