From be35f8ffbdb0d932eed0e35175224bb89f3e3a49 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 6 Feb 2024 16:38:54 +0530 Subject: [PATCH] fix in reduce flow Signed-off-by: Bharathwaj G --- .../builder/BaseSingleTreeBuilder.java | 18 +++++ .../bucket/startree/InternalStarTree.java | 78 ++++++++++++------- 2 files changed, 67 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java index 47c660a7e2936..2af2d5eb3e38d 100644 --- a/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/codec/freshstartree/builder/BaseSingleTreeBuilder.java @@ -563,6 +563,24 @@ Record getNextSegmentRecord() throws IOException { return new Record(dimensions, metrics); } + private long getTimeStampVal2(final String fieldName, final long val) { + + switch (fieldName) { + case "minute": + return val / MINUTE; + case "hour": + return val / HOUR; + case "day": + return val / DAY; + case "month": + return val / (DAY * 30); // TODO + case "year": + return val / YEAR; + default: + return val; + } + } + private long getTimeStampVal(final String fieldName, final long val) { switch (fieldName) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/InternalStarTree.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/InternalStarTree.java index d97502850274e..a5b928028cbc9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/InternalStarTree.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/startree/InternalStarTree.java @@ -8,6 +8,9 @@ package org.opensearch.search.aggregations.bucket.startree; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; @@ -16,6 +19,7 @@ import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.InternalMultiBucketAggregation; +import org.opensearch.search.aggregations.bucket.adjacency.InternalAdjacencyMatrix; import org.opensearch.search.aggregations.support.CoreValuesSourceType; import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.aggregations.support.ValuesSourceType; @@ -32,13 +36,13 @@ public class InternalStarTree aggregations, ReduceContext reduceContext) { - reduceContext.consumeBucketsAndMaybeBreak(ranges.size()); - List[] rangeList = new List[ranges.size()]; - for (int i = 0; i < rangeList.length; ++i) { - rangeList[i] = new ArrayList<>(); - } + Map> bucketsMap = new HashMap<>(); + for (InternalAggregation aggregation : aggregations) { - InternalStarTree ranges = (InternalStarTree) aggregation; + InternalStarTree filters = (InternalStarTree) aggregation; int i = 0; - for (B range : ranges.ranges) { - rangeList[i++].add(range); + for (B bucket : filters.ranges) { + String key = bucket.getKey(); + List sameRangeList = bucketsMap.get(key); + if (sameRangeList == null) { + sameRangeList = new ArrayList<>(aggregations.size()); + bucketsMap.put(key, sameRangeList); + } + sameRangeList.add(bucket); } } - final List ranges = new ArrayList<>(); - for (int i = 0; i < this.ranges.size(); ++i) { - ranges.add((B) reduceBucket(rangeList[i], reduceContext)); + ArrayList reducedBuckets = new ArrayList<>(bucketsMap.size()); + + for(List sameRangeList : bucketsMap.values()) { + B reducedBucket = reduceBucket(sameRangeList, reduceContext); + if (reducedBucket.getDocCount() >= 1) { + reducedBuckets.add(reducedBucket); + } } - return getFactory().create(name, ranges, getMetadata()); + reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); + Collections.sort(reducedBuckets, Comparator.comparing(InternalStarTree.Bucket::getKey)); + + return getFactory().create(name, reducedBuckets, getMetadata()); } @Override protected B reduceBucket(List buckets, ReduceContext context) { assert buckets.size() > 0; - long docCount = 0; + + + B reduced = null; List aggregationsList = new ArrayList<>(buckets.size()); - for (InternalStarTree.Bucket bucket : buckets) { - docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); + for (B bucket : buckets) { + if (reduced == null) { + reduced = (B) new Bucket(bucket.getKey(), bucket.getDocCount(), bucket.getAggregations()); + } else { + reduced.sum += bucket.sum; + } + aggregationsList.add(bucket.getAggregations()); } - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); - InternalStarTree.Bucket prototype = buckets.get(0); - return getFactory().createBucket(prototype.key, docCount, aggs); + reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); + return reduced; } @Override