Skip to content

Commit

Permalink
Attempt to implement streaming of large S3 files to disk for DB impor…
Browse files Browse the repository at this point in the history
…ting (when restoring)
  • Loading branch information
Peter Nemere committed Nov 7, 2024
1 parent 311c404 commit e2f37ea
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
6 changes: 3 additions & 3 deletions api/ws/wsHelpers/sync-mongo-restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ func DownloadArchive(svcs *services.APIServices) (string, error) {
svcs.Log.Errorf(" Failed to get free disk bytes: %v", err)
}

svcs.Log.Infof(" Downloading: %v... free space: %v bytes", dbFile, freeBytes)
svcs.Log.Infof(" Downloading: %v... (%v bytes free)", dbFile, freeBytes)

dbFileBytes, err := svcs.FS.ReadObject(svcs.Config.DataBackupBucket, dbFile)
dbStream, err := svcs.FS.ReadObjectStream(svcs.Config.DataBackupBucket, dbFile)
if err != nil {
return "", fmt.Errorf("Failed to download remote DB dump file: %v. Error: %v", dbFile, err)
}

// Save locally
// Remove remote root dir
dbFilePathLocal := strings.TrimPrefix(dbFile, dataBackupS3Path+"/")
err = localFS.WriteObject(dataBackupLocalPath, dbFilePathLocal, dbFileBytes)
err = localFS.WriteObjectStream(dataBackupLocalPath, dbFilePathLocal, dbStream)

if err != nil {
return "", fmt.Errorf("Failed to write local DB dump file: %v. Error: %v", dbFilePathLocal, err)
Expand Down
10 changes: 9 additions & 1 deletion core/fileaccess/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
// available from
package fileaccess

import "strings"
import (
"io"
"strings"
)

// Generic interface for reading/writing files asynchronously
// We could have used OS level things but we want to be able to
Expand All @@ -45,6 +48,11 @@ type FileAccess interface {
// Writes a file as bytes
WriteObject(bucket string, path string, data []byte) error

// Reads a file as bytes (returning a stream)
ReadObjectStream(bucket string, path string) (io.ReadCloser, error)
// Writes a file from stream
WriteObjectStream(bucket string, path string, stream io.Reader) error

// Reads a file as JSON and decodes it into itemsPtr
ReadJSON(bucket string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error
// Writes itemsPtr as a JSON file
Expand Down
30 changes: 30 additions & 0 deletions core/fileaccess/localFileSystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,36 @@ func (fs *FSAccess) WriteObject(rootPath string, path string, data []byte) error
return os.WriteFile(fullPath, data, 0777)
}

func (fs *FSAccess) ReadObjectStream(rootPath string, path string) (io.ReadCloser, error) {
fullPath := fs.filePath(rootPath, path)
f, err := os.Open(fullPath)
if err != nil {
return nil, err
}

return f, nil
}

func (fs *FSAccess) WriteObjectStream(rootPath string, path string, stream io.Reader) error {
fullPath := fs.filePath(rootPath, path)

// Ensure any subdirs in between are created
createPath := filepath.Dir(fullPath)
err := os.MkdirAll(createPath, 0777)
if err != nil {
return err
}

outFile, err := os.Create(fullPath)
if err != nil {
return err
}
defer outFile.Close()

_, err = io.Copy(outFile, stream)
return err
}

func (fs *FSAccess) ReadJSON(rootPath string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error {
fileData, err := fs.ReadObject(rootPath, s3Path)

Expand Down
26 changes: 26 additions & 0 deletions core/fileaccess/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ func (s3Access S3Access) WriteObject(bucket string, path string, data []byte) er
return err
}

func (s3Access S3Access) ReadObjectStream(bucket string, path string) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
}

result, err := s3Access.s3Api.GetObject(input)
if err != nil {
return nil, err
}

return result.Body, nil
}

func (s3Access S3Access) WriteObjectStream(bucket string, path string, stream io.Reader) error {
/*input := &s3.PutObjectInput{
Body: stream,
Bucket: aws.String(bucket),
Key: aws.String(path),
}
_, err := s3Access.s3Api.PutObject(input)
return err*/
return fmt.Errorf("Not implemented")
}

func (s3Access S3Access) ReadJSON(bucket string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error {
fileData, err := s3Access.ReadObject(bucket, s3Path)

Expand Down

0 comments on commit e2f37ea

Please sign in to comment.