Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use streaming compression for files. #47

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/llama/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *StoreCommand) Execute(ctx context.Context, flag *flag.FlagSet, _ ...int
return subcommands.ExitFailure
}

id, err := global.MustStore().Store(ctx, bytes)
id, err := global.MustStore().StoreBytes(ctx, bytes)
if err != nil {
log.Printf("storing %q: %v\n", arg, err)
return subcommands.ExitFailure
Expand Down
56 changes: 29 additions & 27 deletions files/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package files
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
Expand Down Expand Up @@ -76,35 +75,38 @@ func (f List) Append(mapped ...Mapped) List {

func uploadWorker(ctx context.Context, store store.Store, jobs <-chan Mapped, out chan<- *protocol.FileAndPath) {
for file := range jobs {
data, mode, err := func() ([]byte, os.FileMode, error) {
if file.Local.Bytes != nil {
if file.Local.Path != "" {
panic("MappedFile: got both Path and Bytes")
}
return file.Local.Bytes, file.Local.Mode, nil
} else {
data, err := ioutil.ReadFile(file.Local.Path)
if err != nil {
return nil, 0, fmt.Errorf("reading file %q: %w", file.Local.Path, err)
}
st, err := os.Stat(file.Local.Path)
if err != nil {
return nil, 0, fmt.Errorf("stat %q: %w", file.Local.Path, err)
}
return data, st.Mode(), nil
}
}()
var blob *protocol.Blob
if err == nil {
blob, err = files.NewBlob(ctx, store, data)
}
if err != nil {
blob = &protocol.Blob{Err: err.Error()}
if file.Local.Bytes != nil && file.Local.Path != "" {
panic("MappedFile: got both Path and Bytes")
}
out <- &protocol.FileAndPath{
File: protocol.File{Blob: *blob, Mode: mode},

result := protocol.FileAndPath{
File: protocol.File{
Mode: file.Local.Mode,
},
Path: file.Remote,
}

if file.Local.Path != "" {
pfile, err := files.ReadFile(ctx, store, file.Local.Path)
switch err {
case nil:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use an if test for a nilness check.

result.File = *pfile
default:
result.File.Blob = protocol.Blob{Err: err.Error()}
}
}

if file.Local.Bytes != nil {
blob, err := files.NewBlob(ctx, store, file.Local.Bytes)
switch err {
case nil:
result.File.Blob = *blob
default:
result.File.Blob = protocol.Blob{Err: err.Error()}
}
}

out <- &result
}
}

Expand Down
12 changes: 5 additions & 7 deletions protocol/files/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewBlob(ctx context.Context, store store.Store, bytes []byte) (*protocol.Bl
if base64.StdEncoding.EncodedLen(len(bytes)) < protocol.MaxInlineBlob {
return &protocol.Blob{Bytes: bytes}, nil
}
id, err := store.Store(ctx, bytes)
id, err := store.StoreBytes(ctx, bytes)
if err != nil {
return nil, err
}
Expand All @@ -100,16 +100,14 @@ func ReadFile(ctx context.Context, store store.Store, path string) (*protocol.Fi
if fi.Mode().IsDir() {
return nil, errors.New("ReadFile: got directory")
}
bytes, err := ioutil.ReadAll(fh)
if err != nil {
return nil, err
}
blob, err := NewBlob(ctx, store, bytes)

id, err := store.Store(ctx, fh)
if err != nil {
return nil, err
}

return &protocol.File{
Blob: *blob,
Blob: protocol.Blob{Ref: id},
Mode: fi.Mode(),
}, nil
}
13 changes: 12 additions & 1 deletion store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package store
import (
"context"
"encoding/hex"
"io"
"io/ioutil"

"github.com/nelhage/llama/protocol"
"golang.org/x/crypto/blake2b"
Expand All @@ -26,7 +28,16 @@ type inMemory struct {
objects map[string][]byte
}

func (s *inMemory) Store(ctx context.Context, obj []byte) (string, error) {
func (s *inMemory) Store(ctx context.Context, obj io.Reader) (string, error) {
buf, err := ioutil.ReadAll(obj)
if err != nil {
return "", err
}

return s.StoreBytes(ctx, buf)
}

func (s *inMemory) StoreBytes(ctx context.Context, obj []byte) (string, error) {
sha := blake2b.Sum256(obj)
id := hex.EncodeToString(sha[:])
s.objects[id] = append([]byte(nil), obj...)
Expand Down
47 changes: 41 additions & 6 deletions store/s3store/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
Expand Down Expand Up @@ -133,9 +134,13 @@ func FromSessionAndOptions(s *session.Session, address string, opts Options) (*S
}, nil
}

func (s *Store) Store(ctx context.Context, obj []byte) (string, error) {
// StoreCompressedObj takes a compressed data object and stores it in S3,
// returning an object ID.
func (s *Store) StoreCompressedObj(ctx context.Context, obj []byte) (string, error) {
ctx, span := tracing.StartSpan(ctx, "s3.store")
defer span.End()

// The object ID is the hash of the compressed object.
id := storeutil.HashObject(obj) + ":zstd"

span.AddField("object_id", id)
Expand Down Expand Up @@ -170,23 +175,48 @@ func (s *Store) Store(ctx context.Context, obj []byte) (string, error) {
}
}

compressed := encode.EncodeAll(obj, nil)
span.AddField("s3.write_bytes", len(compressed))
span.AddField("s3.write_bytes", len(obj))

usage.WriteRequests += 1
_, err = s.s3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: bytes.NewReader(compressed),
Body: bytes.NewReader(obj),
Bucket: &s.url.Host,
Key: key,
})
if err != nil {
return "", err
}

s.metrics.XferIn += uint64(len(obj))
upload.Complete()
return id, nil
}

func (s *Store) Store(ctx context.Context, obj io.Reader) (string, error) {
compressed := bytes.Buffer{}

encoder, err := zstd.NewWriter(&compressed)
if err != nil {
return "", err
}

if _, err := io.Copy(encoder, obj); err != nil {
encoder.Close()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use defer encoder.Close() above to ensure this is closed on every exit path

return "", err
}

if err := encoder.Close(); err != nil {
return "", err
}

return s.StoreCompressedObj(ctx, compressed.Bytes())
}

func (s *Store) StoreBytes(ctx context.Context, obj []byte) (string, error) {
compressed := encode.EncodeAll(obj, nil)
return s.StoreCompressedObj(ctx, compressed)
}

const getConcurrency = 32

func (s *Store) getFromS3(ctx context.Context, id string, usage *usageMetrics) ([]byte, error) {
Expand Down Expand Up @@ -226,9 +256,11 @@ func (s *Store) decompress(id string, body []byte) (string, []byte, error) {
return expectHash, nil, fmt.Errorf("%q: unknown compression %s", id, coding)
}
var err error
inbytes := len(body)
body, err = decode.DecodeAll(body, nil)
if err != nil {
return expectHash, nil, fmt.Errorf("%q: decoding: %w", id, err)
return expectHash, nil,
fmt.Errorf("%q: decoding %d %s bytes: %w", id, inbytes, coding, err)
}
}
return expectHash, body, nil
Expand All @@ -247,15 +279,18 @@ func (s *Store) getOne(ctx context.Context, id string, usage *usageMetrics) ([]b
}
}

// The ID is generated from the compressed representation.
gotHash := storeutil.HashObject(body)

hash, body, err := s.decompress(id, body)
if err != nil {
return nil, err
}

gotHash := storeutil.HashObject(body)
if gotHash != hash {
return nil, fmt.Errorf("object store mismatch: got csum=%s expected %s", gotHash, id)
}

u := s.seen.StartUpload(id)
u.Complete()

Expand Down
5 changes: 4 additions & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package store
import (
"context"
"errors"
"io"

"github.com/nelhage/llama/protocol"
)
Expand All @@ -30,7 +31,9 @@ type GetRequest struct {
var ErrNotExists = errors.New("Requested object does not exist")

type Store interface {
Store(ctx context.Context, obj []byte) (string, error)
StoreBytes(ctx context.Context, obj []byte) (string, error)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default I would only have Store, and would make a StoreBytes helper somewhere that wraps the bytes in a bytes.Buffer.

If an implementation wants to do something more efficient when it knows it has a byte buffer, it can do a type assertion to *bytes.Buffer to check for that.

Store(ctx context.Context, obj io.Reader) (string, error)

GetObjects(ctx context.Context, gets []GetRequest)
FetchAWSUsage(u *protocol.UsageMetrics)
}
Expand Down