Skip to content

Commit

Permalink
feat: add hdfs support to storage transfer (#12270)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurusai-voleti authored Dec 4, 2024
1 parent dc4d975 commit 94d5485
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
"transfer_spec.0.http_data_source",
"transfer_spec.0.azure_blob_storage_data_source",
"transfer_spec.0.posix_data_source",
"transfer_spec.0.hdfs_data_source",
}
transferSpecDataSinkKeys = []string{
"transfer_spec.0.gcs_data_sink",
Expand Down Expand Up @@ -197,6 +198,14 @@ func ResourceStorageTransferJob() *schema.Resource {
ExactlyOneOf: transferSpecDataSourceKeys,
Description: `An Azure Blob Storage data source.`,
},
"hdfs_data_source": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: hdfsDataSchema(),
ExactlyOneOf: transferSpecDataSourceKeys,
Description: `An HDFS Storage data source.`,
},
},
},
Description: `Transfer specification.`,
Expand Down Expand Up @@ -545,6 +554,18 @@ func posixDataSchema() *schema.Resource {
}
}

func hdfsDataSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"path": {
Type: schema.TypeString,
Required: true,
Description: `Directory path to the filesystem.`,
},
},
}
}

func azureBlobStorageDataSchema() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
Expand Down Expand Up @@ -1105,6 +1126,25 @@ func flattenPosixData(posixData *storagetransfer.PosixFilesystem) []map[string]i
return []map[string]interface{}{data}
}

func expandHdfsData(hdfsDatas []interface{}) *storagetransfer.HdfsData {
if len(hdfsDatas) == 0 || hdfsDatas[0] == nil {
return nil
}

hdfsData := hdfsDatas[0].(map[string]interface{})
return &storagetransfer.HdfsData{
Path: hdfsData["path"].(string),
}
}

func flattenHdfsData(hdfsData *storagetransfer.HdfsData) []map[string]interface{} {
data := map[string]interface{}{
"path": hdfsData.Path,
}

return []map[string]interface{}{data}
}

func expandAzureCredentials(azureCredentials []interface{}) *storagetransfer.AzureCredentials {
if len(azureCredentials) == 0 || azureCredentials[0] == nil {
return nil
Expand Down Expand Up @@ -1232,6 +1272,7 @@ func expandTransferSpecs(transferSpecs []interface{}) *storagetransfer.TransferS
HttpDataSource: expandHttpData(transferSpec["http_data_source"].([]interface{})),
AzureBlobStorageDataSource: expandAzureBlobStorageData(transferSpec["azure_blob_storage_data_source"].([]interface{})),
PosixDataSource: expandPosixData(transferSpec["posix_data_source"].([]interface{})),
HdfsDataSource: expandHdfsData(transferSpec["hdfs_data_source"].([]interface{})),
}
}

Expand Down Expand Up @@ -1267,6 +1308,8 @@ func flattenTransferSpec(transferSpec *storagetransfer.TransferSpec, d *schema.R
data["azure_blob_storage_data_source"] = flattenAzureBlobStorageData(transferSpec.AzureBlobStorageDataSource, d)
} else if transferSpec.PosixDataSource != nil {
data["posix_data_source"] = flattenPosixData(transferSpec.PosixDataSource)
} else if transferSpec.HdfsDataSource != nil {
data["hdfs_data_source"] = flattenHdfsData(transferSpec.HdfsDataSource)
}

return []map[string]interface{}{data}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,39 @@ func TestAccStorageTransferJob_notificationConfig(t *testing.T) {
})
}

func TestAccStorageTransferJob_hdfsSource(t *testing.T) {
t.Parallel()

testDataSinkName := acctest.RandString(t, 10)
otherDataSinkName := acctest.RandString(t, 10)
testTransferJobDescription := acctest.RandString(t, 10)
testSourceAgentPoolName := fmt.Sprintf("tf-test-source-agent-pool-%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccStorageTransferJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccStorageTransferJob_hdfsSource(envvar.GetTestProjectFromEnv(), testDataSinkName, testTransferJobDescription, testSourceAgentPoolName, "/root/", ""),
},
{
ResourceName: "google_storage_transfer_job.transfer_job",
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccStorageTransferJob_hdfsSource(envvar.GetTestProjectFromEnv(), otherDataSinkName, testTransferJobDescription, testSourceAgentPoolName, "/root/dir/", "object/"),
},
{
ResourceName: "google_storage_transfer_job.transfer_job",
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccStorageTransferJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
config := acctest.GoogleProviderConfig(t)
Expand Down Expand Up @@ -950,6 +983,83 @@ resource "google_storage_transfer_job" "transfer_job" {
`, project, dataSinkBucketName, project, sourceAgentPoolName, transferJobDescription, project)
}

func testAccStorageTransferJob_hdfsSource(project string, dataSinkBucketName string, transferJobDescription string, sourceAgentPoolName string, hdfsPath string, gcsPath string) string {
return fmt.Sprintf(`
data "google_storage_transfer_project_service_account" "default" {
project = "%s"
}
resource "google_storage_bucket" "data_sink" {
name = "%s"
project = "%s"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_iam_member" "data_sink" {
bucket = google_storage_bucket.data_sink.name
role = "roles/storage.admin"
member = "serviceAccount:${data.google_storage_transfer_project_service_account.default.email}"
}
resource "google_project_iam_member" "pubsub" {
project = data.google_storage_transfer_project_service_account.default.project
role = "roles/pubsub.admin"
member = "serviceAccount:${data.google_storage_transfer_project_service_account.default.email}"
}
resource "google_storage_transfer_agent_pool" "foo" {
name = "%s"
bandwidth_limit {
limit_mbps = "120"
}
depends_on = [google_project_iam_member.pubsub]
}
resource "google_storage_transfer_job" "transfer_job" {
description = "%s"
project = "%s"
transfer_spec {
source_agent_pool_name = google_storage_transfer_agent_pool.foo.id
hdfs_data_source {
path = "%s"
}
gcs_data_sink {
bucket_name = google_storage_bucket.data_sink.name
path = "%s"
}
}
schedule {
schedule_start_date {
year = 2018
month = 10
day = 1
}
schedule_end_date {
year = 2019
month = 10
day = 1
}
start_time_of_day {
hours = 0
minutes = 30
seconds = 0
nanos = 0
}
}
depends_on = [
google_storage_bucket_iam_member.data_sink,
google_project_iam_member.pubsub
]
}
`, project, dataSinkBucketName, project, sourceAgentPoolName, transferJobDescription, project, hdfsPath, gcsPath)
}

func testAccStorageTransferJob_posixSink(project string, dataSourceBucketName string, transferJobDescription string, sinkAgentPoolName string) string {
return fmt.Sprintf(`
data "google_storage_transfer_project_service_account" "default" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ The following arguments are supported:

* `azure_blob_storage_data_source` - (Optional) An Azure Blob Storage data source. Structure [documented below](#nested_azure_blob_storage_data_source).

* `hdfs_data_source` - (Optional) An HDFS data source. Structure [documented below](#nested_hdfs_data_source).

<a name="nested_schedule"></a>The `schedule` block supports:

* `schedule_start_date` - (Required) The first day the recurring transfer is scheduled to run. If `schedule_start_date` is in the past, the transfer will run for the first time on the following day. Structure [documented below](#nested_schedule_start_end_date).
Expand Down Expand Up @@ -219,6 +221,10 @@ A duration in seconds with up to nine fractional digits, terminated by 's'. Exam

* `root_directory` - (Required) Root directory path to the filesystem.

<a name="nested_hdfs_data_source"></a>The `hdfs_data_source` block supports:

* `path` - (Required) Root directory path to the filesystem.

<a name="nested_aws_s3_data_source"></a>The `aws_s3_data_source` block supports:

* `bucket_name` - (Required) S3 Bucket name.
Expand Down

0 comments on commit 94d5485

Please sign in to comment.