diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index d0051568c4..fd53f9c064 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -55,6 +55,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; @@ -72,6 +73,7 @@ import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.data.model.ExprMissingValue; import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -95,6 +97,7 @@ import org.opensearch.sql.planner.logical.LogicalFetchCursor; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPaginate; @@ -498,6 +501,113 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) { consecutive); } + /** Build {@link LogicalLookup}. */ + @Override + public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) { + LogicalPlan child = node.getChild().get(0).accept(this, queryContext); + List options = node.getOptions(); + // Todo, refactor the option. + Boolean overwrite = (Boolean) options.get(0).getValue().getValue(); + + Table table = + dataSourceService + .getDataSource(DEFAULT_DATASOURCE_NAME) + .getStorageEngine() + .getTable(null, node.getIndexName()); + + if (table == null || !table.exists()) { + throw new SemanticCheckException( + String.format("no such lookup index %s", node.getIndexName())); + } + + AnalysisContext lookupTableContext = new AnalysisContext(); + TypeEnvironment lookupTableEnvironment = lookupTableContext.peek(); + table + .getFieldTypes() + .forEach( + (name, type) -> + lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type)); + ImmutableMap matchFieldMap = + analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext); + + return new LogicalLookup( + child, + node.getIndexName(), + matchFieldMap, + overwrite, + analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table)); + } + + private ImmutableMap analyzeLookupMatchFields( + List inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) { + ImmutableMap.Builder copyMapBuilder = + new ImmutableMap.Builder<>(); + for (Map resultMap : inputMap) { + Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext); + if (resultMap.getTarget() instanceof Field) { + Expression targetExpression = + expressionAnalyzer.analyze(resultMap.getTarget(), queryContext); + ReferenceExpression targetReference = + DSL.ref(targetExpression.toString(), targetExpression.type()); + ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type()); + TypeEnvironment curEnv = queryContext.peek(); + curEnv.remove(originReference); + curEnv.define(targetReference); + copyMapBuilder.put(originReference, targetReference); + } else { + throw new SemanticCheckException( + String.format("the target expected to be field, but is %s", resultMap.getTarget())); + } + } + + return copyMapBuilder.build(); + } + + private ImmutableMap analyzeLookupCopyFields( + List inputMap, AnalysisContext context, Table table) { + + TypeEnvironment curEnv = context.peek(); + java.util.Map fieldTypes = table.getFieldTypes(); + + if (inputMap.isEmpty()) { + fieldTypes.forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v)); + return ImmutableMap.builder().build(); + } + + ImmutableMap.Builder copyMapBuilder = + new ImmutableMap.Builder<>(); + for (Map resultMap : inputMap) { + if (resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field) { + String fieldName = ((Field) resultMap.getOrigin()).getField().toString(); + ExprType ex = fieldTypes.get(fieldName); + + if (ex == null) { + throw new SemanticCheckException(String.format("no such field %s", fieldName)); + } + + ReferenceExpression origin = new ReferenceExpression(fieldName, ex); + + if (resultMap.getTarget().equals(resultMap.getOrigin())) { + + curEnv.define(origin); + copyMapBuilder.put(origin, origin); + } else { + ReferenceExpression target = + new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex); + curEnv.define(target); + copyMapBuilder.put(origin, target); + } + } else { + throw new SemanticCheckException( + String.format( + "the origin and target expected to be field, but is %s/%s", + resultMap.getOrigin(), resultMap.getTarget())); + } + } + + return copyMapBuilder.build(); + } + /** Logical head is identical to {@link LogicalLimit}. */ public LogicalPlan visitHead(Head node, AnalysisContext context) { LogicalPlan child = node.getChild().get(0).accept(this, context); diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index f27260dd5f..cd3e130597 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -50,6 +50,7 @@ import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.Parse; @@ -227,6 +228,10 @@ public T visitDedupe(Dedupe node, C context) { return visitChildren(node, context); } + public T visitLookup(Lookup node, C context) { + return visitChildren(node, context); + } + public T visitHead(Head node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index d9956609ec..e6f2a68427 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -6,6 +6,7 @@ package org.opensearch.sql.ast.dsl; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -53,6 +54,7 @@ import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Limit; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; @@ -446,6 +448,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List options, Field. return new Dedupe(input, options, Arrays.asList(fields)); } + public static Lookup lookup( + UnresolvedPlan input, + String indexName, + List matchFieldList, + List options, + List copyFieldList) { + return new Lookup(input, indexName, matchFieldList, options, copyFieldList); + } + + public static List fieldMap(String field, String asField, String... more) { + assert more == null || more.length % 2 == 0; + List list = new ArrayList(); + list.add(map(field, asField)); + for (int i = 0; i < more.length; i = i + 2) { + list.add(map(more[i], more[i + 1])); + } + return list; + } + public static Head head(UnresolvedPlan input, Integer size, Integer from) { return new Head(input, size, from); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java new file mode 100644 index 0000000000..d9b22cbd26 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.Map; + +/** AST node represent Lookup operation. */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +public class Lookup extends UnresolvedPlan { + private UnresolvedPlan child; + private final String indexName; + private final List matchFieldList; + private final List options; + private final List copyFieldList; + + @Override + public Lookup attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/Explain.java b/core/src/main/java/org/opensearch/sql/executor/Explain.java index 31890a8090..e8fb8a1218 100644 --- a/core/src/main/java/org/opensearch/sql/executor/Explain.java +++ b/core/src/main/java/org/opensearch/sql/executor/Explain.java @@ -24,6 +24,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; @@ -174,6 +175,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) { "consecutive", node.getConsecutive()))); } + @Override + public ExplainResponseNode visitLookup(LookupOperator node, Object context) { + return explain( + node, + context, + explainNode -> + explainNode.setDescription( + ImmutableMap.of( + "copyfields", node.getCopyFieldMap().toString(), + "matchfields", node.getMatchFieldMap().toString(), + "indexname", node.getIndexName(), + "overwrite", node.getOverwrite()))); + } + @Override public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) { return explain( diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index c988084d1b..7e2cbced5d 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -134,7 +134,8 @@ public PhysicalPlan visitValues(LogicalValues node, C context) { public PhysicalPlan visitLimit(LogicalLimit node, C context) { PhysicalPlan child = visitChild(node, context); // Optimize sort + limit to take ordered operator - if (child instanceof SortOperator sortChild) { + if (child instanceof SortOperator) { + SortOperator sortChild = (SortOperator) child; return new TakeOrderedOperator( sortChild.getInput(), node.getLimit(), node.getOffset(), sortChild.getSortList()); } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java new file mode 100644 index 0000000000..7b0db5bc72 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalLookup.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.logical; + +import java.util.Arrays; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.expression.ReferenceExpression; + +/** Logical Lookup Plan. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = true) +public class LogicalLookup extends LogicalPlan { + + private final String indexName; + private final Map matchFieldMap; + private final Map copyFieldMap; + private final Boolean overwrite; + + /** Constructor of LogicalLookup. */ + public LogicalLookup( + LogicalPlan child, + String indexName, + Map matchFieldMap, + Boolean overwrite, + Map copyFieldMap) { + super(Arrays.asList(child)); + this.indexName = indexName; + this.copyFieldMap = copyFieldMap; + this.matchFieldMap = matchFieldMap; + this.overwrite = overwrite; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java index 13c6d7a979..c5b96dc877 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java @@ -145,4 +145,13 @@ public LogicalPlan values(List... values) { public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) { return new LogicalLimit(input, limit, offset); } + + public static LogicalPlan lookup( + LogicalPlan input, + String indexName, + Map matchFieldMap, + Boolean overwrite, + Map copyFields) { + return new LogicalLookup(input, indexName, matchFieldMap, overwrite, copyFields); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java index c9eedd8efc..7324bbcef5 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) { return visitNode(plan, context); } + public R visitLookup(LogicalLookup plan, C context) { + return visitNode(plan, context); + } + public R visitRename(LogicalRename plan, C context) { return visitNode(plan, context); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java new file mode 100644 index 0000000000..4c9e3bfa65 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/LookupOperator.java @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical; + +import static org.opensearch.sql.data.type.ExprCoreType.STRUCT; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +/** Lookup operator. Perform lookup on another OpenSearch index and enrich the results. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +public class LookupOperator extends PhysicalPlan { + @Getter private final PhysicalPlan input; + @Getter private final String indexName; + @Getter private final Map matchFieldMap; + @Getter private final Map copyFieldMap; + @Getter private final Boolean overwrite; + + @EqualsAndHashCode.Exclude + private final BiFunction, Map> lookup; + + /** Lookup Constructor. */ + @NonNull + public LookupOperator( + PhysicalPlan input, + String indexName, + Map matchFieldMap, + Boolean overwrite, + Map copyFieldMap, + BiFunction, Map> lookup) { + this.input = input; + this.indexName = indexName; + this.matchFieldMap = matchFieldMap; + this.overwrite = overwrite; + this.copyFieldMap = copyFieldMap; + this.lookup = lookup; + } + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return visitor.visitLookup(this, context); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + + @Override + public boolean hasNext() { + return input.hasNext(); + } + + @Override + public ExprValue next() { + ExprValue inputValue = input.next(); + + if (STRUCT == inputValue.type()) { + Map matchMap = new HashMap<>(); + Map finalMap = new HashMap<>(); + + for (Map.Entry matchField : + matchFieldMap.entrySet()) { + Object val = inputValue.bindingTuples().resolve(matchField.getValue()).value(); + if (val != null) { + matchMap.put(matchField.getKey().toString(), val); + } else { + // No value in search results, so we just return the input + return inputValue; + } + } + + finalMap.put("_match", matchMap); + + Map copyMap = new HashMap<>(); + + if (!copyFieldMap.isEmpty()) { + + for (Map.Entry copyField : + copyFieldMap.entrySet()) { + copyMap.put(String.valueOf(copyField.getKey()), String.valueOf(copyField.getValue())); + } + + finalMap.put("_copy", copyMap.keySet()); + } + + Map lookupResult = lookup.apply(indexName, finalMap); + + if (lookupResult == null || lookupResult.isEmpty()) { + // no lookup found or lookup is empty, so we just return the original input value + return inputValue; + } + + Map tupleInputValue = ExprValueUtils.getTupleValue(inputValue); + Map resultTupleBuilder = new LinkedHashMap<>(); + resultTupleBuilder.putAll(tupleInputValue); + for (Map.Entry sourceOfAdditionalField : lookupResult.entrySet()) { + String lookedUpFieldName = sourceOfAdditionalField.getKey(); + Object lookedUpFieldValue = sourceOfAdditionalField.getValue(); + String finalFieldName = copyMap.getOrDefault(lookedUpFieldName, lookedUpFieldName); + ExprValue value = ExprValueUtils.fromObjectValue(lookedUpFieldValue); + if (overwrite) { + resultTupleBuilder.putIfAbsent(finalFieldName, value); + } else { + resultTupleBuilder.put(finalFieldName, value); + } + } + + return ExprTupleValue.fromExprValueMap(resultTupleBuilder); + + } else { + return inputValue; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java index 0c2764112d..ed95cd8dbd 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanDSL.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import lombok.experimental.UtilityClass; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -83,6 +84,17 @@ public static DedupeOperator dedupe( input, Arrays.asList(expressions), allowedDuplication, keepEmpty, consecutive); } + public static LookupOperator lookup( + PhysicalPlan input, + String indexName, + Map matchFieldMap, + Boolean overwrite, + Map copyFieldMap, + BiFunction, Map> lookupFunction) { + return new LookupOperator( + input, indexName, matchFieldMap, overwrite, copyFieldMap, lookupFunction); + } + public WindowOperator window( PhysicalPlan input, NamedExpression windowFunction, WindowDefinition windowDefinition) { return new WindowOperator(input, windowFunction, windowDefinition); diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index 66c7219e39..3b8caf3674 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -64,6 +64,10 @@ public R visitDedupe(DedupeOperator node, C context) { return visitNode(node, context); } + public R visitLookup(LookupOperator node, C context) { + return visitNode(node, context); + } + public R visitValues(ValuesOperator node, C context) { return visitNode(node, context); } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index d6cb0544d8..f15989f987 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -73,6 +73,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.DataType; @@ -87,12 +89,14 @@ import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Kmeans; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.RareTopN.CommandType; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.DSL; @@ -111,10 +115,94 @@ import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.datasource.DataSourceTable; +import org.opensearch.sql.storage.StorageEngine; +import org.opensearch.sql.storage.Table; class AnalyzerTest extends AnalyzerTestBase { + private static final String LOOKUP_TABLE_DEVICE_NAMES = "device_names"; + public static final String NO_SUCH_TABLE = "no_such_table"; + public static final String TABLE_DOES_NOT_EXIST = "does_not_exist"; + + public static Map lookupTableFieldTypes = + new ImmutableMap.Builder() + .put("id", INTEGER) + .put("dev_id", STRING) + .put("serial_number", LONG) + .put("vendor", STRING) + .put("ip_v4", STRING) + .put("firmware_version", LONG) + .put("reliability_factor", DOUBLE) + .put("comment", ExprCoreType.STRING) + .build(); + + private final Table deviceNamesLookupTable; + private Table tableDoesNotExist; + + public AnalyzerTest() { + this.deviceNamesLookupTable = + new Table() { + @Override + public boolean exists() { + return true; + } + + @Override + public void create(Map schema) { + throw new UnsupportedOperationException("Create table is not supported"); + } + + @Override + public Map getFieldTypes() { + return lookupTableFieldTypes; + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } + + public Map getReservedFieldTypes() { + return ImmutableMap.of("_test", STRING); + } + }; + this.tableDoesNotExist = + new Table() { + + @Override + public boolean exists() { + return false; + } + + @Override + public Map getFieldTypes() { + return Map.of(); + } + + @Override + public PhysicalPlan implement(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } + }; + } + + protected StorageEngine storageEngine() { + return (dataSourceSchemaName, tableName) -> { + switch (tableName) { + case NO_SUCH_TABLE: + return null; + case TABLE_DOES_NOT_EXIST: + return tableDoesNotExist; + case LOOKUP_TABLE_DEVICE_NAMES: + return deviceNamesLookupTable; + default: + return table; + } + }; + } + @Test public void filter_relation() { assertAnalyzeEqual( @@ -1871,4 +1959,263 @@ public void visit_close_cursor() { () -> assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor())); } + + @Test + public void visit_lookup_same_field_name_in_query_and_lookup_index() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("overwrite", AstDSL.booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void visit_lookup_use_multiple_fields() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("comment", STRING), + new ReferenceExpression("comment", STRING), + new ReferenceExpression("dev_id", STRING), + new ReferenceExpression("field_value1", STRING)), + false, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of( + AstDSL.map("comment", "comment"), AstDSL.map("dev_id", "field_value1")), + ImmutableList.of(AstDSL.argument("overwrite", AstDSL.booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void visit_lookup_various_field_name_in_query_and_lookup_index() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("dev_id", STRING), + new ReferenceExpression("comment", STRING)), + true, + Collections.emptyMap()), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("dev_id", "comment")), + ImmutableList.of(AstDSL.argument("overwrite", AstDSL.booleanLiteral(true))), + Collections.emptyList())); + } + + @ParameterizedTest + @ValueSource(strings = {NO_SUCH_TABLE, TABLE_DOES_NOT_EXIST}) + public void visit_lookup_should_report_error_when_lookup_table_does_not_exist(String tableName) { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + tableName, + ImmutableList.of(AstDSL.map("string_value", "comment")), + ImmutableList.of(argument("overwrite", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("no_such_field", "comment")), + ImmutableList.of(argument("overwrite", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_lookup_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("no_such_field", "comment")), + ImmutableList.of(argument("overwrite", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_join_field_does_not_exist_in_query_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "no_such_field")), + ImmutableList.of(argument("overwrite", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void + visit_lookup_should_error_when_join_field_does_not_exist_in_query_table_but_exist_lookup_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "dev_id")), + ImmutableList.of(argument("overwrite", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void + visit_lookup_should_error_when_join_field_does_not_exist_in_lookup_table_but_exist_query_table() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("string_value", "field_value2")), + ImmutableList.of(argument("overwrite", booleanLiteral(true))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_target_expression_point_out_function() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map(field("ip_v4"), intLiteral(7))), + ImmutableList.of(argument("overwrite", booleanLiteral(false))), + emptyList()); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_copy_field() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("maker", STRING))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("overwrite", AstDSL.booleanLiteral(false))), + ImmutableList.of(AstDSL.map("vendor", "maker")))); + } + + @Test + public void visit_lookup_copy_multiple_fields() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("maker", STRING), + new ReferenceExpression("serial_number", LONG), + new ReferenceExpression("serial", LONG))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("overwrite", AstDSL.booleanLiteral(false))), + ImmutableList.of( + AstDSL.map("vendor", "maker"), AstDSL.map("serial_number", "serial")))); + } + + @Test + public void visit_lookup_copy_field_when_origin_and_target_is_the_same() { + assertAnalyzeEqual( + LogicalPlanDSL.lookup( + LogicalPlanDSL.relation("schema", table), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), + new ReferenceExpression("field_value2", STRING)), + false, + ImmutableMap.of( + new ReferenceExpression("vendor", STRING), + new ReferenceExpression("vendor", STRING))), + AstDSL.lookup( + AstDSL.relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(AstDSL.argument("overwrite", AstDSL.booleanLiteral(false))), + ImmutableList.of(AstDSL.map("vendor", "vendor")))); + } + + @Test + public void visit_lookup_should_report_error_when_copy_field_does_not_exist() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("overwrite", booleanLiteral(false))), + ImmutableList.of(AstDSL.map("no_such_field", "maker"))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_copy_target_is_not_a_field() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("overwrite", booleanLiteral(false))), + ImmutableList.of(AstDSL.map(AstDSL.field("vendor"), AstDSL.intLiteral(8)))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } + + @Test + public void visit_lookup_should_report_error_when_copy_source_is_not_a_field() { + Lookup lookup = + AstDSL.lookup( + relation("schema"), + LOOKUP_TABLE_DEVICE_NAMES, + ImmutableList.of(AstDSL.map("ip_v4", "field_value2")), + ImmutableList.of(argument("overwrite", booleanLiteral(false))), + ImmutableList.of(AstDSL.map(AstDSL.booleanLiteral(true), AstDSL.field("maker")))); + + assertThrows(SemanticCheckException.class, () -> analyze(lookup)); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java b/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java index febf662843..31ac677bb9 100644 --- a/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/ExplainTest.java @@ -22,6 +22,7 @@ import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.nested; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; @@ -299,6 +300,33 @@ void can_explain_trendline() { explain.apply(plan)); } + @Test + void can_explain_lookup() { + PhysicalPlan plan = + lookup( + tableScan, + "lookup_index_name", + Map.of(ref("lookup_index_field", STRING), ref("query_index_field", STRING)), + true, + Map.of(ref("lookup_index_field_name", STRING), ref("renamed_field", STRING)), + null); + assertEquals( + new ExplainResponse( + new ExplainResponseNode( + "LookupOperator", + Map.of( + "copyfields", + "{lookup_index_field_name=renamed_field}", + "matchfields", + "{lookup_index_field=query_index_field}", + "indexname", + "lookup_index_name", + "overwrite", + true), + singletonList(tableScan.explainNode()))), + explain.apply(plan)); + } + private static class FakeTableScan extends TableScanOperator { @Override public boolean hasNext() { diff --git a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index 43ce23ed56..a303d1c738 100644 --- a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -128,6 +128,7 @@ public TableWriteOperator build(PhysicalPlan child) { LogicalPlan ad = new LogicalAD(relation, Map.of()); LogicalPlan ml = new LogicalML(relation, Map.of()); LogicalPlan paginate = new LogicalPaginate(42, List.of(relation)); + LogicalPlan lookup = new LogicalLookup(relation, "lookup_index", Map.of(), true, Map.of()); List> nestedArgs = List.of( @@ -175,6 +176,7 @@ public TableWriteOperator build(PhysicalPlan child) { nested, cursor, closeCursor, + lookup, trendline) .map(Arguments::of); } @@ -226,5 +228,14 @@ public Integer visitRareTopN(LogicalRareTopN plan, Object context) { .mapToInt(Integer::intValue) .sum(); } + + @Override + public Integer visitLookup(LogicalLookup plan, Object context) { + return 1 + + plan.getChild().stream() + .map(child -> child.accept(this, context)) + .mapToInt(Integer::intValue) + .sum(); + } } } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java new file mode 100644 index 0000000000..382028216c --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/physical/LookupOperatorTest.java @@ -0,0 +1,673 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.sql.planner.physical; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.utils.MatcherUtils.containsNull; +import static org.opensearch.sql.utils.MatcherUtils.containsValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.expression.ReferenceExpression; + +@ExtendWith(MockitoExtension.class) +public class LookupOperatorTest extends PhysicalPlanTestBase { + + public static final String LOOKUP_INDEX = "lookup_index"; + + public static final ImmutableList> LOOKUP_TABLE = + ImmutableList.of( + ImmutableMap.of( + "id", 1, "ip", "v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A"), + ImmutableMap.of( + "id", 2, "ip", "4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + public static final ImmutableList> LOOKUP_TABLE_WITH_NULLS = + ImmutableList.of( + new HashMap<>( + ImmutableMap.of( + "id", 1, "ip", "v4", "ip_v4", "112.111.162.4", "region", "USA", "class", "A")) { + { + put("class", null); + } + }, + ImmutableMap.of( + "id", 2, "ip", "4", "ip_v4", "74.125.19.106", "region", "EU", "class", "A")); + + @Mock private BiFunction, Map> lookupFunction; + + @Test + public void lookup_empty_table() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip", Collections.emptyList())); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "112.111.162.4", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "74.125.19.106", + "action", + "POST", + "response", + 200, + "referer", + "www.google.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of("ip", "74.125.19.106", "action", "POST", "response", 500)))); + } + + @Test + public void lookup_append_only_true() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + } + + @Test + public void lookup_append_only_false() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + false, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "v4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "4"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU")))); + } + + @Test + public void lookup_copy_one_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of( + new ReferenceExpression("class", STRING), + new ReferenceExpression("ip_address_class", STRING)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("ip_address_class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("ip_address_class", "A")))); + } + + @Test + public void lookup_copy_multiple_fields() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of( + new ReferenceExpression("class", STRING), + new ReferenceExpression("class", STRING), + new ReferenceExpression("id", INTEGER), + new ReferenceExpression("address_id", INTEGER)), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("class", "A"), + containsValue("address_id", 1)))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2)))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("class", "A"), + containsValue("address_id", 2)))); + } + + @Test + public void lookup_empty_join_field() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + List queryResults = + new ImmutableList.Builder() + .addAll(inputs) + .add( + ExprValueUtils.tupleValue( + ImmutableMap.of("action", "GET", "response", 200, "referer", "www.amazon.com"))) + .build(); + PhysicalPlan plan = + new LookupOperator( + testScan(queryResults), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of("action", "GET", "response", 200, "referer", "www.amazon.com")))); + } + + @Test + public void lookup_ignore_non_struct_input() { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE)); + ExprStringValue stringExpression = + new ExprStringValue("Expression of string type should be ignored"); + List queryResults = + new ImmutableList.Builder().addAll(inputs).add(stringExpression).build(); + PhysicalPlan plan = + new LookupOperator( + testScan(queryResults), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + true, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(6)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip", "112.111.162.4"), + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip", "74.125.19.106"), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat(result, hasItem(stringExpression)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void lookup_table_with_nulls(boolean overwrite) { + when(lookupFunction.apply(eq(LOOKUP_INDEX), anyMap())) + .thenAnswer(lookupTableQueryResults("ip_v4", LOOKUP_TABLE_WITH_NULLS)); + PhysicalPlan plan = + new LookupOperator( + new TestScan(), + LOOKUP_INDEX, + ImmutableMap.of( + new ReferenceExpression("ip_v4", STRING), new ReferenceExpression("ip", STRING)), + overwrite, + ImmutableMap.of(), + lookupFunction); + + List result = execute(plan); + + assertThat(result, hasSize(5)); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 200, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + ExprValueUtils.tupleValue( + ImmutableMap.of( + "ip", + "209.160.24.63", + "action", + "GET", + "response", + 404, + "referer", + "www.amazon.com")))); + assertThat( + result, + hasItem( + allOf( + containsValue("ip_v4", "112.111.162.4"), + containsValue("region", "USA"), + containsNull("class")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 200), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + assertThat( + result, + hasItem( + allOf( + containsValue("response", 500), + containsValue("ip_v4", "74.125.19.106"), + containsValue("region", "EU"), + containsValue("class", "A")))); + } + + private static @NotNull Answer> lookupTableQueryResults( + String lookupTableFieldName, List> lookupTableContent) { + return invocationOnMock -> { + String lookupTableName = invocationOnMock.getArgument(0); + if (!LOOKUP_INDEX.equals(lookupTableName)) { + return ImmutableMap.of(); + } + HashMap> parameters = invocationOnMock.getArgument(1); + String valueOfJoinFieldInLookupTable = + (String) parameters.get("_match").get(lookupTableFieldName); + if (Objects.isNull(valueOfJoinFieldInLookupTable)) { + return null; + } + return lookupTableContent.stream() + .filter(map -> valueOfJoinFieldInLookupTable.equals(map.get(lookupTableFieldName))) + .findAny() + .orElse(ImmutableMap.of()); + }; + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java index 26f288e6b6..06350aeb58 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java @@ -18,6 +18,7 @@ import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.lookup; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.remove; @@ -68,6 +69,16 @@ public void print_physical_plan() { agg( rareTopN( filter( + limit( + lookup( + new TestScan(), + "lookup_index", + Map.of(), + true, + Map.of(), + null), + 1, + 1), limit( new TrendlineOperator( new TestScan(), @@ -99,6 +110,8 @@ public void print_physical_plan() { + "\t\t\t\t\tFilter->\n" + "\t\t\t\t\t\tLimit->\n" + "\t\t\t\t\t\t\tTrendline->", + + "\t\t\t\t\t\tLimit->\n" + + "\t\t\t\t\t\t\tLookup->", printer.print(plan)); } @@ -153,6 +166,8 @@ public static Stream getPhysicalPlanForTest() { Collections.singletonList( Pair.of(AstDSL.computation(1, AstDSL.field("field"), "alias", SMA), DOUBLE))); + PhysicalPlan lookup = lookup(plan, "lookup_index", Map.of(), false, Map.of(), null); + return Stream.of( Arguments.of(filter, "filter"), Arguments.of(aggregation, "aggregation"), @@ -169,7 +184,8 @@ public static Stream getPhysicalPlanForTest() { Arguments.of(limit, "limit"), Arguments.of(nested, "nested"), Arguments.of(cursorClose, "cursorClose"), - Arguments.of(trendline, "trendline")); + Arguments.of(trendline, "trendline"), + Arguments.of(lookup, "Lookup")); } @ParameterizedTest(name = "{1}") @@ -243,6 +259,11 @@ public String visitLimit(LimitOperator node, Integer tabs) { return name(node, "Limit->", tabs); } + @Override + public String visitLookup(LookupOperator node, Integer tabs) { + return name(node, "Lookup->", tabs); + } + @Override public String visitTrendline(TrendlineOperator node, Integer tabs) { return name(node, "Trendline->", tabs); diff --git a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java index 206f05a38a..6b0d5e2ac7 100644 --- a/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java +++ b/core/src/test/java/org/opensearch/sql/utils/MatcherUtils.java @@ -5,7 +5,9 @@ package org.opensearch.sql.utils; +import java.util.Objects; import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; import org.hamcrest.TypeSafeMatcher; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; @@ -41,4 +43,77 @@ protected boolean matchesSafely(ExprValue value) { } }; } + + public static TypeSafeDiagnosingMatcher containsValue(String key, Object value) { + Objects.requireNonNull(key, "Key is required"); + Objects.requireNonNull(value, "Value is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if (Objects.isNull(expressionForKey)) { + mismatchDescription + .appendValue(item) + .appendText(" does not contain key ") + .appendValue(key); + return false; + } + Object givenValue = expressionForKey.value(); + if (value.equals(givenValue)) { + return true; + } + mismatchDescription + .appendText(" value for key ") + .appendValue(key) + .appendText(" was ") + .appendValue(givenValue); + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("ExprValue should contain key ") + .appendValue(key) + .appendText(" with string value ") + .appendValue(value); + } + }; + } + + public static TypeSafeDiagnosingMatcher containsNull(String key) { + Objects.requireNonNull(key, "Key is required"); + return new TypeSafeDiagnosingMatcher<>() { + + @Override + protected boolean matchesSafely(ExprValue item, Description mismatchDescription) { + ExprValue expressionForKey = item.keyValue(key); + if (Objects.isNull(expressionForKey)) { + mismatchDescription + .appendValue(item) + .appendText(" does not contain key ") + .appendValue(key); + return false; + } + if (expressionForKey.isNull()) { + return true; + } + mismatchDescription + .appendText(" value for key ") + .appendValue(key) + .appendText(" was ") + .appendValue(expressionForKey.value()); + return false; + } + + @Override + public void describeTo(Description description) { + description + .appendText("ExprValue should contain key ") + .appendValue(key) + .appendText(" with null value "); + } + }; + } } diff --git a/docs/category.json b/docs/category.json index 32f56cfb46..16d993960d 100644 --- a/docs/category.json +++ b/docs/category.json @@ -9,6 +9,7 @@ "ppl_cli": [ "user/ppl/cmd/ad.rst", "user/ppl/cmd/dedup.rst", + "user/ppl/cmd/lookup.rst", "user/ppl/cmd/describe.rst", "user/ppl/cmd/showdatasources.rst", "user/ppl/cmd/information_schema.rst", diff --git a/docs/user/ppl/cmd/lookup.rst b/docs/user/ppl/cmd/lookup.rst new file mode 100644 index 0000000000..75ae380421 --- /dev/null +++ b/docs/user/ppl/cmd/lookup.rst @@ -0,0 +1,153 @@ +============= +lookup +============= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| Use the ``lookup`` command to do a lookup from another index and add the fields and values from the lookup document to the search result. + +Syntax +============ +lookup [AS ] ["," [AS ]]... [overwrite=] [ [AS ]] ["," [AS ]]... + +* lookup-index: mandatory. the name of the lookup index. If more than one is provided, all of them must match. +* lookup-field: mandatory. the name of the lookup field. Must be existing in the lookup-index. It is used to match to a local field (in the current search) to get the lookup document. When there is no lookup document matching it is a no-op. If there is more than one an exception is thrown. +* local-lookup-field: optional. the name of a field in the current search to match against the lookup-field. **Default:** value of lookup-field. +* overwrite: optional. indicates if the values to copy over to the search result from the lookup document should overwrite existing values. If true no existing values are overwritten. **Default:** false. +* source-field: optional. the fields to copy over from the lookup document to the search result. If no such fields are given all fields are copied. **Default:** all fields +* local-source-field: optional. the final name of the field in the search result (if different from the field name in the lookup document). **Default:** value of source-field. + +Note: To check if there is a match between the lookup index and the current search result a term and a match query for the field value of lookup-field is performed. + +Example 1: Simple lookup +============================= + +The example shows a simple lookup to add the name of a person from a lookup index. + +PPL query:: + + os> source=account_data; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | gender | + |------------------+----------| + | 1 | M | + | 13 | F | + +------------------+----------+ + + os> source=hr; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | name | + |------------------+----------| + | 1 | John | + | 13 | Alice | + +------------------+----------+ + + os> source=account_data | lookup hr account_number; + fetched rows / total rows = 2/2 + +------------------+----------+----------+ + | account_number | gender | name | + |------------------+----------|----------| + | 1 | M | John | + | 13 | F | Alice | + +------------------+----------+----------+ + + +Example 2: Lookup with different field names +============================================ + +The example show a lookup to add the name of a person from a lookup index with different field names. + +PPL query:: + + os> source=account_data; + fetched rows / total rows = 2/2 + +------------------+----------+ + | account_number | gender | + |------------------+----------| + | 1 | M | + | 13 | F | + +------------------+----------+ + + os> source=hr; + fetched rows / total rows = 2/2 + +------------------+----------+ + | employee_number | name | + |------------------+----------| + | 1 | John | + | 13 | Alice | + +------------------+----------+ + + os> source=account_data | lookup hr employee_number AS account_number name AS given_name; + fetched rows / total rows = 2/2 + +------------------+----------+----------------+ + | account_number | gender | given_name | + |------------------+----------|----------------| + | 1 | M | John | + | 13 | F | Alice | + +------------------+----------+----------------+ + +Example 3: Full lookup example +============================== + +The example show a lookup to add the name of a person from a lookup index with different field names. + +PPL query:: + + os> source=account_data; + fetched rows / total rows = 4/4 + +------------------+----------+------------+------------------+ + | account_number | gender | department | name | + |------------------+----------+------------+------------------| + | 1 | M | finance | John Miller | + | 13 | F | it | Melinda Williams | + | 20 | M | it | NULL | + | 21 | F | finance | Mandy Smith | + +------------------+----------+------------+------------------+ + + os> source=hr; + fetched rows / total rows = 5/5 + +------------------+--------------+------------+--------+ + | employee_number | name | dep | active | + |------------------+--------------|------------|--------| + | 1 | John n/a | finance | true | + | 13 | Alice n/a | finance | false | + | 13 | Melinda n/a | it | true | + | 19 | Jack n/a | finance | true | + | 21 | NULL | finance | false | + +------------------+--------------+------------+--------+ + + os> source=account_data | lookup hr employee_number AS account_number, dep AS department overwrite=true; + fetched rows / total rows = 4/4 + +------------------+----------+------------------+------------+-----------+---------+-----------------+ + | account_number | gender | name | department | active | dep | employee_number | + |------------------+----------|------------------|------------|-----------|---------|-----------------| + | 1 | M | John Miller | finance | true | finance | 1 | + | 13 | F | Melinda Williams | it | true | it | 13 | + | 20 | M | NULL | it | NULL | NULL | NULL | + | 21 | F | Mandy Smith | it | NULL | it | 21 | + +------------------+----------+------------------+------------+-----------+---------+-----------------+ + + os> source=account_data | lookup hr employee_number AS account_number, dep AS department overwrite=false; + fetched rows / total rows = 4/4 + +------------------+----------+------------------+------------+-----------+---------+-----------------+ + | account_number | gender | name | department | active | dep | employee_number | + |------------------+----------|------------------|------------|-----------|---------|-----------------| + | 1 | M | John n/a | finance | true | finance | 1 | + | 13 | F | Melinda /na | it | true | it | 13 | + | 20 | M | NULL | it | NULL | NULL | NULL | + | 21 | F | Mandy Smith | it | NULL | it | 21 | + +------------------+----------+------------------+------------+-----------+---------+-----------------+ + + +Limitation +========== +The ``lookup`` command is not rewritten to OpenSearch DSL, it is only executed on the coordination node. diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 5b956fb5d3..4075ab8c1d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -741,6 +741,16 @@ public enum Index { "multi_nested", getNestedTypeIndexMapping(), "src/test/resources/nested_with_nulls.json"), + IOT_READINGS( + TestsConstants.TEST_INDEX_IOT_READINGS, + "iot_readings", + getMappingFile("iot_readings_index_mapping.json"), + "src/test/resources/iot_readings.json"), + IOT_SENSORS( + TestsConstants.TEST_INDEX_IOT_SENSORS, + "iot_sensors", + getMappingFile("iot_sensors_index_mapping.json"), + "src/test/resources/iot_sensors.json"), GEOPOINTS( TestsConstants.TEST_INDEX_GEOPOINT, "dates", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 73838feb4f..02b916d07e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -58,6 +58,8 @@ public class TestsConstants { public static final String TEST_INDEX_MULTI_NESTED_TYPE = TEST_INDEX + "_multi_nested"; public static final String TEST_INDEX_NESTED_WITH_NULLS = TEST_INDEX + "_nested_with_nulls"; public static final String TEST_INDEX_GEOPOINT = TEST_INDEX + "_geopoint"; + public static final String TEST_INDEX_IOT_READINGS = TEST_INDEX + "_iot_readings"; + public static final String TEST_INDEX_IOT_SENSORS = TEST_INDEX + "_iot_sensors"; public static final String DATASOURCES = ".ql-datasources"; public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java new file mode 100644 index 0000000000..1353cf7ddf --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/LookupCommandIT.java @@ -0,0 +1,267 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_IOT_READINGS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_IOT_SENSORS; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; + +import java.io.IOException; +import java.math.BigDecimal; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +public class LookupCommandIT extends PPLIntegTestCase { + + @Override + public void init() throws IOException { + loadIndex(Index.IOT_READINGS); + loadIndex(Index.IOT_SENSORS); + } + + @Test + public void testLookup() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows( + new BigDecimal("28.1"), + 255, + "2015-01-20 15:31:32.406431", + "temperature-basement", + "meter", + 255, + "VendorOne"), + rows( + new BigDecimal("27.8"), + 256, + "2016-01-20 15:31:33.509334", + "temperature-living-room", + "temperature meter", + 256, + "VendorTwo"), + rows( + new BigDecimal("27.4"), + 257, + "2017-01-20 15:31:35.732436", + "temperature-bedroom", + "camcorder", + 257, + "VendorThree"), + rows( + new BigDecimal("28.5"), + 255, + "2018-01-20 15:32:32.406431", + "temperature-basement", + "meter", + 255, + "VendorOne"), + rows( + new BigDecimal("27.9"), + 256, + "2019-01-20 15:32:33.509334", + "temperature-living-room", + "temperature meter", + 256, + "VendorTwo"), + rows( + new BigDecimal("27.4"), + 257, + "2020-01-20 15:32:35.732436", + "temperature-bedroom", + "camcorder", + 257, + "VendorThree")); + } + + @Test + public void testLookupSelectedAttribute() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id type, vendor | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(new BigDecimal("28.1"), 255, "2015-01-20 15:31:32.406431", "meter", "VendorOne"), + rows( + new BigDecimal("27.8"), + 256, + "2016-01-20 15:31:33.509334", + "temperature meter", + "VendorTwo"), + rows(new BigDecimal("27.4"), 257, "2017-01-20 15:31:35.732436", "camcorder", "VendorThree"), + rows(new BigDecimal("28.5"), 255, "2018-01-20 15:32:32.406431", "meter", "VendorOne"), + rows( + new BigDecimal("27.9"), + 256, + "2019-01-20 15:32:33.509334", + "temperature meter", + "VendorTwo"), + rows( + new BigDecimal("27.4"), 257, "2020-01-20 15:32:35.732436", "camcorder", "VendorThree")); + } + + @Test + public void testLookupRenameSelectedAttributes() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id did as dev_id, type as kind, vendor | sort" + + " @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(new BigDecimal("28.1"), 255, "2015-01-20 15:31:32.406431", 255, "meter", "VendorOne"), + rows( + new BigDecimal("27.8"), + 256, + "2016-01-20 15:31:33.509334", + 256, + "temperature meter", + "VendorTwo"), + rows( + new BigDecimal("27.4"), + 257, + "2017-01-20 15:31:35.732436", + 257, + "camcorder", + "VendorThree"), + rows(new BigDecimal("28.5"), 255, "2018-01-20 15:32:32.406431", 255, "meter", "VendorOne"), + rows( + new BigDecimal("27.9"), + 256, + "2019-01-20 15:32:33.509334", + 256, + "temperature meter", + "VendorTwo"), + rows( + new BigDecimal("27.4"), + 257, + "2020-01-20 15:32:35.732436", + 257, + "camcorder", + "VendorThree")); + } + + @Test + public void testLookupSelectedMultipleAttributes() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | lookup %s did as device-id type | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(new BigDecimal("28.1"), 255, "2015-01-20 15:31:32.406431", "meter"), + rows(new BigDecimal("27.8"), 256, "2016-01-20 15:31:33.509334", "temperature meter"), + rows(new BigDecimal("27.4"), 257, "2017-01-20 15:31:35.732436", "camcorder"), + rows(new BigDecimal("28.5"), 255, "2018-01-20 15:32:32.406431", "meter"), + rows(new BigDecimal("27.9"), 256, "2019-01-20 15:32:33.509334", "temperature meter"), + rows(new BigDecimal("27.4"), 257, "2020-01-20 15:32:35.732436", "camcorder")); + } + + @Test + public void testLookupShouldOverwriteShouldBeFalseByDefault() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | rename temperature as vendor | lookup %s did as device-id | sort" + + " @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows(255, "2015-01-20 15:31:32.406431", "VendorOne", "temperature-basement", "meter", 255), + rows( + 256, + "2016-01-20 15:31:33.509334", + "VendorTwo", + "temperature-living-room", + "temperature meter", + 256), + rows( + 257, + "2017-01-20 15:31:35.732436", + "VendorThree", + "temperature-bedroom", + "camcorder", + 257), + rows(255, "2018-01-20 15:32:32.406431", "VendorOne", "temperature-basement", "meter", 255), + rows( + 256, + "2019-01-20 15:32:33.509334", + "VendorTwo", + "temperature-living-room", + "temperature meter", + 256), + rows( + 257, + "2020-01-20 15:32:35.732436", + "VendorThree", + "temperature-bedroom", + "camcorder", + 257)); + } + + @Test + public void testLookupWithOverwriteFalse() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s | rename temperature as vendor | lookup %s did as device-id overwrite =" + + " true | sort @timestamp", + TEST_INDEX_IOT_READINGS, TEST_INDEX_IOT_SENSORS)); + verifyDataRows( + result, + rows( + 255, + "2015-01-20 15:31:32.406431", + new BigDecimal("28.1"), + "temperature-basement", + "meter", + 255), + rows( + 256, + "2016-01-20 15:31:33.509334", + new BigDecimal("27.8"), + "temperature-living-room", + "temperature meter", + 256), + rows( + 257, + "2017-01-20 15:31:35.732436", + new BigDecimal("27.4"), + "temperature-bedroom", + "camcorder", + 257), + rows( + 255, + "2018-01-20 15:32:32.406431", + new BigDecimal("28.5"), + "temperature-basement", + "meter", + 255), + rows( + 256, + "2019-01-20 15:32:33.509334", + new BigDecimal("27.9"), + "temperature-living-room", + "temperature meter", + 256), + rows( + 257, + "2020-01-20 15:32:35.732436", + new BigDecimal("27.4"), + "temperature-bedroom", + "camcorder", + 257)); + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/iot_readings_index_mappings.json b/integ-test/src/test/resources/indexDefinitions/iot_readings_index_mappings.json new file mode 100644 index 0000000000..a97c6c170e --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/iot_readings_index_mappings.json @@ -0,0 +1,24 @@ +{ + "mappings": { + "properties": { + "device-id": { + "type": "long" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "temperature": { + "type": "float" + }, + "timestamp": { + "type": "date" + } + } + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/indexDefinitions/iot_sensors_index_mappings.json b/integ-test/src/test/resources/indexDefinitions/iot_sensors_index_mappings.json new file mode 100644 index 0000000000..e9682a390e --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/iot_sensors_index_mappings.json @@ -0,0 +1,36 @@ +{ + "mappings": { + "properties": { + "did": { + "type": "long" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "type": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "vendor": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/iot_readings.json b/integ-test/src/test/resources/iot_readings.json new file mode 100644 index 0000000000..be7e542e87 --- /dev/null +++ b/integ-test/src/test/resources/iot_readings.json @@ -0,0 +1,13 @@ +{ "index" : { "_id" : "1" } } +{ "device-id":255, "temperature":28.1, "@timestamp":"2015-01-20T15:31:32.406431+00:00" } +{ "index" : { "_id" : "2" } } +{ "device-id":256, "temperature":27.8, "@timestamp":"2016-01-20T15:31:33.509334+00:00" } +{ "index" : { "_id" : "3" } } +{ "device-id":257, "temperature":27.4, "@timestamp":"2017-01-20T15:31:35.732436+00:00" } +{ "index" : { "_id" : "4" } } +{ "device-id":255, "temperature":28.5, "@timestamp":"2018-01-20T15:32:32.406431+00:00" } +{ "index" : { "_id" : "5" } } +{ "device-id":256, "temperature":27.9, "@timestamp":"2019-01-20T15:32:33.509334+00:00" } +{ "index" : { "_id" : "6" } } +{ "device-id":257, "temperature":27.4, "@timestamp":"2020-01-20T15:32:35.732436+00:00" } +{ "index" : { "_id" : "7" } } diff --git a/integ-test/src/test/resources/iot_sensors.json b/integ-test/src/test/resources/iot_sensors.json new file mode 100644 index 0000000000..a36dd0aaa4 --- /dev/null +++ b/integ-test/src/test/resources/iot_sensors.json @@ -0,0 +1,6 @@ +{ "index" : { "_id" : "1" } } +{ "did" : 255, "name":"temperature-basement", "vendor":"VendorOne", "type":"meter"} +{ "index" : { "_id" : "2" } } +{ "did" : 256, "name":"temperature-living-room", "vendor":"VendorTwo", "type":"temperature meter" } +{ "index" : { "_id" : "3" } } +{ "did" : 257, "name":"temperature-bedroom", "vendor":"VendorThree", "type":"camcorder"} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index 358bc10ab4..7fd2908bc3 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -16,6 +16,7 @@ import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; import org.opensearch.sql.planner.physical.LimitOperator; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -118,6 +119,17 @@ public PhysicalPlan visitDedupe(DedupeOperator node, Object context) { node.getConsecutive()); } + @Override + public PhysicalPlan visitLookup(LookupOperator node, Object context) { + return new LookupOperator( + visitInput(node.getInput(), context), + node.getIndexName(), + node.getMatchFieldMap(), + node.getOverwrite(), + node.getCopyFieldMap(), + node.getLookup()); + } + @Override public PhysicalPlan visitWindow(WindowOperator node, Object context) { return new WindowOperator( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index b8822cd1e8..d742996c73 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -9,9 +9,21 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.Nullable; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.node.NodeClient; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; @@ -28,9 +40,11 @@ import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScanBuilder; import org.opensearch.sql.planner.DefaultImplementor; import org.opensearch.sql.planner.logical.LogicalAD; +import org.opensearch.sql.planner.logical.LogicalLookup; import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.physical.LookupOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; @@ -209,5 +223,82 @@ public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) { public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); } + + @Override + public PhysicalPlan visitLookup(LogicalLookup node, OpenSearchIndexScan context) { + SingleRowQuery singleRowQuery = new SingleRowQuery(client); + return new LookupOperator( + visitChild(node, context), + node.getIndexName(), + node.getMatchFieldMap(), + node.getOverwrite(), + node.getCopyFieldMap(), + lookup(singleRowQuery)); + } + + BiFunction, Map> lookup( + SingleRowQuery singleRowQuery) { + Objects.requireNonNull(singleRowQuery, "SingleRowQuery is required to perform lookup"); + return (indexName, inputMap) -> { + Map matchMap = (Map) inputMap.get("_match"); + Set copySet = (Set) inputMap.get("_copy"); + return singleRowQuery.executeQuery(indexName, matchMap, copySet); + }; + } + } + + static class SingleRowQuery { + + private final NodeClient nodeClient; + + public SingleRowQuery(OpenSearchClient openSearchClient) { + Objects.requireNonNull(openSearchClient, "Opensearch client is required to perform lookup"); + this.nodeClient = + Objects.requireNonNull( + openSearchClient.getNodeClient(), + "Can not perform lookup because openSearchClient was null. This is likely a bug."); + } + + public @Nullable Map executeQuery( + String indexName, Map matchMap, Set copySet) { + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + + for (Map.Entry f : matchMap.entrySet()) { + BoolQueryBuilder orQueryBuilder = new BoolQueryBuilder(); + + // Todo: Search with term and a match query? Or terms only? + orQueryBuilder.should(new TermQueryBuilder(f.getKey(), f.getValue().toString())); + orQueryBuilder.should(new MatchQueryBuilder(f.getKey(), f.getValue().toString())); + orQueryBuilder.minimumShouldMatch(1); + + // filter is the same as "must" but ignores scoring + boolQueryBuilder.filter(orQueryBuilder); + } + + SearchResponse result = + nodeClient + .search( + new SearchRequest(indexName) + .source( + SearchSourceBuilder.searchSource() + .fetchSource( + copySet == null ? null : copySet.toArray(new String[0]), null) + .query(boolQueryBuilder) + .size(2))) + .actionGet(); + + SearchHit[] searchHits = result.getHits().getHits(); + int hits = searchHits.length; + if (hits == 0) { + // null indicates no hits for the lookup found + return null; + } + + if (hits != 1) { + throw new RuntimeException("too many hits for " + indexName + " (" + hits + ")"); + } + + return searchHits[0].getSourceAsMap(); + } } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 724178bd34..f97cd80512 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; @@ -85,6 +86,8 @@ class OpenSearchExecutionProtectorTest { @Mock private OpenSearchSettings settings; + @Mock private BiFunction, Map> lookupFunction; + private OpenSearchExecutionProtector executionProtector; @BeforeEach @@ -127,59 +130,71 @@ void test_protect_indexScan() { client); assertEquals( PhysicalPlanDSL.project( - PhysicalPlanDSL.limit( - PhysicalPlanDSL.dedupe( - PhysicalPlanDSL.rareTopN( - resourceMonitor( - PhysicalPlanDSL.sort( - PhysicalPlanDSL.eval( - PhysicalPlanDSL.remove( - PhysicalPlanDSL.rename( - PhysicalPlanDSL.agg( - filter( - resourceMonitor( - new OpenSearchIndexScan( - client, maxResultWindow, request)), - filterExpr), - aggregators, - groupByExprs), - mappings), - exclude), - newEvalField), - sortField)), - CommandType.TOP, - topExprs, - topField), - dedupeField), - limit, - offset), - include), - executionProtector.protect( - PhysicalPlanDSL.project( + PhysicalPlanDSL.lookup( PhysicalPlanDSL.limit( PhysicalPlanDSL.dedupe( PhysicalPlanDSL.rareTopN( - PhysicalPlanDSL.sort( - PhysicalPlanDSL.eval( - PhysicalPlanDSL.remove( - PhysicalPlanDSL.rename( - PhysicalPlanDSL.agg( - filter( - new OpenSearchIndexScan( - client, maxResultWindow, request), - filterExpr), - aggregators, - groupByExprs), - mappings), - exclude), - newEvalField), - sortField), + resourceMonitor( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + filter( + resourceMonitor( + new OpenSearchIndexScan( + client, maxResultWindow, request)), + filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortField)), CommandType.TOP, topExprs, topField), dedupeField), limit, offset), + "lookup_index_name", + Map.of(), + false, + Map.of(), + lookupFunction), + include), + executionProtector.protect( + PhysicalPlanDSL.project( + PhysicalPlanDSL.lookup( + PhysicalPlanDSL.limit( + PhysicalPlanDSL.dedupe( + PhysicalPlanDSL.rareTopN( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + filter( + new OpenSearchIndexScan( + client, maxResultWindow, request), + filterExpr), + aggregators, + groupByExprs), + mappings), + exclude), + newEvalField), + sortField), + CommandType.TOP, + topExprs, + topField), + dedupeField), + limit, + offset), + "lookup_index_name", + Map.of(), + false, + Map.of(), + lookupFunction), include))); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 3f8a07f495..fcc49545a7 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -29,8 +29,11 @@ import static org.opensearch.sql.planner.logical.LogicalPlanDSL.sort; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -38,7 +41,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.node.NodeClient; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.common.setting.Settings; @@ -54,6 +59,8 @@ import org.opensearch.sql.opensearch.mapping.IndexMapping; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex.OpenSearchDefaultImplementor; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex.SingleRowQuery; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScan; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanDSL; @@ -66,6 +73,9 @@ class OpenSearchIndexTest { public static final TimeValue SCROLL_TIMEOUT = new TimeValue(1); public static final OpenSearchRequest.IndexName INDEX_NAME = new OpenSearchRequest.IndexName("test"); + public static final String LOOKUP_INDEX_NAME = "lookup-index-name"; + public static final String LOOKUP_TABLE_FIELD = "lookup_table_field"; + public static final String QUERY_FIELD = "query_field"; @Mock private OpenSearchClient client; @@ -75,6 +85,8 @@ class OpenSearchIndexTest { @Mock private IndexMapping mapping; + @Mock private NodeClient nodeClient; + private OpenSearchIndex index; @BeforeEach @@ -229,6 +241,7 @@ void implementRelationOperatorWithOptimization() { void implementOtherLogicalOperators() { when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(client.getNodeClient()).thenReturn(nodeClient); NamedExpression include = named("age", ref("age", INTEGER)); ReferenceExpression exclude = ref("name", STRING); ReferenceExpression dedupeField = ref("name", STRING); @@ -241,34 +254,63 @@ void implementOtherLogicalOperators() { LogicalPlan plan = project( - LogicalPlanDSL.dedupe( - sort( - eval( - remove(rename(index.createScanBuilder(), mappings), exclude), newEvalField), - sortField), - dedupeField), + LogicalPlanDSL.lookup( + LogicalPlanDSL.dedupe( + sort( + eval( + remove(rename(index.createScanBuilder(), mappings), exclude), + newEvalField), + sortField), + dedupeField), + LOOKUP_INDEX_NAME, + Map.of( + new ReferenceExpression(LOOKUP_TABLE_FIELD, STRING), + new ReferenceExpression(QUERY_FIELD, STRING)), + true, + Collections.emptyMap()), include); Integer maxResultWindow = index.getMaxResultWindow(); + BiFunction, Map> anyBifunction = + new BiFunction<>() { + @Override + public Map apply(String s, Map stringObjectMap) { + return Map.of(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof BiFunction; + } + }; + final var requestBuilder = new OpenSearchRequestBuilder(QUERY_SIZE_LIMIT, exprValueFactory, settings); assertEquals( PhysicalPlanDSL.project( - PhysicalPlanDSL.dedupe( - PhysicalPlanDSL.sort( - PhysicalPlanDSL.eval( - PhysicalPlanDSL.remove( - PhysicalPlanDSL.rename( - new OpenSearchIndexScan( - client, - QUERY_SIZE_LIMIT, - requestBuilder.build( - INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), - mappings), - exclude), - newEvalField), - sortField), - dedupeField), + PhysicalPlanDSL.lookup( + PhysicalPlanDSL.dedupe( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + new OpenSearchIndexScan( + client, + QUERY_SIZE_LIMIT, + requestBuilder.build( + INDEX_NAME, maxResultWindow, SCROLL_TIMEOUT, client)), + mappings), + exclude), + newEvalField), + sortField), + dedupeField), + LOOKUP_INDEX_NAME, + Map.of( + new ReferenceExpression(LOOKUP_TABLE_FIELD, STRING), + new ReferenceExpression(QUERY_FIELD, STRING)), + true, + Collections.emptyMap(), + anyBifunction), include), index.implement(plan)); } @@ -281,4 +323,21 @@ void isFieldTypeTolerance() { assertTrue(index.isFieldTypeTolerance()); assertFalse(index.isFieldTypeTolerance()); } + + @Test + public void lookupShouldExecuteQuery() { + OpenSearchDefaultImplementor implementor = new OpenSearchDefaultImplementor(client); + Map matchMap = Map.of("column name", "required value"); + Set copySet = Set.of("column_1", "column_2"); + Map parameters = Map.of("_match", matchMap, "_copy", copySet); + SingleRowQuery singleRowQuery = Mockito.mock(SingleRowQuery.class); + Map resultRow = Map.of("column_1", 1, "column_2", 2); + when(singleRowQuery.executeQuery("lookup_index_name", matchMap, copySet)).thenReturn(resultRow); + BiFunction, Map> lookup = + implementor.lookup(singleRowQuery); + + Map givenResult = lookup.apply("lookup_index_name", parameters); + + assertEquals(resultRow, givenResult); + } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/SingleRowQueryTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/SingleRowQueryTest.java new file mode 100644 index 0000000000..bd8369878d --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/SingleRowQueryTest.java @@ -0,0 +1,251 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyArray; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex.SingleRowQuery; + +@ExtendWith(MockitoExtension.class) +class SingleRowQueryTest { + + @Mock private OpenSearchClient openSearchClient; + @Mock private NodeClient nodeClient; + @Mock private ActionFuture searchFuture; + @Mock private SearchResponse response; + @Mock private SearchHits searchHits; + @Mock private SearchHit hit; + @Captor private ArgumentCaptor searchRequestCaptor; + + private SingleRowQuery singleRowQuery; + + @BeforeEach + public void beforeEach() { + when(openSearchClient.getNodeClient()).thenReturn(nodeClient); + singleRowQuery = new SingleRowQuery(openSearchClient); + } + + @AfterEach + public void shouldUseExactlyOneSearch() { + verify(nodeClient, times(1)).search(any()); + } + + @Test + void shouldReturnNullWhenRowDoesNotExist() { + Map predicates = Map.of("column_name", "value 1"); + Set projection = Set.of("returned column name"); + mockSearch(new SearchHit[0]); + + Map row = singleRowQuery.executeQuery("index_name", predicates, projection); + + assertThat(row, nullValue()); + } + + @Test + void shouldThrowExceptionWhenMoreThanOneRowIsFound() { + Map predicates = Map.of("column_name", "value 1"); + Set projection = Set.of("returned column name"); + mockSearch(new SearchHit[2]); + + RuntimeException ex = + assertThrows( + RuntimeException.class, + () -> singleRowQuery.executeQuery("index_name", predicates, projection)); + + assertThat(ex.getMessage(), Matchers.containsString("too many hits")); + } + + @ParameterizedTest + @ValueSource(strings = {"table_1", "other_table", "yet_another_table"}) + void shouldQueryCorrectTable(String tableName) { + Map predicates = Map.of("column_name", "value row criteria"); + Set projection = Set.of("returned column name"); + + mockSearch(new SearchHit[0]); + + Map row = singleRowQuery.executeQuery(tableName, predicates, projection); + + SearchRequest searchRequest = searchRequestCaptor.getValue(); + assertThat(searchRequest.indices(), equalTo(new String[] {tableName})); + assertThat(row, nullValue()); + } + + @ParameterizedTest + @CsvSource({ + "field,value row criteria,fetched_column_name_one", + "column,another value row criteria,another_fetched_column_name", + "attribute,yet another value row criteria,third_fetched_column_name", + "regular_name,one_2_three,this_is_the_fetched_column_name", + "extra_ordinary_column_name,I am an expected value,my_name_is_the_fetched_column", + "nice_column_name,last value row criteria,my_nice_fetched_column", + }) + public void shouldBuildOpenSearchQueryForOnePredicate( + String columnName, String valuePredicate, String projection) { + Map predicates = Map.of(columnName, valuePredicate); + mockSearch(new SearchHit[0]); + + Map row = + singleRowQuery.executeQuery("index_name", predicates, Set.of(projection)); + + SearchRequest searchRequest = searchRequestCaptor.getValue(); + BoolQueryBuilder filterQueryForSinglePredicate = new BoolQueryBuilder(); + filterQueryForSinglePredicate.should(new TermQueryBuilder(columnName, valuePredicate)); + filterQueryForSinglePredicate.should(new MatchQueryBuilder(columnName, valuePredicate)); + filterQueryForSinglePredicate.minimumShouldMatch(1); + BoolQueryBuilder expectedQuery = new BoolQueryBuilder(); + expectedQuery.filter(filterQueryForSinglePredicate); + + assertThat(searchRequest.source().query(), equalTo(expectedQuery)); + assertThat(searchRequest.source().size(), equalTo(2)); + assertThat(searchRequest.source().fetchSource().includes(), equalTo(new String[] {projection})); + assertThat(searchRequest.source().fetchSource().excludes(), emptyArray()); + assertThat(row, nullValue()); + } + + @ParameterizedTest + @CsvSource({ + "columnName1,columnName2,columnName3", + "columnName4,columnName5,columnName6", + "columnName,columnName8,columnName9", + "extraOrdinaryOne,eXtraOrdinaryTwo,extraOrdinaryThree" + }) + void shouldFetchVariousColumns(String columnOne, String columnTwo, String columnThree) { + Map predicates = Map.of("find_only_row", "with_value_abc"); + mockSearch(new SearchHit[0]); + + Map row = + singleRowQuery.executeQuery( + "index_name", predicates, Set.of(columnOne, columnTwo, columnThree)); + + assertThat(row, nullValue()); + } + + @ParameterizedTest + @MethodSource("variousPredicates") + void shouldUseComplexPredicate(Map predicates) { + Set projection = Set.of("returned column name"); + mockSearch(new SearchHit[0]); + + Map row = singleRowQuery.executeQuery("index_name", predicates, projection); + + SearchRequest searchRequest = searchRequestCaptor.getValue(); + BoolQueryBuilder expectedQuery = new BoolQueryBuilder(); + predicates.entrySet().stream() + .map( + entry -> { + String columnName = entry.getKey(); + Object value = entry.getValue(); + BoolQueryBuilder filterQueryForSinglePredicate = new BoolQueryBuilder(); + filterQueryForSinglePredicate.should(new TermQueryBuilder(columnName, value)); + filterQueryForSinglePredicate.should(new MatchQueryBuilder(columnName, value)); + filterQueryForSinglePredicate.minimumShouldMatch(1); + return filterQueryForSinglePredicate; + }) + .forEach(expectedQuery::filter); + + assertThat(searchRequest.source().query(), equalTo(expectedQuery)); + assertThat(row, nullValue()); + } + + static Stream variousPredicates() { + return Stream.of( + Arguments.of(Map.of("column_name_1", "value row criteria_12")), + Arguments.of( + Map.of( + "column_name_2", "value row criteria_23", "another_column_5", "another value_8")), + Arguments.of( + Map.of( + "column_name_3", + "value row criteria_34", + "another_column_6", + "another value_8", + "yet_another_column_11", + "yet another value_13")), + Arguments.of( + Map.of( + "column_name_4", + "value row criteria_45", + "another_column_7", + "another value_10", + "yet_another_column_12", + "yet another value_14", + "extra_column_15", + "extra value_16"))); + } + + @Test + public void shouldReturnRow() { + Map predicates = Map.of("column_name", "value row criteria"); + Set projection = Set.of("returned column name"); + Map searchResult = + Map.of("column_name", "value row criteria", "returned column name", "value 2"); + mockSearch(searchResult); + + Map row = singleRowQuery.executeQuery("index_name", predicates, projection); + + assertThat(row, equalTo(searchResult)); + } + + @Test + public void shouldTreatProjectionAsOptionalParameter() { + Map predicates = Map.of("column_name", "value row criteria"); + Set projection = null; + Map searchResult = + Map.of("column_name", "value row criteria", "returned column name", "value 2"); + mockSearch(searchResult); + + Map row = singleRowQuery.executeQuery("index_name", predicates, projection); + + assertThat(row, equalTo(searchResult)); + } + + private void mockSearch(Map searchResult) { + when(hit.getSourceAsMap()).thenReturn(searchResult); + mockSearch(new SearchHit[] {hit}); + } + + private void mockSearch(SearchHit[] searchResult) { + when(nodeClient.search(searchRequestCaptor.capture())).thenReturn(searchFuture); + when(searchFuture.actionGet()).thenReturn(response); + when(response.getHits()).thenReturn(searchHits); + when(searchHits.getHits()).thenReturn(searchResult); + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 4a883fa656..3b9cbc4158 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -64,6 +64,7 @@ SMA: 'SMA'; // ARGUMENT KEYWORDS KEEPEMPTY: 'KEEPEMPTY'; CONSECUTIVE: 'CONSECUTIVE'; +OVERWRITE: 'OVERWRITE'; DEDUP_SPLITVALUES: 'DEDUP_SPLITVALUES'; PARTITIONS: 'PARTITIONS'; ALLNUM: 'ALLNUM'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index c9d0f2e110..04417c1511 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -38,6 +38,7 @@ commands | renameCommand | statsCommand | dedupCommand + | lookupCommand | sortCommand | evalCommand | headCommand @@ -87,6 +88,18 @@ dedupCommand : DEDUP (number = integerLiteral)? fieldList (KEEPEMPTY EQUAL keepempty = booleanLiteral)? (CONSECUTIVE EQUAL consecutive = booleanLiteral)? ; +matchFieldWithOptAs + : orignalMatchField = fieldExpression (AS asMatchField = fieldExpression)? + ; + +copyFieldWithOptAs + : orignalCopyField = fieldExpression (AS asCopyField = fieldExpression)? + ; + +lookupCommand + : LOOKUP tableSource matchFieldWithOptAs (COMMA matchFieldWithOptAs)* (OVERWRITE EQUAL overwrite = booleanLiteral)? (copyFieldWithOptAs (COMMA copyFieldWithOptAs)*)* + ; + sortCommand : SORT sortbyClause ; @@ -872,6 +885,7 @@ keywordsCanBeId | RENAME | STATS | DEDUP + | LOOKUP | SORT | EVAL | FILLNULL diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index c3c31ee2e1..7390d419e3 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -12,6 +12,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FromClauseContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.HeadCommandContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RenameCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SearchFilterFromContext; @@ -55,6 +56,7 @@ import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Kmeans; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.Parse; import org.opensearch.sql.ast.tree.Project; @@ -214,6 +216,47 @@ public UnresolvedPlan visitDedupCommand(DedupCommandContext ctx) { return new Dedupe(ArgumentFactory.getArgumentList(ctx), getFieldList(ctx.fieldList())); } + /** Lookup command */ + @Override + public UnresolvedPlan visitLookupCommand(LookupCommandContext ctx) { + ArgumentFactory.getArgumentList(ctx); + ctx.tableSource(); + ctx.copyFieldWithOptAs(); + ctx.matchFieldWithOptAs(); + return new Lookup( + ctx.tableSource().tableQualifiedName().getText(), + ctx.matchFieldWithOptAs().stream() + .map( + ct -> + new Map( + evaluateFieldExpressionContext(ct.orignalMatchField), + evaluateFieldExpressionContext(ct.asMatchField, ct.orignalMatchField))) + .collect(Collectors.toList()), + ArgumentFactory.getArgumentList(ctx), + ctx.copyFieldWithOptAs().stream() + .map( + ct -> + new Map( + evaluateFieldExpressionContext(ct.orignalCopyField), + evaluateFieldExpressionContext(ct.asCopyField, ct.orignalCopyField))) + .collect(Collectors.toList())); + } + + private UnresolvedExpression evaluateFieldExpressionContext( + OpenSearchPPLParser.FieldExpressionContext f) { + return internalVisitExpression(f); + } + + private UnresolvedExpression evaluateFieldExpressionContext( + OpenSearchPPLParser.FieldExpressionContext f0, + OpenSearchPPLParser.FieldExpressionContext f1) { + if (f0 == null) { + return internalVisitExpression(f1); + } else { + return internalVisitExpression(f0); + } + } + /** Head command visitor. */ @Override public UnresolvedPlan visitHeadCommand(HeadCommandContext ctx) { diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java index f89ecf9c6e..44b150fd3d 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/ArgumentFactory.java @@ -9,6 +9,7 @@ import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.DedupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.FieldsCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.IntegerLiteralContext; +import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.LookupCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RareCommandContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortFieldContext; import static org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsCommandContext; @@ -80,6 +81,13 @@ public static List getArgumentList(DedupCommandContext ctx) { : new Argument("consecutive", new Literal(false, DataType.BOOLEAN))); } + public static List getArgumentList(LookupCommandContext ctx) { + return Arrays.asList( + ctx.overwrite != null + ? new Argument("overwrite", getArgumentValue(ctx.overwrite)) + : new Argument("overwrite", new Literal(false, DataType.BOOLEAN))); + } + /** * Get list of {@link Argument}. * diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 96e21eafcd..358ad9c847 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -38,6 +38,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; +import org.opensearch.sql.ast.tree.Lookup; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.RareTopN; import org.opensearch.sql.ast.tree.Relation; @@ -216,6 +217,40 @@ public String visitDedupe(Dedupe node, String context) { child, fields, allowedDuplication, keepEmpty, consecutive); } + @Override + public String visitLookup(Lookup node, String context) { + String child = node.getChild().get(0).accept(this, context); + String lookupIndexName = node.getIndexName(); + ImmutableMap.Builder matchMapBuilder = new ImmutableMap.Builder<>(); + for (Map matchMap : node.getMatchFieldList()) { + matchMapBuilder.put( + visitExpression(matchMap.getOrigin()), + ((Field) matchMap.getTarget()).getField().toString()); + } + String matches = + matchMapBuilder.build().entrySet().stream() + .map(entry -> StringUtils.format("%s as %s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + + ImmutableMap.Builder copyMapBuilder = new ImmutableMap.Builder<>(); + for (Map copyMap : node.getCopyFieldList()) { + copyMapBuilder.put( + visitExpression(copyMap.getOrigin()), + ((Field) copyMap.getTarget()).getField().toString()); + } + String copies = + copyMapBuilder.build().entrySet().stream() + .map(entry -> StringUtils.format("%s as %s", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",")); + + List options = node.getOptions(); + Boolean overwrite = (Boolean) options.get(0).getValue().getValue(); + + return StringUtils.format( + "%s | lookup %s %s overwrite=%b %s", child, lookupIndexName, matches, overwrite, copies) + .trim(); + } + @Override public String visitHead(Head node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index c6f4ed2044..7ba68a7df1 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -23,11 +23,13 @@ import static org.opensearch.sql.ast.dsl.AstDSL.eval; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.fieldMap; import static org.opensearch.sql.ast.dsl.AstDSL.filter; import static org.opensearch.sql.ast.dsl.AstDSL.function; import static org.opensearch.sql.ast.dsl.AstDSL.head; import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.let; +import static org.opensearch.sql.ast.dsl.AstDSL.lookup; import static org.opensearch.sql.ast.dsl.AstDSL.map; import static org.opensearch.sql.ast.dsl.AstDSL.nullLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.parse; @@ -50,6 +52,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.Optional; +import java.util.Collections; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -402,6 +405,18 @@ public void testDedupCommandWithSortby() { defaultDedupArgs())); } + @Test + public void testLookupCommand() { + assertEqual( + "source=t | lookup a field", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("overwrite", booleanLiteral(false))), + Collections.emptyList())); + } + @Test public void testHeadCommand() { assertEqual("source=t | head", head(relation("t"), 10, 0)); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java index fbb25549ab..b20074a4c3 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstExpressionBuilderTest.java @@ -733,10 +733,9 @@ public void functionNameCanBeUsedAsIdentifier() { + " | LOG10 | LOG2 | MOD | PI |POW | POWER | RAND | ROUND | SIGN | SQRT | TRUNCATE " + "| ACOS | ASIN | ATAN | ATAN2 | COS | COT | DEGREES | RADIANS | SIN | TAN"); assertFunctionNameCouldBeId( - "SEARCH | DESCRIBE | SHOW | FROM | WHERE | FIELDS | RENAME | STATS " - + "| DEDUP | SORT | EVAL | HEAD | TOP | RARE | PARSE | METHOD | REGEX | PUNCT | GROK " - + "| PATTERN | PATTERNS | NEW_FIELD | KMEANS | AD | ML | SOURCE | INDEX | D | DESC " - + "| DATASOURCES"); + "SEARCH | DESCRIBE | SHOW | FROM | WHERE | FIELDS | RENAME | STATS | DEDUP | LOOKUP | SORT" + + " | EVAL | HEAD | TOP | RARE | PARSE | METHOD | REGEX | PUNCT | GROK | PATTERN |" + + " PATTERNS | NEW_FIELD | KMEANS | AD | ML | SOURCE | INDEX | D | DESC | DATASOURCES"); } void assertFunctionNameCouldBeId(String antlrFunctionName) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java index 761dbe2997..c0c71f862a 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/ArgumentFactoryTest.java @@ -14,12 +14,15 @@ import static org.opensearch.sql.ast.dsl.AstDSL.dedupe; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.fieldMap; import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.lookup; import static org.opensearch.sql.ast.dsl.AstDSL.projectWithArg; import static org.opensearch.sql.ast.dsl.AstDSL.relation; import static org.opensearch.sql.ast.dsl.AstDSL.sort; import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral; +import java.util.Collections; import org.junit.Test; import org.opensearch.sql.ppl.parser.AstBuilderTest; @@ -102,4 +105,53 @@ public void testSortFieldArgument() { public void testNoArgConstructorForArgumentFactoryShouldPass() { new ArgumentFactory(); } + + @Test + public void testLookupCommandRequiredArguments() { + assertEqual( + "source=t | lookup a field", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("overwrite", booleanLiteral(false))), + Collections.emptyList())); + } + + @Test + public void testLookupCommandFieldArguments() { + assertEqual( + "source=t | lookup a field AS field1,field2 AS field3 destfield AS destfield1, destfield2" + + " AS destfield3", + lookup( + relation("t"), + "a", + fieldMap("field", "field1", "field2", "field3"), + exprList(argument("overwrite", booleanLiteral(false))), + fieldMap("destfield", "destfield1", "destfield2", "destfield3"))); + } + + @Test + public void testLookupCommandAppendTrueArgument() { + assertEqual( + "source=t | lookup a field overwrite=true", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("overwrite", booleanLiteral(true))), + Collections.emptyList())); + } + + @Test + public void testLookupCommandAppendFalseArgument() { + assertEqual( + "source=t | lookup a field overwrite=false", + lookup( + relation("t"), + "a", + fieldMap("field", "field"), + exprList(argument("overwrite", booleanLiteral(false))), + Collections.emptyList())); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 06f8fbb061..cbe6febd8e 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -96,6 +96,22 @@ public void testTrendlineCommand() { anonymize("source=t | trendline sma(2, date) as date_alias sma(3, time) as time_alias")); } + @Test + public void testLookupCommand() { + assertEquals( + "source=t | lookup index field1 as field1,field2 as field2 overwrite=false", + anonymize("source=t | lookup index field1,field2")); + assertEquals( + "source=t | lookup index field1 as field1,field2 as field2 overwrite=true", + anonymize("source=t | lookup index field1,field2 overwrite=true")); + assertEquals( + "source=t | lookup index field1 as field12,field2 as field22 overwrite=false copyfield1 as" + + " copyfield1,copyfield2 as copyfield22", + anonymize( + "source=t | lookup index field1 as field12, field2 AS field22 copyfield1, copyfield2 as" + + " copyfield22")); + } + @Test public void testHeadCommandWithNumber() { assertEquals("source=t | head 3", anonymize("source=t | head 3"));