-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
modifying execute API to get column nullability state #686
base: develop
Are you sure you want to change the base?
Changes from 5 commits
080216e
d9cf5bc
6ae414e
b1fa542
aa0e088
e4a3a43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright © 2016-2019 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.wrangler.api; | ||
|
||
/** | ||
* A Null Handling specific exception used for communicating issues with Null Handling in a column. | ||
*/ | ||
public class NullHandlingException extends Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see this being thrown or caught anywhere, how are we expecting to use it? Would be better to leave this out of the PR and include it in whatever PR actually uses it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was for future use, removed |
||
public NullHandlingException(Exception e) { | ||
super(e); | ||
} | ||
|
||
public NullHandlingException(String message) { | ||
super(message); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import io.cdap.wrangler.api.ReportErrorAndProceed; | ||
import io.cdap.wrangler.api.Row; | ||
import io.cdap.wrangler.api.TransientVariableScope; | ||
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; | ||
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator; | ||
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext; | ||
import io.cdap.wrangler.schema.TransientStoreKeys; | ||
|
@@ -40,6 +41,8 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import javax.annotation.Nullable; | ||
|
||
|
@@ -56,10 +59,13 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu | |
private final RecipeParser recipeParser; | ||
private final ExecutorContext context; | ||
private List<Directive> directives; | ||
private HashMap<String, UserDefinedAction> nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map. In general, when declaring variables we should be using the interface and not the specific implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should also be final. Basically anything passed into the constructor should be final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context) { | ||
public RecipePipelineExecutor(RecipeParser recipeParser, @Nullable ExecutorContext context, | ||
HashMap<String, UserDefinedAction> nullabilityMap) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
this.context = context; | ||
this.recipeParser = recipeParser; | ||
this.nullabilityMap = nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should do a defensive copy to make sure the map cannot change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,7 @@ public static List<Row> execute(String[] recipe, List<Row> rows, ExecutorContext | |
|
||
String migrate = new MigrateToV2(recipe).migrate(); | ||
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); | ||
return new RecipePipelineExecutor(parser, context).execute(rows); | ||
return new RecipePipelineExecutor(parser, context, null).execute(rows); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should not pass null for anything that is not annotated as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
/** | ||
|
@@ -112,7 +112,7 @@ public static Pair<List<Row>, List<Row>> executeWithErrors(String[] recipe, List | |
|
||
String migrate = new MigrateToV2(recipe).migrate(); | ||
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); | ||
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context); | ||
RecipePipeline pipeline = new RecipePipelineExecutor(parser, context, null); | ||
List<Row> results = pipeline.execute(rows); | ||
List<Row> errors = pipeline.errors(); | ||
return new Pair<>(results, errors); | ||
|
@@ -126,7 +126,7 @@ public static RecipePipeline execute(String[] recipe) | |
|
||
String migrate = new MigrateToV2(recipe).migrate(); | ||
RecipeParser parser = new GrammarBasedParser(Contexts.SYSTEM, migrate, registry); | ||
return new RecipePipelineExecutor(parser, new TestingPipelineContext()); | ||
return new RecipePipelineExecutor(parser, new TestingPipelineContext(), null); | ||
} | ||
|
||
public static RecipeParser parse(String[] recipe) throws DirectiveParseException, DirectiveLoadException { | ||
|
harshdeeppruthi marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,9 @@ | |
|
||
package io.cdap.wrangler.proto.workspace.v2; | ||
|
||
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
|
||
/** | ||
|
@@ -26,10 +28,14 @@ | |
public class DirectiveExecutionRequest { | ||
private final List<String> directives; | ||
private final int limit; | ||
private final HashMap<String, UserDefinedAction> nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very similar to using send-to-error and filter directives. Would like to understand why we need to introduce a new way to do almost the same thing. |
||
|
||
public DirectiveExecutionRequest(List<String> directives, int limit) { | ||
|
||
public DirectiveExecutionRequest(List<String> directives, int limit, | ||
HashMap<String, UserDefinedAction> nullabilityMap) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
this.directives = directives; | ||
this.limit = limit; | ||
this.nullabilityMap = nullabilityMap; | ||
} | ||
|
||
public int getLimit() { | ||
|
@@ -39,4 +45,8 @@ public int getLimit() { | |
public List<String> getDirectives() { | ||
return directives == null ? Collections.emptyList() : directives; | ||
} | ||
|
||
public HashMap<String, UserDefinedAction> getNullabilityMap() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to check for null here, similar to getDirectives(). This object is created by deserializing the HTTP request body, so it's possible the caller is not setting this field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import com.google.gson.JsonObject; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import javax.annotation.Nullable; | ||
|
@@ -38,16 +39,20 @@ public class Workspace { | |
// this is for insights page in UI | ||
private final JsonObject insights; | ||
|
||
private HashMap<String, UserDefinedAction> nullabilityMap; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HashMap -> Map (same for the rest of the class) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should also be final There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
private Workspace(String workspaceName, String workspaceId, List<String> directives, | ||
long createdTimeMillis, long updatedTimeMillis, @Nullable SampleSpec sampleSpec, | ||
JsonObject insights) { | ||
JsonObject insights, HashMap<String, UserDefinedAction> nullabilityMap) { | ||
this.workspaceName = workspaceName; | ||
this.workspaceId = workspaceId; | ||
this.directives = directives; | ||
this.createdTimeMillis = createdTimeMillis; | ||
this.updatedTimeMillis = updatedTimeMillis; | ||
this.sampleSpec = sampleSpec; | ||
this.insights = insights; | ||
this.nullabilityMap = nullabilityMap == null || nullabilityMap.isEmpty() ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. by convention, only variables annotated as
This is to prevent the map from ever changing once the object is created. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
new HashMap<>() : nullabilityMap; | ||
} | ||
|
||
public String getWorkspaceName() { | ||
|
@@ -79,6 +84,15 @@ public JsonObject getInsights() { | |
return insights; | ||
} | ||
|
||
public HashMap<String, UserDefinedAction> getNullabilityMap() { | ||
return nullabilityMap; | ||
} | ||
|
||
public void setNullabilityMap( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should usually not have any setter methods. Making the state mutable makes it very difficult to reason about correctness. If it needs to be changed, a new instance of the Workspace should be created using the Builder. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, But how does using the Builder instead of directly changing it ensure correctness? |
||
HashMap<String, UserDefinedAction> nullabilityMap) { | ||
this.nullabilityMap = nullabilityMap; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
|
@@ -111,7 +125,8 @@ public static Builder builder(Workspace existing) { | |
.setCreatedTimeMillis(existing.getCreatedTimeMillis()) | ||
.setUpdatedTimeMillis(existing.getUpdatedTimeMillis()) | ||
.setSampleSpec(existing.getSampleSpec()) | ||
.setInsights(existing.getInsights()); | ||
.setInsights(existing.getInsights()) | ||
.setNullabilityMap(existing.getNullabilityMap()); | ||
} | ||
|
||
/** | ||
|
@@ -125,6 +140,7 @@ public static class Builder { | |
private long updatedTimeMillis; | ||
private SampleSpec sampleSpec; | ||
private JsonObject insights; | ||
private HashMap<String, UserDefinedAction> nullabilityMap; | ||
|
||
Builder(String name, String workspaceId) { | ||
this.workspaceName = name; | ||
|
@@ -159,9 +175,23 @@ public Builder setInsights(JsonObject insights) { | |
return this; | ||
} | ||
|
||
public Builder setNullabilityMap (HashMap<String, UserDefinedAction> nullabilityMap) { | ||
this.nullabilityMap = nullabilityMap; | ||
return this; | ||
} | ||
|
||
public Workspace build() { | ||
return new Workspace(workspaceName, workspaceId, directives, createdTimeMillis, updatedTimeMillis, sampleSpec, | ||
insights); | ||
insights, nullabilityMap); | ||
} | ||
} | ||
|
||
/** | ||
* UserDefinedAction enum. | ||
*/ | ||
public enum UserDefinedAction { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should move this to its own class instead of being nested. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
SKIP_ROW, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a better name would be FILTER There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
SEND_TO_ERROR_COLLECTOR, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ERROR_COLLECTOR is a pipeline specific concept, this can just be SEND_TO_ERROR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But by SEND_TO_ERROR_COLLECTOR, I actually mean the pipeline specific error collector hence the name. |
||
ERROR_PIPELINE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pipeline is not a great name, as wrangler can be used outside of a pipeline when making HTTP requests. This can just be ERROR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,7 @@ | |
import io.cdap.wrangler.proto.workspace.v2.ServiceResponse; | ||
import io.cdap.wrangler.proto.workspace.v2.StageSpec; | ||
import io.cdap.wrangler.proto.workspace.v2.Workspace; | ||
import io.cdap.wrangler.proto.workspace.v2.Workspace.UserDefinedAction; | ||
import io.cdap.wrangler.proto.workspace.v2.WorkspaceCreationRequest; | ||
import io.cdap.wrangler.proto.workspace.v2.WorkspaceDetail; | ||
import io.cdap.wrangler.proto.workspace.v2.WorkspaceId; | ||
|
@@ -169,7 +170,8 @@ public void createWorkspace(HttpServiceRequest request, HttpServiceResponder res | |
long now = System.currentTimeMillis(); | ||
Workspace workspace = Workspace.builder(generateWorkspaceName(wsId, creationRequest.getSampleRequest().getPath()), | ||
wsId.getWorkspaceId()) | ||
.setCreatedTimeMillis(now).setUpdatedTimeMillis(now).setSampleSpec(spec).build(); | ||
.setCreatedTimeMillis(now).setUpdatedTimeMillis(now) | ||
.setSampleSpec(spec).setNullabilityMap(new HashMap<>()).build(); | ||
wsStore.saveWorkspace(wsId, new WorkspaceDetail(workspace, rows)); | ||
responder.sendJson(wsId.getWorkspaceId()); | ||
}); | ||
|
@@ -472,6 +474,12 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque | |
|
||
WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); | ||
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); | ||
HashMap<String, UserDefinedAction> nullabilityMap = executionRequest.getNullabilityMap() == null ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logic should be in the getNullabilityMap() class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
new HashMap<>() : executionRequest.getNullabilityMap(); | ||
if (!nullabilityMap.isEmpty()) { | ||
//change nullabilityMap in Workspace Object | ||
changeNullability(nullabilityMap, workspaceId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should update the workspace after executing the directives (this is already happening), not before. Otherwise the execution can fail and now there's a partially updated workspace. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need the updated nullabilityMap before directives are executed. |
||
} | ||
List<Row> result = executeDirectives(ns.getName(), directives, detail, | ||
userDirectivesCollector); | ||
DirectiveExecutionResponse response = generateExecutionResponse(result, | ||
|
@@ -484,6 +492,18 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque | |
return response; | ||
} | ||
|
||
private void changeNullability(HashMap<String, UserDefinedAction> columnMappings, | ||
WorkspaceId workspaceId) throws Exception { | ||
try { | ||
Workspace workspace = wsStore.getWorkspace(workspaceId); | ||
workspace.setNullabilityMap(columnMappings); | ||
wsStore.updateWorkspace(workspaceId, workspace); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Error in setting nullabilityMap of columns ", e); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
|
||
/** | ||
* Get source specs, contains some hacky way on dealing with the csv parser | ||
*/ | ||
|
@@ -580,7 +600,7 @@ private <E extends Exception> List<Row> executeLocally(String namespace, List<St | |
// load the udd | ||
composite.reload(namespace); | ||
return executeDirectives(namespace, directives, new ArrayList<>(detail.getSample()), | ||
grammarVisitor); | ||
grammarVisitor, detail.getWorkspace().getNullabilityMap()); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2024