Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre aggregate changes #22

Open
wants to merge 15 commits into
base: 2.11
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
* @opensearch.internal
*/
public class Lucene {
public static final String LATEST_CODEC = "Lucene95";
public static final String LATEST_CODEC = "StarTreeCodec"; // TODO : this is a hack

public static final String SOFT_DELETES_FIELD = "__soft_deletes";

Expand Down Expand Up @@ -1077,6 +1077,11 @@ public SortedDocValues getSortedDocValues(String field) {
return null;
}

@Override
public Object getAggregatedDocValues() throws IOException {
return null;
}

public SortedNumericDocValues getSortedNumericDocValues(String field) {
return null;
}
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/opensearch/index/codec/CodecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.freshstartree.codec.StarTreeCodec;
import org.opensearch.index.mapper.MapperService;

import java.util.Map;
Expand Down Expand Up @@ -68,8 +69,15 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
assert null != indexSettings;
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
codecs.put(LZ4, new Lucene95Codec());
/**
* Todo : currently we don't have a single field to use per field codec to handle aggregation
* So no better way to test the changes then to change the default codec - This should be changed.
*
* There are issues with this as restarting the process and reloading the indices results in errors
* Lucene95Codec is read when reloading the indices ( Solved now by using StarTreeCodec as the latest codec )
*/
codecs.put(DEFAULT_CODEC, new StarTreeCodec());
codecs.put(LZ4, new StarTreeCodec());
codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION));
codecs.put(ZLIB, new Lucene95Codec(Mode.BEST_COMPRESSION));
} else {
Expand All @@ -78,7 +86,7 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
codecs.put(LUCENE_DEFAULT_CODEC, new StarTreeCodec());
for (String codec : Codec.availableCodecs()) {
codecs.put(codec, Codec.forName(codec));
}
Expand All @@ -96,7 +104,7 @@ public CodecService(@Nullable MapperService mapperService, Logger logger) {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
codecs.put(LUCENE_DEFAULT_CODEC, new StarTreeCodec());
for (String codec : Codec.availableCodecs()) {
codecs.put(codec, Codec.forName(codec));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.codec.freshstartree.codec.StarTreeCodec;
import org.opensearch.index.codec.freshstartree.codec.StarTreeDocValuesFormat;
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -53,18 +54,18 @@
*
* @opensearch.internal
*/
public class PerFieldMappingPostingFormatCodec extends Lucene95Codec {
public class PerFieldMappingPostingFormatCodec extends StarTreeCodec { // TODO : this is a hack , can't extend startreecodec
private final Logger logger;
private final MapperService mapperService;
private final DocValuesFormat dvFormat = new Lucene90DocValuesFormat();
private final DocValuesFormat dvFormat = new StarTreeDocValuesFormat();

static {
assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class)
: "PerFieldMappingPostingFormatCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
}

public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService mapperService, Logger logger) {
super(compressionMode);
public PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
super();
this.mapperService = mapperService;
this.logger = logger;
}
Expand All @@ -84,4 +85,9 @@ public PostingsFormat getPostingsFormatForField(String field) {
public DocValuesFormat getDocValuesFormatForField(String field) {
return dvFormat;
}

@Override
public final DocValuesFormat docValuesFormat() {
return dvFormat;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.index.codec.freshstartree.aggregator;

import java.util.Comparator;

/** Aggregation function, doc values column pair */
public class AggregationFunctionColumnPair implements Comparable<AggregationFunctionColumnPair> {
public static final String DELIMITER = "__";
public static final String STAR = "*";
public static final AggregationFunctionColumnPair COUNT_STAR = new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, STAR);

private final AggregationFunctionType _functionType;
private final String _column;

public AggregationFunctionColumnPair(AggregationFunctionType functionType, String column) {
_functionType = functionType;
if (functionType == AggregationFunctionType.COUNT) {
_column = STAR;
} else {
_column = column;
}
}

public AggregationFunctionType getFunctionType() {
return _functionType;
}

public String getColumn() {
return _column;
}

public String toColumnName() {
return toColumnName(_functionType, _column);
}

public static String toColumnName(AggregationFunctionType functionType, String column) {
return functionType.getName() + DELIMITER + column;
}

public static AggregationFunctionColumnPair fromColumnName(String columnName) {
String[] parts = columnName.split(DELIMITER, 2);
return fromFunctionAndColumnName(parts[0], parts[1]);
}

private static AggregationFunctionColumnPair fromFunctionAndColumnName(String functionName, String columnName) {
AggregationFunctionType functionType = AggregationFunctionType.getAggregationFunctionType(functionName);
if (functionType == AggregationFunctionType.COUNT) {
return COUNT_STAR;
} else {
return new AggregationFunctionColumnPair(functionType, columnName);
}
}

@Override
public int hashCode() {
return 31 * _functionType.hashCode() + _column.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof AggregationFunctionColumnPair) {
AggregationFunctionColumnPair anotherPair = (AggregationFunctionColumnPair) obj;
return _functionType == anotherPair._functionType && _column.equals(anotherPair._column);
}
return false;
}

@Override
public String toString() {
return toColumnName();
}

@Override
public int compareTo(AggregationFunctionColumnPair other) {
return Comparator.comparing((AggregationFunctionColumnPair o) -> o._column)
.thenComparing((AggregationFunctionColumnPair o) -> o._functionType)
.compare(this, other);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.index.codec.freshstartree.aggregator;

/** Aggregated function type */
public enum AggregationFunctionType {
COUNT("count"),
SUM("sum");
// AVG("avg");

private String name;

AggregationFunctionType(String name) {
this.name = name;
}

public static AggregationFunctionType getAggregationFunctionType(String functionName) {
return AggregationFunctionType.valueOf(functionName);
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.index.codec.freshstartree.aggregator;

/** Count value aggregator */
public class CountValueAggregator implements ValueAggregator {
public static final DataType AGGREGATED_VALUE_TYPE = DataType.LONG;

@Override
public AggregationFunctionType getAggregationType() {
return AggregationFunctionType.COUNT;
}

@Override
public DataType getAggregatedValueType() {
return AGGREGATED_VALUE_TYPE;
}

@Override
public Long getInitialAggregatedValue(Long rawValue) {
return 1l;
}

@Override
public Long applyRawValue(Long value, Long rawValue) {
return value + 1;
}

@Override
public Long applyAggregatedValue(Long value, Long aggregatedValue) {
return value + aggregatedValue;
}

@Override
public Long cloneAggregatedValue(Long value) {
return value;
}

@Override
public int getMaxAggregatedValueByteSize() {
return Long.BYTES;
}

@Override
public byte[] serializeAggregatedValue(Long value) {
throw new UnsupportedOperationException();
}

@Override
public Long deserializeAggregatedValue(byte[] bytes) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.index.codec.freshstartree.aggregator;

/** Data type of doc values */
public enum DataType {
INT(Integer.BYTES, true),
LONG(Long.BYTES, true),
FLOAT(Float.BYTES, true),
DOUBLE(Double.BYTES, true);

private final int _size;
private final boolean _numeric;

DataType(int size, boolean numeric) {
_size = size;
_numeric = numeric;
}

/** Returns the number of bytes needed to store the data type. */
public int size() {
if (_size >= 0) {
return _size;
}
throw new IllegalStateException("Cannot get number of bytes for: " + this);
}

/**
* Returns {@code true} if the data type is numeric (INT, LONG, FLOAT, DOUBLE, BIG_DECIMAL),
* {@code false} otherwise.
*/
public boolean isNumeric() {
return _numeric;
}

/** Converts the given string value to the data type. Returns byte[] for BYTES. */
public Object convert(String value) {
try {
switch (this) {
case INT:
return Integer.valueOf(value);
case LONG:
return Long.valueOf(value);
case FLOAT:
return Float.valueOf(value);
case DOUBLE:
return Double.valueOf(value);
default:
throw new IllegalStateException();
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
}
Loading