Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookup - ppl command #3181

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
039949e
Initial commit for PPL lookup
salyh Jun 13, 2024
036e07a
Tests related to lookup and Analyzer class.
lukasz-soszynski-eliatra Jun 18, 2024
1c2a8f6
Removed System.out leftover
salyh Jun 25, 2024
1de0bdf
Added implementation in QueryDataAnonymizer and fixed related tests
salyh Jun 25, 2024
cda7a80
Add documentation
salyh Jun 25, 2024
488c9af
Added more lookup operator tests
lukasz-soszynski-eliatra Jun 25, 2024
c2edb51
Fix typo
salyh Jun 28, 2024
9da10e3
One unit test and integration tests for the lookup operator.
lukasz-soszynski-eliatra Jul 2, 2024
e9d8389
Added "appendonly" example to docs
salyh Jul 9, 2024
0c4e488
Move implementation from ExecutionProtector to OpenSearchIndex class
salyh Jul 16, 2024
ad96d22
Corrections related to LookupCommandIT and tests related to OpenSearc…
lukasz-soszynski-eliatra Jul 19, 2024
f2410e8
Typo corrected.
lukasz-soszynski-eliatra Jul 19, 2024
127f22b
Enable doc test
salyh Jul 23, 2024
b0ccb79
Merge branch 'main' into lookup
YANG-DB Dec 2, 2024
0326c6c
update spotless issue
Dec 2, 2024
1d2be37
update spotless issue
YANG-DB Dec 2, 2024
92e2abb
rename appendonly into overwrite
YANG-DB Dec 2, 2024
bc74c0c
add missing header licenses
YANG-DB Dec 2, 2024
8e6ec1b
update appendonly with overwrite for test names
YANG-DB Dec 2, 2024
c13192e
update :spotlessApply
YANG-DB Dec 3, 2024
37fec28
replace lookup.rst usage of `accounts` index with `account_data` inde…
YANG-DB Dec 3, 2024
639ee8a
Merge branch 'main' into lookup
YANG-DB Dec 4, 2024
08c7352
Merge branch 'main' into lookup
YANG-DB Dec 5, 2024
0889306
Merge branch 'main' into lookup
YANG-DB Dec 7, 2024
6469151
Merge branch 'main' into lookup
YANG-DB Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand All @@ -72,6 +73,7 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -95,6 +97,7 @@
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalLookup;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -498,6 +501,113 @@ public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) {
consecutive);
}

/** Build {@link LogicalLookup}. */
@Override
public LogicalPlan visitLookup(Lookup node, AnalysisContext queryContext) {
LogicalPlan child = node.getChild().get(0).accept(this, queryContext);
List<Argument> options = node.getOptions();
// Todo, refactor the option.
Boolean overwrite = (Boolean) options.get(0).getValue().getValue();

Table table =
dataSourceService
.getDataSource(DEFAULT_DATASOURCE_NAME)
.getStorageEngine()
.getTable(null, node.getIndexName());

if (table == null || !table.exists()) {
throw new SemanticCheckException(
String.format("no such lookup index %s", node.getIndexName()));
}

AnalysisContext lookupTableContext = new AnalysisContext();
TypeEnvironment lookupTableEnvironment = lookupTableContext.peek();
table
.getFieldTypes()
.forEach(
(name, type) ->
lookupTableEnvironment.define(new Symbol(Namespace.FIELD_NAME, name), type));
ImmutableMap<ReferenceExpression, ReferenceExpression> matchFieldMap =
analyzeLookupMatchFields(node.getMatchFieldList(), queryContext, lookupTableContext);

return new LogicalLookup(
child,
node.getIndexName(),
matchFieldMap,
overwrite,
analyzeLookupCopyFields(node.getCopyFieldList(), queryContext, table));
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupMatchFields(
List<Map> inputMap, AnalysisContext queryContext, AnalysisContext lookupTableContext) {
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
Expression origin = expressionAnalyzer.analyze(resultMap.getOrigin(), lookupTableContext);
if (resultMap.getTarget() instanceof Field) {
Expression targetExpression =
expressionAnalyzer.analyze(resultMap.getTarget(), queryContext);
ReferenceExpression targetReference =
DSL.ref(targetExpression.toString(), targetExpression.type());
ReferenceExpression originReference = DSL.ref(origin.toString(), origin.type());
TypeEnvironment curEnv = queryContext.peek();
curEnv.remove(originReference);
curEnv.define(targetReference);
copyMapBuilder.put(originReference, targetReference);
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

private ImmutableMap<ReferenceExpression, ReferenceExpression> analyzeLookupCopyFields(
List<Map> inputMap, AnalysisContext context, Table table) {

TypeEnvironment curEnv = context.peek();
java.util.Map<String, ExprType> fieldTypes = table.getFieldTypes();

if (inputMap.isEmpty()) {
fieldTypes.forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
return ImmutableMap.<ReferenceExpression, ReferenceExpression>builder().build();
}

ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> copyMapBuilder =
new ImmutableMap.Builder<>();
for (Map resultMap : inputMap) {
if (resultMap.getOrigin() instanceof Field && resultMap.getTarget() instanceof Field) {
String fieldName = ((Field) resultMap.getOrigin()).getField().toString();
ExprType ex = fieldTypes.get(fieldName);

if (ex == null) {
throw new SemanticCheckException(String.format("no such field %s", fieldName));
}

ReferenceExpression origin = new ReferenceExpression(fieldName, ex);

if (resultMap.getTarget().equals(resultMap.getOrigin())) {

curEnv.define(origin);
copyMapBuilder.put(origin, origin);
} else {
ReferenceExpression target =
new ReferenceExpression(((Field) resultMap.getTarget()).getField().toString(), ex);
curEnv.define(target);
copyMapBuilder.put(origin, target);
}
} else {
throw new SemanticCheckException(
String.format(
"the origin and target expected to be field, but is %s/%s",
resultMap.getOrigin(), resultMap.getTarget()));
}
}

return copyMapBuilder.build();
}

/** Logical head is identical to {@link LogicalLimit}. */
public LogicalPlan visitHead(Head node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
Expand Down Expand Up @@ -227,6 +228,10 @@ public T visitDedupe(Dedupe node, C context) {
return visitChildren(node, context);
}

public T visitLookup(Lookup node, C context) {
return visitChildren(node, context);
}

public T visitHead(Head node, C context) {
return visitChildren(node, context);
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ast.dsl;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -446,6 +448,25 @@ public static Dedupe dedupe(UnresolvedPlan input, List<Argument> options, Field.
return new Dedupe(input, options, Arrays.asList(fields));
}

public static Lookup lookup(
UnresolvedPlan input,
String indexName,
List<Map> matchFieldList,
List<Argument> options,
List<Map> copyFieldList) {
return new Lookup(input, indexName, matchFieldList, options, copyFieldList);
}

public static List<Map> fieldMap(String field, String asField, String... more) {
assert more == null || more.length % 2 == 0;
List list = new ArrayList();
list.add(map(field, asField));
for (int i = 0; i < more.length; i = i + 2) {
list.add(map(more[i], more[i + 1]));
}
return list;
}

public static Head head(UnresolvedPlan input, Integer size, Integer from) {
return new Head(input, size, from);
}
Expand Down
49 changes: 49 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Lookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Map;

/** AST node represent Lookup operation. */
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
public class Lookup extends UnresolvedPlan {
private UnresolvedPlan child;
private final String indexName;
private final List<Map> matchFieldList;
private final List<Argument> options;
private final List<Map> copyFieldList;

@Override
public Lookup attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitLookup(this, context);
}
}
15 changes: 15 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
import org.opensearch.sql.planner.physical.LimitOperator;
import org.opensearch.sql.planner.physical.LookupOperator;
import org.opensearch.sql.planner.physical.NestedOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
Expand Down Expand Up @@ -174,6 +175,20 @@ public ExplainResponseNode visitDedupe(DedupeOperator node, Object context) {
"consecutive", node.getConsecutive())));
}

@Override
public ExplainResponseNode visitLookup(LookupOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"copyfields", node.getCopyFieldMap().toString(),
"matchfields", node.getMatchFieldMap().toString(),
"indexname", node.getIndexName(),
"overwrite", node.getOverwrite())));
}

@Override
public ExplainResponseNode visitRareTopN(RareTopNOperator node, Object context) {
return explain(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public PhysicalPlan visitValues(LogicalValues node, C context) {
public PhysicalPlan visitLimit(LogicalLimit node, C context) {
PhysicalPlan child = visitChild(node, context);
// Optimize sort + limit to take ordered operator
if (child instanceof SortOperator sortChild) {
if (child instanceof SortOperator) {
SortOperator sortChild = (SortOperator) child;
return new TakeOrderedOperator(
sortChild.getInput(), node.getLimit(), node.getOffset(), sortChild.getSortList());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Arrays;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.expression.ReferenceExpression;

/** Logical Lookup Plan. */
@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class LogicalLookup extends LogicalPlan {

private final String indexName;
private final Map<ReferenceExpression, ReferenceExpression> matchFieldMap;
private final Map<ReferenceExpression, ReferenceExpression> copyFieldMap;
private final Boolean overwrite;

/** Constructor of LogicalLookup. */
public LogicalLookup(
LogicalPlan child,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean overwrite,
Map<ReferenceExpression, ReferenceExpression> copyFieldMap) {
super(Arrays.asList(child));
this.indexName = indexName;
this.copyFieldMap = copyFieldMap;
this.matchFieldMap = matchFieldMap;
this.overwrite = overwrite;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitLookup(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,13 @@ public LogicalPlan values(List<LiteralExpression>... values) {
public static LogicalPlan limit(LogicalPlan input, Integer limit, Integer offset) {
return new LogicalLimit(input, limit, offset);
}

public static LogicalPlan lookup(
LogicalPlan input,
String indexName,
Map<ReferenceExpression, ReferenceExpression> matchFieldMap,
Boolean overwrite,
Map<ReferenceExpression, ReferenceExpression> copyFields) {
return new LogicalLookup(input, indexName, matchFieldMap, overwrite, copyFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public R visitDedupe(LogicalDedupe plan, C context) {
return visitNode(plan, context);
}

public R visitLookup(LogicalLookup plan, C context) {
return visitNode(plan, context);
}

public R visitRename(LogicalRename plan, C context) {
return visitNode(plan, context);
}
Expand Down
Loading
Loading