Skip to content

Commit

Permalink
Merge pull request #18 from blueelvis/develop
Browse files Browse the repository at this point in the history
[PLUGIN-1753] Added support for Named Databases, fixed UI along with several other fixes
  • Loading branch information
itsankit-google authored Feb 27, 2024
2 parents 0918fe3 + 1aa8ee0 commit 63deb9d
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 97 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ DirectivesVisitor.java
release.properties

# Remove dev directory.
dev
dev

# VSCode Files
.vscode
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.common.IdUtils;
import io.cdap.plugin.gcp.firestore.util.FirestoreConstants;

import java.io.IOException;
import java.util.UUID;
import javax.annotation.Nullable;

/**
Expand All @@ -20,6 +22,7 @@
public class FirestoreConfig extends PluginConfig {
public static final String NAME_PROJECT = "project";
public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType";
public static final String NAME_DATABASE = "databaseName";
public static final String NAME_SERVICE_ACCOUNT_FILE_PATH = "serviceFilePath";
public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON";
public static final String AUTO_DETECT = "auto-detect";
Expand All @@ -37,25 +40,31 @@ public class FirestoreConfig extends PluginConfig {
@Nullable
protected String project;

@Name(NAME_DATABASE)
@Description("Name of the Firestore Database. "
+ "If not specified, it will use '(default)'.")
@Macro
@Nullable
protected String databaseName;

@Name(NAME_SERVICE_ACCOUNT_TYPE)
@Description("Service account type, file path where the service account is located or the JSON content of the " +
"service account.")
@Macro
@Nullable
protected String serviceAccountType;

@Name(NAME_SERVICE_ACCOUNT_FILE_PATH)
@Description("Path on the local file system of the service account key used "
+ "for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. "
+ "When running on other clusters, the file must be present on every node in the cluster.")
@Macro
@Nullable
@Macro
protected String serviceFilePath;

@Name(NAME_SERVICE_ACCOUNT_JSON)
@Description("Content of the service account file.")
@Macro
@Nullable
@Macro
protected String serviceAccountJson;

public String getProject() {
Expand Down Expand Up @@ -103,11 +112,13 @@ public String getServiceAccountType() {
return serviceAccountType;
}

@Nullable
public Boolean isServiceAccountJson() {
String serviceAccountType = getServiceAccountType();
return Strings.isNullOrEmpty(serviceAccountType) ? null : serviceAccountType.equals(SERVICE_ACCOUNT_JSON);
}

@Nullable
public Boolean isServiceAccountFilePath() {
String serviceAccountType = getServiceAccountType();
return Strings.isNullOrEmpty(serviceAccountType) ? null : serviceAccountType.equals(SERVICE_ACCOUNT_FILE_PATH);
Expand All @@ -127,6 +138,16 @@ public String getServiceAccount() {
*/
public void validate(FailureCollector collector) {
IdUtils.validateReferenceName(referenceName, collector);
validateDatabaseName(collector);
}

public String getDatabaseName() {
if (containsMacro(NAME_DATABASE) && Strings.isNullOrEmpty(databaseName)) {
return null;
} else if (Strings.isNullOrEmpty(databaseName)) {
return "(default)";
}
return databaseName;
}

public String getReferenceName() {
Expand All @@ -150,4 +171,69 @@ public boolean autoServiceAccountUnavailable() {
}
return false;
}

/**
* Validates the given database name to consists of characters allowed to represent a dataset.
*/
public void validateDatabaseName(FailureCollector collector) {
if (containsMacro(FirestoreConfig.NAME_DATABASE)) {
return;
}

String databaseName = getDatabaseName();

// Check if the database name is empty or null.
if (Strings.isNullOrEmpty(databaseName)) {
collector.addFailure("Database Name must be specified.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// Check if database name contains the (default)
if (!databaseName.equals(FirestoreConstants.DEFAULT_DATABASE_NAME)) {

// Ensure database name includes only letters, numbers, and hyphen (-)
// characters.
if (!databaseName.matches("^[a-zA-Z0-9-]+$")) {
collector.addFailure("Database name can only include letters, numbers and hyphen characters.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// Ensure database name is in lower case.
if (databaseName != databaseName.toLowerCase()) {
collector.addFailure("Database name must be in lowercase.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// The first character must be a letter.
if (!databaseName.matches("^[a-zA-Z].*")) {
collector.addFailure("Database name's first character can only be an alphabet.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// The last character must be a letter or number.
if (!databaseName.matches(".*[a-zA-Z0-9]$")) {
collector.addFailure("Database name's last character can only be a letter or a number.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// Minimum of 4 characters.
if (databaseName.length() < 4) {
collector.addFailure("Database name should be at least 4 letters.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// Maximum of 63 characters.
if (databaseName.length() > 63) {
collector.addFailure("Database name cannot be more than 63 characters.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
}

// Should not be a UUID.
try {
UUID.fromString(databaseName);
collector.addFailure("Database name cannot contain a UUID.", null)
.withConfigProperty(FirestoreConfig.NAME_DATABASE);
} catch (IllegalArgumentException e) { }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,31 @@ public class FirestoreOutputFormatProvider implements OutputFormatProvider {
* for {@link FirestoreRecordWriter}.
*
* @param project Firestore project
* @param serviceAccountPath Firestore service account path
* @param databaseName Name of the Firestore Database
* @param serviceAccountFilePath Path to the JSON File containing the service account credentials
* @param serviceAccountJson JSON content of the service account credentials
* @param serviceAccountType The type of the Service account if it is stored in a filePath or JSON
* @param collection Firestore collection name
* @param shouldUseAutoGeneratedId should use auto generated document id
* @param batchSize batch size
*/
public FirestoreOutputFormatProvider(String project, @Nullable String serviceAccountPath,
String collection, String shouldUseAutoGeneratedId, String batchSize) {
public FirestoreOutputFormatProvider(String project, String databaseName, @Nullable String serviceAccountFilePath,
@Nullable String serviceAccountJson, String serviceAccountType,
String collection, String shouldUseAutoGeneratedId,
String batchSize) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<String, String>()
.put(FirestoreConfig.NAME_PROJECT, project)
.put(FirestoreConstants.PROPERTY_COLLECTION, collection)
.put(FirestoreConfig.NAME_DATABASE, databaseName)
.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_TYPE, serviceAccountType)
.put(FirestoreConstants.PROPERTY_COLLECTION, Strings.isNullOrEmpty(collection) ? "" : collection)
.put(FirestoreSinkConstants.PROPERTY_ID_TYPE, shouldUseAutoGeneratedId)
.put(FirestoreSinkConstants.PROPERTY_BATCH_SIZE, batchSize);

if (!Strings.isNullOrEmpty(serviceAccountPath)) {
builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountPath);
if (!Strings.isNullOrEmpty(serviceAccountFilePath)) {
builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountFilePath);
}
if (!Strings.isNullOrEmpty(serviceAccountJson)) {
builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_JSON, serviceAccountJson);
}
this.configMap = builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.firestore.WriteResult;
import com.google.common.base.Strings;
import io.cdap.plugin.gcp.firestore.common.FirestoreConfig;
import io.cdap.plugin.gcp.firestore.exception.FirestoreInitializationException;
import io.cdap.plugin.gcp.firestore.sink.util.FirestoreSinkConstants;
import io.cdap.plugin.gcp.firestore.util.FirestoreConstants;
import io.cdap.plugin.gcp.firestore.util.FirestoreUtil;
Expand Down Expand Up @@ -58,16 +59,38 @@ public class FirestoreRecordWriter extends RecordWriter<NullWritable, Map<String
*/
public FirestoreRecordWriter(TaskAttemptContext taskAttemptContext) {
Configuration config = taskAttemptContext.getConfiguration();

String projectId = config.get(FirestoreConfig.NAME_PROJECT);
String databaseId = config.get(FirestoreConfig.NAME_DATABASE);

// Get Service Account
Boolean isServiceAccountFilePath = false;
String serviceAccountFilePath = config.get(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH);

// Get Service Account Type whether JSON or FilePath
String serviceAccountType = config.get(FirestoreConfig.NAME_SERVICE_ACCOUNT_TYPE);

String serviceAccount = "";
if (serviceAccountType.equalsIgnoreCase(FirestoreConfig.SERVICE_ACCOUNT_FILE_PATH)) {
serviceAccount = config.get(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH);
isServiceAccountFilePath = true;
} else if (serviceAccountType.equalsIgnoreCase(FirestoreConfig.SERVICE_ACCOUNT_JSON)) {
serviceAccount = config.get(FirestoreConfig.NAME_SERVICE_ACCOUNT_JSON);
isServiceAccountFilePath = false;
} else {
throw new FirestoreInitializationException("Service account type can only be either a File Path or JSON.");
}

String collection = Strings.nullToEmpty(config.get(FirestoreConstants.PROPERTY_COLLECTION)).trim();
this.batchSize = config.getInt(FirestoreSinkConstants.PROPERTY_BATCH_SIZE, 25);
this.useAutogeneratedId = config.getBoolean(FirestoreSinkConstants.PROPERTY_ID_TYPE, false);
LOG.debug("Initialize RecordWriter(projectId={}, collection={}, serviceFilePath={}, batchSize={}, " +
"useAutogeneratedId={})", projectId, collection, serviceAccountFilePath, batchSize,
useAutogeneratedId);

this.db = FirestoreUtil.getFirestore(serviceAccountFilePath, projectId);
LOG.debug("Initialize RecordWriter(projectId={}, databaseId={}, collection={}, " +
"isServiceAccountFilePath={}, serviceFilePath={}, " +
"batchSize={}, useAutogeneratedId={}", projectId, databaseId, collection, isServiceAccountFilePath,
serviceAccountFilePath, batchSize, useAutogeneratedId);

this.db = FirestoreUtil.getFirestore(serviceAccount, isServiceAccountFilePath, projectId, databaseId);
this.collectionRef = db.collection(collection);
this.batch = db.batch();
this.totalCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,17 @@ public void prepareRun(BatchSinkContext batchSinkContext) throws Exception {
collector.getOrThrowException();

String project = config.getProject();
String serviceAccountFile = config.getServiceAccountFilePath();
String databaseName = config.getDatabaseName();
String serviceAccountFilePath = config.getServiceAccountFilePath();
String serviceAccountJson = config.getServiceAccountJson();
String serviceAccountType = config.getServiceAccountType();
String collection = config.getCollection();
String shouldAutoGenerateId = Boolean.toString(config.shouldUseAutoGeneratedId());
String batchSize = Integer.toString(config.getBatchSize());

batchSinkContext.addOutput(Output.of(config.getReferenceName(),
new FirestoreOutputFormatProvider(project, serviceAccountFile, collection, shouldAutoGenerateId,
batchSize)));
new FirestoreOutputFormatProvider(project, databaseName, serviceAccountFilePath, serviceAccountJson,
serviceAccountType, collection, shouldAutoGenerateId, batchSize)));

LineageRecorder lineageRecorder = new LineageRecorder(batchSinkContext, config.getReferenceName());
lineageRecorder.createExternalDataset(inputSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,19 @@ public FirestoreSinkConfig() {
* @param referenceName the reference name
* @param project the project id
* @param serviceFilePath the service file path
* @param databaseName the name of the database
* @param collection the collection
* @param idType the id type
* @param idAlias the id alias
* @param batchSize the batch size
*/
@VisibleForTesting
public FirestoreSinkConfig(String referenceName, String project, String serviceFilePath,
public FirestoreSinkConfig(String referenceName, String project, String serviceFilePath, String databaseName,
String collection, String idType, String idAlias, int batchSize) {
this.referenceName = referenceName;
this.project = project;
this.serviceFilePath = serviceFilePath;
this.databaseName = databaseName;
this.collection = collection;
this.idType = idType;
this.idAlias = idAlias;
Expand Down Expand Up @@ -148,7 +150,6 @@ public boolean shouldUseAutoGeneratedId() {
*/
public void validate(@Nullable Schema schema, FailureCollector collector) {
super.validate(collector);

validateBatchSize(collector);
validateFirestoreConnection(collector);

Expand All @@ -164,13 +165,15 @@ void validateFirestoreConnection(FailureCollector collector) {
return;
}
try {
Firestore db = FirestoreUtil.getFirestore(getServiceAccountFilePath(), getProject());
Firestore db = FirestoreUtil.getFirestore(getServiceAccount(), isServiceAccountFilePath(),
getProject(), getDatabaseName());
db.close();
} catch (Exception e) {
collector.addFailure(e.getMessage(), "Ensure properties like project, service account " +
"file path are correct.")
"file path, database name are correct.")
.withConfigProperty(NAME_SERVICE_ACCOUNT_FILE_PATH)
.withConfigProperty(NAME_PROJECT)
.withConfigProperty(NAME_DATABASE)
.withStacktrace(e.getStackTrace());
}
}
Expand All @@ -185,7 +188,7 @@ private void validateSchema(Schema schema, FailureCollector collector) {
}

/**
* Validates given field schema to be complaint with Firestore types.
* Validates given field schema to be compliant with Firestore types.
* Will throw {@link IllegalArgumentException} if schema contains unsupported type.
*
* @param fieldName field name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public class FirestoreInputFormatProvider implements InputFormatProvider {
/**
* Constructor for FirestoreInputFormatProvider object.
* @param project the project of Firestore DB
* @param serviceAccountPath the service account path of Firestore DB
* @param databaseName Name of the Firestore database
* @param serviceAccountFilePath the service account path of Firestore DB
* @param serviceAccountJson JSON content of the service account credentials
* @param serviceAccountType The type of the Service account if it is stored in a filePath or JSON
* @param collection the collection
* @param mode there are two modes (basic and advanced)
* @param pullDocuments the list of documents to pull
Expand All @@ -48,18 +51,24 @@ public class FirestoreInputFormatProvider implements InputFormatProvider {
* @param fields the fields of collection
*/
public FirestoreInputFormatProvider(
String project, @Nullable String serviceAccountPath, String collection, String mode,
String project, String databaseName, @Nullable String serviceAccountFilePath, @Nullable String serviceAccountJson,
String serviceAccountType, String collection, String mode,
String pullDocuments, String skipDocuments, String filters, List<String> fields) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<String, String>()
.put(FirestoreConfig.NAME_PROJECT, project)
.put(FirestoreConfig.NAME_DATABASE, databaseName)
.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_TYPE, serviceAccountType)
.put(FirestoreConstants.PROPERTY_COLLECTION, Strings.isNullOrEmpty(collection) ? "" : collection)
.put(FirestoreSourceConstants.PROPERTY_QUERY_MODE, mode)
.put(FirestoreSourceConstants.PROPERTY_PULL_DOCUMENTS, Strings.isNullOrEmpty(pullDocuments) ? "" : pullDocuments)
.put(FirestoreSourceConstants.PROPERTY_SKIP_DOCUMENTS, Strings.isNullOrEmpty(skipDocuments) ? "" : skipDocuments)
.put(FirestoreSourceConstants.PROPERTY_CUSTOM_QUERY, Strings.isNullOrEmpty(filters) ? "" : filters)
.put(FirestoreSourceConstants.PROPERTY_SCHEMA, Joiner.on(",").join(fields));
if (Objects.nonNull(serviceAccountPath)) {
builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountPath);
if (Objects.nonNull(serviceAccountFilePath)) {
builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_FILE_PATH, serviceAccountFilePath);
}
if (Objects.nonNull(serviceAccountJson)) {
builder.put(FirestoreConfig.NAME_SERVICE_ACCOUNT_JSON, serviceAccountJson);
}
this.configMap = builder.build();
}
Expand Down
Loading

0 comments on commit 63deb9d

Please sign in to comment.