From 57fd8e17f38bda54a2cc6c0c0107adcf2c32e556 Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Mon, 25 Nov 2019 14:23:32 -0700 Subject: [PATCH 01/11] Generated 5.1.x BP SDK from commit 1664144 (#95) --- ds3/ds3Gets.go | 1 + ds3/ds3Posts.go | 27 +++++++++++++++ ds3/ds3Puts.go | 2 ++ ds3/models/requests.go | 46 +++++++++++++++++++++++++ ds3/models/responseModels.go | 65 ++++++++++++++++++++++++++++++++++++ ds3/models/responses.go | 19 +++++++++++ 6 files changed, 160 insertions(+) diff --git a/ds3/ds3Gets.go b/ds3/ds3Gets.go index fef38b6..3cb584a 100644 --- a/ds3/ds3Gets.go +++ b/ds3/ds3Gets.go @@ -4118,6 +4118,7 @@ func (client *Client) GetS3TargetsSpectraS3(request *models.GetS3TargetsSpectraS WithOptionalQueryParam("https", networking.BoolPtrToStrPtr(request.Https)). WithOptionalVoidQueryParam("last_page", request.LastPage). WithOptionalQueryParam("name", request.Name). + WithOptionalQueryParam("naming_mode", networking.InterfaceToStrPtr(request.NamingMode)). WithOptionalQueryParam("page_length", networking.IntPtrToStrPtr(request.PageLength)). WithOptionalQueryParam("page_offset", networking.IntPtrToStrPtr(request.PageOffset)). WithOptionalQueryParam("page_start_marker", request.PageStartMarker). diff --git a/ds3/ds3Posts.go b/ds3/ds3Posts.go index bbb87d0..c7f9f7d 100644 --- a/ds3/ds3Posts.go +++ b/ds3/ds3Posts.go @@ -18,6 +18,31 @@ import ( "github.com/SpectraLogic/ds3_go_sdk/ds3/networking" ) +func (client *Client) CompleteBlob(request *models.CompleteBlobRequest) (*models.CompleteBlobResponse, error) { + // Build the http request + httpRequest, err := networking.NewHttpRequestBuilder(). + WithHttpVerb(HTTP_VERB_POST). + WithPath("/" + request.BucketName + "/" + request.ObjectName). + WithQueryParam("blob", request.Blob). + WithQueryParam("job", request.Job). + Build(client.connectionInfo) + + if err != nil { + return nil, err + } + + networkRetryDecorator := networking.NewNetworkRetryDecorator(client.sendNetwork, client.clientPolicy.maxRetries) + + // Invoke the HTTP request. + response, requestErr := networkRetryDecorator.Invoke(httpRequest) + if requestErr != nil { + return nil, requestErr + } + + // Create a response object based on the result. + return models.NewCompleteBlobResponse(response) +} + func (client *Client) CompleteMultiPartUpload(request *models.CompleteMultiPartUploadRequest) (*models.CompleteMultiPartUploadResponse, error) { // Build the http request httpRequest, err := networking.NewHttpRequestBuilder(). @@ -1276,6 +1301,7 @@ func (client *Client) RegisterS3TargetSpectraS3(request *models.RegisterS3Target WithOptionalQueryParam("data_path_end_point", request.DataPathEndPoint). WithOptionalQueryParam("default_read_preference", networking.InterfaceToStrPtr(request.DefaultReadPreference)). WithOptionalQueryParam("https", networking.BoolPtrToStrPtr(request.Https)). + WithOptionalQueryParam("naming_mode", networking.InterfaceToStrPtr(request.NamingMode)). WithOptionalQueryParam("offline_data_staging_window_in_tb", networking.IntPtrToStrPtr(request.OfflineDataStagingWindowInTb)). WithOptionalQueryParam("permit_going_out_of_sync", networking.BoolPtrToStrPtr(request.PermitGoingOutOfSync)). WithOptionalQueryParam("proxy_domain", request.ProxyDomain). @@ -1309,6 +1335,7 @@ func (client *Client) DelegateCreateUserSpectraS3(request *models.DelegateCreate WithHttpVerb(HTTP_VERB_POST). WithPath("/_rest_/user"). WithQueryParam("name", request.Name). + WithOptionalQueryParam("default_data_policy_id", request.DefaultDataPolicyId). WithOptionalQueryParam("id", request.Id). WithOptionalQueryParam("max_buckets", networking.IntPtrToStrPtr(request.MaxBuckets)). WithOptionalQueryParam("secret_key", request.SecretKey). diff --git a/ds3/ds3Puts.go b/ds3/ds3Puts.go index d7d775f..d3a2479 100644 --- a/ds3/ds3Puts.go +++ b/ds3/ds3Puts.go @@ -613,6 +613,7 @@ func (client *Client) PutBulkJobSpectraS3(request *models.PutBulkJobSpectraS3Req WithOptionalQueryParam("max_upload_size", networking.Int64PtrToStrPtr(request.MaxUploadSize)). WithOptionalQueryParam("minimize_spanning_across_media", networking.BoolPtrToStrPtr(request.MinimizeSpanningAcrossMedia)). WithOptionalQueryParam("name", request.Name). + WithOptionalVoidQueryParam("pre_allocate_job_space", request.PreAllocateJobSpace). WithOptionalQueryParam("priority", networking.InterfaceToStrPtr(request.Priority)). WithOptionalQueryParam("verify_after_write", networking.BoolPtrToStrPtr(request.VerifyAfterWrite)). WithQueryParam("operation", "start_bulk_put"). @@ -2584,6 +2585,7 @@ func (client *Client) ModifyS3TargetSpectraS3(request *models.ModifyS3TargetSpec WithOptionalQueryParam("default_read_preference", networking.InterfaceToStrPtr(request.DefaultReadPreference)). WithOptionalQueryParam("https", networking.BoolPtrToStrPtr(request.Https)). WithOptionalQueryParam("name", request.Name). + WithOptionalQueryParam("naming_mode", networking.InterfaceToStrPtr(request.NamingMode)). WithOptionalQueryParam("offline_data_staging_window_in_tb", networking.IntPtrToStrPtr(request.OfflineDataStagingWindowInTb)). WithOptionalQueryParam("permit_going_out_of_sync", networking.BoolPtrToStrPtr(request.PermitGoingOutOfSync)). WithOptionalQueryParam("proxy_domain", request.ProxyDomain). diff --git a/ds3/models/requests.go b/ds3/models/requests.go index 737fd82..cbca3ea 100644 --- a/ds3/models/requests.go +++ b/ds3/models/requests.go @@ -32,6 +32,22 @@ func NewAbortMultiPartUploadRequest(bucketName string, objectName string, upload } } +type CompleteBlobRequest struct { + BucketName string + ObjectName string + Blob string + Job string +} + +func NewCompleteBlobRequest(bucketName string, objectName string, blob string, job string) *CompleteBlobRequest { + return &CompleteBlobRequest{ + BucketName: bucketName, + ObjectName: objectName, + Blob: blob, + Job: job, + } +} + type CompleteMultiPartUploadRequest struct { BucketName string ObjectName string @@ -3054,6 +3070,7 @@ type PutBulkJobSpectraS3Request struct { MinimizeSpanningAcrossMedia *bool Name *string Objects []Ds3PutObject + PreAllocateJobSpace bool Priority Priority VerifyAfterWrite *bool } @@ -3100,6 +3117,11 @@ func (putBulkJobSpectraS3Request *PutBulkJobSpectraS3Request) WithName(name stri return putBulkJobSpectraS3Request } +func (putBulkJobSpectraS3Request *PutBulkJobSpectraS3Request) WithPreAllocateJobSpace() *PutBulkJobSpectraS3Request { + putBulkJobSpectraS3Request.PreAllocateJobSpace = true + return putBulkJobSpectraS3Request +} + func (putBulkJobSpectraS3Request *PutBulkJobSpectraS3Request) WithPriority(priority Priority) *PutBulkJobSpectraS3Request { putBulkJobSpectraS3Request.Priority = priority return putBulkJobSpectraS3Request @@ -9013,6 +9035,7 @@ type GetS3TargetsSpectraS3Request struct { Https *bool LastPage bool Name *string + NamingMode CloudNamingMode PageLength *int PageOffset *int PageStartMarker *string @@ -9057,6 +9080,11 @@ func (getS3TargetsSpectraS3Request *GetS3TargetsSpectraS3Request) WithName(name return getS3TargetsSpectraS3Request } +func (getS3TargetsSpectraS3Request *GetS3TargetsSpectraS3Request) WithNamingMode(namingMode CloudNamingMode) *GetS3TargetsSpectraS3Request { + getS3TargetsSpectraS3Request.NamingMode = namingMode + return getS3TargetsSpectraS3Request +} + func (getS3TargetsSpectraS3Request *GetS3TargetsSpectraS3Request) WithPageLength(pageLength int) *GetS3TargetsSpectraS3Request { getS3TargetsSpectraS3Request.PageLength = &pageLength return getS3TargetsSpectraS3Request @@ -9141,6 +9169,7 @@ type ModifyS3TargetSpectraS3Request struct { DefaultReadPreference TargetReadPreferenceType Https *bool Name *string + NamingMode CloudNamingMode OfflineDataStagingWindowInTb *int PermitGoingOutOfSync *bool ProxyDomain *string @@ -9201,6 +9230,11 @@ func (modifyS3TargetSpectraS3Request *ModifyS3TargetSpectraS3Request) WithName(n return modifyS3TargetSpectraS3Request } +func (modifyS3TargetSpectraS3Request *ModifyS3TargetSpectraS3Request) WithNamingMode(namingMode CloudNamingMode) *ModifyS3TargetSpectraS3Request { + modifyS3TargetSpectraS3Request.NamingMode = namingMode + return modifyS3TargetSpectraS3Request +} + func (modifyS3TargetSpectraS3Request *ModifyS3TargetSpectraS3Request) WithOfflineDataStagingWindowInTb(offlineDataStagingWindowInTb int) *ModifyS3TargetSpectraS3Request { modifyS3TargetSpectraS3Request.OfflineDataStagingWindowInTb = &offlineDataStagingWindowInTb return modifyS3TargetSpectraS3Request @@ -9265,6 +9299,7 @@ type RegisterS3TargetSpectraS3Request struct { DefaultReadPreference TargetReadPreferenceType Https *bool Name string + NamingMode CloudNamingMode OfflineDataStagingWindowInTb *int PermitGoingOutOfSync *bool ProxyDomain *string @@ -9315,6 +9350,11 @@ func (registerS3TargetSpectraS3Request *RegisterS3TargetSpectraS3Request) WithHt return registerS3TargetSpectraS3Request } +func (registerS3TargetSpectraS3Request *RegisterS3TargetSpectraS3Request) WithNamingMode(namingMode CloudNamingMode) *RegisterS3TargetSpectraS3Request { + registerS3TargetSpectraS3Request.NamingMode = namingMode + return registerS3TargetSpectraS3Request +} + func (registerS3TargetSpectraS3Request *RegisterS3TargetSpectraS3Request) WithOfflineDataStagingWindowInTb(offlineDataStagingWindowInTb int) *RegisterS3TargetSpectraS3Request { registerS3TargetSpectraS3Request.OfflineDataStagingWindowInTb = &offlineDataStagingWindowInTb return registerS3TargetSpectraS3Request @@ -9377,6 +9417,7 @@ func (verifyS3TargetSpectraS3Request *VerifyS3TargetSpectraS3Request) WithFullDe } type DelegateCreateUserSpectraS3Request struct { + DefaultDataPolicyId *string Id *string MaxBuckets *int Name string @@ -9389,6 +9430,11 @@ func NewDelegateCreateUserSpectraS3Request(name string) *DelegateCreateUserSpect } } +func (delegateCreateUserSpectraS3Request *DelegateCreateUserSpectraS3Request) WithDefaultDataPolicyId(defaultDataPolicyId string) *DelegateCreateUserSpectraS3Request { + delegateCreateUserSpectraS3Request.DefaultDataPolicyId = &defaultDataPolicyId + return delegateCreateUserSpectraS3Request +} + func (delegateCreateUserSpectraS3Request *DelegateCreateUserSpectraS3Request) WithId(id string) *DelegateCreateUserSpectraS3Request { delegateCreateUserSpectraS3Request.Id = &id return delegateCreateUserSpectraS3Request diff --git a/ds3/models/responseModels.go b/ds3/models/responseModels.go index 45ca227..e1307d4 100644 --- a/ds3/models/responseModels.go +++ b/ds3/models/responseModels.go @@ -436,6 +436,53 @@ func (capacitySummaryContainer *CapacitySummaryContainer) parse(xmlNode *XmlNode } } +type CloudNamingMode Enum + +const ( + CLOUD_NAMING_MODE_BLACK_PEARL CloudNamingMode = 1 + iota + CLOUD_NAMING_MODE_AWS_S3 CloudNamingMode = 1 + iota +) + +func (cloudNamingMode *CloudNamingMode) UnmarshalText(text []byte) error { + var str string = string(bytes.ToUpper(text)) + switch str { + case "": *cloudNamingMode = UNDEFINED + case "BLACK_PEARL": *cloudNamingMode = CLOUD_NAMING_MODE_BLACK_PEARL + case "AWS_S3": *cloudNamingMode = CLOUD_NAMING_MODE_AWS_S3 + default: + *cloudNamingMode = UNDEFINED + return errors.New(fmt.Sprintf("Cannot marshal '%s' into CloudNamingMode", str)) + } + return nil +} + +func (cloudNamingMode CloudNamingMode) String() string { + switch cloudNamingMode { + case CLOUD_NAMING_MODE_BLACK_PEARL: return "BLACK_PEARL" + case CLOUD_NAMING_MODE_AWS_S3: return "AWS_S3" + default: + log.Printf("Error: invalid CloudNamingMode represented by '%d'", cloudNamingMode) + return "" + } +} + +func (cloudNamingMode CloudNamingMode) StringPtr() *string { + if cloudNamingMode == UNDEFINED { + return nil + } + result := cloudNamingMode.String() + return &result +} + +func newCloudNamingModeFromContent(content []byte, aggErr *AggregateError) *CloudNamingMode { + if len(content) == 0 { + // no value + return nil + } + result := new(CloudNamingMode) + parseEnum(content, result, aggErr) + return result +} type CompletedJob struct { BucketId string CachedSizeInBytes int64 @@ -1659,9 +1706,11 @@ type S3Region Enum const ( S3_REGION_GOV_CLOUD S3Region = 1 + iota S3_REGION_US_EAST_1 S3Region = 1 + iota + S3_REGION_US_EAST_2 S3Region = 1 + iota S3_REGION_US_WEST_1 S3Region = 1 + iota S3_REGION_US_WEST_2 S3Region = 1 + iota S3_REGION_EU_WEST_1 S3Region = 1 + iota + S3_REGION_EU_WEST_2 S3Region = 1 + iota S3_REGION_EU_CENTRAL_1 S3Region = 1 + iota S3_REGION_AP_SOUTH_1 S3Region = 1 + iota S3_REGION_AP_SOUTHEAST_1 S3Region = 1 + iota @@ -1670,6 +1719,7 @@ const ( S3_REGION_AP_NORTHEAST_2 S3Region = 1 + iota S3_REGION_SA_EAST_1 S3Region = 1 + iota S3_REGION_CN_NORTH_1 S3Region = 1 + iota + S3_REGION_CA_CENTRAL_1 S3Region = 1 + iota ) func (s3Region *S3Region) UnmarshalText(text []byte) error { @@ -1678,9 +1728,11 @@ func (s3Region *S3Region) UnmarshalText(text []byte) error { case "": *s3Region = UNDEFINED case "GOV_CLOUD": *s3Region = S3_REGION_GOV_CLOUD case "US_EAST_1": *s3Region = S3_REGION_US_EAST_1 + case "US_EAST_2": *s3Region = S3_REGION_US_EAST_2 case "US_WEST_1": *s3Region = S3_REGION_US_WEST_1 case "US_WEST_2": *s3Region = S3_REGION_US_WEST_2 case "EU_WEST_1": *s3Region = S3_REGION_EU_WEST_1 + case "EU_WEST_2": *s3Region = S3_REGION_EU_WEST_2 case "EU_CENTRAL_1": *s3Region = S3_REGION_EU_CENTRAL_1 case "AP_SOUTH_1": *s3Region = S3_REGION_AP_SOUTH_1 case "AP_SOUTHEAST_1": *s3Region = S3_REGION_AP_SOUTHEAST_1 @@ -1689,6 +1741,7 @@ func (s3Region *S3Region) UnmarshalText(text []byte) error { case "AP_NORTHEAST_2": *s3Region = S3_REGION_AP_NORTHEAST_2 case "SA_EAST_1": *s3Region = S3_REGION_SA_EAST_1 case "CN_NORTH_1": *s3Region = S3_REGION_CN_NORTH_1 + case "CA_CENTRAL_1": *s3Region = S3_REGION_CA_CENTRAL_1 default: *s3Region = UNDEFINED return errors.New(fmt.Sprintf("Cannot marshal '%s' into S3Region", str)) @@ -1700,9 +1753,11 @@ func (s3Region S3Region) String() string { switch s3Region { case S3_REGION_GOV_CLOUD: return "GOV_CLOUD" case S3_REGION_US_EAST_1: return "US_EAST_1" + case S3_REGION_US_EAST_2: return "US_EAST_2" case S3_REGION_US_WEST_1: return "US_WEST_1" case S3_REGION_US_WEST_2: return "US_WEST_2" case S3_REGION_EU_WEST_1: return "EU_WEST_1" + case S3_REGION_EU_WEST_2: return "EU_WEST_2" case S3_REGION_EU_CENTRAL_1: return "EU_CENTRAL_1" case S3_REGION_AP_SOUTH_1: return "AP_SOUTH_1" case S3_REGION_AP_SOUTHEAST_1: return "AP_SOUTHEAST_1" @@ -1711,6 +1766,7 @@ func (s3Region S3Region) String() string { case S3_REGION_AP_NORTHEAST_2: return "AP_NORTHEAST_2" case S3_REGION_SA_EAST_1: return "SA_EAST_1" case S3_REGION_CN_NORTH_1: return "CN_NORTH_1" + case S3_REGION_CA_CENTRAL_1: return "CA_CENTRAL_1" default: log.Printf("Error: invalid S3Region represented by '%d'", s3Region) return "" @@ -3828,6 +3884,7 @@ const ( TAPE_DRIVE_TYPE_TS1140 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_TS1150 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_TS1155 TapeDriveType = 1 + iota + TAPE_DRIVE_TYPE_TS1160 TapeDriveType = 1 + iota ) func (tapeDriveType *TapeDriveType) UnmarshalText(text []byte) error { @@ -3842,6 +3899,7 @@ func (tapeDriveType *TapeDriveType) UnmarshalText(text []byte) error { case "TS1140": *tapeDriveType = TAPE_DRIVE_TYPE_TS1140 case "TS1150": *tapeDriveType = TAPE_DRIVE_TYPE_TS1150 case "TS1155": *tapeDriveType = TAPE_DRIVE_TYPE_TS1155 + case "TS1160": *tapeDriveType = TAPE_DRIVE_TYPE_TS1160 default: *tapeDriveType = UNDEFINED return errors.New(fmt.Sprintf("Cannot marshal '%s' into TapeDriveType", str)) @@ -3859,6 +3917,7 @@ func (tapeDriveType TapeDriveType) String() string { case TAPE_DRIVE_TYPE_TS1140: return "TS1140" case TAPE_DRIVE_TYPE_TS1150: return "TS1150" case TAPE_DRIVE_TYPE_TS1155: return "TS1155" + case TAPE_DRIVE_TYPE_TS1160: return "TS1160" default: log.Printf("Error: invalid TapeDriveType represented by '%d'", tapeDriveType) return "" @@ -4395,6 +4454,7 @@ type AzureTarget struct { Id string LastFullyVerified *string Name *string + NamingMode CloudNamingMode PermitGoingOutOfSync bool Quiesced Quiesced State TargetState @@ -4425,6 +4485,8 @@ func (azureTarget *AzureTarget) parse(xmlNode *XmlNode, aggErr *AggregateError) azureTarget.LastFullyVerified = parseNullableString(child.Content) case "Name": azureTarget.Name = parseNullableString(child.Content) + case "NamingMode": + parseEnum(child.Content, &azureTarget.NamingMode, aggErr) case "PermitGoingOutOfSync": azureTarget.PermitGoingOutOfSync = parseBool(child.Content, aggErr) case "Quiesced": @@ -4720,6 +4782,7 @@ type S3Target struct { Id string LastFullyVerified *string Name *string + NamingMode CloudNamingMode OfflineDataStagingWindowInTb int PermitGoingOutOfSync bool ProxyDomain *string @@ -4759,6 +4822,8 @@ func (s3Target *S3Target) parse(xmlNode *XmlNode, aggErr *AggregateError) { s3Target.LastFullyVerified = parseNullableString(child.Content) case "Name": s3Target.Name = parseNullableString(child.Content) + case "NamingMode": + parseEnum(child.Content, &s3Target.NamingMode, aggErr) case "OfflineDataStagingWindowInTb": s3Target.OfflineDataStagingWindowInTb = parseInt(child.Content, aggErr) case "PermitGoingOutOfSync": diff --git a/ds3/models/responses.go b/ds3/models/responses.go index 4b3d9a3..eb6eb1c 100644 --- a/ds3/models/responses.go +++ b/ds3/models/responses.go @@ -37,6 +37,25 @@ func NewAbortMultiPartUploadResponse(webResponse WebResponse) (*AbortMultiPartUp } } +type CompleteBlobResponse struct { + + Headers *http.Header +} + + + +func NewCompleteBlobResponse(webResponse WebResponse) (*CompleteBlobResponse, error) { + defer webResponse.Body().Close() + expectedStatusCodes := []int { 200 } + + switch code := webResponse.StatusCode(); code { + case 200: + return &CompleteBlobResponse{Headers: webResponse.Header()}, nil + default: + return nil, buildBadStatusCodeError(webResponse, expectedStatusCodes) + } +} + type CompleteMultiPartUploadResponse struct { CompleteMultipartUploadResult CompleteMultipartUploadResult Headers *http.Header From 3c9fa2a8f6e1f0a143a212d6c75c46e63da32527 Mon Sep 17 00:00:00 2001 From: Rachel Tucker Date: Wed, 27 Nov 2019 14:39:48 -0700 Subject: [PATCH 02/11] OTHER: updating CompleteBlob request to have optional metadata and checksum. Also used latest 5.1 API from commit 1710735. --- ds3/ds3Posts.go | 3 +++ ds3/models/requests.go | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/ds3/ds3Posts.go b/ds3/ds3Posts.go index c7f9f7d..5157c2c 100644 --- a/ds3/ds3Posts.go +++ b/ds3/ds3Posts.go @@ -25,6 +25,9 @@ func (client *Client) CompleteBlob(request *models.CompleteBlobRequest) (*models WithPath("/" + request.BucketName + "/" + request.ObjectName). WithQueryParam("blob", request.Blob). WithQueryParam("job", request.Job). + WithOptionalQueryParam("size", networking.Int64PtrToStrPtr(request.Size)). + WithChecksum(request.Checksum). + WithHeaders(request.Metadata). Build(client.connectionInfo) if err != nil { diff --git a/ds3/models/requests.go b/ds3/models/requests.go index cbca3ea..79fee96 100644 --- a/ds3/models/requests.go +++ b/ds3/models/requests.go @@ -36,7 +36,10 @@ type CompleteBlobRequest struct { BucketName string ObjectName string Blob string + Checksum Checksum Job string + Metadata map[string]string + Size *int64 } func NewCompleteBlobRequest(bucketName string, objectName string, blob string, job string) *CompleteBlobRequest { @@ -45,9 +48,31 @@ func NewCompleteBlobRequest(bucketName string, objectName string, blob string, j ObjectName: objectName, Blob: blob, Job: job, + Checksum: NewNoneChecksum(), + Metadata: make(map[string]string), } } +func (completeBlobRequest *CompleteBlobRequest) WithSize(size int64) *CompleteBlobRequest { + completeBlobRequest.Size = &size + return completeBlobRequest +} + + +func (completeBlobRequest *CompleteBlobRequest) WithChecksum(contentHash string, checksumType ChecksumType) *CompleteBlobRequest { + completeBlobRequest.Checksum.ContentHash = contentHash + completeBlobRequest.Checksum.Type = checksumType + return completeBlobRequest +} + +func (completeBlobRequest *CompleteBlobRequest) WithMetaData(key string, values ...string) *CompleteBlobRequest { + if strings.HasPrefix(strings.ToLower(key), AMZ_META_HEADER) { + completeBlobRequest.Metadata[key] = strings.Join(values, ",") + } else { + completeBlobRequest.Metadata[strings.ToLower(AMZ_META_HEADER + key)] = strings.Join(values, ",") + } + return completeBlobRequest +} type CompleteMultiPartUploadRequest struct { BucketName string ObjectName string From b11e23f1e19829513541db90dc9828aec76a33b2 Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Fri, 6 Dec 2019 11:13:11 -0700 Subject: [PATCH 03/11] GOSDK-30: Add a way to skip cert verification (#98) --- ds3/ds3Client.go | 5 +++++ ds3/networking/net.go | 18 +++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/ds3/ds3Client.go b/ds3/ds3Client.go index 0b82ef3..401ab3c 100644 --- a/ds3/ds3Client.go +++ b/ds3/ds3Client.go @@ -65,6 +65,11 @@ func (clientBuilder *ClientBuilder) WithNetworkRetryCount(count int) *ClientBuil return clientBuilder } +func (clientBuilder *ClientBuilder) WithIgnoreServerCertificate(ignoreServerCert bool) *ClientBuilder { + clientBuilder.connectionInfo.IgnoreServerCertificate = ignoreServerCert + return clientBuilder +} + func (clientBuilder *ClientBuilder) WithLogger(logger sdk_log.Logger) *ClientBuilder { clientBuilder.logger = logger return clientBuilder diff --git a/ds3/networking/net.go b/ds3/networking/net.go index 7472bbd..1aede6b 100644 --- a/ds3/networking/net.go +++ b/ds3/networking/net.go @@ -1,15 +1,17 @@ package networking import ( + "crypto/tls" "net/http" "net/url" "github.com/SpectraLogic/ds3_go_sdk/ds3/models" ) type ConnectionInfo struct { - Endpoint *url.URL - Credentials *Credentials - Proxy *url.URL + Endpoint *url.URL + Credentials *Credentials + Proxy *url.URL + IgnoreServerCertificate bool } type Credentials struct { @@ -27,9 +29,15 @@ type SendNetwork struct { } func NewSendNetwork(connectionInfo *ConnectionInfo) Network { - return &SendNetwork{ - transport: &http.Transport{Proxy: http.ProxyURL(connectionInfo.Proxy)}, + sendNetwork := &SendNetwork{ + transport: &http.Transport{ + Proxy: http.ProxyURL(connectionInfo.Proxy), + }, } + if connectionInfo.IgnoreServerCertificate { + sendNetwork.transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + return sendNetwork } func (sendNetwork *SendNetwork) Invoke(httpRequest *http.Request) (models.WebResponse, error) { From 04239b17d0f465fb2368c20f11937d2945257e04 Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Wed, 8 Jan 2020 15:08:39 -0700 Subject: [PATCH 04/11] GOSDK-32: System shutdown due to too many go routines when putting lots of files to BP. Removed done channel which was causing go routines in the consumer to be left open longer than necessary. (#100) --- ds3_integration/utils/testUtils.go | 2 +- helpers/conditionalBool.go | 43 ++++++++ helpers/consumer.go | 34 +++---- helpers/consumer_test.go | 15 +-- helpers/getProducer.go | 130 +++++++++--------------- helpers/getTransfernator.go | 7 +- helpers/putProducer.go | 128 ++++++++++------------- helpers/putTransceiver.go | 7 +- helpers_integration/helpersImpl_test.go | 118 +++++++++++++++++++++ 9 files changed, 286 insertions(+), 198 deletions(-) create mode 100644 helpers/conditionalBool.go diff --git a/ds3_integration/utils/testUtils.go b/ds3_integration/utils/testUtils.go index ad275bd..5b7c388 100644 --- a/ds3_integration/utils/testUtils.go +++ b/ds3_integration/utils/testUtils.go @@ -375,7 +375,7 @@ func DeleteBucketLogError(t *testing.T, client *ds3.Client, bucketName string) ( // Deletes the specified bucket and returns an error if one occurs func DeleteBucket(client *ds3.Client, bucketName string) (error) { - deleteBucket, deleteErr := client.DeleteBucket(models.NewDeleteBucketRequest(bucketName)) + deleteBucket, deleteErr := client.DeleteBucketSpectraS3(models.NewDeleteBucketSpectraS3Request(bucketName).WithForce()) if deleteErr != nil { return deleteErr } diff --git a/helpers/conditionalBool.go b/helpers/conditionalBool.go new file mode 100644 index 0000000..fdb8b28 --- /dev/null +++ b/helpers/conditionalBool.go @@ -0,0 +1,43 @@ +package helpers + +import "sync" + +type NotifyBlobDone interface { + // Waits for at least one done signal. + Wait() + + // Sends a done signal. Multiple signals have no additional effect. + SignalDone() +} + + +func NewConditionalBool() *ConditionalBool { + conditional :=sync.NewCond(&sync.Mutex{}) + return &ConditionalBool{ + conditional: *conditional, + Done: false, + } +} + +type ConditionalBool struct { + conditional sync.Cond + Done bool +} + +func (conditionalBool *ConditionalBool) Wait() { + conditionalBool.conditional.L.Lock() + // wait for a done signal to be received + for !conditionalBool.Done { + conditionalBool.conditional.Wait() + } + // reset done notifier + conditionalBool.Done = false + conditionalBool.conditional.L.Unlock() +} + +func (conditionalBool *ConditionalBool) SignalDone() { + conditionalBool.conditional.L.Lock() + conditionalBool.Done = true + conditionalBool.conditional.Broadcast() + conditionalBool.conditional.L.Unlock() +} \ No newline at end of file diff --git a/helpers/consumer.go b/helpers/consumer.go index a03bfa9..0756a4f 100644 --- a/helpers/consumer.go +++ b/helpers/consumer.go @@ -12,50 +12,42 @@ type consumerImpl struct { queue *chan TransferOperation waitGroup *sync.WaitGroup maxConcurrentOperations uint - blobDoneChannel chan<- struct{} + + // Conditional value that gets triggered when a blob has finished being transferred + doneNotifier NotifyBlobDone } -func newConsumer(queue *chan TransferOperation, blobDoneChannel chan<- struct{}, waitGroup *sync.WaitGroup, maxConcurrentOperations uint) Consumer { +func newConsumer(queue *chan TransferOperation, waitGroup *sync.WaitGroup, maxConcurrentOperations uint, doneNotifier NotifyBlobDone) Consumer { return &consumerImpl{ queue: queue, waitGroup: waitGroup, maxConcurrentOperations: maxConcurrentOperations, - blobDoneChannel: blobDoneChannel, + doneNotifier: doneNotifier, } } -func performTransfer(operation TransferOperation, semaphore *chan int, blobDoneChannel chan<- struct{}, jobWaitGroup *sync.WaitGroup, childWaitGroup *sync.WaitGroup) { - defer func() { - // per operation that finishes, send a done message to the producer - blobDoneChannel <-struct {}{} - jobWaitGroup.Done() - childWaitGroup.Done() - }() +func performTransfer(operation TransferOperation, semaphore *chan int, waitGroup *sync.WaitGroup, doneNotifier NotifyBlobDone) { + defer waitGroup.Done() operation() + + // send done signal + doneNotifier.SignalDone() + <- *semaphore } func (consumer *consumerImpl) run() { - // Defer closing the blob done channel. This will signal to the producer that it can shut down. - defer func() {close(consumer.blobDoneChannel)}() - // semaphore for controlling max number of transfer operations in flight per job semaphore := make(chan int, consumer.maxConcurrentOperations + 1) - var childWaitGroup sync.WaitGroup for { nextOp, ok := <- *consumer.queue if ok { semaphore <- 1 - childWaitGroup.Add(1) - go performTransfer(nextOp, &semaphore, consumer.blobDoneChannel, consumer.waitGroup, &childWaitGroup) + go performTransfer(nextOp, &semaphore, consumer.waitGroup, consumer.doneNotifier) } else { consumer.waitGroup.Done() - break + return } } - - // Wait for all child transfer operations to finish before shutting down. - // This is to stop the done channel from being close prematurely - childWaitGroup.Wait() } diff --git a/helpers/consumer_test.go b/helpers/consumer_test.go index 4b6c612..f7dca89 100644 --- a/helpers/consumer_test.go +++ b/helpers/consumer_test.go @@ -40,10 +40,9 @@ func TestProducerConsumerModel(t *testing.T) { queue := make(chan TransferOperation, 5) - // make the blob done channel larger than the number of transfer operations queued. - blobDoneChannel := make(chan struct{}, numOperations+1) + doneNotifier := NewConditionalBool() - consumer := newConsumer(&queue, blobDoneChannel, &wg, 5) + consumer := newConsumer(&queue, &wg, 5, doneNotifier) go producer(&queue) go consumer.run() @@ -51,13 +50,5 @@ func TestProducerConsumerModel(t *testing.T) { wg.Wait() ds3Testing.AssertInt(t, "Executed Transfer Operations", numOperations, resultCount) - - // verify that 10 done messages were sent - ds3Testing.AssertInt(t, "Done signals sent", numOperations, len(blobDoneChannel)) - for len(blobDoneChannel) > 0 { - _, ok := <-blobDoneChannel - ds3Testing.AssertBool(t, "expected channel not to be closed", true, ok) - } - _, ok := <- blobDoneChannel - ds3Testing.AssertBool(t, "expected channel to be closed", false, ok) + ds3Testing.AssertBool(t, "received done notification", true, doneNotifier.Done) } diff --git a/helpers/getProducer.go b/helpers/getProducer.go index fb84926..d9227bd 100644 --- a/helpers/getProducer.go +++ b/helpers/getProducer.go @@ -24,13 +24,8 @@ type getProducer struct { rangeFinder ranges.BlobRangeFinder sdk_log.Logger - // Channel that represents blobs that have finished being process. - // This will be written to once a get object operation has completed regardless of error or success. - // This is used to notify the runner to re-check if any blobs are now ready to be retrieved. - blobDoneChannel <-chan struct{} - - // Used to track if we are done queuing blobs - continueQueuingBlobs bool + // Conditional value that gets triggered when a blob has finished being transferred + doneNotifier NotifyBlobDone } func newGetProducer( @@ -38,9 +33,9 @@ func newGetProducer( getObjects *[]helperModels.GetObject, queue *chan TransferOperation, strategy *ReadTransferStrategy, - blobDoneChannel <-chan struct{}, client *ds3.Client, - waitGroup *sync.WaitGroup) *getProducer { + waitGroup *sync.WaitGroup, + doneNotifier NotifyBlobDone) *getProducer { return &getProducer{ JobMasterObjectList: jobMasterObjectList, @@ -54,8 +49,7 @@ func newGetProducer( deferredBlobQueue: NewBlobDescriptionQueue(), rangeFinder: ranges.NewBlobRangeFinder(getObjects), Logger: client.Logger, //use the same logger as the client - blobDoneChannel: blobDoneChannel, - continueQueuingBlobs: true, + doneNotifier: doneNotifier, } } @@ -75,15 +69,20 @@ func toReadObjectMap(getObjects *[]helperModels.GetObject) map[string]helperMode } // Processes all the blobs in a chunk that are ready for transfer from BP -func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) { +// Returns the number of blobs queued for process +func (producer *getProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) int { producer.Debugf("begin chunk processing %s", curChunk.ChunkId) + processedCount := 0 // transfer blobs that are ready, and queue those that are waiting for channel for _, curObj := range curChunk.Objects { producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length) blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length) - producer.queueBlobForTransfer(&blob, bucketName, jobId) + if producer.queueBlobForTransfer(&blob, bucketName, jobId) { + processedCount++ + } } + return processedCount } // Information required to perform a get operation of a blob with BP as data source and channelBuilder as destination @@ -174,9 +173,10 @@ func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, bl // Attempts to transfer a single blob from the BP to the client. If the blob is not ready for transfer, // then it is added to the waiting to transfer queue -func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) { +// Returns whether or not the blob was queued for transfer +func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) bool { if producer.processedBlobTracker.IsProcessed(*blob) { - return + return false // already been processed } curReadObj := producer.readObjectMap[blob.Name()] @@ -185,13 +185,13 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip // a fatal error happened on a previous blob for this file, skip processing producer.Warningf("fatal error occurred while transferring previous blob on this file, skipping blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length()) producer.processedBlobTracker.MarkProcessed(*blob) - return + return false // not going to process } if !curReadObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) { producer.Debugf("channel is not currently available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length()) producer.deferredBlobQueue.Push(blob) - return + return false // not ready to be processed } producer.Debugf("channel is available for getting blob '%s' offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length()) @@ -212,11 +212,16 @@ func (producer *getProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip // Mark blob as processed producer.processedBlobTracker.MarkProcessed(*blob) + + return true } // Attempts to process all blobs whose channels were not available for transfer. // Blobs whose channels are still not available are placed back on the queue. -func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) { +// Returns the number of blobs queued for processing. +func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string) int { + processedCount := 0 + // attempt to process all blobs in waiting to be transferred waitingBlobs := producer.deferredBlobQueue.Size() for i := 0; i < waitingBlobs; i++ { @@ -228,87 +233,54 @@ func (producer *getProducer) processWaitingBlobs(bucketName string, jobId string producer.Errorf("failure during blob transfer '%s' at offset %d: %s", curBlob.Name(), curBlob.Offset(), err.Error()) break } - producer.queueBlobForTransfer(curBlob, bucketName, jobId) + if producer.queueBlobForTransfer(curBlob, bucketName, jobId) { + processedCount++ + } } + return processedCount } // This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine. // Each transfer operation will retrieve one blob of content from the BP. // Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet. func (producer *getProducer) run() error { + defer close(*producer.queue) + // determine number of blobs to be processed var totalBlobCount int64 = producer.totalBlobCount() producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs()) - // initiate first set of blob transfers - err := producer.queueBlobsReadyForTransfer(totalBlobCount) - if err != nil { - return err - } + // process all chunks and make sure all blobs are queued for transfer + for producer.hasMoreToProcess(totalBlobCount) { + processedCount, err := producer.queueBlobsReadyForTransfer(totalBlobCount) + if err != nil { + return err + } - // wait for either a timer or for at least one blob to finish before attempting to queue more items for transfer - ticker := time.NewTicker(producer.strategy.BlobStrategy.delay()) - var fatalErr error - for { - select { - case _, ok := <- producer.blobDoneChannel: - if ok { - // reset the timer - ticker.Stop() - ticker = time.NewTicker(producer.strategy.BlobStrategy.delay()) - - err = producer.queueBlobsReadyForTransfer(totalBlobCount) - if err != nil { - // A fatal error has occurred, stop queuing blobs for processing and - // close processing queue to signal consumer we won't be sending any more blobs. - producer.continueQueuingBlobs = false - fatalErr = err - close(*producer.queue) - } - } else { - // The consumer closed the channel, signaling completion. - return fatalErr - } - case <- ticker.C: - err = producer.queueBlobsReadyForTransfer(totalBlobCount) - if err != nil { - // A fatal error has occurred, stop queuing blobs for processing and - // close processing queue to signal consumer we won't be sending any more blobs. - producer.continueQueuingBlobs = false - fatalErr = err - close(*producer.queue) - } + // If the last operation processed blobs, then wait for something to finish + if processedCount > 0 { + producer.doneNotifier.Wait() + } else if producer.hasMoreToProcess(totalBlobCount) { + // nothing could be processed, cache is probably full, wait a bit before trying again + time.Sleep(producer.strategy.BlobStrategy.delay()) } } - return fatalErr + return nil } func (producer *getProducer) hasMoreToProcess(totalBlobCount int64) bool { return producer.processedBlobTracker.NumberOfProcessedBlobs() < totalBlobCount || producer.deferredBlobQueue.Size() > 0 } -func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) error { - if !producer.continueQueuingBlobs { - // We've queued up all the blobs we are going to for this job. - return nil - } - - // check if there is anything left to be queued - if !producer.hasMoreToProcess(totalBlobCount) { - // Everything has been queued for processing. - producer.continueQueuingBlobs = false - // close processing queue to signal consumer we won't be sending any more blobs. - close(*producer.queue) - return nil - } - +// Returns the number of blobs that have been queued for transfer +func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (int, error) { // Attempt to transfer waiting blobs - producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId) + processedCount := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId) // Check if we need to query the BP for allocated blobs, or if we already know everything is allocated. if int64(producer.deferredBlobQueue.Size()) + producer.processedBlobTracker.NumberOfProcessedBlobs() >= totalBlobCount { // Everything is already allocated, no need to query BP for allocated chunks - return nil + return processedCount, nil } // Get the list of available chunks that the server can receive. The server may @@ -318,7 +290,7 @@ func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) er chunksReadyResponse, err := producer.client.GetJobChunksReadyForClientProcessingSpectraS3(chunksReady) if err != nil { producer.Errorf("unrecoverable error: %v", err) - return err + return processedCount, err } // Check to see if any chunks can be processed @@ -327,14 +299,10 @@ func (producer *getProducer) queueBlobsReadyForTransfer(totalBlobCount int64) er // Loop through all the chunks that are available for processing, and send // the files that are contained within them. for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects { - producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId) + processedCount += producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId) } - } else { - // When no chunks are returned we need to sleep to allow for cache space to - // be freed. - producer.strategy.BlobStrategy.delay() } - return nil + return processedCount, nil } // Determines the number of blobs to be transferred. diff --git a/helpers/getTransfernator.go b/helpers/getTransfernator.go index a9d7773..528e77b 100644 --- a/helpers/getTransfernator.go +++ b/helpers/getTransfernator.go @@ -78,10 +78,11 @@ func (transceiver *getTransceiver) transfer() (string, error) { // init queue, producer and consumer var waitGroup sync.WaitGroup - blobDoneChannel := make(chan struct{}, 10) + doneNotifier := NewConditionalBool() + queue := newOperationQueue(transceiver.Strategy.BlobStrategy.maxWaitingTransfers(), transceiver.Client.Logger) - producer := newGetProducer(&bulkGetResponse.MasterObjectList, transceiver.ReadObjects, &queue, transceiver.Strategy, blobDoneChannel, transceiver.Client, &waitGroup) - consumer := newConsumer(&queue, blobDoneChannel, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers()) + producer := newGetProducer(&bulkGetResponse.MasterObjectList, transceiver.ReadObjects, &queue, transceiver.Strategy, transceiver.Client, &waitGroup, doneNotifier) + consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers(), doneNotifier) // Wait for completion of producer-consumer goroutines var aggErr ds3Models.AggregateError diff --git a/helpers/putProducer.go b/helpers/putProducer.go index 3c639fb..0c3bd92 100644 --- a/helpers/putProducer.go +++ b/helpers/putProducer.go @@ -21,13 +21,8 @@ type putProducer struct { deferredBlobQueue BlobDescriptionQueue // queue of blobs whose channels are not yet ready for transfer sdk_log.Logger - // Channel that represents blobs that have finished being process. - // This will be written to once a put object operation has completed regardless of error or success. - // This is used to notify the runner to re-check if any blobs are now ready to be sent. - blobDoneChannel <-chan struct{} - - // Used to track if we are done queuing blobs - continueQueuingBlobs bool + // Conditional value that gets triggered when a blob has finished being transferred + doneNotifier NotifyBlobDone } func newPutProducer( @@ -35,9 +30,9 @@ func newPutProducer( putObjects *[]helperModels.PutObject, queue *chan TransferOperation, strategy *WriteTransferStrategy, - blobDoneChannel <-chan struct{}, client *ds3.Client, - waitGroup *sync.WaitGroup) *putProducer { + waitGroup *sync.WaitGroup, + doneNotifier NotifyBlobDone) *putProducer { return &putProducer{ JobMasterObjectList: jobMasterObjectList, @@ -50,8 +45,7 @@ func newPutProducer( deferredBlobQueue: NewBlobDescriptionQueue(), processedBlobTracker: newProcessedBlobTracker(), Logger: client.Logger, // use the same logger as the client - blobDoneChannel: blobDoneChannel, - continueQueuingBlobs: true, + doneNotifier: doneNotifier, } } @@ -141,20 +135,28 @@ func (producer *putProducer) metadataFrom(info putObjectInfo) map[string]string // Processes all the blobs in a chunk and attempts to add them to the transfer queue. // If a blob is not ready for transfer, then it is added to the waiting to be transferred queue. -func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) { +// Returns the number of blobs added to queue. +func (producer *putProducer) processChunk(curChunk *ds3Models.Objects, bucketName string, jobId string) int { + processedCount := 0 producer.Debugf("begin chunk processing %s", curChunk.ChunkId) // transfer blobs that are ready, and queue those that are waiting for channel for _, curObj := range curChunk.Objects { producer.Debugf("queuing object in waiting to be processed %s offset=%d length=%d", *curObj.Name, curObj.Offset, curObj.Length) blob := helperModels.NewBlobDescription(*curObj.Name, curObj.Offset, curObj.Length) - producer.queueBlobForTransfer(&blob, bucketName, jobId) + if producer.queueBlobForTransfer(&blob, bucketName, jobId) { + processedCount++ + } } + return processedCount } // Iterates through blobs that are waiting to be transferred and attempts to transfer. // If successful, blob is removed from queue. Else, it is re-queued. -func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) { +// Returns the number of blobs added to queue. +func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string) int { + processedCount := 0 + // attempt to process all blobs in waiting to be transferred waitingBlobs := producer.deferredBlobQueue.Size() for i := 0; i < waitingBlobs; i++ { @@ -166,15 +168,20 @@ func (producer *putProducer) processWaitingBlobs(bucketName string, jobId string break } producer.Debugf("attempting to process %s offset=%d length=%d", curBlob.Name(), curBlob.Offset(), curBlob.Length()) - producer.queueBlobForTransfer(curBlob, bucketName, jobId) + if producer.queueBlobForTransfer(curBlob, bucketName, jobId) { + processedCount++ + } } + + return processedCount } // Attempts to transfer a single blob. If the blob is not ready for transfer, // it is added to the waiting to transfer queue. -func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) { +// Returns whether or not the blob was queued for transfer. +func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescription, bucketName string, jobId string) bool { if producer.processedBlobTracker.IsProcessed(*blob) { - return + return false // this was already processed } curWriteObj := producer.writeObjectMap[blob.Name()] @@ -183,14 +190,14 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip // a fatal error happened on a previous blob for this file, skip processing producer.Warningf("fatal error occurred while transferring previous blob on this file, skipping blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length()) producer.processedBlobTracker.MarkProcessed(*blob) - return + return false // not actually transferring this blob } if !curWriteObj.ChannelBuilder.IsChannelAvailable(blob.Offset()) { producer.Debugf("channel is not currently available for blob %s offset=%d length=%d", blob.Name(), blob.Offset(), blob.Length()) // Not ready to be transferred producer.deferredBlobQueue.Push(blob) - return + return false // not ready to be sent } producer.Debugf("channel is available for blob %s offset=%d length=%d", curWriteObj.PutObject.Name, blob.Offset(), blob.Length()) @@ -212,85 +219,52 @@ func (producer *putProducer) queueBlobForTransfer(blob *helperModels.BlobDescrip // Mark blob as processed producer.processedBlobTracker.MarkProcessed(*blob) + + return true } // This initiates the production of the transfer operations which will be consumed by a consumer running in a separate go routine. // Each transfer operation will put one blob of content to the BP. // Once all blobs have been queued to be transferred, the producer will finish, even if all operations have not been consumed yet. func (producer *putProducer) run() error { + defer close(*producer.queue) + // determine number of blobs to be processed var totalBlobCount int64 = producer.totalBlobCount() producer.Debugf("job status totalBlobs=%d processedBlobs=%d", totalBlobCount, producer.processedBlobTracker.NumberOfProcessedBlobs()) - // initiate first set of blob transfers - err := producer.queueBlobsReadyForTransfer(totalBlobCount) - if err != nil { - return err - } + // process all chunks and make sure all blobs are queued for transfer + for producer.hasMoreToProcess(totalBlobCount) { + processedCount, err := producer.queueBlobsReadyForTransfer(totalBlobCount) + if err != nil { + return err + } - // wait for either a timer or for at least one blob to finish before attempting to queue more items for transfer - ticker := time.NewTicker(producer.strategy.BlobStrategy.delay()) - var fatalErr error - for { - select { - case _, ok := <- producer.blobDoneChannel: - if ok { - // reset the timer - ticker.Stop() - ticker = time.NewTicker(producer.strategy.BlobStrategy.delay()) - - err = producer.queueBlobsReadyForTransfer(totalBlobCount) - if err != nil { - // A fatal error has occurred, stop queuing blobs for processing and - // close processing queue to signal consumer we won't be sending any more blobs. - producer.continueQueuingBlobs = false - fatalErr = err - close(*producer.queue) - } - } else { - // The consumer closed the channel, signaling completion. - return fatalErr - } - case <- ticker.C: - err = producer.queueBlobsReadyForTransfer(totalBlobCount) - if err != nil { - // A fatal error has occurred, stop queuing blobs for processing and - // close processing queue to signal consumer we won't be sending any more blobs. - producer.continueQueuingBlobs = false - fatalErr = err - close(*producer.queue) - } + // If the last operation processed blobs, then wait for something to finish + if processedCount > 0 { + // wait for a done signal to be received + producer.doneNotifier.Wait() + } else if producer.hasMoreToProcess(totalBlobCount) { + // nothing could be processed, cache is probably full, wait a bit before trying again + time.Sleep(producer.strategy.BlobStrategy.delay()) } } - return fatalErr + return nil } func (producer *putProducer) hasMoreToProcess(totalBlobCount int64) bool { return producer.processedBlobTracker.NumberOfProcessedBlobs() < totalBlobCount || producer.deferredBlobQueue.Size() > 0 } -func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) error { - if !producer.continueQueuingBlobs { - // We've queued up all the blobs we are going to for this job. - return nil - } - - // check if there is anything left to be processed - if !producer.hasMoreToProcess(totalBlobCount) { - // Everything has been queued for processing. - producer.continueQueuingBlobs = false - // close processing queue to signal consumer we won't be sending any more blobs. - close(*producer.queue) - return nil - } - +// Returns the number of items queued for work. +func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) (int, error) { // Attempt to transfer waiting blobs - producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId) + processedCount := producer.processWaitingBlobs(*producer.JobMasterObjectList.BucketName, producer.JobMasterObjectList.JobId) // Check if we need to query the BP for allocated blobs, or if we already know everything is allocated. if int64(producer.deferredBlobQueue.Size()) + producer.processedBlobTracker.NumberOfProcessedBlobs() >= totalBlobCount { // Everything is already allocated, no need to query BP for allocated chunks - return nil + return processedCount, nil } // Get the list of available chunks that the server can receive. The server may @@ -300,7 +274,7 @@ func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) er chunksReadyResponse, err := producer.client.GetJobChunksReadyForClientProcessingSpectraS3(chunksReady) if err != nil { producer.Errorf("unrecoverable error: %v", err) - return err + return processedCount, err } // Check to see if any chunks can be processed @@ -309,10 +283,10 @@ func (producer *putProducer) queueBlobsReadyForTransfer(totalBlobCount int64) er // Loop through all the chunks that are available for processing, and send // the files that are contained within them. for _, curChunk := range chunksReadyResponse.MasterObjectList.Objects { - producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId) + processedCount += producer.processChunk(&curChunk, *chunksReadyResponse.MasterObjectList.BucketName, chunksReadyResponse.MasterObjectList.JobId) } } - return nil + return processedCount, nil } // Determines the number of blobs to be transferred. diff --git a/helpers/putTransceiver.go b/helpers/putTransceiver.go index 73b51a9..3c8b7dc 100644 --- a/helpers/putTransceiver.go +++ b/helpers/putTransceiver.go @@ -73,10 +73,11 @@ func (transceiver *putTransceiver) transfer() (string, error) { // init queue, producer and consumer var waitGroup sync.WaitGroup - blobDoneChannel := make(chan struct{}, 10) + doneNotifier := NewConditionalBool() + queue := newOperationQueue(transceiver.Strategy.BlobStrategy.maxWaitingTransfers(), transceiver.Client.Logger) - producer := newPutProducer(&bulkPutResponse.MasterObjectList, transceiver.WriteObjects, &queue, transceiver.Strategy, blobDoneChannel, transceiver.Client, &waitGroup) - consumer := newConsumer(&queue, blobDoneChannel, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers()) + producer := newPutProducer(&bulkPutResponse.MasterObjectList, transceiver.WriteObjects, &queue, transceiver.Strategy, transceiver.Client, &waitGroup, doneNotifier) + consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers(), doneNotifier) // Wait for completion of producer-consumer goroutines waitGroup.Add(1) // adding producer and consumer goroutines to wait group diff --git a/helpers_integration/helpersImpl_test.go b/helpers_integration/helpersImpl_test.go index d328184..d93155c 100644 --- a/helpers_integration/helpersImpl_test.go +++ b/helpers_integration/helpersImpl_test.go @@ -1,6 +1,8 @@ package helpers_integration import ( + "bytes" + "fmt" "github.com/SpectraLogic/ds3_go_sdk/ds3" ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models" "github.com/SpectraLogic/ds3_go_sdk/ds3_integration/utils" @@ -15,6 +17,7 @@ import ( "os" "sync" "testing" + "time" ) var client *ds3.Client @@ -386,3 +389,118 @@ func TestPutObjectDoesNotExist(t *testing.T) { ds3Testing.AssertBool(t, "error callback was called", true, errorCallbackCalled) } + +type closeWrapper struct { + io.Reader +} + +func (wrapper *closeWrapper) Close() error { + return nil +} + +type emptyReadChannelBuilder struct { + channels.FatalErrorHandler +} + +func (builder *emptyReadChannelBuilder) GetChannel(_ int64) (io.ReadCloser, error) { + reader := bytes.NewReader([]byte{}) + return &closeWrapper{Reader: reader}, nil +} + +func (builder *emptyReadChannelBuilder) IsChannelAvailable(_ int64) bool { + return true +} + +func (builder *emptyReadChannelBuilder) OnDone(reader io.ReadCloser) { + reader.Close() +} + +type emptyWriteCloser struct {} + +func (writer *emptyWriteCloser) Write(p []byte) (n int, err error) { + return len(p), nil +} + +func (writer *emptyWriteCloser) Close() error { + return nil +} + +type emptyWriteChannelBuilder struct { + channels.FatalErrorHandler +} + +func (builder *emptyWriteChannelBuilder) GetChannel(_ int64) (io.WriteCloser, error) { + return &emptyWriteCloser{}, nil +} + +func (builder *emptyWriteChannelBuilder) IsChannelAvailable(_ int64) bool { + return true +} + +func (builder *emptyWriteChannelBuilder) OnDone(writer io.WriteCloser) { + writer.Close() +} + +func TestBulkPutAndGetLotsOfFiles(t *testing.T) { + defer func() { + // force delete the bucket because its faster than deleting all the files + testutils.DeleteBucket(client, testBucket) + // re-creating the bucket after force delete because other tests expect it to exist + testutils.PutBucket(client, testBucket) + }() + helper := helpers.NewHelpers(client) + + // put a bunch of files + var writeObjects []helperModels.PutObject + + const numObjects = 18650 + for i := 0; i < numObjects; i++ { + objName := fmt.Sprintf("file-%d.txt", i) + curWriteObj := helperModels.PutObject{ + PutObject: ds3Models.Ds3PutObject{Name:objName, Size:0}, + ChannelBuilder: &emptyReadChannelBuilder{}, + } + writeObjects = append(writeObjects, curWriteObj) + } + + writeStrategy := helpers.WriteTransferStrategy{ + BlobStrategy: &helpers.SimpleBlobStrategy{ + Delay:time.Second * 30, + MaxConcurrentTransfers:10, + }, + Options: helpers.WriteBulkJobOptions{MaxUploadSize: &helpers.MinUploadSize}, + Listeners: newErrorOnErrorListenerStrategy(t), + } + + jobId, err := helper.PutObjects(testBucket, writeObjects, writeStrategy) + ds3Testing.AssertNilError(t, err) + if jobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // retrieve said files + var readObjects []helperModels.GetObject + for _, writeObj := range writeObjects { + curGetObj := helperModels.GetObject{ + Name: writeObj.PutObject.Name, + ChannelBuilder: &emptyWriteChannelBuilder{}, + } + readObjects = append(readObjects, curGetObj) + } + + readStrategy := helpers.ReadTransferStrategy{ + Options: helpers.ReadBulkJobOptions{}, // use default job options + BlobStrategy: &helpers.SimpleBlobStrategy{ + Delay:time.Second * 30, + MaxConcurrentTransfers:10, + }, + Listeners: newErrorOnErrorListenerStrategy(t), + } + + jobId, err = helper.GetObjects(testBucket, readObjects, readStrategy) + ds3Testing.AssertNilError(t, err) + + if jobId == "" { + t.Errorf("expected to get a BP job ID, but instead got nothing") + } +} From 5d5a4d23cd7a5e14070ff6c463f1e5e35732eeec Mon Sep 17 00:00:00 2001 From: Rachel Tucker Date: Thu, 7 May 2020 15:16:08 -0600 Subject: [PATCH 05/11] Generate BP SDK for API 5.2.x from commit 1734756 --- ds3/ds3Deletes.go | 23 +++++ ds3/ds3Gets.go | 84 ++++++++++++++++ ds3/ds3Heads.go | 1 + ds3/ds3Posts.go | 28 ++++++ ds3/ds3Puts.go | 1 + ds3/models/requests.go | 154 ++++++++++++++++++++++++++++++ ds3/models/responseModels.go | 179 +++++++++++++++++++++++++++++++++++ ds3/models/responses.go | 123 ++++++++++++++++++++++++ 8 files changed, 593 insertions(+) diff --git a/ds3/ds3Deletes.go b/ds3/ds3Deletes.go index 1069dc1..6d661d5 100644 --- a/ds3/ds3Deletes.go +++ b/ds3/ds3Deletes.go @@ -710,6 +710,29 @@ func (client *Client) DeleteAzureTargetFailureNotificationRegistrationSpectraS3( return models.NewDeleteAzureTargetFailureNotificationRegistrationSpectraS3Response(response) } +func (client *Client) DeleteBucketChangesNotificationRegistrationSpectraS3(request *models.DeleteBucketChangesNotificationRegistrationSpectraS3Request) (*models.DeleteBucketChangesNotificationRegistrationSpectraS3Response, error) { + // Build the http request + httpRequest, err := networking.NewHttpRequestBuilder(). + WithHttpVerb(HTTP_VERB_DELETE). + WithPath("/_rest_/bucket_changes_notification_registration/" + request.BucketChangesNotificationRegistration). + Build(client.connectionInfo) + + if err != nil { + return nil, err + } + + networkRetryDecorator := networking.NewNetworkRetryDecorator(client.sendNetwork, client.clientPolicy.maxRetries) + + // Invoke the HTTP request. + response, requestErr := networkRetryDecorator.Invoke(httpRequest) + if requestErr != nil { + return nil, requestErr + } + + // Create a response object based on the result. + return models.NewDeleteBucketChangesNotificationRegistrationSpectraS3Response(response) +} + func (client *Client) DeleteDs3TargetFailureNotificationRegistrationSpectraS3(request *models.DeleteDs3TargetFailureNotificationRegistrationSpectraS3Request) (*models.DeleteDs3TargetFailureNotificationRegistrationSpectraS3Response, error) { // Build the http request httpRequest, err := networking.NewHttpRequestBuilder(). diff --git a/ds3/ds3Gets.go b/ds3/ds3Gets.go index 3cb584a..967c4e7 100644 --- a/ds3/ds3Gets.go +++ b/ds3/ds3Gets.go @@ -1800,6 +1800,89 @@ func (client *Client) GetAzureTargetFailureNotificationRegistrationsSpectraS3(re return models.NewGetAzureTargetFailureNotificationRegistrationsSpectraS3Response(response) } +func (client *Client) GetBucketChangesNotificationRegistrationSpectraS3(request *models.GetBucketChangesNotificationRegistrationSpectraS3Request) (*models.GetBucketChangesNotificationRegistrationSpectraS3Response, error) { + // Build the http request + httpRequest, err := networking.NewHttpRequestBuilder(). + WithHttpVerb(HTTP_VERB_GET). + WithPath("/_rest_/bucket_changes_notification_registration/" + request.BucketChangesNotificationRegistration). + Build(client.connectionInfo) + + if err != nil { + return nil, err + } + + networkRetryDecorator := networking.NewNetworkRetryDecorator(client.sendNetwork, client.clientPolicy.maxRetries) + httpRedirectDecorator := networking.NewHttpTempRedirectDecorator(networkRetryDecorator, client.clientPolicy.maxRedirect) + + // Invoke the HTTP request. + response, requestErr := httpRedirectDecorator.Invoke(httpRequest) + if requestErr != nil { + return nil, requestErr + } + + // Create a response object based on the result. + return models.NewGetBucketChangesNotificationRegistrationSpectraS3Response(response) +} + +func (client *Client) GetBucketChangesNotificationRegistrationsSpectraS3(request *models.GetBucketChangesNotificationRegistrationsSpectraS3Request) (*models.GetBucketChangesNotificationRegistrationsSpectraS3Response, error) { + // Build the http request + httpRequest, err := networking.NewHttpRequestBuilder(). + WithHttpVerb(HTTP_VERB_GET). + WithPath("/_rest_/bucket_changes_notification_registration"). + WithOptionalVoidQueryParam("last_page", request.LastPage). + WithOptionalQueryParam("page_length", networking.IntPtrToStrPtr(request.PageLength)). + WithOptionalQueryParam("page_offset", networking.IntPtrToStrPtr(request.PageOffset)). + WithOptionalQueryParam("page_start_marker", request.PageStartMarker). + WithOptionalQueryParam("user_id", request.UserId). + Build(client.connectionInfo) + + if err != nil { + return nil, err + } + + networkRetryDecorator := networking.NewNetworkRetryDecorator(client.sendNetwork, client.clientPolicy.maxRetries) + httpRedirectDecorator := networking.NewHttpTempRedirectDecorator(networkRetryDecorator, client.clientPolicy.maxRedirect) + + // Invoke the HTTP request. + response, requestErr := httpRedirectDecorator.Invoke(httpRequest) + if requestErr != nil { + return nil, requestErr + } + + // Create a response object based on the result. + return models.NewGetBucketChangesNotificationRegistrationsSpectraS3Response(response) +} + +func (client *Client) GetBucketHistorySpectraS3(request *models.GetBucketHistorySpectraS3Request) (*models.GetBucketHistorySpectraS3Response, error) { + // Build the http request + httpRequest, err := networking.NewHttpRequestBuilder(). + WithHttpVerb(HTTP_VERB_GET). + WithPath("/_rest_/bucket_history"). + WithOptionalQueryParam("bucket_id", request.BucketId). + WithOptionalVoidQueryParam("last_page", request.LastPage). + WithOptionalQueryParam("min_sequence_number", networking.Int64PtrToStrPtr(request.MinSequenceNumber)). + WithOptionalQueryParam("page_length", networking.IntPtrToStrPtr(request.PageLength)). + WithOptionalQueryParam("page_offset", networking.IntPtrToStrPtr(request.PageOffset)). + WithOptionalQueryParam("page_start_marker", request.PageStartMarker). + Build(client.connectionInfo) + + if err != nil { + return nil, err + } + + networkRetryDecorator := networking.NewNetworkRetryDecorator(client.sendNetwork, client.clientPolicy.maxRetries) + httpRedirectDecorator := networking.NewHttpTempRedirectDecorator(networkRetryDecorator, client.clientPolicy.maxRedirect) + + // Invoke the HTTP request. + response, requestErr := httpRedirectDecorator.Invoke(httpRequest) + if requestErr != nil { + return nil, requestErr + } + + // Create a response object based on the result. + return models.NewGetBucketHistorySpectraS3Response(response) +} + func (client *Client) GetDs3TargetFailureNotificationRegistrationSpectraS3(request *models.GetDs3TargetFailureNotificationRegistrationSpectraS3Request) (*models.GetDs3TargetFailureNotificationRegistrationSpectraS3Response, error) { // Build the http request httpRequest, err := networking.NewHttpRequestBuilder(). @@ -3209,6 +3292,7 @@ func (client *Client) GetTapeDrivesSpectraS3(request *models.GetTapeDrivesSpectr WithHttpVerb(HTTP_VERB_GET). WithPath("/_rest_/tape_drive"). WithOptionalVoidQueryParam("last_page", request.LastPage). + WithOptionalQueryParam("minimum_task_priority", networking.InterfaceToStrPtr(request.MinimumTaskPriority)). WithOptionalQueryParam("page_length", networking.IntPtrToStrPtr(request.PageLength)). WithOptionalQueryParam("page_offset", networking.IntPtrToStrPtr(request.PageOffset)). WithOptionalQueryParam("page_start_marker", request.PageStartMarker). diff --git a/ds3/ds3Heads.go b/ds3/ds3Heads.go index 189bff0..2dd438e 100644 --- a/ds3/ds3Heads.go +++ b/ds3/ds3Heads.go @@ -46,6 +46,7 @@ func (client *Client) HeadObject(request *models.HeadObjectRequest) (*models.Hea httpRequest, err := networking.NewHttpRequestBuilder(). WithHttpVerb(HTTP_VERB_HEAD). WithPath("/" + request.BucketName + "/" + request.ObjectName). + WithOptionalQueryParam("version_id", request.VersionId). Build(client.connectionInfo) if err != nil { diff --git a/ds3/ds3Posts.go b/ds3/ds3Posts.go index 5157c2c..8985793 100644 --- a/ds3/ds3Posts.go +++ b/ds3/ds3Posts.go @@ -598,6 +598,34 @@ func (client *Client) PutAzureTargetFailureNotificationRegistrationSpectraS3(req return models.NewPutAzureTargetFailureNotificationRegistrationSpectraS3Response(response) } +func (client *Client) PutBucketChangesNotificationRegistrationSpectraS3(request *models.PutBucketChangesNotificationRegistrationSpectraS3Request) (*models.PutBucketChangesNotificationRegistrationSpectraS3Response, error) { + // Build the http request + httpRequest, err := networking.NewHttpRequestBuilder(). + WithHttpVerb(HTTP_VERB_POST). + WithPath("/_rest_/bucket_changes_notification_registration"). + WithQueryParam("notification_end_point", request.NotificationEndPoint). + WithOptionalQueryParam("bucket_id", request.BucketId). + WithOptionalQueryParam("format", networking.InterfaceToStrPtr(request.Format)). + WithOptionalQueryParam("naming_convention", networking.InterfaceToStrPtr(request.NamingConvention)). + WithOptionalQueryParam("notification_http_method", networking.InterfaceToStrPtr(request.NotificationHttpMethod)). + Build(client.connectionInfo) + + if err != nil { + return nil, err + } + + networkRetryDecorator := networking.NewNetworkRetryDecorator(client.sendNetwork, client.clientPolicy.maxRetries) + + // Invoke the HTTP request. + response, requestErr := networkRetryDecorator.Invoke(httpRequest) + if requestErr != nil { + return nil, requestErr + } + + // Create a response object based on the result. + return models.NewPutBucketChangesNotificationRegistrationSpectraS3Response(response) +} + func (client *Client) PutDs3TargetFailureNotificationRegistrationSpectraS3(request *models.PutDs3TargetFailureNotificationRegistrationSpectraS3Request) (*models.PutDs3TargetFailureNotificationRegistrationSpectraS3Response, error) { // Build the http request httpRequest, err := networking.NewHttpRequestBuilder(). diff --git a/ds3/ds3Puts.go b/ds3/ds3Puts.go index d3a2479..0156476 100644 --- a/ds3/ds3Puts.go +++ b/ds3/ds3Puts.go @@ -2040,6 +2040,7 @@ func (client *Client) ModifyTapeDriveSpectraS3(request *models.ModifyTapeDriveSp httpRequest, err := networking.NewHttpRequestBuilder(). WithHttpVerb(HTTP_VERB_PUT). WithPath("/_rest_/tape_drive/" + request.TapeDriveId). + WithOptionalQueryParam("minimum_task_priority", networking.InterfaceToStrPtr(request.MinimumTaskPriority)). WithOptionalQueryParam("quiesced", networking.InterfaceToStrPtr(request.Quiesced)). WithOptionalQueryParam("reserved_task_type", networking.InterfaceToStrPtr(request.ReservedTaskType)). Build(client.connectionInfo) diff --git a/ds3/models/requests.go b/ds3/models/requests.go index 79fee96..114e328 100644 --- a/ds3/models/requests.go +++ b/ds3/models/requests.go @@ -317,6 +317,7 @@ func NewHeadBucketRequest(bucketName string) *HeadBucketRequest { type HeadObjectRequest struct { BucketName string ObjectName string + VersionId *string } func NewHeadObjectRequest(bucketName string, objectName string) *HeadObjectRequest { @@ -326,6 +327,11 @@ func NewHeadObjectRequest(bucketName string, objectName string) *HeadObjectReque } } +func (headObjectRequest *HeadObjectRequest) WithVersionId(versionId string) *HeadObjectRequest { + headObjectRequest.VersionId = &versionId + return headObjectRequest +} + type InitiateMultiPartUploadRequest struct { BucketName string ObjectName string @@ -3829,6 +3835,40 @@ func (putAzureTargetFailureNotificationRegistrationSpectraS3Request *PutAzureTar return putAzureTargetFailureNotificationRegistrationSpectraS3Request } +type PutBucketChangesNotificationRegistrationSpectraS3Request struct { + BucketId *string + Format HttpResponseFormatType + NamingConvention NamingConventionType + NotificationEndPoint string + NotificationHttpMethod RequestType +} + +func NewPutBucketChangesNotificationRegistrationSpectraS3Request(notificationEndPoint string) *PutBucketChangesNotificationRegistrationSpectraS3Request { + return &PutBucketChangesNotificationRegistrationSpectraS3Request{ + NotificationEndPoint: notificationEndPoint, + } +} + +func (putBucketChangesNotificationRegistrationSpectraS3Request *PutBucketChangesNotificationRegistrationSpectraS3Request) WithBucketId(bucketId string) *PutBucketChangesNotificationRegistrationSpectraS3Request { + putBucketChangesNotificationRegistrationSpectraS3Request.BucketId = &bucketId + return putBucketChangesNotificationRegistrationSpectraS3Request +} + +func (putBucketChangesNotificationRegistrationSpectraS3Request *PutBucketChangesNotificationRegistrationSpectraS3Request) WithFormat(format HttpResponseFormatType) *PutBucketChangesNotificationRegistrationSpectraS3Request { + putBucketChangesNotificationRegistrationSpectraS3Request.Format = format + return putBucketChangesNotificationRegistrationSpectraS3Request +} + +func (putBucketChangesNotificationRegistrationSpectraS3Request *PutBucketChangesNotificationRegistrationSpectraS3Request) WithNamingConvention(namingConvention NamingConventionType) *PutBucketChangesNotificationRegistrationSpectraS3Request { + putBucketChangesNotificationRegistrationSpectraS3Request.NamingConvention = namingConvention + return putBucketChangesNotificationRegistrationSpectraS3Request +} + +func (putBucketChangesNotificationRegistrationSpectraS3Request *PutBucketChangesNotificationRegistrationSpectraS3Request) WithNotificationHttpMethod(notificationHttpMethod RequestType) *PutBucketChangesNotificationRegistrationSpectraS3Request { + putBucketChangesNotificationRegistrationSpectraS3Request.NotificationHttpMethod = notificationHttpMethod + return putBucketChangesNotificationRegistrationSpectraS3Request +} + type PutDs3TargetFailureNotificationRegistrationSpectraS3Request struct { Format HttpResponseFormatType NamingConvention NamingConventionType @@ -4221,6 +4261,16 @@ func NewDeleteAzureTargetFailureNotificationRegistrationSpectraS3Request(azureTa } } +type DeleteBucketChangesNotificationRegistrationSpectraS3Request struct { + BucketChangesNotificationRegistration string +} + +func NewDeleteBucketChangesNotificationRegistrationSpectraS3Request(bucketChangesNotificationRegistration string) *DeleteBucketChangesNotificationRegistrationSpectraS3Request { + return &DeleteBucketChangesNotificationRegistrationSpectraS3Request{ + BucketChangesNotificationRegistration: bucketChangesNotificationRegistration, + } +} + type DeleteDs3TargetFailureNotificationRegistrationSpectraS3Request struct { NotificationId string } @@ -4399,6 +4449,98 @@ func (getAzureTargetFailureNotificationRegistrationsSpectraS3Request *GetAzureTa return getAzureTargetFailureNotificationRegistrationsSpectraS3Request } +type GetBucketChangesNotificationRegistrationSpectraS3Request struct { + BucketChangesNotificationRegistration string +} + +func NewGetBucketChangesNotificationRegistrationSpectraS3Request(bucketChangesNotificationRegistration string) *GetBucketChangesNotificationRegistrationSpectraS3Request { + return &GetBucketChangesNotificationRegistrationSpectraS3Request{ + BucketChangesNotificationRegistration: bucketChangesNotificationRegistration, + } +} + +type GetBucketChangesNotificationRegistrationsSpectraS3Request struct { + LastPage bool + PageLength *int + PageOffset *int + PageStartMarker *string + UserId *string +} + +func NewGetBucketChangesNotificationRegistrationsSpectraS3Request() *GetBucketChangesNotificationRegistrationsSpectraS3Request { + return &GetBucketChangesNotificationRegistrationsSpectraS3Request{ + } +} + +func (getBucketChangesNotificationRegistrationsSpectraS3Request *GetBucketChangesNotificationRegistrationsSpectraS3Request) WithLastPage() *GetBucketChangesNotificationRegistrationsSpectraS3Request { + getBucketChangesNotificationRegistrationsSpectraS3Request.LastPage = true + return getBucketChangesNotificationRegistrationsSpectraS3Request +} + +func (getBucketChangesNotificationRegistrationsSpectraS3Request *GetBucketChangesNotificationRegistrationsSpectraS3Request) WithPageLength(pageLength int) *GetBucketChangesNotificationRegistrationsSpectraS3Request { + getBucketChangesNotificationRegistrationsSpectraS3Request.PageLength = &pageLength + return getBucketChangesNotificationRegistrationsSpectraS3Request +} + +func (getBucketChangesNotificationRegistrationsSpectraS3Request *GetBucketChangesNotificationRegistrationsSpectraS3Request) WithPageOffset(pageOffset int) *GetBucketChangesNotificationRegistrationsSpectraS3Request { + getBucketChangesNotificationRegistrationsSpectraS3Request.PageOffset = &pageOffset + return getBucketChangesNotificationRegistrationsSpectraS3Request +} + +func (getBucketChangesNotificationRegistrationsSpectraS3Request *GetBucketChangesNotificationRegistrationsSpectraS3Request) WithPageStartMarker(pageStartMarker string) *GetBucketChangesNotificationRegistrationsSpectraS3Request { + getBucketChangesNotificationRegistrationsSpectraS3Request.PageStartMarker = &pageStartMarker + return getBucketChangesNotificationRegistrationsSpectraS3Request +} + +func (getBucketChangesNotificationRegistrationsSpectraS3Request *GetBucketChangesNotificationRegistrationsSpectraS3Request) WithUserId(userId string) *GetBucketChangesNotificationRegistrationsSpectraS3Request { + getBucketChangesNotificationRegistrationsSpectraS3Request.UserId = &userId + return getBucketChangesNotificationRegistrationsSpectraS3Request +} + +type GetBucketHistorySpectraS3Request struct { + BucketId *string + LastPage bool + MinSequenceNumber *int64 + PageLength *int + PageOffset *int + PageStartMarker *string +} + +func NewGetBucketHistorySpectraS3Request() *GetBucketHistorySpectraS3Request { + return &GetBucketHistorySpectraS3Request{ + } +} + +func (getBucketHistorySpectraS3Request *GetBucketHistorySpectraS3Request) WithBucketId(bucketId string) *GetBucketHistorySpectraS3Request { + getBucketHistorySpectraS3Request.BucketId = &bucketId + return getBucketHistorySpectraS3Request +} + +func (getBucketHistorySpectraS3Request *GetBucketHistorySpectraS3Request) WithLastPage() *GetBucketHistorySpectraS3Request { + getBucketHistorySpectraS3Request.LastPage = true + return getBucketHistorySpectraS3Request +} + +func (getBucketHistorySpectraS3Request *GetBucketHistorySpectraS3Request) WithMinSequenceNumber(minSequenceNumber int64) *GetBucketHistorySpectraS3Request { + getBucketHistorySpectraS3Request.MinSequenceNumber = &minSequenceNumber + return getBucketHistorySpectraS3Request +} + +func (getBucketHistorySpectraS3Request *GetBucketHistorySpectraS3Request) WithPageLength(pageLength int) *GetBucketHistorySpectraS3Request { + getBucketHistorySpectraS3Request.PageLength = &pageLength + return getBucketHistorySpectraS3Request +} + +func (getBucketHistorySpectraS3Request *GetBucketHistorySpectraS3Request) WithPageOffset(pageOffset int) *GetBucketHistorySpectraS3Request { + getBucketHistorySpectraS3Request.PageOffset = &pageOffset + return getBucketHistorySpectraS3Request +} + +func (getBucketHistorySpectraS3Request *GetBucketHistorySpectraS3Request) WithPageStartMarker(pageStartMarker string) *GetBucketHistorySpectraS3Request { + getBucketHistorySpectraS3Request.PageStartMarker = &pageStartMarker + return getBucketHistorySpectraS3Request +} + type GetDs3TargetFailureNotificationRegistrationSpectraS3Request struct { NotificationId string } @@ -6911,6 +7053,7 @@ func NewGetTapeDriveSpectraS3Request(tapeDriveId string) *GetTapeDriveSpectraS3R type GetTapeDrivesSpectraS3Request struct { LastPage bool + MinimumTaskPriority Priority PageLength *int PageOffset *int PageStartMarker *string @@ -6931,6 +7074,11 @@ func (getTapeDrivesSpectraS3Request *GetTapeDrivesSpectraS3Request) WithLastPage return getTapeDrivesSpectraS3Request } +func (getTapeDrivesSpectraS3Request *GetTapeDrivesSpectraS3Request) WithMinimumTaskPriority(minimumTaskPriority Priority) *GetTapeDrivesSpectraS3Request { + getTapeDrivesSpectraS3Request.MinimumTaskPriority = minimumTaskPriority + return getTapeDrivesSpectraS3Request +} + func (getTapeDrivesSpectraS3Request *GetTapeDrivesSpectraS3Request) WithPageLength(pageLength int) *GetTapeDrivesSpectraS3Request { getTapeDrivesSpectraS3Request.PageLength = &pageLength return getTapeDrivesSpectraS3Request @@ -7574,6 +7722,7 @@ func NewModifyAllTapePartitionsSpectraS3Request(quiesced Quiesced) *ModifyAllTap } type ModifyTapeDriveSpectraS3Request struct { + MinimumTaskPriority Priority Quiesced Quiesced ReservedTaskType ReservedTaskType TapeDriveId string @@ -7585,6 +7734,11 @@ func NewModifyTapeDriveSpectraS3Request(tapeDriveId string) *ModifyTapeDriveSpec } } +func (modifyTapeDriveSpectraS3Request *ModifyTapeDriveSpectraS3Request) WithMinimumTaskPriority(minimumTaskPriority Priority) *ModifyTapeDriveSpectraS3Request { + modifyTapeDriveSpectraS3Request.MinimumTaskPriority = minimumTaskPriority + return modifyTapeDriveSpectraS3Request +} + func (modifyTapeDriveSpectraS3Request *ModifyTapeDriveSpectraS3Request) WithQuiesced(quiesced Quiesced) *ModifyTapeDriveSpectraS3Request { modifyTapeDriveSpectraS3Request.Quiesced = quiesced return modifyTapeDriveSpectraS3Request diff --git a/ds3/models/responseModels.go b/ds3/models/responseModels.go index e1307d4..741bd91 100644 --- a/ds3/models/responseModels.go +++ b/ds3/models/responseModels.go @@ -2397,6 +2397,144 @@ func (azureTargetFailureNotificationRegistration *AzureTargetFailureNotification } } +type BucketChangesNotificationRegistration struct { + BucketId *string + CreationDate string + Format HttpResponseFormatType + Id string + LastFailure *string + LastHttpResponseCode *int + LastNotification *string + LastSequenceNumber *int64 + NamingConvention NamingConventionType + NotificationEndPoint *string + NotificationHttpMethod RequestType + NumberOfFailuresSinceLastSuccess int + UserId *string +} + +func (bucketChangesNotificationRegistration *BucketChangesNotificationRegistration) parse(xmlNode *XmlNode, aggErr *AggregateError) { + + // Parse Child Nodes + for _, child := range xmlNode.Children { + switch child.XMLName.Local { + case "BucketId": + bucketChangesNotificationRegistration.BucketId = parseNullableString(child.Content) + case "CreationDate": + bucketChangesNotificationRegistration.CreationDate = parseString(child.Content) + case "Format": + parseEnum(child.Content, &bucketChangesNotificationRegistration.Format, aggErr) + case "Id": + bucketChangesNotificationRegistration.Id = parseString(child.Content) + case "LastFailure": + bucketChangesNotificationRegistration.LastFailure = parseNullableString(child.Content) + case "LastHttpResponseCode": + bucketChangesNotificationRegistration.LastHttpResponseCode = parseNullableInt(child.Content, aggErr) + case "LastNotification": + bucketChangesNotificationRegistration.LastNotification = parseNullableString(child.Content) + case "LastSequenceNumber": + bucketChangesNotificationRegistration.LastSequenceNumber = parseNullableInt64(child.Content, aggErr) + case "NamingConvention": + parseEnum(child.Content, &bucketChangesNotificationRegistration.NamingConvention, aggErr) + case "NotificationEndPoint": + bucketChangesNotificationRegistration.NotificationEndPoint = parseNullableString(child.Content) + case "NotificationHttpMethod": + parseEnum(child.Content, &bucketChangesNotificationRegistration.NotificationHttpMethod, aggErr) + case "NumberOfFailuresSinceLastSuccess": + bucketChangesNotificationRegistration.NumberOfFailuresSinceLastSuccess = parseInt(child.Content, aggErr) + case "UserId": + bucketChangesNotificationRegistration.UserId = parseNullableString(child.Content) + default: + log.Printf("WARNING: unable to parse unknown xml tag '%s' while parsing BucketChangesNotificationRegistration.", child.XMLName.Local) + } + } +} + +type BucketHistoryEvent struct { + BucketId string + Id string + ObjectName *string + SequenceNumber *int64 + Type BucketHistoryEventType + VersionId string +} + +func (bucketHistoryEvent *BucketHistoryEvent) parse(xmlNode *XmlNode, aggErr *AggregateError) { + + // Parse Child Nodes + for _, child := range xmlNode.Children { + switch child.XMLName.Local { + case "BucketId": + bucketHistoryEvent.BucketId = parseString(child.Content) + case "Id": + bucketHistoryEvent.Id = parseString(child.Content) + case "ObjectName": + bucketHistoryEvent.ObjectName = parseNullableString(child.Content) + case "SequenceNumber": + bucketHistoryEvent.SequenceNumber = parseNullableInt64(child.Content, aggErr) + case "Type": + parseEnum(child.Content, &bucketHistoryEvent.Type, aggErr) + case "VersionId": + bucketHistoryEvent.VersionId = parseString(child.Content) + default: + log.Printf("WARNING: unable to parse unknown xml tag '%s' while parsing BucketHistoryEvent.", child.XMLName.Local) + } + } +} + +type BucketHistoryEventType Enum + +const ( + BUCKET_HISTORY_EVENT_TYPE_DELETE BucketHistoryEventType = 1 + iota + BUCKET_HISTORY_EVENT_TYPE_MARK_LATEST BucketHistoryEventType = 1 + iota + BUCKET_HISTORY_EVENT_TYPE_UNMARK_LATEST BucketHistoryEventType = 1 + iota + BUCKET_HISTORY_EVENT_TYPE_CREATE BucketHistoryEventType = 1 + iota +) + +func (bucketHistoryEventType *BucketHistoryEventType) UnmarshalText(text []byte) error { + var str string = string(bytes.ToUpper(text)) + switch str { + case "": *bucketHistoryEventType = UNDEFINED + case "DELETE": *bucketHistoryEventType = BUCKET_HISTORY_EVENT_TYPE_DELETE + case "MARK_LATEST": *bucketHistoryEventType = BUCKET_HISTORY_EVENT_TYPE_MARK_LATEST + case "UNMARK_LATEST": *bucketHistoryEventType = BUCKET_HISTORY_EVENT_TYPE_UNMARK_LATEST + case "CREATE": *bucketHistoryEventType = BUCKET_HISTORY_EVENT_TYPE_CREATE + default: + *bucketHistoryEventType = UNDEFINED + return errors.New(fmt.Sprintf("Cannot marshal '%s' into BucketHistoryEventType", str)) + } + return nil +} + +func (bucketHistoryEventType BucketHistoryEventType) String() string { + switch bucketHistoryEventType { + case BUCKET_HISTORY_EVENT_TYPE_DELETE: return "DELETE" + case BUCKET_HISTORY_EVENT_TYPE_MARK_LATEST: return "MARK_LATEST" + case BUCKET_HISTORY_EVENT_TYPE_UNMARK_LATEST: return "UNMARK_LATEST" + case BUCKET_HISTORY_EVENT_TYPE_CREATE: return "CREATE" + default: + log.Printf("Error: invalid BucketHistoryEventType represented by '%d'", bucketHistoryEventType) + return "" + } +} + +func (bucketHistoryEventType BucketHistoryEventType) StringPtr() *string { + if bucketHistoryEventType == UNDEFINED { + return nil + } + result := bucketHistoryEventType.String() + return &result +} + +func newBucketHistoryEventTypeFromContent(content []byte, aggErr *AggregateError) *BucketHistoryEventType { + if len(content) == 0 { + // no value + return nil + } + result := new(BucketHistoryEventType) + parseEnum(content, result, aggErr) + return result +} type Ds3TargetFailureNotificationRegistration struct { CreationDate string Format HttpResponseFormatType @@ -3774,6 +3912,7 @@ type TapeDrive struct { Id string LastCleaned *string MfgSerialNumber *string + MinimumTaskPriority *Priority PartitionId string Quiesced Quiesced ReservedTaskType ReservedTaskType @@ -3800,6 +3939,8 @@ func (tapeDrive *TapeDrive) parse(xmlNode *XmlNode, aggErr *AggregateError) { tapeDrive.LastCleaned = parseNullableString(child.Content) case "MfgSerialNumber": tapeDrive.MfgSerialNumber = parseNullableString(child.Content) + case "MinimumTaskPriority": + tapeDrive.MinimumTaskPriority = newPriorityFromContent(child.Content, aggErr) case "PartitionId": tapeDrive.PartitionId = parseString(child.Content) case "Quiesced": @@ -7387,6 +7528,44 @@ func (azureTargetFailureNotificationRegistrationList *AzureTargetFailureNotifica } } +type BucketChangesNotificationRegistrationList struct { + BucketChangesNotificationRegistrations []BucketChangesNotificationRegistration +} + +func (bucketChangesNotificationRegistrationList *BucketChangesNotificationRegistrationList) parse(xmlNode *XmlNode, aggErr *AggregateError) { + + // Parse Child Nodes + for _, child := range xmlNode.Children { + switch child.XMLName.Local { + case "BucketChangesNotificationRegistration": + var model BucketChangesNotificationRegistration + model.parse(&child, aggErr) + bucketChangesNotificationRegistrationList.BucketChangesNotificationRegistrations = append(bucketChangesNotificationRegistrationList.BucketChangesNotificationRegistrations, model) + default: + log.Printf("WARNING: unable to parse unknown xml tag '%s' while parsing BucketChangesNotificationRegistrationList.", child.XMLName.Local) + } + } +} + +type BucketHistoryEventList struct { + BucketHistoryEvents []BucketHistoryEvent +} + +func (bucketHistoryEventList *BucketHistoryEventList) parse(xmlNode *XmlNode, aggErr *AggregateError) { + + // Parse Child Nodes + for _, child := range xmlNode.Children { + switch child.XMLName.Local { + case "BucketHistoryEvent": + var model BucketHistoryEvent + model.parse(&child, aggErr) + bucketHistoryEventList.BucketHistoryEvents = append(bucketHistoryEventList.BucketHistoryEvents, model) + default: + log.Printf("WARNING: unable to parse unknown xml tag '%s' while parsing BucketHistoryEventList.", child.XMLName.Local) + } + } +} + type Ds3TargetFailureNotificationRegistrationList struct { Ds3TargetFailureNotificationRegistrations []Ds3TargetFailureNotificationRegistration } diff --git a/ds3/models/responses.go b/ds3/models/responses.go index eb6eb1c..f6dde66 100644 --- a/ds3/models/responses.go +++ b/ds3/models/responses.go @@ -3454,6 +3454,32 @@ func NewPutAzureTargetFailureNotificationRegistrationSpectraS3Response(webRespon } } +type PutBucketChangesNotificationRegistrationSpectraS3Response struct { + BucketChangesNotificationRegistration BucketChangesNotificationRegistration + Headers *http.Header +} + +func (putBucketChangesNotificationRegistrationSpectraS3Response *PutBucketChangesNotificationRegistrationSpectraS3Response) parse(webResponse WebResponse) error { + return parseResponsePayload(webResponse, &putBucketChangesNotificationRegistrationSpectraS3Response.BucketChangesNotificationRegistration) +} + +func NewPutBucketChangesNotificationRegistrationSpectraS3Response(webResponse WebResponse) (*PutBucketChangesNotificationRegistrationSpectraS3Response, error) { + defer webResponse.Body().Close() + expectedStatusCodes := []int { 201 } + + switch code := webResponse.StatusCode(); code { + case 201: + var body PutBucketChangesNotificationRegistrationSpectraS3Response + if err := body.parse(webResponse); err != nil { + return nil, err + } + body.Headers = webResponse.Header() + return &body, nil + default: + return nil, buildBadStatusCodeError(webResponse, expectedStatusCodes) + } +} + type PutDs3TargetFailureNotificationRegistrationSpectraS3Response struct { Ds3TargetFailureNotificationRegistration Ds3TargetFailureNotificationRegistration Headers *http.Header @@ -3811,6 +3837,25 @@ func NewDeleteAzureTargetFailureNotificationRegistrationSpectraS3Response(webRes } } +type DeleteBucketChangesNotificationRegistrationSpectraS3Response struct { + + Headers *http.Header +} + + + +func NewDeleteBucketChangesNotificationRegistrationSpectraS3Response(webResponse WebResponse) (*DeleteBucketChangesNotificationRegistrationSpectraS3Response, error) { + defer webResponse.Body().Close() + expectedStatusCodes := []int { 204 } + + switch code := webResponse.StatusCode(); code { + case 204: + return &DeleteBucketChangesNotificationRegistrationSpectraS3Response{Headers: webResponse.Header()}, nil + default: + return nil, buildBadStatusCodeError(webResponse, expectedStatusCodes) + } +} + type DeleteDs3TargetFailureNotificationRegistrationSpectraS3Response struct { Headers *http.Header @@ -4110,6 +4155,84 @@ func NewGetAzureTargetFailureNotificationRegistrationsSpectraS3Response(webRespo } } +type GetBucketChangesNotificationRegistrationSpectraS3Response struct { + BucketChangesNotificationRegistration BucketChangesNotificationRegistration + Headers *http.Header +} + +func (getBucketChangesNotificationRegistrationSpectraS3Response *GetBucketChangesNotificationRegistrationSpectraS3Response) parse(webResponse WebResponse) error { + return parseResponsePayload(webResponse, &getBucketChangesNotificationRegistrationSpectraS3Response.BucketChangesNotificationRegistration) +} + +func NewGetBucketChangesNotificationRegistrationSpectraS3Response(webResponse WebResponse) (*GetBucketChangesNotificationRegistrationSpectraS3Response, error) { + defer webResponse.Body().Close() + expectedStatusCodes := []int { 200 } + + switch code := webResponse.StatusCode(); code { + case 200: + var body GetBucketChangesNotificationRegistrationSpectraS3Response + if err := body.parse(webResponse); err != nil { + return nil, err + } + body.Headers = webResponse.Header() + return &body, nil + default: + return nil, buildBadStatusCodeError(webResponse, expectedStatusCodes) + } +} + +type GetBucketChangesNotificationRegistrationsSpectraS3Response struct { + BucketChangesNotificationRegistrationList BucketChangesNotificationRegistrationList + Headers *http.Header +} + +func (getBucketChangesNotificationRegistrationsSpectraS3Response *GetBucketChangesNotificationRegistrationsSpectraS3Response) parse(webResponse WebResponse) error { + return parseResponsePayload(webResponse, &getBucketChangesNotificationRegistrationsSpectraS3Response.BucketChangesNotificationRegistrationList) +} + +func NewGetBucketChangesNotificationRegistrationsSpectraS3Response(webResponse WebResponse) (*GetBucketChangesNotificationRegistrationsSpectraS3Response, error) { + defer webResponse.Body().Close() + expectedStatusCodes := []int { 200 } + + switch code := webResponse.StatusCode(); code { + case 200: + var body GetBucketChangesNotificationRegistrationsSpectraS3Response + if err := body.parse(webResponse); err != nil { + return nil, err + } + body.Headers = webResponse.Header() + return &body, nil + default: + return nil, buildBadStatusCodeError(webResponse, expectedStatusCodes) + } +} + +type GetBucketHistorySpectraS3Response struct { + BucketHistoryEventList BucketHistoryEventList + Headers *http.Header +} + +func (getBucketHistorySpectraS3Response *GetBucketHistorySpectraS3Response) parse(webResponse WebResponse) error { + return parseResponsePayload(webResponse, &getBucketHistorySpectraS3Response.BucketHistoryEventList) +} + +func NewGetBucketHistorySpectraS3Response(webResponse WebResponse) (*GetBucketHistorySpectraS3Response, error) { + defer webResponse.Body().Close() + expectedStatusCodes := []int { 200 } + + switch code := webResponse.StatusCode(); code { + case 200: + var body GetBucketHistorySpectraS3Response + if err := body.parse(webResponse); err != nil { + return nil, err + } + body.Headers = webResponse.Header() + return &body, nil + default: + return nil, buildBadStatusCodeError(webResponse, expectedStatusCodes) + } +} + type GetDs3TargetFailureNotificationRegistrationSpectraS3Response struct { Ds3TargetFailureNotificationRegistration Ds3TargetFailureNotificationRegistration Headers *http.Header From f3eedf12ec717ec8e7c838bd8bd081fa5fdd76eb Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Thu, 11 Jun 2020 15:34:30 -0600 Subject: [PATCH 06/11] OTHER: fixing race condition queueing things for processing. This only happens in streaming strategy combined with very small blob size on the BP bucket. There existed the possibility that a blob would be added to the queue twice. This explicitly prevents the double queuing of blobs. (#101) (#104) --- helpers/blobQueue.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/helpers/blobQueue.go b/helpers/blobQueue.go index 0f54135..e1f9a3e 100644 --- a/helpers/blobQueue.go +++ b/helpers/blobQueue.go @@ -3,6 +3,7 @@ package helpers import ( "errors" helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models" + "reflect" ) // A queue that manages descriptions of blobs @@ -25,6 +26,12 @@ func NewBlobDescriptionQueue() BlobDescriptionQueue { } func (queue *blobDescriptionQueueImpl) Push(description *helperModels.BlobDescription) { + // verify that this blob isn't already in the queue before adding it + for _, existingBlob := range queue.queue { + if reflect.DeepEqual(*existingBlob, *description) { + return + } + } queue.queue = append(queue.queue, description) } From 83231e91baf2214a08e15dc1705cb5cb36b93fb3 Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Fri, 12 Jun 2020 09:19:21 -0600 Subject: [PATCH 07/11] Updating 5.2 API to commit 1742847 (#105) --- ds3/models/responseModels.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ds3/models/responseModels.go b/ds3/models/responseModels.go index 741bd91..3bfb950 100644 --- a/ds3/models/responseModels.go +++ b/ds3/models/responseModels.go @@ -1576,6 +1576,7 @@ const ( S3_INITIAL_DATA_PLACEMENT_POLICY_REDUCED_REDUNDANCY S3InitialDataPlacementPolicy = 1 + iota S3_INITIAL_DATA_PLACEMENT_POLICY_STANDARD_IA S3InitialDataPlacementPolicy = 1 + iota S3_INITIAL_DATA_PLACEMENT_POLICY_GLACIER S3InitialDataPlacementPolicy = 1 + iota + S3_INITIAL_DATA_PLACEMENT_POLICY_DEEP_ARCHIVE S3InitialDataPlacementPolicy = 1 + iota ) func (s3InitialDataPlacementPolicy *S3InitialDataPlacementPolicy) UnmarshalText(text []byte) error { @@ -1586,6 +1587,7 @@ func (s3InitialDataPlacementPolicy *S3InitialDataPlacementPolicy) UnmarshalText( case "REDUCED_REDUNDANCY": *s3InitialDataPlacementPolicy = S3_INITIAL_DATA_PLACEMENT_POLICY_REDUCED_REDUNDANCY case "STANDARD_IA": *s3InitialDataPlacementPolicy = S3_INITIAL_DATA_PLACEMENT_POLICY_STANDARD_IA case "GLACIER": *s3InitialDataPlacementPolicy = S3_INITIAL_DATA_PLACEMENT_POLICY_GLACIER + case "DEEP_ARCHIVE": *s3InitialDataPlacementPolicy = S3_INITIAL_DATA_PLACEMENT_POLICY_DEEP_ARCHIVE default: *s3InitialDataPlacementPolicy = UNDEFINED return errors.New(fmt.Sprintf("Cannot marshal '%s' into S3InitialDataPlacementPolicy", str)) @@ -1599,6 +1601,7 @@ func (s3InitialDataPlacementPolicy S3InitialDataPlacementPolicy) String() string case S3_INITIAL_DATA_PLACEMENT_POLICY_REDUCED_REDUNDANCY: return "REDUCED_REDUNDANCY" case S3_INITIAL_DATA_PLACEMENT_POLICY_STANDARD_IA: return "STANDARD_IA" case S3_INITIAL_DATA_PLACEMENT_POLICY_GLACIER: return "GLACIER" + case S3_INITIAL_DATA_PLACEMENT_POLICY_DEEP_ARCHIVE: return "DEEP_ARCHIVE" default: log.Printf("Error: invalid S3InitialDataPlacementPolicy represented by '%d'", s3InitialDataPlacementPolicy) return "" @@ -2453,6 +2456,7 @@ func (bucketChangesNotificationRegistration *BucketChangesNotificationRegistrati type BucketHistoryEvent struct { BucketId string Id string + ObjectCreationDate *string ObjectName *string SequenceNumber *int64 Type BucketHistoryEventType @@ -2468,6 +2472,8 @@ func (bucketHistoryEvent *BucketHistoryEvent) parse(xmlNode *XmlNode, aggErr *Ag bucketHistoryEvent.BucketId = parseString(child.Content) case "Id": bucketHistoryEvent.Id = parseString(child.Content) + case "ObjectCreationDate": + bucketHistoryEvent.ObjectCreationDate = parseNullableString(child.Content) case "ObjectName": bucketHistoryEvent.ObjectName = parseNullableString(child.Content) case "SequenceNumber": From 336bdbb941ca8a7cd36e6e7b198a5204d8d9d8ec Mon Sep 17 00:00:00 2001 From: Rachel Tucker Date: Mon, 22 Jun 2020 15:30:57 -0600 Subject: [PATCH 08/11] OTHER: improving error message when retriving a file from the BP ends up with less data then expected. Originally this returned an Unexpected EOF error, which is not very descriptive. Now it mentions how much data was retrieved, and on which blob the data failed to be retrieved from. --- helpers/getProducer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/helpers/getProducer.go b/helpers/getProducer.go index d9227bd..0591b2f 100644 --- a/helpers/getProducer.go +++ b/helpers/getProducer.go @@ -1,6 +1,7 @@ package helpers import ( + "fmt" "github.com/SpectraLogic/ds3_go_sdk/ds3" ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models" helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models" @@ -137,8 +138,15 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf return } defer info.channelBuilder.OnDone(writer) - _, err = io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer - if err != nil { + bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer + if err != nil && err != io.ErrUnexpectedEOF { + producer.strategy.Listeners.Errored(info.blob.Name(), err) + info.channelBuilder.SetFatalError(err) + producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + return + } + if bytesWritten != info.blob.Length() { + err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) producer.strategy.Listeners.Errored(info.blob.Name(), err) info.channelBuilder.SetFatalError(err) producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) From a15eecfb2605cbd2679ec45c128402d6a83ae3a6 Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Wed, 8 Jul 2020 16:49:00 -0600 Subject: [PATCH 09/11] GOSDK-23: The SDK breaks canonical MIME encoding of http headers. Removing the to lower call on all headers which was messing with the MIME encoding. (#110) --- ds3/models/ds3Response.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ds3/models/ds3Response.go b/ds3/models/ds3Response.go index 0e70865..aaf2880 100644 --- a/ds3/models/ds3Response.go +++ b/ds3/models/ds3Response.go @@ -2,7 +2,6 @@ package models import ( "io" - "strings" "net/http" ) @@ -29,13 +28,10 @@ func (wrappedHttpResponse *WrappedHttpResponse) Body() io.ReadCloser { } func (wrappedHttpResponse *WrappedHttpResponse) Header() *http.Header { - // The HTTP spec says headers keys are case insensitive, so we'll just - // to lower them before processing the response so we can always get the - // right thing. result := make(http.Header) header := wrappedHttpResponse.rawResponse.Header for k, v := range header { - result[strings.ToLower(k)] = v + result[k] = v } return &result } From fc238ea10c14d55739665aad0adb866bb58670b3 Mon Sep 17 00:00:00 2001 From: Rachel Tucker Date: Fri, 26 Jun 2020 16:00:50 -0600 Subject: [PATCH 10/11] OTHER: adding retry around bulk get when a blob only partially transfers. If the blob is not retrieved in its entirety, then ranged naked gets are used to retrieve the remaining portion of the blob. This retry only applies to blobs where no range was specified. --- ds3_integration/utils/testUtils.go | 4 +- helpers/getProducer.go | 59 +++++++++++- helpers_integration/helpersImpl_test.go | 117 ++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 6 deletions(-) diff --git a/ds3_integration/utils/testUtils.go b/ds3_integration/utils/testUtils.go index 5b7c388..a30b521 100644 --- a/ds3_integration/utils/testUtils.go +++ b/ds3_integration/utils/testUtils.go @@ -60,7 +60,7 @@ func VerifyBookContent(t *testing.T, bookName string, actual io.ReadCloser) { verifyContent(t, expected, actual) } -func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.ReadCloser) { +func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.Reader) { f, err := os.Open(filePath) ds3Testing.AssertNilError(t, err) @@ -73,7 +73,7 @@ func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64 verifyPartialContent(t, *expected, actual, length) } -func verifyPartialContent(t *testing.T, expected []byte, actual io.ReadCloser, length int64) { +func verifyPartialContent(t *testing.T, expected []byte, actual io.Reader, length int64) { content, err := getNBytesFromReader(actual, length) ds3Testing.AssertNilError(t, err) diff --git a/helpers/getProducer.go b/helpers/getProducer.go index 0591b2f..bb4d31b 100644 --- a/helpers/getProducer.go +++ b/helpers/getProducer.go @@ -12,6 +12,8 @@ import ( "time" ) +const timesToRetryGettingPartialBlob = 5 + type getProducer struct { JobMasterObjectList *ds3Models.MasterObjectList //MOL from put bulk job creation GetObjects *[]helperModels.GetObject @@ -146,10 +148,13 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf return } if bytesWritten != info.blob.Length() { - err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) - producer.strategy.Listeners.Errored(info.blob.Name(), err) - info.channelBuilder.SetFatalError(err) - producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + producer.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) + err := GetRemainingBlob(producer.client, info.bucketName, info.blob, bytesWritten, writer, producer.Logger) + if err != nil { + producer.strategy.Listeners.Errored(info.blob.Name(), err) + info.channelBuilder.SetFatalError(err) + producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + } } return } @@ -166,6 +171,52 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf } } +func GetRemainingBlob(client *ds3.Client, bucketName string, blob *helperModels.BlobDescription, amountAlreadyRetrieved int64, writer io.Writer, logger sdk_log.Logger) error { + logger.Debugf("starting retry for fetching partial blob '%s' at offset '%d': amount to retrieve %d", blob.Name(), blob.Offset(), blob.Length() - amountAlreadyRetrieved) + bytesRetrievedSoFar := amountAlreadyRetrieved + timesRetried := 0 + rangeEnd := blob.Offset() + blob.Length() -1 + for bytesRetrievedSoFar < blob.Length() && timesRetried < timesToRetryGettingPartialBlob { + rangeStart := blob.Offset() + bytesRetrievedSoFar + bytesRetrievedThisRound, err := RetryGettingBlobRange(client, bucketName, blob.Name(), blob.Offset(), rangeStart, rangeEnd, writer, logger) + if err != nil { + logger.Errorf("failed to get object '%s' at offset '%d', range %d=%d attempt %d: %s", blob.Name(), blob.Offset(), rangeStart, rangeEnd, timesRetried, err.Error()) + } + bytesRetrievedSoFar+= bytesRetrievedThisRound + timesRetried++ + } + + if bytesRetrievedSoFar < blob.Length() { + return fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", blob.Name(), blob.Offset(), bytesRetrievedSoFar, blob.Length()) + } + return nil +} + +func RetryGettingBlobRange(client *ds3.Client, bucketName string, objectName string, blobOffset int64, rangeStart int64, rangeEnd int64, writer io.Writer, logger sdk_log.Logger) (int64, error) { + // perform a naked get call for the rest of the blob that we originally failed to get + partOfBlobToFetch := ds3Models.Range{ + Start: rangeStart, + End: rangeEnd, + } + getObjRequest := ds3Models.NewGetObjectRequest(bucketName, objectName). + WithOffset(blobOffset). + WithRanges(partOfBlobToFetch) + + getObjResponse, err := client.GetObject(getObjRequest) + if err != nil { + return 0, err + } + defer func() { + err := getObjResponse.Content.Close() + if err != nil { + logger.Warningf("failed to close response body for get object '%s' with range %d-%d: %v", objectName, rangeStart, rangeEnd, err) + } + }() + + bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer + return bytesWritten, err +} + // Writes a range of a blob to its destination channel func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, blobRange ds3Models.Range, content io.Reader) error { writer, err := channelBuilder.GetChannel(blobRange.Start) diff --git a/helpers_integration/helpersImpl_test.go b/helpers_integration/helpersImpl_test.go index d93155c..65abf88 100644 --- a/helpers_integration/helpersImpl_test.go +++ b/helpers_integration/helpersImpl_test.go @@ -504,3 +504,120 @@ func TestBulkPutAndGetLotsOfFiles(t *testing.T) { t.Errorf("expected to get a BP job ID, but instead got nothing") } } + +func TestRetryGettingBlobRange(t *testing.T) { + defer testutils.DeleteBucketContents(client, testBucket) + + helper := helpers.NewHelpers(client) + strategy := newTestTransferStrategy(t) + + // Put a blobbed object to BP + const bigFilePath = LargeBookPath + LargeBookTitle + writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath) + ds3Testing.AssertNilError(t, err) + + var writeObjects []helperModels.PutObject + writeObjects = append(writeObjects, *writeObj) + + putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy) + ds3Testing.AssertNilError(t, err) + if putJobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // Try to get some data from each blob + getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId)) + ds3Testing.AssertNilError(t, err) + + blobsChecked := 0 + for _, curObj := range getJob.MasterObjectList.Objects { + for _, blob := range curObj.Objects { + func() { + // create a temp file for writing the blob to + tempFile, err := ioutil.TempFile("", "go-sdk-test-") + ds3Testing.AssertNilError(t, err) + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + // get a range of the blob + startRange := blob.Offset+10 // retrieve subset of blob + endRange := blob.Length+blob.Offset-1 + bytesWritten, err := helpers.RetryGettingBlobRange(client, testBucket, writeObj.PutObject.Name, blob.Offset, startRange, endRange, tempFile, client.Logger) + ds3Testing.AssertNilError(t, err) + ds3Testing.AssertInt64(t, "bytes written", endRange-startRange+1, bytesWritten) + + // verify that retrieved partial blob is correct + err = tempFile.Sync() + ds3Testing.AssertNilError(t, err) + + tempFile.Seek(0, 0) + length := endRange-startRange + testutils.VerifyPartialFile(t, bigFilePath, length, startRange, tempFile) + }() + blobsChecked++ + } + } + if blobsChecked == 0 { + t.Fatalf("didn't verify any blobs") + } +} + +func TestGetRemainingBlob(t *testing.T) { + defer testutils.DeleteBucketContents(client, testBucket) + + helper := helpers.NewHelpers(client) + strategy := newTestTransferStrategy(t) + + // Put a blobbed object to BP + const bigFilePath = LargeBookPath + LargeBookTitle + writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath) + ds3Testing.AssertNilError(t, err) + + var writeObjects []helperModels.PutObject + writeObjects = append(writeObjects, *writeObj) + + putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy) + ds3Testing.AssertNilError(t, err) + if putJobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // Try to get some data from each blob + getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId)) + ds3Testing.AssertNilError(t, err) + + blobsChecked := 0 + for _, curObj := range getJob.MasterObjectList.Objects { + for _, blob := range curObj.Objects { + func() { + // create a temp file for writing the blob to + tempFile, err := ioutil.TempFile("", "go-sdk-test-") + ds3Testing.AssertNilError(t, err) + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + // get the remainder of the blob after skipping some bytes + blob := helperModels.NewBlobDescription(*blob.Name, blob.Offset, blob.Length) + var amountToSkip int64 = 10 + err = helpers.GetRemainingBlob(client, testBucket, &blob, amountToSkip, tempFile, client.Logger) + ds3Testing.AssertNilError(t, err) + + // verify that retrieved partial blob is correct + err = tempFile.Sync() + ds3Testing.AssertNilError(t, err) + + tempFile.Seek(0, 0) + length := blob.Length() - amountToSkip + testutils.VerifyPartialFile(t, bigFilePath, length, blob.Offset()+amountToSkip, tempFile) + }() + blobsChecked++ + } + } + if blobsChecked == 0 { + t.Fatalf("didn't verify any blobs") + } +} From c7d22a34c7e1e22689b1fed015a74c9e1642279a Mon Sep 17 00:00:00 2001 From: RachelTucker Date: Thu, 13 Aug 2020 13:35:51 -0600 Subject: [PATCH 11/11] Updating to BP API 5.2 commit 1755379 (#116) --- ds3/ds3Gets.go | 1 + ds3/ds3Posts.go | 1 + ds3/ds3Puts.go | 1 + ds3/models/requests.go | 18 ++++++++++++++++++ ds3/models/responseModels.go | 6 ++++++ 5 files changed, 27 insertions(+) diff --git a/ds3/ds3Gets.go b/ds3/ds3Gets.go index 967c4e7..af59cb7 100644 --- a/ds3/ds3Gets.go +++ b/ds3/ds3Gets.go @@ -23,6 +23,7 @@ func (client *Client) GetObject(request *models.GetObjectRequest) (*models.GetOb httpRequest, err := networking.NewHttpRequestBuilder(). WithHttpVerb(HTTP_VERB_GET). WithPath("/" + request.BucketName + "/" + request.ObjectName). + WithOptionalVoidQueryParam("cached_only", request.CachedOnly). WithOptionalQueryParam("job", request.Job). WithOptionalQueryParam("offset", networking.Int64PtrToStrPtr(request.Offset)). WithOptionalQueryParam("version_id", request.VersionId). diff --git a/ds3/ds3Posts.go b/ds3/ds3Posts.go index 8985793..41e49a8 100644 --- a/ds3/ds3Posts.go +++ b/ds3/ds3Posts.go @@ -1341,6 +1341,7 @@ func (client *Client) RegisterS3TargetSpectraS3(request *models.RegisterS3Target WithOptionalQueryParam("proxy_port", networking.IntPtrToStrPtr(request.ProxyPort)). WithOptionalQueryParam("proxy_username", request.ProxyUsername). WithOptionalQueryParam("region", networking.InterfaceToStrPtr(request.Region)). + WithOptionalQueryParam("restricted_access", networking.BoolPtrToStrPtr(request.RestrictedAccess)). WithOptionalQueryParam("staged_data_expiration_in_days", networking.IntPtrToStrPtr(request.StagedDataExpirationInDays)). Build(client.connectionInfo) diff --git a/ds3/ds3Puts.go b/ds3/ds3Puts.go index 0156476..1cc128c 100644 --- a/ds3/ds3Puts.go +++ b/ds3/ds3Puts.go @@ -2596,6 +2596,7 @@ func (client *Client) ModifyS3TargetSpectraS3(request *models.ModifyS3TargetSpec WithOptionalQueryParam("proxy_username", request.ProxyUsername). WithOptionalQueryParam("quiesced", networking.InterfaceToStrPtr(request.Quiesced)). WithOptionalQueryParam("region", networking.InterfaceToStrPtr(request.Region)). + WithOptionalQueryParam("restricted_access", networking.BoolPtrToStrPtr(request.RestrictedAccess)). WithOptionalQueryParam("secret_key", request.SecretKey). WithOptionalQueryParam("staged_data_expiration_in_days", networking.IntPtrToStrPtr(request.StagedDataExpirationInDays)). Build(client.connectionInfo) diff --git a/ds3/models/requests.go b/ds3/models/requests.go index 114e328..a2ffb7c 100644 --- a/ds3/models/requests.go +++ b/ds3/models/requests.go @@ -258,6 +258,7 @@ type Range struct { type GetObjectRequest struct { BucketName string ObjectName string + CachedOnly bool Checksum Checksum Job *string Metadata map[string]string @@ -274,6 +275,11 @@ func NewGetObjectRequest(bucketName string, objectName string) *GetObjectRequest } } +func (getObjectRequest *GetObjectRequest) WithCachedOnly() *GetObjectRequest { + getObjectRequest.CachedOnly = true + return getObjectRequest +} + func (getObjectRequest *GetObjectRequest) WithJob(job string) *GetObjectRequest { getObjectRequest.Job = &job return getObjectRequest @@ -9358,6 +9364,7 @@ type ModifyS3TargetSpectraS3Request struct { ProxyUsername *string Quiesced Quiesced Region S3Region + RestrictedAccess *bool S3Target string SecretKey *string StagedDataExpirationInDays *int @@ -9459,6 +9466,11 @@ func (modifyS3TargetSpectraS3Request *ModifyS3TargetSpectraS3Request) WithRegion return modifyS3TargetSpectraS3Request } +func (modifyS3TargetSpectraS3Request *ModifyS3TargetSpectraS3Request) WithRestrictedAccess(restrictedAccess bool) *ModifyS3TargetSpectraS3Request { + modifyS3TargetSpectraS3Request.RestrictedAccess = &restrictedAccess + return modifyS3TargetSpectraS3Request +} + func (modifyS3TargetSpectraS3Request *ModifyS3TargetSpectraS3Request) WithSecretKey(secretKey string) *ModifyS3TargetSpectraS3Request { modifyS3TargetSpectraS3Request.SecretKey = &secretKey return modifyS3TargetSpectraS3Request @@ -9487,6 +9499,7 @@ type RegisterS3TargetSpectraS3Request struct { ProxyPort *int ProxyUsername *string Region S3Region + RestrictedAccess *bool SecretKey string StagedDataExpirationInDays *int } @@ -9574,6 +9587,11 @@ func (registerS3TargetSpectraS3Request *RegisterS3TargetSpectraS3Request) WithRe return registerS3TargetSpectraS3Request } +func (registerS3TargetSpectraS3Request *RegisterS3TargetSpectraS3Request) WithRestrictedAccess(restrictedAccess bool) *RegisterS3TargetSpectraS3Request { + registerS3TargetSpectraS3Request.RestrictedAccess = &restrictedAccess + return registerS3TargetSpectraS3Request +} + func (registerS3TargetSpectraS3Request *RegisterS3TargetSpectraS3Request) WithStagedDataExpirationInDays(stagedDataExpirationInDays int) *RegisterS3TargetSpectraS3Request { registerS3TargetSpectraS3Request.StagedDataExpirationInDays = &stagedDataExpirationInDays return registerS3TargetSpectraS3Request diff --git a/ds3/models/responseModels.go b/ds3/models/responseModels.go index 3bfb950..2de6855 100644 --- a/ds3/models/responseModels.go +++ b/ds3/models/responseModels.go @@ -4028,6 +4028,7 @@ const ( TAPE_DRIVE_TYPE_LTO6 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_LTO7 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_LTO8 TapeDriveType = 1 + iota + TAPE_DRIVE_TYPE_LTO9 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_TS1140 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_TS1150 TapeDriveType = 1 + iota TAPE_DRIVE_TYPE_TS1155 TapeDriveType = 1 + iota @@ -4043,6 +4044,7 @@ func (tapeDriveType *TapeDriveType) UnmarshalText(text []byte) error { case "LTO6": *tapeDriveType = TAPE_DRIVE_TYPE_LTO6 case "LTO7": *tapeDriveType = TAPE_DRIVE_TYPE_LTO7 case "LTO8": *tapeDriveType = TAPE_DRIVE_TYPE_LTO8 + case "LTO9": *tapeDriveType = TAPE_DRIVE_TYPE_LTO9 case "TS1140": *tapeDriveType = TAPE_DRIVE_TYPE_TS1140 case "TS1150": *tapeDriveType = TAPE_DRIVE_TYPE_TS1150 case "TS1155": *tapeDriveType = TAPE_DRIVE_TYPE_TS1155 @@ -4061,6 +4063,7 @@ func (tapeDriveType TapeDriveType) String() string { case TAPE_DRIVE_TYPE_LTO6: return "LTO6" case TAPE_DRIVE_TYPE_LTO7: return "LTO7" case TAPE_DRIVE_TYPE_LTO8: return "LTO8" + case TAPE_DRIVE_TYPE_LTO9: return "LTO9" case TAPE_DRIVE_TYPE_TS1140: return "TS1140" case TAPE_DRIVE_TYPE_TS1150: return "TS1150" case TAPE_DRIVE_TYPE_TS1155: return "TS1155" @@ -4939,6 +4942,7 @@ type S3Target struct { ProxyUsername *string Quiesced Quiesced Region *S3Region + RestrictedAccess bool SecretKey *string StagedDataExpirationInDays int State TargetState @@ -4989,6 +4993,8 @@ func (s3Target *S3Target) parse(xmlNode *XmlNode, aggErr *AggregateError) { parseEnum(child.Content, &s3Target.Quiesced, aggErr) case "Region": s3Target.Region = newS3RegionFromContent(child.Content, aggErr) + case "RestrictedAccess": + s3Target.RestrictedAccess = parseBool(child.Content, aggErr) case "SecretKey": s3Target.SecretKey = parseNullableString(child.Content) case "StagedDataExpirationInDays":