Skip to content

Commit

Permalink
Improve performance for upload and download. Improve progress bar.
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexAkulov committed May 10, 2019
1 parent 8f4bb32 commit 1868a00
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 38 deletions.
66 changes: 66 additions & 0 deletions pbar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
progressbar "gopkg.in/cheggaaa/pb.v1"
"io"
)

type Bar struct {
pb *progressbar.ProgressBar
show bool
}

func StartNewByteBar(show bool, total int64) *Bar {
if show {
return &Bar{
show: true,
pb: progressbar.StartNew(int(total)).SetUnits(progressbar.U_BYTES),
}
}
return &Bar{
show: false,
}
}

func StartNewBar(show bool, total int) *Bar {
if show {
return &Bar{
show: true,
pb: progressbar.StartNew(total),
}
}
return &Bar{
show: false,
}
}

func (b *Bar) FinishPrint(str string) {
if b.show {
b.pb.FinishPrint(str)
}
}

func (b *Bar) Add64(add int64) {
if b.show {
b.pb.Add64(add)
}
}

func (b *Bar) Set(current int) {
if b.show {
b.pb.Set(current)
}
}

func (b *Bar) Increment() {
if b.show {
b.pb.Increment()
}
}

func (b *Bar) NewProxyReader(r io.Reader) io.Reader {
if b.show {
return b.pb.NewProxyReader(r)
}
return r
}
85 changes: 47 additions & 38 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"archive/tar"
"bufio"
"crypto/md5"
"encoding/json"
"fmt"
Expand All @@ -24,10 +25,12 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/mholt/archiver"
pb "gopkg.in/cheggaaa/pb.v1"
)

const MetaFileName = "meta.json"
const (
MetaFileName = "meta.json"
BufferSize = 1024 * 1024 * 100
)

// S3 - presents methods for manipulate data on s3
type S3 struct {
Expand Down Expand Up @@ -89,9 +92,15 @@ func (s *S3) CompressedStreamDownload(s3Path, localPath string) error {
return err
}
defer r.Close()
filesize, err := s.GetSize(archiveName)
if err != nil {
return err
}
bar := StartNewByteBar(!s.Config.DisableProgressBar, filesize)
buffer := bufio.NewReaderSize(r, BufferSize)
proxyReader := bar.NewProxyReader(buffer)
z, _ := getArchiveReader(s.Config.CompressionFormat)

if err := z.Open(r, 0); err != nil {
if err := z.Open(proxyReader, 0); err != nil {
return err
}
defer z.Close()
Expand Down Expand Up @@ -143,7 +152,6 @@ func (s *S3) CompressedStreamDownload(s3Path, localPath string) error {
if err != nil && !os.IsExist(err) {
return fmt.Errorf("can't download '%s' with %v", metafile.RequiredBackup, err)
}
log.Printf(" Done.")
}
for _, hardlink := range metafile.Hardlinks {
newname := filepath.Join(localPath, hardlink)
Expand All @@ -156,22 +164,19 @@ func (s *S3) CompressedStreamDownload(s3Path, localPath string) error {
return err
}
}
bar.FinishPrint("Done.")
return nil
}

func (s *S3) CompressedStreamUpload(localPath, s3Path, diffFromPath string) error {
var bar *pb.ProgressBar
if !s.Config.DisableProgressBar {
var filesCount int
filepath.Walk(localPath, func(filePath string, info os.FileInfo, err error) error {
if info.Mode().IsRegular() {
filesCount++
}
return nil
})
bar = pb.StartNew(filesCount)
defer bar.FinishPrint("Done.")
}
var totalBytes int64
filepath.Walk(localPath, func(filePath string, info os.FileInfo, err error) error {
if info.Mode().IsRegular() {
totalBytes = totalBytes + info.Size()
}
return nil
})
bar := StartNewByteBar(!s.Config.DisableProgressBar, totalBytes)
if diffFromPath != "" {
fi, err := os.Stat(diffFromPath)
if err != nil {
Expand All @@ -191,8 +196,10 @@ func (s *S3) CompressedStreamUpload(localPath, s3Path, diffFromPath string) erro
return err
}
defer w.Close()
b := bufio.NewWriterSize(w, BufferSize)
defer b.Flush()
z, _ := getArchiveWriter(s.Config.CompressionFormat, s.Config.CompressionLevel)
if err := z.Create(w); err != nil {
if err := z.Create(b); err != nil {
return err
}
defer z.Close()
Expand All @@ -201,9 +208,7 @@ func (s *S3) CompressedStreamUpload(localPath, s3Path, diffFromPath string) erro
if !info.Mode().IsRegular() {
return nil
}
if !s.Config.DisableProgressBar {
bar.Increment()
}
bar.Add64(info.Size())
file, err := os.Open(filePath)
if err != nil {
return err
Expand Down Expand Up @@ -267,22 +272,33 @@ func (s *S3) CompressedStreamUpload(localPath, s3Path, diffFromPath string) erro
return fmt.Errorf("can't add mata.json to archive with %v", err)
}
}
bar.FinishPrint("Done.")
return nil
}

func (s *S3) GetSize(filepath string) (int64, error) {
svc := s3.New(s.session)

result, err := svc.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s.Config.Bucket),
Key: aws.String(filepath),
})
if err != nil {
return 0, err
}
return *result.ContentLength, nil
}

// UploadDirectory - synchronize localPath to dstPath on s3
func (s *S3) UploadDirectory(localPath string, dstPath string) error {
// TODO: it must be refactored like as Download() method
iter, filesForDelete, err := s.newSyncFolderIterator(localPath, dstPath)
if err != nil {
return err
}
var bar *pb.ProgressBar
if !s.Config.DisableProgressBar {
bar = pb.StartNew(len(iter.fileInfos) + iter.skipFilesCount)
bar.Set(iter.skipFilesCount)
defer bar.FinishPrint("Done.")
}
bar := StartNewBar(!s.Config.DisableProgressBar, len(iter.fileInfos)+iter.skipFilesCount)
bar.Set(iter.skipFilesCount)
defer bar.FinishPrint("Done.")

uploader := s3manager.NewUploader(s.session)
uploader.PartSize = s.Config.PartSize
Expand All @@ -299,9 +315,7 @@ func (s *S3) UploadDirectory(localPath string, dstPath string) error {
errs = append(errs, s3Err)
}
}
if !s.Config.DisableProgressBar {
bar.Increment()
}
bar.Increment()
if object.After == nil {
continue
}
Expand Down Expand Up @@ -378,16 +392,11 @@ func (s *S3) DownloadTree(s3Path string, localPath string) error {
if err != nil {
return err
}
var bar *pb.ProgressBar
if !s.Config.DisableProgressBar {
bar = pb.StartNew(len(s3Files))
defer bar.FinishPrint("Done.")
}
bar := StartNewBar(!s.Config.DisableProgressBar, len(s3Files))
defer bar.FinishPrint("Done.")
downloader := s3manager.NewDownloader(s.session)
for _, s3File := range s3Files {
if !s.Config.DisableProgressBar {
bar.Increment()
}
bar.Increment()
if existsFile, ok := localFiles[s3File.key]; ok {
if existsFile.size == s3File.size {
switch s.Config.OverwriteStrategy {
Expand Down

0 comments on commit 1868a00

Please sign in to comment.