Skip to content

Commit

Permalink
temporary workaround for s3 delete for all versions
Browse files Browse the repository at this point in the history
Signed-off-by: Slach <[email protected]>
  • Loading branch information
Slach committed Oct 25, 2024
1 parent 67abc5a commit d95149b
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,19 @@ func (s *S3) deleteKey(ctx context.Context, key string) error {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
if s.versioning {
objVersion, err := s.getObjectVersion(ctx, key)
objVersions, err := s.getObjectAllVersions(ctx, key)
if err != nil {
return errors.Wrapf(err, "deleteKey, obtaining object version bucket: %s key: %s", s.Config.Bucket, key)
}
params.VersionId = objVersion
for _, objVersion := range objVersions {
params.VersionId = &objVersion
if _, err := s.client.DeleteObject(ctx, params); err != nil {
return errors.Wrapf(err, "deleteKey, deleting object bucket: %s key: %s version: %v", s.Config.Bucket, key, params.VersionId)
}
}
if len(objVersions) > 0 {
return nil
}
}
if _, err := s.client.DeleteObject(ctx, params); err != nil {
return errors.Wrapf(err, "deleteKey, deleting object bucket: %s key: %s version: %v", s.Config.Bucket, key, params.VersionId)
Expand Down Expand Up @@ -367,19 +375,28 @@ func (s *S3) isVersioningEnabled(ctx context.Context) bool {
return output.Status == s3types.BucketVersioningStatusEnabled
}

func (s *S3) getObjectVersion(ctx context.Context, key string) (*string, error) {
params := &s3.HeadObjectInput{
func (s *S3) getObjectAllVersions(ctx context.Context, key string) ([]string, error) {
listParams := &s3.ListObjectVersionsInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(key),
Prefix: aws.String(key),
}
if s.Config.RequestPayer != "" {
params.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
listParams.RequestPayer = s3types.RequestPayer(s.Config.RequestPayer)
}
object, err := s.client.HeadObject(ctx, params)
if err != nil {
return nil, err
versions := []string{}
pager := s3.NewListObjectVersionsPaginator(s.client, listParams)
for pager.HasMorePages() {
page, err := pager.NextPage(ctx)
if err != nil {
return nil, errors.Wrapf(err, "listing object versions bucket: %s key: %s", s.Config.Bucket, key)
}
for _, version := range page.Versions {
if *version.Key == key {
versions = append(versions, *version.VersionId)
}
}
}
return object.VersionId, nil
return versions, nil
}

func (s *S3) StatFile(ctx context.Context, key string) (RemoteFile, error) {
Expand Down

0 comments on commit d95149b

Please sign in to comment.