Skip to content

Commit

Permalink
Implemented S3 WriteObjectStream using s3manager Uploader. This allow…
Browse files Browse the repository at this point in the history
…s uploading large DB backup files (specifically memoisation) to be streamed up instead of reading into memory first and then uploading.
  • Loading branch information
Peter Nemere committed Dec 10, 2024
1 parent 5062f8c commit 23c34a0
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
4 changes: 2 additions & 2 deletions api/ws/wsHelpers/sync-mongo-dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func UploadArchive(svcs *services.APIServices) error {
svcs.Log.Infof(" Uploading: %v...", dbFile)

dbFilePath := path.Join(dataBackupLocalPath, dbFile)
dbFileBytes, err := os.ReadFile(dbFilePath)
dbFileObj, err := os.Open(dbFilePath)
if err != nil {
return fmt.Errorf("Failed to read local DB dump file: %v. Error: %v", dbFilePath, err)
}

// Upload to bucket
dbFilePathRemote := path.Join(dataBackupS3Path, dbFile)
err = svcs.FS.WriteObject(svcs.Config.DataBackupBucket, dbFilePathRemote, dbFileBytes)
err = svcs.FS.WriteObjectStream(svcs.Config.DataBackupBucket, dbFilePathRemote, dbFileObj)

if err != nil {
return fmt.Errorf("Failed to upload DB dump file: %v. Error: %v", dbFilePathRemote, err)
Expand Down
10 changes: 5 additions & 5 deletions api/ws/wsHelpers/sync-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func makeRelativePaths(fullPaths []string, root string) ([]string, error) {

// Requires source bucket, source root with srcRelativePaths which are relative to source root. This way the relative paths
// can be compared to dest paths, and only the ones not already in dest root are copied.
func syncFiles(srcBucket string, srcRoot string, srcRelativePaths []string, destBucket string, destRoot string, fs fileaccess.FileAccess, log logger.ILogger) error {
func syncFiles(srcBucket string, srcRoot string, srcRelativePaths []string, destBucket string, destRoot string, fs fileaccess.FileAccess, jobLog logger.ILogger) error {
// Get a listing from the destination
// NOTE: the returned paths contain destRoot at the start!
destFullFiles, err := fs.ListObjects(destBucket, destRoot)
Expand All @@ -208,21 +208,21 @@ func syncFiles(srcBucket string, srcRoot string, srcRelativePaths []string, dest
}
}

log.Infof(" Sync backup directory to %v: %v skipped (already at destination)...", destRoot, len(srcRelativePaths)-len(toCopyRelativePaths))
jobLog.Infof(" Sync backup directory to %v: %v skipped (already at destination)...", destRoot, len(srcRelativePaths)-len(toCopyRelativePaths))

// Copy all the files
for c, relSrcPath := range toCopyRelativePaths {
if c%100 == 0 {
log.Infof(" Sync backup directory to %v: %v of %v copied...", destRoot, c, len(toCopyRelativePaths))
jobLog.Infof(" Sync backup directory to %v: %v of %v copied...", destRoot, c, len(toCopyRelativePaths))
}

srcFullPath := path.Join(srcRoot, relSrcPath)
err = fs.CopyObject(srcBucket, srcFullPath, destBucket, path.Join(destRoot, relSrcPath))
if err != nil {
if fs.IsNotFoundError(err) {
log.Errorf(" Sync backup source file not found: s3://%v/%v", srcBucket, srcFullPath)
jobLog.Errorf(" Sync backup source file not found: s3://%v/%v", srcBucket, srcFullPath)
} else {
log.Errorf(" Sync error reading read s3://%v/%v: %v", srcBucket, srcFullPath, err)
jobLog.Errorf(" Sync error reading read s3://%v/%v: %v", srcBucket, srcFullPath, err)
}
//return err
}
Expand Down
15 changes: 9 additions & 6 deletions core/fileaccess/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io"
"path"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -136,15 +137,17 @@ func (s3Access S3Access) ReadObjectStream(bucket string, path string) (io.ReadCl
}

func (s3Access S3Access) WriteObjectStream(bucket string, path string, stream io.Reader) error {
/*input := &s3.PutObjectInput{
Body: stream,
uploader := s3manager.NewUploaderWithClient(s3Access.s3Api)

upParams := &s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
Body: stream,
}

_, err := s3Access.s3Api.PutObject(input)
return err*/
return fmt.Errorf("Not implemented")
// Perform an upload
_, err := uploader.Upload(upParams)
return err
}

func (s3Access S3Access) ReadJSON(bucket string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error {
Expand Down Expand Up @@ -193,7 +196,7 @@ func (s3Access S3Access) CopyObject(srcBucket string, srcPath string, dstBucket
input := &s3.CopyObjectInput{
Bucket: aws.String(dstBucket),
Key: aws.String(dstPath),
CopySource: aws.String(srcBucket + "/" + srcPath),
CopySource: aws.String(path.Join(srcBucket, srcPath)),
}
_, err := s3Access.s3Api.CopyObject(input)
return err
Expand Down

0 comments on commit 23c34a0

Please sign in to comment.