From 080216e853d2a1f20e53aa69531ccfc1a567f874 Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Fri, 24 Nov 2023 11:58:51 +0530 Subject: [PATCH 1/6] modifying execute API to get column nullability state --- .../wrangler/api/NullHandlingException.java | 31 +++++++++++++++++++ .../executor/RecipePipelineExecutor.java | 8 ++++- .../java/io/cdap/wrangler/TestingRig.java | 6 ++-- .../v2/DirectiveExecutionRequest.java | 12 ++++++- .../proto/workspace/v2/Workspace.java | 24 ++++++++++++++ .../directive/AbstractDirectiveHandler.java | 13 ++++++-- .../service/directive/DirectivesHandler.java | 2 +- .../directive/RemoteExecutionTask.java | 2 +- .../service/directive/WorkspaceHandler.java | 21 ++++++++++++- .../io/cdap/wrangler/test/TestingRig.java | 2 +- .../main/java/io/cdap/wrangler/Wrangler.java | 3 +- 11 files changed, 111 insertions(+), 13 deletions(-) create mode 100644 wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java new file mode 100644 index 000000000..0d13c6711 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2016-2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +/** + * A Null Handling specific exception used for communicating issues with Null Handling in a column. + */ +public class NullHandlingException extends Exception { + public NullHandlingException(Exception e) { + super(e); + } + + public NullHandlingException(String message) { + super(message); + } + +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java index 159d6d512..a09301f5a 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java @@ -30,6 +30,7 @@ import io.cdap.wrangler.api.ReportErrorAndProceed; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TransientVariableScope; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; import io.cdap.wrangler.schema.TransientStoreKeys; @@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; @@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline directives; + private HashMap nullabilityMap; - public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) { + public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, + HashMap nullabilityMap) { this.context = context; this.recipeParser = recipeParser; + this.nullabilityMap = nullabilityMap; } /** diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java index a56d12504..5507f0770 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java @@ -89,7 +89,7 @@ public static List execute(String[] recipe, List rows, ExecutorContext String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, context).execute(rows); + return new RecipePipelineExecutor(parser, context, null).execute(rows); } /** @@ -112,7 +112,7 @@ public static Pair, List> executeWithErrors(String[] recipe, List String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - RecipePipeline pipeline = new RecipePipelineExecutor(parser, context); + RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null); List results = pipeline.execute(rows); List errors = pipeline.errors(); return new Pair<>(results, errors); @@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe) String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, new TestingPipelineContext()); + return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null); } public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException { diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java index a77ce1f5d..ffd96e946 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java @@ -17,7 +17,9 @@ package io.cdap.wrangler.proto.workspace.v2; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import java.util.Collections; +import java.util.HashMap; import java.util.List; /** @@ -26,10 +28,14 @@ public class DirectiveExecutionRequest { private final List directives; private final int limit; + private final HashMap nullabilityMap; - public DirectiveExecutionRequest(List directives, int limit) { + + public DirectiveExecutionRequest(List directives, int limit, + HashMap nullabilityMap) { this.directives = directives; this.limit = limit; + this.nullabilityMap = nullabilityMap; } public int getLimit() { @@ -39,4 +45,8 @@ public int getLimit() { public List getDirectives() { return directives == null ? Collections.emptyList() : directives; } + + public HashMap getNullabilityMap() { + return nullabilityMap; + } } diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index 16aac68f8..79d7aa2ab 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -20,6 +20,7 @@ import com.google.gson.JsonObject; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Objects; import javax.annotation.Nullable; @@ -38,6 +39,8 @@ public class Workspace { // this is for insights page in UI private final JsonObject insights; + private HashMap nullabilityMap; + private Workspace(String workspaceName, String workspaceId, List directives, long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, JsonObject insights) { @@ -48,6 +51,7 @@ private Workspace(String workspaceName, String workspaceId, List directi this.updatedTimeMillis = updatedTimeMillis; this.sampleSpec = sampleSpec; this.insights = insights; + this.nullabilityMap = new HashMap<>(); } public String getWorkspaceName() { @@ -79,6 +83,15 @@ public JsonObject getInsights() { return insights; } + public HashMap getColumnMappings() { + return nullabilityMap; + } + + public void setColumnMappings( + HashMap nullabilityMap) { + this.nullabilityMap = nullabilityMap; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -164,4 +177,15 @@ public Workspace build() { insights); } } + + /** + * UserDefinedAction enum. + */ + public enum UserDefinedAction { + NO_ACTION, + SKIP_ROW, + SEND_TO_ERROR_COLLECTOR, + ERROR_PIPELINE, + NULLABLE + } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index d7eae7960..74818e131 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -46,6 +46,9 @@ import io.cdap.wrangler.proto.workspace.ColumnValidationResult; import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult; import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse; +import io.cdap.wrangler.proto.workspace.v2.SampleSpec; +import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; @@ -118,7 +121,8 @@ protected List executeDirectives( String namespace, List directives, List sample, - GrammarWalker.Visitor grammarVisitor) throws DirectiveParseException, E, RecipeException { + GrammarWalker.Visitor grammarVisitor, + Workspace workspace) throws DirectiveParseException, E, RecipeException { if (directives.isEmpty()) { return sample; @@ -139,8 +143,11 @@ protected List executeDirectives( new ConfigDirectiveContext(DirectiveConfig.EMPTY)); try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser, new ServicePipelineContext( - namespace, ExecutorContext.Environment.SERVICE, - getContext(), TRANSIENT_STORE))) { + namespace, + ExecutorContext.Environment.SERVICE, + getContext(), + TRANSIENT_STORE), + workspace.getColumnMappings())) { List result = executor.execute(sample); List errors = executor.errors() diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java index a73354cb0..e3b75d321 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java @@ -1095,7 +1095,7 @@ private List executeDirectives(NamespacedId id, List< // Extract rows from the workspace. List rows = fromWorkspace(workspace); return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows), - grammarVisitor); + grammarVisitor, null); }); } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java index 27216247f..6d092a4d0 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java @@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception { namespace, ExecutorContext.Environment.SERVICE, systemAppContext, - new DefaultTransientStore()))) { + new DefaultTransientStore()), null)) { rows = executor.execute(rows); List errors = executor.errors().stream() .filter(ErrorRecordBase::isShownInWrangler) diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 82b45521e..bb39790ea 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; import io.cdap.wrangler.proto.workspace.v2.StageSpec; import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; @@ -472,6 +473,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); + HashMap nullabilityMap = executionRequest.getNullabilityMap() == null ? + new HashMap<>() : executionRequest.getNullabilityMap(); + if (!nullabilityMap.isEmpty()) { + //change nullabilityMap in Workspace Object + changeNullability(nullabilityMap, workspaceId); + } List result = executeDirectives(ns.getName(), directives, detail, userDirectivesCollector); DirectiveExecutionResponse response = generateExecutionResponse(result, @@ -484,6 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque return response; } + private void changeNullability(HashMap columnMappings, + WorkspaceId workspaceId) throws Exception { + try { + Workspace workspace = wsStore.getWorkspace(workspaceId); + workspace.setColumnMappings(columnMappings); + wsStore.updateWorkspace(workspaceId, workspace); + } catch (Exception e) { + throw new RuntimeException("Error in setting nullabilityMap of columns ", e); + } + } + + /** * Get source specs, contains some hacky way on dealing with the csv parser */ @@ -580,7 +599,7 @@ private List executeLocally(String namespace, List(detail.getSample()), - grammarVisitor); + grammarVisitor, detail.getWorkspace()); } /** diff --git a/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java b/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java index da108cf1b..9a1cd9254 100644 --- a/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java +++ b/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java @@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class directive, Test String migrate = new MigrateToV2(recipe.toArray()).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, null); + return new RecipePipelineExecutor(parser, null, null); } public static RecipeParser parser(Class directive, String[] recipe) diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..0fd83dbf2 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.parser.NoOpDirectiveContext; import io.cdap.wrangler.parser.RecipeCompiler; import io.cdap.wrangler.proto.Contexts; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveInfo; import io.cdap.wrangler.registry.DirectiveRegistry; @@ -365,7 +366,7 @@ && checkPreconditionNotEmpty(false)) { try { // Create the pipeline executor with context being set. - pipeline = new RecipePipelineExecutor(recipe, ctx); + pipeline = new RecipePipelineExecutor(recipe, ctx, null); } catch (Exception e) { throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e); } From d9cf5bcc0455a8578fdf78ec4483f1bdcc76fdb1 Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Fri, 1 Dec 2023 15:07:36 +0530 Subject: [PATCH 2/6] modifying getter and setter names for NullabilityMap --- .../java/io/cdap/wrangler/proto/workspace/v2/Workspace.java | 4 ++-- .../wrangler/service/directive/AbstractDirectiveHandler.java | 2 +- .../io/cdap/wrangler/service/directive/WorkspaceHandler.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index 79d7aa2ab..d8f09307e 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -83,11 +83,11 @@ public JsonObject getInsights() { return insights; } - public HashMap getColumnMappings() { + public HashMap getNullabilityMap() { return nullabilityMap; } - public void setColumnMappings( + public void setNullabilityMap( HashMap nullabilityMap) { this.nullabilityMap = nullabilityMap; } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index 74818e131..b78cfb4d6 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -147,7 +147,7 @@ protected List executeDirectives( ExecutorContext.Environment.SERVICE, getContext(), TRANSIENT_STORE), - workspace.getColumnMappings())) { + workspace.getNullabilityMap())) { List result = executor.execute(sample); List errors = executor.errors() diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index bb39790ea..654d4f190 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -495,7 +495,7 @@ private void changeNullability(HashMap columnMappings WorkspaceId workspaceId) throws Exception { try { Workspace workspace = wsStore.getWorkspace(workspaceId); - workspace.setColumnMappings(columnMappings); + workspace.setNullabilityMap(columnMappings); wsStore.updateWorkspace(workspaceId, workspace); } catch (Exception e) { throw new RuntimeException("Error in setting nullabilityMap of columns ", e); From 6ae414ef6d6c0fc12d657854df7e45cdc531ae3a Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Tue, 5 Dec 2023 21:40:10 +0530 Subject: [PATCH 3/6] setting nullabilityMap in workspace --- .../wrangler/proto/workspace/v2/Workspace.java | 16 ++++++++++++---- .../service/directive/WorkspaceHandler.java | 3 ++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index d8f09307e..f4c240a3b 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -43,7 +43,7 @@ public class Workspace { private Workspace(String workspaceName, String workspaceId, List directives, long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, - JsonObject insights) { + JsonObject insights, HashMap nullabilityMap) { this.workspaceName = workspaceName; this.workspaceId = workspaceId; this.directives = directives; @@ -51,7 +51,8 @@ private Workspace(String workspaceName, String workspaceId, List directi this.updatedTimeMillis = updatedTimeMillis; this.sampleSpec = sampleSpec; this.insights = insights; - this.nullabilityMap = new HashMap<>(); + this.nullabilityMap = nullabilityMap == null || nullabilityMap.isEmpty() ? + new HashMap<>() : nullabilityMap; } public String getWorkspaceName() { @@ -124,7 +125,8 @@ public static Builder builder(Workspace existing) { .setCreatedTimeMillis(existing.getCreatedTimeMillis()) .setUpdatedTimeMillis(existing.getUpdatedTimeMillis()) .setSampleSpec(existing.getSampleSpec()) - .setInsights(existing.getInsights()); + .setInsights(existing.getInsights()) + .setNullabilityMap(existing.getNullabilityMap()); } /** @@ -138,6 +140,7 @@ public static class Builder { private long updatedTimeMillis; private SampleSpec sampleSpec; private JsonObject insights; + private HashMap nullabilityMap; Builder(String name, String workspaceId) { this.workspaceName = name; @@ -172,9 +175,14 @@ public Builder setInsights(JsonObject insights) { return this; } + public Builder setNullabilityMap (HashMap nullabilityMap) { + this.nullabilityMap = nullabilityMap; + return this; + } + public Workspace build() { return new Workspace(workspaceName, workspaceId, directives, createdTimeMillis, updatedTimeMillis, sampleSpec, - insights); + insights, nullabilityMap); } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 654d4f190..b4952b828 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -170,7 +170,8 @@ public void createWorkspace(HttpServiceRequest request, HttpServiceResponder res long now = System.currentTimeMillis(); Workspace workspace = Workspace.builder(generateWorkspaceName(wsId, creationRequest.getSampleRequest().getPath()), wsId.getWorkspaceId()) - .setCreatedTimeMillis(now).setUpdatedTimeMillis(now).setSampleSpec(spec).build(); + .setCreatedTimeMillis(now).setUpdatedTimeMillis(now) + .setSampleSpec(spec).setNullabilityMap(new HashMap<>()).build(); wsStore.saveWorkspace(wsId, new WorkspaceDetail(workspace, rows)); responder.sendJson(wsId.getWorkspaceId()); }); From b1fa542dbddedf09c2ddd220d51d9482383b7d20 Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Tue, 12 Dec 2023 16:06:30 +0530 Subject: [PATCH 4/6] modifying executeDirectives definition to intake nullabilityMap instead of Workspace object --- .../wrangler/service/directive/AbstractDirectiveHandler.java | 4 ++-- .../io/cdap/wrangler/service/directive/WorkspaceHandler.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index b78cfb4d6..056c9da0c 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -122,7 +122,7 @@ protected List executeDirectives( List directives, List sample, GrammarWalker.Visitor grammarVisitor, - Workspace workspace) throws DirectiveParseException, E, RecipeException { + HashMap nullabilityMap) throws DirectiveParseException, E, RecipeException { if (directives.isEmpty()) { return sample; @@ -147,7 +147,7 @@ protected List executeDirectives( ExecutorContext.Environment.SERVICE, getContext(), TRANSIENT_STORE), - workspace.getNullabilityMap())) { + nullabilityMap)) { List result = executor.execute(sample); List errors = executor.errors() diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index b4952b828..4f4a677cd 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -600,7 +600,7 @@ private List executeLocally(String namespace, List(detail.getSample()), - grammarVisitor, detail.getWorkspace()); + grammarVisitor, detail.getWorkspace().getNullabilityMap()); } /** From aa0e08865e14e540a56fc1c379974aa8fa097065 Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Tue, 12 Dec 2023 16:10:51 +0530 Subject: [PATCH 5/6] removing NO_ACTION and NULLABLE options from UserDefinedAction as it's not required --- .../java/io/cdap/wrangler/proto/workspace/v2/Workspace.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index f4c240a3b..33b422e53 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -190,10 +190,8 @@ public Workspace build() { * UserDefinedAction enum. */ public enum UserDefinedAction { - NO_ACTION, SKIP_ROW, SEND_TO_ERROR_COLLECTOR, ERROR_PIPELINE, - NULLABLE } } From e4a3a43b6be816b88e0d77906f814c7ac5bd16bd Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Tue, 13 Feb 2024 10:56:03 +0530 Subject: [PATCH 6/6] addressing review comments -1 --- .../wrangler/api/NullHandlingException.java | 31 ------------------- .../executor/RecipePipelineExecutor.java | 13 ++++---- .../java/io/cdap/wrangler/TestingRig.java | 3 +- .../v2/DirectiveExecutionRequest.java | 12 ++++--- .../proto/workspace/v2/UserDefinedAction.java | 10 ++++++ .../proto/workspace/v2/Workspace.java | 29 +++++------------ .../directive/AbstractDirectiveHandler.java | 5 +-- .../service/directive/WorkspaceHandler.java | 19 ++++++------ 8 files changed, 46 insertions(+), 76 deletions(-) delete mode 100644 wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java create mode 100644 wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java deleted file mode 100644 index 0d13c6711..000000000 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/NullHandlingException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright © 2016-2019 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.wrangler.api; - -/** - * A Null Handling specific exception used for communicating issues with Null Handling in a column. - */ -public class NullHandlingException extends Exception { - public NullHandlingException(Exception e) { - super(e); - } - - public NullHandlingException(String message) { - super(message); - } - -} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java index a09301f5a..03673fa38 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java @@ -30,19 +30,18 @@ import io.cdap.wrangler.api.ReportErrorAndProceed; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TransientVariableScope; -import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; import io.cdap.wrangler.schema.TransientStoreKeys; import io.cdap.wrangler.utils.RecordConvertor; import io.cdap.wrangler.utils.RecordConvertorException; import io.cdap.wrangler.utils.SchemaConverter; +import java.util.HashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; +import java.util.Map; import java.util.List; import javax.annotation.Nullable; @@ -59,13 +58,13 @@ public final class RecipePipelineExecutor implements RecipePipeline directives; - private HashMap nullabilityMap; + private final Map nullabilityMap; public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, - HashMap nullabilityMap) { + Map nullabilityMap) { this.context = context; this.recipeParser = recipeParser; - this.nullabilityMap = nullabilityMap; + this.nullabilityMap = new HashMap<>(nullabilityMap); } /** diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java index 5507f0770..3e30d47db 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java @@ -40,6 +40,7 @@ import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; import io.cdap.wrangler.schema.TransientStoreKeys; +import java.util.Collections; import org.junit.Assert; import java.util.Iterator; @@ -89,7 +90,7 @@ public static List execute(String[] recipe, List rows, ExecutorContext String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, context, null).execute(rows); + return new RecipePipelineExecutor(parser, context, Collections.emptyMap()).execute(rows); } /** diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java index ffd96e946..b3105f057 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java @@ -17,10 +17,11 @@ package io.cdap.wrangler.proto.workspace.v2; -import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Directive execution request for v2 endpoint @@ -28,11 +29,11 @@ public class DirectiveExecutionRequest { private final List directives; private final int limit; - private final HashMap nullabilityMap; + private final Map nullabilityMap; public DirectiveExecutionRequest(List directives, int limit, - HashMap nullabilityMap) { + Map nullabilityMap) { this.directives = directives; this.limit = limit; this.nullabilityMap = nullabilityMap; @@ -46,7 +47,8 @@ public List getDirectives() { return directives == null ? Collections.emptyList() : directives; } - public HashMap getNullabilityMap() { - return nullabilityMap; + public Map getNullabilityMap() { + + return nullabilityMap == null ? Collections.emptyMap() : nullabilityMap; } } diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java new file mode 100644 index 000000000..99bddc175 --- /dev/null +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java @@ -0,0 +1,10 @@ +package io.cdap.wrangler.proto.workspace.v2; + +/** + * UserDefinedAction enum. + */ +public enum UserDefinedAction { + FILTER, + SEND_TO_ERROR_COLLECTOR, + ERROR, +} diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index 33b422e53..abcb63fde 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -20,8 +20,10 @@ import com.google.gson.JsonObject; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; @@ -39,11 +41,11 @@ public class Workspace { // this is for insights page in UI private final JsonObject insights; - private HashMap nullabilityMap; + private final Map nullabilityMap; private Workspace(String workspaceName, String workspaceId, List directives, long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, - JsonObject insights, HashMap nullabilityMap) { + JsonObject insights, Map nullabilityMap) { this.workspaceName = workspaceName; this.workspaceId = workspaceId; this.directives = directives; @@ -51,8 +53,7 @@ private Workspace(String workspaceName, String workspaceId, List directi this.updatedTimeMillis = updatedTimeMillis; this.sampleSpec = sampleSpec; this.insights = insights; - this.nullabilityMap = nullabilityMap == null || nullabilityMap.isEmpty() ? - new HashMap<>() : nullabilityMap; + this.nullabilityMap = Collections.unmodifiableMap(new HashMap(nullabilityMap)); } public String getWorkspaceName() { @@ -84,15 +85,10 @@ public JsonObject getInsights() { return insights; } - public HashMap getNullabilityMap() { + public Map getNullabilityMap() { return nullabilityMap; } - public void setNullabilityMap( - HashMap nullabilityMap) { - this.nullabilityMap = nullabilityMap; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -140,7 +136,7 @@ public static class Builder { private long updatedTimeMillis; private SampleSpec sampleSpec; private JsonObject insights; - private HashMap nullabilityMap; + private Map nullabilityMap; Builder(String name, String workspaceId) { this.workspaceName = name; @@ -175,7 +171,7 @@ public Builder setInsights(JsonObject insights) { return this; } - public Builder setNullabilityMap (HashMap nullabilityMap) { + public Builder setNullabilityMap(Map nullabilityMap) { this.nullabilityMap = nullabilityMap; return this; } @@ -185,13 +181,4 @@ public Workspace build() { insights, nullabilityMap); } } - - /** - * UserDefinedAction enum. - */ - public enum UserDefinedAction { - SKIP_ROW, - SEND_TO_ERROR_COLLECTOR, - ERROR_PIPELINE, - } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index 056c9da0c..99163ac39 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -48,7 +48,7 @@ import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse; import io.cdap.wrangler.proto.workspace.v2.SampleSpec; import io.cdap.wrangler.proto.workspace.v2.Workspace; -import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; @@ -122,7 +122,8 @@ protected List executeDirectives( List directives, List sample, GrammarWalker.Visitor grammarVisitor, - HashMap nullabilityMap) throws DirectiveParseException, E, RecipeException { + Map nullabilityMap) + throws DirectiveParseException, E, RecipeException { if (directives.isEmpty()) { return sample; diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 4f4a677cd..5da2e4967 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -67,7 +67,7 @@ import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; import io.cdap.wrangler.proto.workspace.v2.StageSpec; import io.cdap.wrangler.proto.workspace.v2.Workspace; -import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; @@ -474,10 +474,9 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); - HashMap nullabilityMap = executionRequest.getNullabilityMap() == null ? - new HashMap<>() : executionRequest.getNullabilityMap(); + Map nullabilityMap = executionRequest.getNullabilityMap(); if (!nullabilityMap.isEmpty()) { - //change nullabilityMap in Workspace Object + //create new workspace object with the new nullabilityMap changeNullability(nullabilityMap, workspaceId); } List result = executeDirectives(ns.getName(), directives, detail, @@ -492,16 +491,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque return response; } - private void changeNullability(HashMap columnMappings, - WorkspaceId workspaceId) throws Exception { + private void changeNullability(Map nullabilityMap, + WorkspaceId workspaceId) throws Exception { try { Workspace workspace = wsStore.getWorkspace(workspaceId); - workspace.setNullabilityMap(columnMappings); - wsStore.updateWorkspace(workspaceId, workspace); + Workspace newWorkspace = Workspace.builder(workspace) + .setUpdatedTimeMillis(System.currentTimeMillis()) + .setNullabilityMap(nullabilityMap).build(); + wsStore.updateWorkspace(workspaceId, newWorkspace); } catch (Exception e) { throw new RuntimeException("Error in setting nullabilityMap of columns ", e); } - } + } /**