diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java b/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java index 214e8565a..fab075021 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java @@ -129,8 +129,8 @@ public DirectiveInfo get(String namespace, String name) throws DirectiveLoadExce } if (directive == null) { throw new DirectiveLoadException( - String.format("10-5 - Unable to load the user defined directive '%s'. " + - "Please check if the artifact containing UDD is still present.", name) + String.format("Directive '%s' not found. Check if directive is spelled corrected. If the directive is " + + "user defined, make sure its artifact has been uploaded.", name) ); } DirectiveInfo directiveInfo = new DirectiveInfo(DirectiveInfo.Scope.USER, directive); diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/Json2SchemaTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/Json2SchemaTest.java index 76bd48931..54104ce80 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/utils/Json2SchemaTest.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/Json2SchemaTest.java @@ -102,7 +102,7 @@ public void testLogicalType() throws Exception { Json2Schema json2Schema = new Json2Schema(); Schema actual = json2Schema.toSchema("testRecord", testRow); - Schema expected = Schema.recordOf("expectedRecord", + Schema expected = Schema.recordOf("testRecord", Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.INT))), Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))), Schema.Field.of("date", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))), @@ -134,7 +134,7 @@ public void testArrayType() throws Exception { Json2Schema json2Schema = new Json2Schema(); Schema actual = json2Schema.toSchema("testRecord", testRow); - Schema expected = Schema.recordOf("expectedRecord", + Schema expected = Schema.recordOf("testRecord", Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.INT))), Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))), Schema.Field.of("date", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))), diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java index 2945d0fab..ec90ee195 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/DirectivesHandler.java @@ -55,6 +55,7 @@ import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TokenGroup; import io.cdap.wrangler.api.TransientStore; +import io.cdap.wrangler.api.parser.SyntaxError; import io.cdap.wrangler.api.parser.Token; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.datamodel.DataModelGlossary; @@ -431,7 +432,7 @@ public void upload(HttpServiceRequest request, HttpServiceResponder responder, // Extract content. byte[] content = handler.getContent(); if (content == null) { - throw new BadRequestException("Body not present, please post the file containing the " + throw new BadRequestException("Body is not present, upload file containing " + "records to be wrangled."); } @@ -629,7 +630,7 @@ public void execute(HttpServiceRequest request, HttpServiceResponder responder, if ((object.getClass().getMethod("toString").getDeclaringClass() != Object.class)) { value.put(fieldName, object.toString()); } else { - value.put(fieldName, "Non-displayable object"); + value.put(fieldName, "Non displayable object"); } } else { value.put(fieldName, null); @@ -663,6 +664,18 @@ private String addLoadablePragmaDirectives(String namespace, Request request) { // Compile the directive extracting the loadable plugins (a.k.a // Directives in this context). CompileStatus status = compiler.compile(new MigrateToV2(request.getRecipe().getDirectives()).migrate()); + if (!status.isSuccess()) { + StringBuilder eStr = new StringBuilder(); + Iterator errors = status.getErrors(); + while (errors.hasNext()) { + eStr.append(errors.next().getMessage()); + if (errors.hasNext()) { + eStr.append(","); + } + } + throw new DirectiveParseException(eStr.toString()); + } + RecipeSymbol symbols = status.getSymbols(); Iterator iterator = symbols.iterator(); List userDirectives = new ArrayList<>(); @@ -1294,6 +1307,7 @@ private List executeDirectives(NamespacedId id, @Nullable Request user, String migrate = migrator.migrate(); RecipeParser recipe = new GrammarBasedParser(id.getNamespace().getName(), migrate, composite); recipe.initialize(new ConfigDirectiveContext(configStore.getConfig())); + try { executor.initialize(recipe, context); rows = executor.execute(sample.apply(rows)); @@ -1305,6 +1319,7 @@ private List executeDirectives(NamespacedId id, @Nullable Request user, .stream() .filter(ErrorRecordBase::isShownInWrangler) .collect(Collectors.toList()); + if (errors.size() > 0) { throw new ErrorRecordsException(errors); } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/gcp/GCPUtils.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/gcp/GCPUtils.java index eb4c8ed06..37c8e2d67 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/gcp/GCPUtils.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/gcp/GCPUtils.java @@ -32,6 +32,9 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -42,12 +45,17 @@ public final class GCPUtils { public static final String SERVICE_ACCOUNT_KEYFILE = "service-account-keyfile"; public static ServiceAccountCredentials loadLocalFile(String path) throws IOException { + return loadLocalFile(path, Collections.emptyList()); + } + + public static ServiceAccountCredentials loadLocalFile(String path, List scopes) throws IOException { File credentialsPath = new File(path); if (!credentialsPath.exists()) { throw new FileNotFoundException("Service account file " + credentialsPath.getName() + " does not exist."); } try (FileInputStream serviceAccountStream = new FileInputStream(credentialsPath)) { - return ServiceAccountCredentials.fromStream(serviceAccountStream); + ServiceAccountCredentials serviceAccountCredentials = ServiceAccountCredentials.fromStream(serviceAccountStream); + return (ServiceAccountCredentials) serviceAccountCredentials.createScoped(scopes); } catch (FileNotFoundException e) { throw new IOException( String.format("Unable to find service account file '%s'.", path), e); @@ -62,7 +70,7 @@ public static ServiceAccountCredentials loadLocalFile(String path) throws IOExce */ public static Storage getStorageService(ConnectionMeta connection) throws IOException { StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); - setProperties(connection, storageOptionsBuilder); + setProperties(connection, storageOptionsBuilder, Collections.emptyList()); return storageOptionsBuilder.build().getService(); } @@ -71,7 +79,8 @@ public static Storage getStorageService(ConnectionMeta connection) throws IOExce */ public static BigQuery getBigQueryService(ConnectionMeta connection) throws IOException { BigQueryOptions.Builder bigQueryOptionsBuilder = BigQueryOptions.newBuilder(); - setProperties(connection, bigQueryOptionsBuilder); + setProperties(connection, bigQueryOptionsBuilder, Arrays.asList("https://www.googleapis.com/auth/drive", + "https://www.googleapis.com/auth/bigquery")); return bigQueryOptionsBuilder.build().getService(); } @@ -101,10 +110,10 @@ public static Spanner getSpannerService(ConnectionMeta connection) throws IOExce * set credentials and project_id if those are provided in the input connection */ private static void setProperties(ConnectionMeta connection, - ServiceOptions.Builder serviceOptions) throws IOException { + ServiceOptions.Builder serviceOptions, List scopes) throws IOException { if (connection.getProperties().containsKey(SERVICE_ACCOUNT_KEYFILE)) { String path = connection.getProperties().get(SERVICE_ACCOUNT_KEYFILE); - serviceOptions.setCredentials(loadLocalFile(path)); + serviceOptions.setCredentials(loadLocalFile(path, scopes)); } if (connection.getProperties().containsKey(PROJECT_ID)) { String projectId = connection.getProperties().get(PROJECT_ID);