diff --git a/.gitignore b/.gitignore index 7377350..8888011 100644 --- a/.gitignore +++ b/.gitignore @@ -50,4 +50,7 @@ DirectivesVisitor.java release.properties # Remove dev directory. -dev \ No newline at end of file +dev + +# VSCode Files +.vscode \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/common/FirestoreConfig.java b/src/main/java/io/cdap/plugin/gcp/firestore/common/FirestoreConfig.java index 597545f..45e45b9 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/common/FirestoreConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/common/FirestoreConfig.java @@ -10,8 +10,10 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.common.Constants; import io.cdap.plugin.common.IdUtils; +import io.cdap.plugin.gcp.firestore.util.FirestoreConstants; import java.io.IOException; +import java.util.UUID; import javax.annotation.Nullable; /** @@ -20,6 +22,7 @@ public class FirestoreConfig extends PluginConfig { public static final String NAME_PROJECT = "project"; public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType"; + public static final String NAME_DATABASE = "databaseName"; public static final String NAME_SERVICE_ACCOUNT_FILE_PATH = "serviceFilePath"; public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON"; public static final String AUTO_DETECT = "auto-detect"; @@ -37,25 +40,31 @@ public class FirestoreConfig extends PluginConfig { @Nullable protected String project; + @Name(NAME_DATABASE) + @Description("Name of the Firestore Database. " + + "If not specified, it will use '(default)'.") + @Macro + @Nullable + protected String databaseName; + @Name(NAME_SERVICE_ACCOUNT_TYPE) @Description("Service account type, file path where the service account is located or the JSON content of the " + "service account.") @Macro - @Nullable protected String serviceAccountType; @Name(NAME_SERVICE_ACCOUNT_FILE_PATH) @Description("Path on the local file system of the service account key used " + "for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. " + "When running on other clusters, the file must be present on every node in the cluster.") - @Macro @Nullable + @Macro protected String serviceFilePath; @Name(NAME_SERVICE_ACCOUNT_JSON) @Description("Content of the service account file.") - @Macro @Nullable + @Macro protected String serviceAccountJson; public String getProject() { @@ -103,11 +112,13 @@ public String getServiceAccountType() { return serviceAccountType; } + @Nullable public Boolean isServiceAccountJson() { String serviceAccountType = getServiceAccountType(); return Strings.isNullOrEmpty(serviceAccountType) ? null : serviceAccountType.equals(SERVICE_ACCOUNT_JSON); } + @Nullable public Boolean isServiceAccountFilePath() { String serviceAccountType = getServiceAccountType(); return Strings.isNullOrEmpty(serviceAccountType) ? null : serviceAccountType.equals(SERVICE_ACCOUNT_FILE_PATH); @@ -127,6 +138,16 @@ public String getServiceAccount() { */ public void validate(FailureCollector collector) { IdUtils.validateReferenceName(referenceName, collector); + validateDatabaseName(collector); + } + + public String getDatabaseName() { + if (containsMacro(NAME_DATABASE) && Strings.isNullOrEmpty(databaseName)) { + return null; + } else if (Strings.isNullOrEmpty(databaseName)) { + return "(default)"; + } + return databaseName; } public String getReferenceName() { @@ -150,4 +171,69 @@ public boolean autoServiceAccountUnavailable() { } return false; } + + /** + * Validates the given database name to consists of characters allowed to represent a dataset. + */ + public void validateDatabaseName(FailureCollector collector) { + if (containsMacro(FirestoreConfig.NAME_DATABASE)) { + return; + } + + String databaseName = getDatabaseName(); + + // Check if the database name is empty or null. + if (Strings.isNullOrEmpty(databaseName)) { + collector.addFailure("Database Name must be specified.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // Check if database name contains the (default) + if (!databaseName.equals(FirestoreConstants.DEFAULT_DATABASE_NAME)) { + + // Ensure database name includes only letters, numbers, and hyphen (-) + // characters. + if (!databaseName.matches("^[a-zA-Z0-9-]+$")) { + collector.addFailure("Database name can only include letters, numbers and hyphen characters.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // Ensure database name is in lower case. + if (databaseName != databaseName.toLowerCase()) { + collector.addFailure("Database name must be in lowercase.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // The first character must be a letter. + if (!databaseName.matches("^[a-zA-Z].*")) { + collector.addFailure("Database name's first character can only be an alphabet.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // The last character must be a letter or number. + if (!databaseName.matches(".*[a-zA-Z0-9]$")) { + collector.addFailure("Database name's last character can only be a letter or a number.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // Minimum of 4 characters. + if (databaseName.length() < 4) { + collector.addFailure("Database name should be at least 4 letters.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // Maximum of 63 characters. + if (databaseName.length() > 63) { + collector.addFailure("Database name cannot be more than 63 characters.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } + + // Should not be a UUID. + try { + UUID.fromString(databaseName); + collector.addFailure("Database name cannot contain a UUID.", null) + .withConfigProperty(FirestoreConfig.NAME_DATABASE); + } catch (IllegalArgumentException e) { } + } + } } diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreOutputFormatProvider.java index 6d9a7d9..87c92eb 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreOutputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreOutputFormatProvider.java @@ -38,21 +38,31 @@ public class FirestoreOutputFormatProvider implements OutputFormatProvider { * for {@link FirestoreRecordWriter}. * * @param project Firestore project - * @param serviceAccountPath Firestore service account path + * @param databaseName Name of the Firestore Database + * @param serviceAccountFilePath Path to the JSON File containing the service account credentials + * @param serviceAccountJson JSON content of the service account credentials + * @param serviceAccountType The type of the Service account if it is stored in a filePath or JSON * @param collection Firestore collection name * @param shouldUseAutoGeneratedId should use auto generated document id * @param batchSize batch size */ - public FirestoreOutputFormatProvider(String project, @Nullable String serviceAccountPath, - String collection, String shouldUseAutoGeneratedId, String batchSize) { + public FirestoreOutputFormatProvider(String project, String databaseName, @Nullable String serviceAccountFilePath, + @Nullable String serviceAccountJson, String serviceAccountType, + String collection, String shouldUseAutoGeneratedId, + String batchSize) { ImmutableMap.Builder builder = new ImmutableMap.Builder() .put(FirestoreConfig.NAME_PROJECT, project) - .put(FirestoreConstants.PROPERTY_COLLECTION, collection) + .put(FirestoreConfig.NAME_DATABASE, databaseName) + .put(FirestoreConfig.NAME_SERVICE_ACCOUNT_TYPE, serviceAccountType) + .put(FirestoreConstants.PROPERTY_COLLECTION, Strings.isNullOrEmpty(collection) ? "" : collection) .put(FirestoreSinkConstants.PROPERTY_ID_TYPE, shouldUseAutoGeneratedId) .put(FirestoreSinkConstants.PROPERTY_BATCH_SIZE, batchSize); - if (!Strings.isNullOrEmpty(serviceAccountPath)) { - builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountPath); + if (!Strings.isNullOrEmpty(serviceAccountFilePath)) { + builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountFilePath); + } + if (!Strings.isNullOrEmpty(serviceAccountJson)) { + builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_JSON, serviceAccountJson); } this.configMap = builder.build(); } diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreRecordWriter.java b/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreRecordWriter.java index 699f997..919a510 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreRecordWriter.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/sink/FirestoreRecordWriter.java @@ -24,6 +24,7 @@ import com.google.cloud.firestore.WriteResult; import com.google.common.base.Strings; import io.cdap.plugin.gcp.firestore.common.FirestoreConfig; +import io.cdap.plugin.gcp.firestore.exception.FirestoreInitializationException; import io.cdap.plugin.gcp.firestore.sink.util.FirestoreSinkConstants; import io.cdap.plugin.gcp.firestore.util.FirestoreConstants; import io.cdap.plugin.gcp.firestore.util.FirestoreUtil; @@ -58,16 +59,38 @@ public class FirestoreRecordWriter extends RecordWriter fields) { ImmutableMap.Builder builder = new ImmutableMap.Builder() .put(FirestoreConfig.NAME_PROJECT, project) + .put(FirestoreConfig.NAME_DATABASE, databaseName) + .put(FirestoreConfig.NAME_SERVICE_ACCOUNT_TYPE, serviceAccountType) .put(FirestoreConstants.PROPERTY_COLLECTION, Strings.isNullOrEmpty(collection) ? "" : collection) .put(FirestoreSourceConstants.PROPERTY_QUERY_MODE, mode) .put(FirestoreSourceConstants.PROPERTY_PULL_DOCUMENTS, Strings.isNullOrEmpty(pullDocuments) ? "" : pullDocuments) .put(FirestoreSourceConstants.PROPERTY_SKIP_DOCUMENTS, Strings.isNullOrEmpty(skipDocuments) ? "" : skipDocuments) .put(FirestoreSourceConstants.PROPERTY_CUSTOM_QUERY, Strings.isNullOrEmpty(filters) ? "" : filters) .put(FirestoreSourceConstants.PROPERTY_SCHEMA, Joiner.on(",").join(fields)); - if (Objects.nonNull(serviceAccountPath)) { - builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountPath); + if (Objects.nonNull(serviceAccountFilePath)) { + builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountFilePath); + } + if (Objects.nonNull(serviceAccountJson)) { + builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_JSON, serviceAccountJson); } this.configMap = builder.build(); } diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreRecordReader.java b/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreRecordReader.java index abc408c..ae4d6dd 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreRecordReader.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreRecordReader.java @@ -22,7 +22,9 @@ import com.google.cloud.firestore.QueryDocumentSnapshot; import com.google.cloud.firestore.QuerySnapshot; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import io.cdap.plugin.gcp.firestore.common.FirestoreConfig; +import io.cdap.plugin.gcp.firestore.exception.FirestoreInitializationException; import io.cdap.plugin.gcp.firestore.source.util.FilterInfo; import io.cdap.plugin.gcp.firestore.source.util.FilterInfoParser; import io.cdap.plugin.gcp.firestore.source.util.FirestoreQueryBuilder; @@ -49,7 +51,7 @@ */ public class FirestoreRecordReader extends RecordReader { private static final Logger LOG = LoggerFactory.getLogger(FirestoreRecordReader.class); - private Configuration conf; + private Configuration config; private Firestore db; private List items; // Map key that represents the item index. @@ -63,19 +65,42 @@ public class FirestoreRecordReader extends RecordReader fields = Splitter.on(',').trimResults() - .splitToList(conf.get(FirestoreSourceConstants.PROPERTY_SCHEMA, "")); + .splitToList(config.get(FirestoreSourceConstants.PROPERTY_SCHEMA, "")); List pullDocuments = Splitter.on(',').trimResults().omitEmptyStrings() - .splitToList(conf.get(FirestoreSourceConstants.PROPERTY_PULL_DOCUMENTS, "")); + .splitToList(config.get(FirestoreSourceConstants.PROPERTY_PULL_DOCUMENTS, "")); List skipDocuments = Splitter.on(',').trimResults().omitEmptyStrings() - .splitToList(conf.get(FirestoreSourceConstants.PROPERTY_SKIP_DOCUMENTS, "")); - String customQuery = conf.get(FirestoreSourceConstants.PROPERTY_CUSTOM_QUERY, ""); + .splitToList(config.get(FirestoreSourceConstants.PROPERTY_SKIP_DOCUMENTS, "")); + String customQuery = config.get(FirestoreSourceConstants.PROPERTY_CUSTOM_QUERY, ""); - db = FirestoreUtil.getFirestore(serviceAccountFilePath, projectId); + db = FirestoreUtil.getFirestore(serviceAccount, isServiceAccountFilePath, projectId, databaseId); try { List filters = getParsedFilters(customQuery); diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSource.java b/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSource.java index 41426e6..0b14ef8 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSource.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSource.java @@ -110,7 +110,10 @@ public void prepareRun(BatchSourceContext context) throws Exception { collector.getOrThrowException(); String project = config.getProject(); - String serviceAccountFile = config.getServiceAccountFilePath(); + String databaseName = config.getDatabaseName(); + String serviceAccountFilePath = config.getServiceAccountFilePath(); + String serviceAccountJson = config.getServiceAccountJson(); + String serviceAccountType = config.getServiceAccountType(); String collection = config.getCollection(); String mode = config.getQueryMode().getValue(); String pullDocuments = config.getPullDocuments(); @@ -119,8 +122,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { List fields = fetchSchemaFields(config.getSchema(collector)); - context.setInput(Input.of(config.getReferenceName(), new FirestoreInputFormatProvider(project, serviceAccountFile, - collection, mode, pullDocuments, skipDocuments, filters, fields))); + context.setInput(Input.of(config.getReferenceName(), new FirestoreInputFormatProvider(project, databaseName, + serviceAccountFilePath, serviceAccountJson, serviceAccountType, collection, mode, pullDocuments, skipDocuments, + filters, fields))); emitLineage(context); } @@ -168,7 +172,8 @@ private Schema getSchema(FailureCollector collector) { List items = null; try { - Firestore db = FirestoreUtil.getFirestore(config.getServiceAccountFilePath(), config.getProject()); + Firestore db = FirestoreUtil.getFirestore(config.getServiceAccount(), config.isServiceAccountFilePath(), + config.getProject(), config.getDatabaseName()); ApiFuture query = db.collection(config.getCollection()).limit(1).get(); QuerySnapshot querySnapshot = query.get(); @@ -176,10 +181,11 @@ private Schema getSchema(FailureCollector collector) { } catch (Exception e) { collector.addFailure(e.getMessage(), "Ensure properties like project, service account " + - "file path, collection are correct.") + "file path, collection, database name are correct.") .withConfigProperty(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH) .withConfigProperty(FirestoreConfig.NAME_PROJECT) .withConfigProperty(FirestoreConstants.PROPERTY_COLLECTION) + .withConfigProperty(FirestoreConfig.NAME_DATABASE) .withStacktrace(e.getStackTrace()); collector.getOrThrowException(); } diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfig.java b/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfig.java index 2f13985..a010742 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfig.java @@ -104,25 +104,28 @@ public class FirestoreSourceConfig extends FirestoreConfig { /** * Constructor for FirestoreSourceConfig object. - * @param referenceName the reference name - * @param project the project id - * @param serviceFilePath the service file path - * @param collection the collection id - * @param queryMode the query mode (basic or advanced) - * @param pullDocuments the list of documents to pull - * @param skipDocuments the list of documents to skip - * @param filters the filter for given field as well as value + * + * @param referenceName the reference name + * @param project the project id + * @param serviceFilePath the service file path + * @param databaseName the name of the database + * @param collection the collection id + * @param queryMode the query mode (basic or advanced) + * @param pullDocuments the list of documents to pull + * @param skipDocuments the list of documents to skip + * @param filters the filter for given field as well as value * @param includeDocumentId the included document id - * @param idAlias the id alias - * @param schema the schema + * @param idAlias the id alias + * @param schema the schema */ public FirestoreSourceConfig( - String referenceName, String project, String serviceFilePath, String collection, + String referenceName, String project, String serviceFilePath, String databaseName, String collection, String queryMode, String pullDocuments, String skipDocuments, String filters, String includeDocumentId, String idAlias, String schema) { this.referenceName = referenceName; this.project = project; this.serviceFilePath = serviceFilePath; + this.databaseName = databaseName; this.collection = collection; this.queryMode = queryMode; this.pullDocuments = pullDocuments; @@ -197,6 +200,7 @@ public String getIdAlias() { /** * Return the Schema. + * * @param collector the FailureCollector * @return The Schema */ @@ -209,7 +213,8 @@ public Schema getSchema(FailureCollector collector) { } catch (IOException e) { collector.addFailure("Invalid schema: " + e.getMessage(), null) .withConfigProperty(FirestoreSourceConstants.PROPERTY_SCHEMA); - // if there was an error that was added, it will throw an exception, otherwise, this statement will + // if there was an error that was added, it will throw an exception, otherwise, + // this statement will // not be executed collector.getOrThrowException(); return null; @@ -243,33 +248,37 @@ void validateFirestoreConnection(FailureCollector collector) { } Firestore db = null; try { - db = FirestoreUtil.getFirestore(getServiceAccountFilePath(), getProject()); + db = FirestoreUtil.getFirestore(getServiceAccount(), isServiceAccountFilePath(), getProject(), getDatabaseName()); if (db != null) { db.close(); } } catch (FirestoreInitializationException e) { collector.addFailure(e.getMessage(), "Ensure properties like project, service account " + - "file path are correct.") + "file path, database name are correct.") .withConfigProperty(NAME_SERVICE_ACCOUNT_FILE_PATH) .withConfigProperty(NAME_PROJECT) + .withConfigProperty(NAME_DATABASE) .withStacktrace(e.getStackTrace()); } catch (IllegalArgumentException e) { - collector.addFailure(e.getMessage(), "Ensure collection name exists in Firestore.") + collector.addFailure(e.getMessage(), "Ensure database name & collection name exists in Firestore.") .withConfigProperty(FirestoreConstants.PROPERTY_COLLECTION) + .withConfigProperty(FirestoreConfig.NAME_DATABASE) .withStacktrace(e.getStackTrace()); } catch (Exception e) { - collector.addFailure("Error while connecting to Firestoe - " + e.getMessage(), - "Ensure Firestore connection params are correct.") - .withConfigProperty(FirestoreConstants.PROPERTY_COLLECTION) - .withStacktrace(e.getStackTrace()); + collector.addFailure("Error while connecting to Firestore - " + e.getMessage(), + "Ensure Firestore connection params are correct.") + .withConfigProperty(FirestoreConstants.PROPERTY_COLLECTION) + .withConfigProperty(FirestoreConfig.NAME_DATABASE) + .withStacktrace(e.getStackTrace()); LOG.error("Error", e); } collector.getOrThrowException(); } /** - * Validates the given referenceName to consists of characters allowed to represent a dataset. + * Validates the given collection name to consists of characters allowed to + * represent a dataset. */ public void validateCollection(FailureCollector collector) { if (containsMacro(FirestoreConstants.PROPERTY_COLLECTION)) { @@ -295,9 +304,10 @@ private void validateSchema(Schema schema, FailureCollector collector) { /** * Validates given field schema to be compliant with Firestore types. * - * @param fieldName field name + * @param fieldName field name * @param fieldSchema schema for CDAP field - * @param collector failure collector to collect failures if schema contains unsupported type. + * @param collector failure collector to collect failures if schema contains + * unsupported type. */ private void validateFieldSchema(String fieldName, Schema fieldSchema, FailureCollector collector) { Schema.LogicalType logicalType = fieldSchema.getLogicalType(); @@ -342,8 +352,7 @@ private void validateFieldSchema(String fieldName, Schema fieldSchema, FailureCo return; case UNION: - fieldSchema.getUnionSchemas().forEach(unionSchema -> - validateFieldSchema(fieldName, unionSchema, collector)); + fieldSchema.getUnionSchemas().forEach(unionSchema -> validateFieldSchema(fieldName, unionSchema, collector)); return; default: collector.addFailure(String.format("Field '%s' is of unsupported type '%s'", @@ -360,11 +369,12 @@ private void validateFieldSchema(String fieldName, Schema fieldSchema, FailureCo public boolean shouldConnect() { return !containsMacro(FirestoreSourceConstants.PROPERTY_SCHEMA) && !containsMacro(NAME_SERVICE_ACCOUNT_FILE_PATH) && + !containsMacro(NAME_SERVICE_ACCOUNT_JSON) && !containsMacro(NAME_PROJECT) && tryGetProject() != null && !autoServiceAccountUnavailable(); } - + private void validateDocumentLists(FailureCollector collector) { if (Strings.isNullOrEmpty(getPullDocuments()) || Strings.isNullOrEmpty(getSkipDocuments()) || @@ -416,7 +426,9 @@ private void validateFilters(FailureCollector collector) { } /** - * Returns the empty list if filters contains a macro. Otherwise, the list returned can never be empty. + * Returns the empty list if filters contains a macro. Otherwise, the list + * returned can never be empty. + * * @param collector the FailureCollector * @return the this of FilterInfo */ diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreConstants.java b/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreConstants.java index 1c45433..15a98e4 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreConstants.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreConstants.java @@ -35,4 +35,9 @@ public interface FirestoreConstants { * Default column name to be used when document ids to be included in output schema. */ String ID_PROPERTY_NAME = "__id__"; + + /** + * Default name of the database to use when it is not specified. + */ + String DEFAULT_DATABASE_NAME = "(default)"; } diff --git a/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreUtil.java b/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreUtil.java index 242a42f..ee49577 100644 --- a/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/firestore/util/FirestoreUtil.java @@ -15,8 +15,7 @@ */ package io.cdap.plugin.gcp.firestore.util; - -import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.firestore.Firestore; import com.google.cloud.firestore.FirestoreOptions; import com.google.common.base.Strings; @@ -24,9 +23,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import javax.annotation.Nullable; /** @@ -38,31 +39,82 @@ public class FirestoreUtil { /** * Connects to Firestore instance using given credentials in JSON file, project ID and optional database Id. * - * @param serviceAccountFilePath path to credentials defined in JSON file + * @param serviceAccount Actual credentials in JSON or a path to the file containing them + * @param isServiceAccountFilePath If the serviceAccount parameter is a file path or not * @param projectId Google Cloud project ID + * @param databaseId Cloud Firestore Database Id * @return Firestore service */ - public static Firestore getFirestore(@Nullable String serviceAccountFilePath, String projectId) { + public static Firestore getFirestore(@Nullable String serviceAccount, + @Nullable Boolean isServiceAccountFilePath, + String projectId, + String databaseId) { try { FirestoreOptions.Builder optionsBuilder = FirestoreOptions.newBuilder() - .setProjectId(projectId); - - LOG.debug("serviceAccount={}, project={}...", serviceAccountFilePath, projectId); - - if (!Strings.isNullOrEmpty(serviceAccountFilePath)) { - optionsBuilder.setCredentials(loadServiceAccountCredentials(serviceAccountFilePath)); - } - + .setProjectId(projectId) + .setDatabaseId(databaseId); + LOG.debug("isServiceAccountFilePath={}, project={}, databaseId={}...", + isServiceAccountFilePath, projectId, databaseId); + final GoogleCredentials credential = getCredential(serviceAccount, isServiceAccountFilePath); + optionsBuilder.setCredentials(credential); return optionsBuilder.build().getService(); } catch (IOException e) { throw new FirestoreInitializationException("Unable to connect to Firestore", e); } } - public static ServiceAccountCredentials loadServiceAccountCredentials(String path) throws IOException { + public static GoogleCredentials loadServiceAccountCredentials(String path) throws IOException { File credentialsPath = new File(path); try (FileInputStream serviceAccountStream = new FileInputStream(credentialsPath)) { - return ServiceAccountCredentials.fromStream(serviceAccountStream); + return GoogleCredentials.fromStream(serviceAccountStream); + } + } + + /** + * Obtains Google Cloud Service Account credential from given JSON file. + * If give path is null or empty, obtains application default credentials. + * + * @param serviceAccount path to credentials defined in JSON file + * @param isServiceAccountFilePath indicator whether service account is file path or JSON + * @return Google Cloud Service Cloud credential + * @throws IOException if the credential cannot be created in the current environment + */ + private static GoogleCredentials getCredential(@Nullable String serviceAccount, + @Nullable Boolean isServiceAccountFilePath) throws IOException { + GoogleCredentials credential; + if (!Strings.isNullOrEmpty(serviceAccount)) { + if (!isServiceAccountFilePath) { + credential = loadCredentialFromStream(serviceAccount); + } else { + credential = loadCredentialFromFile(serviceAccount); + } + } else { + credential = GoogleCredentials.getApplicationDefault(); + } + return credential; + } + + /** + * Generate credentials from JSON key + * @param serviceAccountFilePath path to service account file + * @return Google Cloud credential + * @throws IOException if the credential cannot be created in the current environment + */ + private static GoogleCredentials loadCredentialFromFile(@Nullable String serviceAccountFilePath) throws IOException { + try (InputStream inputStream = new FileInputStream(serviceAccountFilePath)) { + return GoogleCredentials.fromStream(inputStream); + } + } + + /** + * Generate credentials from JSON key + * @param serviceAccount contents of service account JSON + * @return Google Cloud credential + * @throws IOException if the credential cannot be created in the current environment + */ + private static GoogleCredentials loadCredentialFromStream(@Nullable String serviceAccount) throws IOException { + try (InputStream jsonInputStream = new ByteArrayInputStream(serviceAccount.getBytes())) { + return GoogleCredentials.fromStream(jsonInputStream); } } } diff --git a/src/test/java/io/cdap/plugin/gcp/firestore/sink/FirestoreSinkConfigHelper.java b/src/test/java/io/cdap/plugin/gcp/firestore/sink/FirestoreSinkConfigHelper.java index 70b0be5..2aec7ac 100644 --- a/src/test/java/io/cdap/plugin/gcp/firestore/sink/FirestoreSinkConfigHelper.java +++ b/src/test/java/io/cdap/plugin/gcp/firestore/sink/FirestoreSinkConfigHelper.java @@ -23,7 +23,7 @@ public class FirestoreSinkConfigHelper { public static final String TEST_REF_NAME = "TestRefName"; public static final String TEST_PROJECT = "test-project"; - public static final String TEST_DATABASE = "TestDatabase"; + public static final String TEST_DATABASE = "testdatabase"; public static final String TEST_COLLECTION = "TestCollection"; public static final String TEST_ID_ALIAS = "testIdAlias"; @@ -35,7 +35,7 @@ public static class ConfigBuilder { private String referenceName = TEST_REF_NAME; private String project = TEST_PROJECT; private String serviceFilePath = "/path/to/file"; - private String database = TEST_DATABASE; + private String databaseName = TEST_DATABASE; private String collection = TEST_COLLECTION; private String idType; private String idAlias; @@ -57,7 +57,7 @@ public ConfigBuilder setServiceFilePath(String serviceFilePath) { } public ConfigBuilder setDatabase(String database) { - this.database = database; + this.databaseName = database; return this; } @@ -82,7 +82,7 @@ public ConfigBuilder setBatchSize(int batchSize) { } public FirestoreSinkConfig build() { - return new FirestoreSinkConfig(referenceName, project, serviceFilePath, collection, idType, idAlias, + return new FirestoreSinkConfig(referenceName, project, serviceFilePath, databaseName, collection, idType, idAlias, batchSize); } } diff --git a/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigHelper.java b/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigHelper.java index 9fc6fdd..abb271f 100644 --- a/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigHelper.java +++ b/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigHelper.java @@ -23,7 +23,7 @@ public class FirestoreSourceConfigHelper { public static final String TEST_REF_NAME = "TestRefName"; public static final String TEST_PROJECT = "test-project"; - public static final String TEST_DATABASE = "TestDatabase"; + public static final String TEST_DATABASE = "testdatabase"; public static final String TEST_COLLECTION = "TestCollection"; public static ConfigBuilder newConfigBuilder() { @@ -34,7 +34,7 @@ public static class ConfigBuilder { private String referenceName = TEST_REF_NAME; private String project = TEST_PROJECT; private String serviceFilePath = "/path/to/file"; - private String database = TEST_DATABASE; + private String databaseName = TEST_DATABASE; private String collection = TEST_COLLECTION; private String includeDocumentId = "true"; private String idAlias = "__id__"; @@ -56,7 +56,7 @@ public ConfigBuilder setServiceFilePath(String serviceFilePath) { } public ConfigBuilder setDatabase(String database) { - this.database = database; + this.databaseName = database; return this; } @@ -81,7 +81,7 @@ public ConfigBuilder setSchema(String schema) { } public FirestoreSourceConfig build() { - return new FirestoreSourceConfig(referenceName, project, serviceFilePath, collection, + return new FirestoreSourceConfig(referenceName, project, serviceFilePath, databaseName, collection, "Basic", "", "", "", includeDocumentId, idAlias, schema); } } diff --git a/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigTest.java b/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigTest.java index 2d52b4c..442280c 100644 --- a/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigTest.java +++ b/src/test/java/io/cdap/plugin/gcp/firestore/source/FirestoreSourceConfigTest.java @@ -19,6 +19,7 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.validation.CauseAttributes; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.gcp.firestore.common.FirestoreConfig; import io.cdap.plugin.gcp.firestore.util.FirestoreConstants; import org.junit.Assert; import org.junit.Rule; @@ -47,6 +48,143 @@ public void testValidateCollectionNull() { .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); } + @Test + public void testValidateWithDatabaseNameWithSpecialCharacters() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("a!!!==--zz") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name can only include letters, numbers and hyphen characters.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabaseNameWithUppercaseCharacters() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase(FirestoreSourceConfigHelper.TEST_DATABASE.toUpperCase()) + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name must be in lowercase.", collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabaseNameWithoutFirstLetterCharacter() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("4testdatabase") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name's first character can only be an alphabet.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabaseNameWithLastCharacterHyphen() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("testdatabase-") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name's last character can only be a letter or a number.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabaseNameWithLessThanFourCharacters() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("tes") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name should be at least 4 letters.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabaseNameWithMoreThanSixtyThreeCharacters() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("testdatabase11233aaasssssssssssssssssssssssssssssssssssssssssssssssss") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name cannot be more than 63 characters.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabaseNameShouldNotBeUUID() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("b793f8c6-e52c-43c7-8ac4-d0cdfaecbb8e") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreConfig.NAME_DATABASE, collector.getValidationFailures().get(0) + .getCauses().get(0).getAttribute(CauseAttributes.STAGE_CONFIG)); + Assert.assertEquals("Database name cannot contain a UUID.", + collector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateWithDatabase() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase(FirestoreSourceConfigHelper.TEST_DATABASE) + .build(), collector); + + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + Assert.assertEquals(FirestoreSourceConfigHelper.TEST_DATABASE, config.getDatabaseName()); + } + + @Test + public void testValidateWithEmptyDatabase() { + MockFailureCollector collector = new MockFailureCollector(); + FirestoreSourceConfig config = withFirestoreValidationMock(FirestoreSourceConfigHelper.newConfigBuilder() + .setCollection(FirestoreSourceConfigHelper.TEST_COLLECTION) + .setDatabase("") + .build(), collector); + + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + Assert.assertEquals("(default)", config.getDatabaseName()); + } + @Test public void testValidateCollectionEmpty() { MockFailureCollector collector = new MockFailureCollector(); diff --git a/widgets/Firestore-batchsink.json b/widgets/Firestore-batchsink.json index a4c0a6f..303f57a 100644 --- a/widgets/Firestore-batchsink.json +++ b/widgets/Firestore-batchsink.json @@ -15,6 +15,15 @@ "placeholder": "Name used to identify this sink for lineage" } }, + { + "widget-type": "textbox", + "label": "Database Name/ID", + "name": "databaseName", + "widget-attributes": { + "placeholder": "Name of the Database. If not specified, will use '(default)'", + "default": "(default)" + } + }, { "widget-type": "textbox", "label": "Collection", @@ -47,19 +56,43 @@ "properties": [ { "widget-type": "textbox", - "label": "Service Account File Path", - "name": "serviceFilePath", + "label": "Project ID", + "name": "project", "widget-attributes" : { "default": "auto-detect" } }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, { "widget-type": "textbox", - "label": "Project ID", - "name": "project", + "label": "Service Account File Path", + "name": "serviceFilePath", "widget-attributes" : { "default": "auto-detect" } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" } ] }, @@ -80,6 +113,32 @@ } ], "outputs": [ ], + "filters": [ + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "serviceAccountType == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountJSON" + } + ] + } + ], "jump-config": { "datasets": [ { diff --git a/widgets/Firestore-batchsource.json b/widgets/Firestore-batchsource.json index a9a0b98..fa2cd8c 100644 --- a/widgets/Firestore-batchsource.json +++ b/widgets/Firestore-batchsource.json @@ -15,6 +15,15 @@ "placeholder": "Name used to identify this source for lineage" } }, + { + "widget-type": "textbox", + "label": "Database Name/ID", + "name": "databaseName", + "widget-attributes": { + "placeholder": "Name of the Database. If not specified, will use '(default)'", + "default": "(default)" + } + }, { "widget-type": "textbox", "label": "Collection", @@ -78,19 +87,43 @@ "properties": [ { "widget-type": "textbox", - "label": "Service Account File Path", - "name": "serviceFilePath", + "label": "Project ID", + "name": "project", "widget-attributes" : { "default": "auto-detect" } }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, { "widget-type": "textbox", - "label": "Project ID", - "name": "project", + "label": "Service Account File Path", + "name": "serviceFilePath", "widget-attributes" : { "default": "auto-detect" } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" } ] }, @@ -177,6 +210,30 @@ "name": "customQuery" } ] + }, + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "serviceAccountType == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountJSON" + } + ] } ] }