diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java index 159d6d512..03673fa38 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/executor/RecipePipelineExecutor.java @@ -30,16 +30,18 @@ import io.cdap.wrangler.api.ReportErrorAndProceed; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TransientVariableScope; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; import io.cdap.wrangler.schema.TransientStoreKeys; import io.cdap.wrangler.utils.RecordConvertor; import io.cdap.wrangler.utils.RecordConvertorException; import io.cdap.wrangler.utils.SchemaConverter; +import java.util.HashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.ArrayList; +import java.util.Map; import java.util.List; import javax.annotation.Nullable; @@ -56,10 +58,13 @@ public final class RecipePipelineExecutor implements RecipePipeline directives; + private final Map nullabilityMap; - public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) { + public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, + Map nullabilityMap) { this.context = context; this.recipeParser = recipeParser; + this.nullabilityMap = new HashMap<>(nullabilityMap); } /** diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java index a56d12504..3e30d47db 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/TestingRig.java @@ -40,6 +40,7 @@ import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; import io.cdap.wrangler.schema.TransientStoreKeys; +import java.util.Collections; import org.junit.Assert; import java.util.Iterator; @@ -89,7 +90,7 @@ public static List execute(String[] recipe, List rows, ExecutorContext String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, context).execute(rows); + return new RecipePipelineExecutor(parser, context, Collections.emptyMap()).execute(rows); } /** @@ -112,7 +113,7 @@ public static Pair, List> executeWithErrors(String[] recipe, List String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - RecipePipeline pipeline = new RecipePipelineExecutor(parser, context); + RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null); List results = pipeline.execute(rows); List errors = pipeline.errors(); return new Pair<>(results, errors); @@ -126,7 +127,7 @@ public static RecipePipeline execute(String[] recipe) String migrate = new MigrateToV2(recipe).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, new TestingPipelineContext()); + return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null); } public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException { diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java index a77ce1f5d..b3105f057 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/DirectiveExecutionRequest.java @@ -17,8 +17,11 @@ package io.cdap.wrangler.proto.workspace.v2; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Directive execution request for v2 endpoint @@ -26,10 +29,14 @@ public class DirectiveExecutionRequest { private final List directives; private final int limit; + private final Map nullabilityMap; - public DirectiveExecutionRequest(List directives, int limit) { + + public DirectiveExecutionRequest(List directives, int limit, + Map nullabilityMap) { this.directives = directives; this.limit = limit; + this.nullabilityMap = nullabilityMap; } public int getLimit() { @@ -39,4 +46,9 @@ public int getLimit() { public List getDirectives() { return directives == null ? Collections.emptyList() : directives; } + + public Map getNullabilityMap() { + + return nullabilityMap == null ? Collections.emptyMap() : nullabilityMap; + } } diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java new file mode 100644 index 000000000..99bddc175 --- /dev/null +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/UserDefinedAction.java @@ -0,0 +1,10 @@ +package io.cdap.wrangler.proto.workspace.v2; + +/** + * UserDefinedAction enum. + */ +public enum UserDefinedAction { + FILTER, + SEND_TO_ERROR_COLLECTOR, + ERROR, +} diff --git a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java index 16aac68f8..abcb63fde 100644 --- a/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java +++ b/wrangler-proto/src/main/java/io/cdap/wrangler/proto/workspace/v2/Workspace.java @@ -20,7 +20,10 @@ import com.google.gson.JsonObject; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; @@ -38,9 +41,11 @@ public class Workspace { // this is for insights page in UI private final JsonObject insights; + private final Map nullabilityMap; + private Workspace(String workspaceName, String workspaceId, List directives, long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, - JsonObject insights) { + JsonObject insights, Map nullabilityMap) { this.workspaceName = workspaceName; this.workspaceId = workspaceId; this.directives = directives; @@ -48,6 +53,7 @@ private Workspace(String workspaceName, String workspaceId, List directi this.updatedTimeMillis = updatedTimeMillis; this.sampleSpec = sampleSpec; this.insights = insights; + this.nullabilityMap = Collections.unmodifiableMap(new HashMap(nullabilityMap)); } public String getWorkspaceName() { @@ -79,6 +85,10 @@ public JsonObject getInsights() { return insights; } + public Map getNullabilityMap() { + return nullabilityMap; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -111,7 +121,8 @@ public static Builder builder(Workspace existing) { .setCreatedTimeMillis(existing.getCreatedTimeMillis()) .setUpdatedTimeMillis(existing.getUpdatedTimeMillis()) .setSampleSpec(existing.getSampleSpec()) - .setInsights(existing.getInsights()); + .setInsights(existing.getInsights()) + .setNullabilityMap(existing.getNullabilityMap()); } /** @@ -125,6 +136,7 @@ public static class Builder { private long updatedTimeMillis; private SampleSpec sampleSpec; private JsonObject insights; + private Map nullabilityMap; Builder(String name, String workspaceId) { this.workspaceName = name; @@ -159,9 +171,14 @@ public Builder setInsights(JsonObject insights) { return this; } + public Builder setNullabilityMap(Map nullabilityMap) { + this.nullabilityMap = nullabilityMap; + return this; + } + public Workspace build() { return new Workspace(workspaceName, workspaceId, directives, createdTimeMillis, updatedTimeMillis, sampleSpec, - insights); + insights, nullabilityMap); } } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index d7eae7960..99163ac39 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -46,6 +46,9 @@ import io.cdap.wrangler.proto.workspace.ColumnValidationResult; import io.cdap.wrangler.proto.workspace.WorkspaceValidationResult; import io.cdap.wrangler.proto.workspace.v2.DirectiveExecutionResponse; +import io.cdap.wrangler.proto.workspace.v2.SampleSpec; +import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveRegistry; import io.cdap.wrangler.registry.SystemDirectiveRegistry; @@ -118,7 +121,9 @@ protected List executeDirectives( String namespace, List directives, List sample, - GrammarWalker.Visitor grammarVisitor) throws DirectiveParseException, E, RecipeException { + GrammarWalker.Visitor grammarVisitor, + Map nullabilityMap) + throws DirectiveParseException, E, RecipeException { if (directives.isEmpty()) { return sample; @@ -139,8 +144,11 @@ protected List executeDirectives( new ConfigDirectiveContext(DirectiveConfig.EMPTY)); try (RecipePipelineExecutor executor = new RecipePipelineExecutor(parser, new ServicePipelineContext( - namespace, ExecutorContext.Environment.SERVICE, - getContext(), TRANSIENT_STORE))) { + namespace, + ExecutorContext.Environment.SERVICE, + getContext(), + TRANSIENT_STORE), + nullabilityMap)) { List result = executor.execute(sample); List errors = executor.errors() diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java index a73354cb0..e3b75d321 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java @@ -1095,7 +1095,7 @@ private List executeDirectives(NamespacedId id, List< // Extract rows from the workspace. List rows = fromWorkspace(workspace); return executeDirectives(id.getNamespace().getName(), directives, sample.apply(rows), - grammarVisitor); + grammarVisitor, null); }); } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java index 27216247f..6d092a4d0 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java @@ -107,7 +107,7 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception { namespace, ExecutorContext.Environment.SERVICE, systemAppContext, - new DefaultTransientStore()))) { + new DefaultTransientStore()), null)) { rows = executor.execute(rows); List errors = executor.errors().stream() .filter(ErrorRecordBase::isShownInWrangler) diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 82b45521e..5da2e4967 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; import io.cdap.wrangler.proto.workspace.v2.StageSpec; import io.cdap.wrangler.proto.workspace.v2.Workspace; +import io.cdap.wrangler.proto.workspace.v2.UserDefinedAction; import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; @@ -169,7 +170,8 @@ public void createWorkspace(HttpServiceRequest request, HttpServiceResponder res long now = System.currentTimeMillis(); Workspace workspace = Workspace.builder(generateWorkspaceName(wsId, creationRequest.getSampleRequest().getPath()), wsId.getWorkspaceId()) - .setCreatedTimeMillis(now).setUpdatedTimeMillis(now).setSampleSpec(spec).build(); + .setCreatedTimeMillis(now).setUpdatedTimeMillis(now) + .setSampleSpec(spec).setNullabilityMap(new HashMap<>()).build(); wsStore.saveWorkspace(wsId, new WorkspaceDetail(workspace, rows)); responder.sendJson(wsId.getWorkspaceId()); }); @@ -472,6 +474,11 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); + Map nullabilityMap = executionRequest.getNullabilityMap(); + if (!nullabilityMap.isEmpty()) { + //create new workspace object with the new nullabilityMap + changeNullability(nullabilityMap, workspaceId); + } List result = executeDirectives(ns.getName(), directives, detail, userDirectivesCollector); DirectiveExecutionResponse response = generateExecutionResponse(result, @@ -484,6 +491,20 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque return response; } + private void changeNullability(Map nullabilityMap, + WorkspaceId workspaceId) throws Exception { + try { + Workspace workspace = wsStore.getWorkspace(workspaceId); + Workspace newWorkspace = Workspace.builder(workspace) + .setUpdatedTimeMillis(System.currentTimeMillis()) + .setNullabilityMap(nullabilityMap).build(); + wsStore.updateWorkspace(workspaceId, newWorkspace); + } catch (Exception e) { + throw new RuntimeException("Error in setting nullabilityMap of columns ", e); + } + } + + /** * Get source specs, contains some hacky way on dealing with the csv parser */ @@ -580,7 +601,7 @@ private List executeLocally(String namespace, List(detail.getSample()), - grammarVisitor); + grammarVisitor, detail.getWorkspace().getNullabilityMap()); } /** diff --git a/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java b/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java index da108cf1b..9a1cd9254 100644 --- a/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java +++ b/wrangler-test/src/main/java/io/cdap/wrangler/test/TestingRig.java @@ -56,7 +56,7 @@ public static RecipePipeline pipeline(Class directive, Test String migrate = new MigrateToV2(recipe.toArray()).migrate(); RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); - return new RecipePipelineExecutor(parser, null); + return new RecipePipelineExecutor(parser, null, null); } public static RecipeParser parser(Class directive, String[] recipe) diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..0fd83dbf2 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -67,6 +67,7 @@ import io.cdap.wrangler.parser.NoOpDirectiveContext; import io.cdap.wrangler.parser.RecipeCompiler; import io.cdap.wrangler.proto.Contexts; +import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; import io.cdap.wrangler.registry.CompositeDirectiveRegistry; import io.cdap.wrangler.registry.DirectiveInfo; import io.cdap.wrangler.registry.DirectiveRegistry; @@ -365,7 +366,7 @@ && checkPreconditionNotEmpty(false)) { try { // Create the pipeline executor with context being set. - pipeline = new RecipePipelineExecutor(recipe, ctx); + pipeline = new RecipePipelineExecutor(recipe, ctx, null); } catch (Exception e) { throw new Exception(String.format("Stage:%s - %s", getContext().getStageName(), e.getMessage()), e); }