From 23c34a0f66da654f901f92dcec70ec7d4a185d2e Mon Sep 17 00:00:00 2001 From: Peter Nemere Date: Tue, 10 Dec 2024 15:19:33 +1000 Subject: [PATCH] Implemented S3 WriteObjectStream using s3manager Uploader. This allows uploading large DB backup files (specifically memoisation) to be streamed up instead of reading into memory first and then uploading. --- api/ws/wsHelpers/sync-mongo-dump.go | 4 ++-- api/ws/wsHelpers/sync-s3.go | 10 +++++----- core/fileaccess/s3.go | 15 +++++++++------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/api/ws/wsHelpers/sync-mongo-dump.go b/api/ws/wsHelpers/sync-mongo-dump.go index 00b15776..f39bc9e9 100644 --- a/api/ws/wsHelpers/sync-mongo-dump.go +++ b/api/ws/wsHelpers/sync-mongo-dump.go @@ -54,14 +54,14 @@ func UploadArchive(svcs *services.APIServices) error { svcs.Log.Infof(" Uploading: %v...", dbFile) dbFilePath := path.Join(dataBackupLocalPath, dbFile) - dbFileBytes, err := os.ReadFile(dbFilePath) + dbFileObj, err := os.Open(dbFilePath) if err != nil { return fmt.Errorf("Failed to read local DB dump file: %v. Error: %v", dbFilePath, err) } // Upload to bucket dbFilePathRemote := path.Join(dataBackupS3Path, dbFile) - err = svcs.FS.WriteObject(svcs.Config.DataBackupBucket, dbFilePathRemote, dbFileBytes) + err = svcs.FS.WriteObjectStream(svcs.Config.DataBackupBucket, dbFilePathRemote, dbFileObj) if err != nil { return fmt.Errorf("Failed to upload DB dump file: %v. Error: %v", dbFilePathRemote, err) diff --git a/api/ws/wsHelpers/sync-s3.go b/api/ws/wsHelpers/sync-s3.go index 13e5f71c..16f176a2 100644 --- a/api/ws/wsHelpers/sync-s3.go +++ b/api/ws/wsHelpers/sync-s3.go @@ -181,7 +181,7 @@ func makeRelativePaths(fullPaths []string, root string) ([]string, error) { // Requires source bucket, source root with srcRelativePaths which are relative to source root. This way the relative paths // can be compared to dest paths, and only the ones not already in dest root are copied. -func syncFiles(srcBucket string, srcRoot string, srcRelativePaths []string, destBucket string, destRoot string, fs fileaccess.FileAccess, log logger.ILogger) error { +func syncFiles(srcBucket string, srcRoot string, srcRelativePaths []string, destBucket string, destRoot string, fs fileaccess.FileAccess, jobLog logger.ILogger) error { // Get a listing from the destination // NOTE: the returned paths contain destRoot at the start! destFullFiles, err := fs.ListObjects(destBucket, destRoot) @@ -208,21 +208,21 @@ func syncFiles(srcBucket string, srcRoot string, srcRelativePaths []string, dest } } - log.Infof(" Sync backup directory to %v: %v skipped (already at destination)...", destRoot, len(srcRelativePaths)-len(toCopyRelativePaths)) + jobLog.Infof(" Sync backup directory to %v: %v skipped (already at destination)...", destRoot, len(srcRelativePaths)-len(toCopyRelativePaths)) // Copy all the files for c, relSrcPath := range toCopyRelativePaths { if c%100 == 0 { - log.Infof(" Sync backup directory to %v: %v of %v copied...", destRoot, c, len(toCopyRelativePaths)) + jobLog.Infof(" Sync backup directory to %v: %v of %v copied...", destRoot, c, len(toCopyRelativePaths)) } srcFullPath := path.Join(srcRoot, relSrcPath) err = fs.CopyObject(srcBucket, srcFullPath, destBucket, path.Join(destRoot, relSrcPath)) if err != nil { if fs.IsNotFoundError(err) { - log.Errorf(" Sync backup source file not found: s3://%v/%v", srcBucket, srcFullPath) + jobLog.Errorf(" Sync backup source file not found: s3://%v/%v", srcBucket, srcFullPath) } else { - log.Errorf(" Sync error reading read s3://%v/%v: %v", srcBucket, srcFullPath, err) + jobLog.Errorf(" Sync error reading read s3://%v/%v: %v", srcBucket, srcFullPath, err) } //return err } diff --git a/core/fileaccess/s3.go b/core/fileaccess/s3.go index 854ce4a2..fa96d855 100644 --- a/core/fileaccess/s3.go +++ b/core/fileaccess/s3.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "io" + "path" "strings" "github.com/aws/aws-sdk-go/aws" @@ -136,15 +137,17 @@ func (s3Access S3Access) ReadObjectStream(bucket string, path string) (io.ReadCl } func (s3Access S3Access) WriteObjectStream(bucket string, path string, stream io.Reader) error { - /*input := &s3.PutObjectInput{ - Body: stream, + uploader := s3manager.NewUploaderWithClient(s3Access.s3Api) + + upParams := &s3manager.UploadInput{ Bucket: aws.String(bucket), Key: aws.String(path), + Body: stream, } - _, err := s3Access.s3Api.PutObject(input) - return err*/ - return fmt.Errorf("Not implemented") + // Perform an upload + _, err := uploader.Upload(upParams) + return err } func (s3Access S3Access) ReadJSON(bucket string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error { @@ -193,7 +196,7 @@ func (s3Access S3Access) CopyObject(srcBucket string, srcPath string, dstBucket input := &s3.CopyObjectInput{ Bucket: aws.String(dstBucket), Key: aws.String(dstPath), - CopySource: aws.String(srcBucket + "/" + srcPath), + CopySource: aws.String(path.Join(srcBucket, srcPath)), } _, err := s3Access.s3Api.CopyObject(input) return err