Skip to content

Commit

Permalink
Feat/act (#408)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferencsarai authored Aug 6, 2024
1 parent 3dbde09 commit 7fe190f
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 16 deletions.
10 changes: 10 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ checks:
options:
timeout: 5m
type: pingpong
act:
options:
file-name: act
file-size: 1024
postage-depth: 20
postage-amount: 420000000
postage-label: act-label
seed: 0
timeout: 5m
type: act
withdraw:
options:
target-address: 0xec44cb15b1b033e74d55ac5d0e24d861bde54532
Expand Down
9 changes: 9 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ bee-configs:

# CI checks
checks:
ci-act:
options:
file-size: 1024
postage-depth: 20
postage-amount: 420000000
postage-label: act-label
seed: 0
timeout: 5m
type: act
ci-full-connectivity:
timeout: 5m
type: full-connectivity
Expand Down
63 changes: 63 additions & 0 deletions pkg/bee/api/act.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package api

import (
"context"
"io"
"net/http"
"net/url"

"github.com/ethersphere/bee/pkg/swarm"
)

type ActService service

type ActUploadResponse struct {
Reference swarm.Address `json:"reference"`
HistoryAddress swarm.Address
}

type ActGranteesResponse struct {
Reference swarm.Address `json:"ref"`
HistoryAddress swarm.Address `json:"historyref"`
}

func (a *ActService) Download(ctx context.Context, addr swarm.Address, opts *DownloadOptions) (resp io.ReadCloser, err error) {
return a.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/bzz/"+addr.String()+"/", nil, opts)
}

func (a *ActService) Upload(ctx context.Context, name string, data io.Reader, o UploadOptions) (ActUploadResponse, error) {
var resp ActUploadResponse
h := http.Header{}
h.Add(postageStampBatchHeader, o.BatchID)
h.Add("swarm-deferred-upload", "true")
h.Add("content-type", "application/octet-stream")
h.Add("Swarm-Act", "true")
h.Add(swarmPinHeader, "true")
historyParser := func(h http.Header) {
resp.HistoryAddress, _ = swarm.ParseHexAddress(h.Get("Swarm-Act-History-Address"))
}
err := a.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bzz?"+url.QueryEscape("name="+name), h, data, &resp, historyParser)
return resp, err
}

func (a *ActService) AddGrantees(ctx context.Context, data io.Reader, o UploadOptions) (ActGranteesResponse, error) {
var resp ActGranteesResponse
h := http.Header{}
h.Add(postageStampBatchHeader, o.BatchID)
h.Add(swarmActHistoryAddress, o.ActHistoryAddress.String())
err := a.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/grantee", h, data, &resp)
return resp, err
}

func (a *ActService) GetGrantees(ctx context.Context, addr swarm.Address) (resp io.ReadCloser, err error) {
return a.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/grantee/"+addr.String(), nil, nil)
}

func (a *ActService) PatchGrantees(ctx context.Context, data io.Reader, addr swarm.Address, haddr swarm.Address, batchID string) (ActGranteesResponse, error) {
var resp ActGranteesResponse
h := http.Header{}
h.Add("swarm-postage-batch-id", batchID)
h.Add("swarm-act-history-address", haddr.String())
err := a.client.requestWithHeader(ctx, http.MethodPatch, "/"+apiVersion+"/grantee/"+addr.String(), h, data, &resp)
return resp, err
}
48 changes: 37 additions & 11 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ import (
"strconv"
"strings"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper"
)

const (
apiVersion = "v1"
contentType = "application/json; charset=utf-8"
contentType = "application/json, text/plain, */*; charset=utf-8"
postageStampBatchHeader = "Swarm-Postage-Batch-Id"
deferredUploadHeader = "Swarm-Deferred-Upload"
swarmAct = "Swarm-Act"
swarmActHistoryAddress = "Swarm-Act-History-Address"
swarmActPublisher = "Swarm-Act-Publisher"
swarmActTimestamp = "Swarm-Act-Timestamp"
swarmPinHeader = "Swarm-Pin"
swarmTagHeader = "Swarm-Tag"
swarmCacheDownloadHeader = "Swarm-Cache"
Expand All @@ -33,6 +38,7 @@ type Client struct {
service service // Reuse a single struct instead of allocating one for each service on the heap.

// Services that API provides.
Act *ActService
Bytes *BytesService
Chunks *ChunksService
Files *FilesService
Expand Down Expand Up @@ -71,6 +77,7 @@ func NewClient(baseURL *url.URL, o *ClientOptions) (c *Client) {
func newClient(httpClient *http.Client) (c *Client) {
c = &Client{httpClient: httpClient}
c.service.client = c
c.Act = (*ActService)(&c.service)
c.Bytes = (*BytesService)(&c.service)
c.Chunks = (*ChunksService)(&c.service)
c.Files = (*FilesService)(&c.service)
Expand Down Expand Up @@ -179,14 +186,28 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
req.Header.Set("Content-Type", contentType)
}
req.Header.Set("Accept", contentType)
// ACT
if opts != nil {
if opts.Act != nil {
req.Header.Set(swarmAct, strconv.FormatBool(*opts.Act))
}
if opts.ActHistoryAddress != nil {
req.Header.Set(swarmActHistoryAddress, (*opts.ActHistoryAddress).String())
}
if opts.ActPublicKey != nil {
req.Header.Set(swarmActPublisher, (*opts.ActPublicKey).String())
}
if opts.ActTimestamp != nil {
req.Header.Set(swarmActTimestamp, strconv.FormatUint(*opts.ActTimestamp, 10))
}
}

if opts != nil && opts.Cache != nil {
req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache))
}
if opts != nil && opts.RedundancyFallbackMode != nil {
req.Header.Set(swarmRedundancyFallbackMode, strconv.FormatBool(*opts.RedundancyFallbackMode))
}

r, err := c.httpClient.Do(req)
if err != nil {
return nil, err
Expand All @@ -200,7 +221,7 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
}

// requestWithHeader handles the HTTP request response cycle.
func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}) (err error) {
func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}, headerParser ...func(http.Header)) (err error) {
req, err := http.NewRequest(method, path, body)
if err != nil {
return err
Expand All @@ -215,12 +236,11 @@ func (c *Client) requestWithHeader(ctx context.Context, method, path string, hea
return err
}

if err = responseErrorHandler(r); err != nil {
return err
}

if v != nil && strings.Contains(r.Header.Get("Content-Type"), "application/json") {
_ = json.NewDecoder(r.Body).Decode(&v)
for _, parser := range headerParser {
parser(r.Header)
}
return err
}

Expand Down Expand Up @@ -292,13 +312,19 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
}

type UploadOptions struct {
Pin bool
Tag uint64
BatchID string
Direct bool
Act bool
Pin bool
Tag uint64
BatchID string
Direct bool
ActHistoryAddress swarm.Address
}

type DownloadOptions struct {
Act *bool
ActHistoryAddress *swarm.Address
ActPublicKey *swarm.Address
ActTimestamp *uint64
Cache *bool
RedundancyFallbackMode *bool
}
65 changes: 65 additions & 0 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -223,6 +224,21 @@ func (c *Client) DownloadFile(ctx context.Context, a swarm.Address, opts *api.Do
return size, h.Sum(nil), nil
}

func (c *Client) DownloadActFile(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (size int64, hash []byte, err error) {
r, err := c.api.Act.Download(ctx, a, opts)
if err != nil {
return 0, nil, fmt.Errorf("download file %s: %w", a, err)
}
defer r.Close()
h := fileHasher()
size, err = io.Copy(h, r)
if err != nil {
return 0, nil, fmt.Errorf("download file %s, hashing copy: %w", a, err)
}

return size, h.Sum(nil), nil
}

// HasChunk returns true/false if node has a chunk
func (c *Client) HasChunk(ctx context.Context, a swarm.Address) (bool, error) {
return c.api.Node.HasChunk(ctx, a)
Expand Down Expand Up @@ -750,6 +766,55 @@ func (c *Client) UploadFile(ctx context.Context, f *File, o api.UploadOptions) (
return
}

func (c *Client) UploadActFile(ctx context.Context, f *File, o api.UploadOptions) (err error) {
h := fileHasher()
r, err := c.api.Act.Upload(ctx, f.Name(), io.TeeReader(f.DataReader(), h), o)
if err != nil {
return fmt.Errorf("upload ACT file: %w", err)
}

f.SetAddress(r.Reference)
f.SetHistroryAddress(r.HistoryAddress)
f.SetHash(h.Sum(nil))

return nil
}

func (c *Client) AddActGrantees(ctx context.Context, f *File, o api.UploadOptions) (err error) {
h := fileHasher()
r, err := c.api.Act.AddGrantees(ctx, io.TeeReader(f.DataReader(), h), o)
if err != nil {
return fmt.Errorf("add ACT grantees: %w", err)
}

f.SetAddress(r.Reference)
f.SetHistroryAddress(r.HistoryAddress)
f.SetHash(h.Sum(nil))

return nil
}

func (c *Client) GetActGrantees(ctx context.Context, a swarm.Address) (addresses []string, err error) {
r, e := c.api.Act.GetGrantees(ctx, a)
if e != nil {
return nil, fmt.Errorf("get grantees: %s: %w", a, e)
}
defer r.Close()
err = json.NewDecoder(r).Decode(&addresses)
return addresses, err
}

func (c *Client) PatchActGrantees(ctx context.Context, pf *File, addr swarm.Address, haddr swarm.Address, batchID string) (err error) {
r, err := c.api.Act.PatchGrantees(ctx, pf.DataReader(), addr, haddr, batchID)
if err != nil {
return fmt.Errorf("add ACT grantees: %w", err)
}

pf.SetAddress(r.Reference)
pf.SetHistroryAddress(r.HistoryAddress)
return nil
}

// UploadCollection uploads TAR collection bytes to the node
func (c *Client) UploadCollection(ctx context.Context, f *File, o api.UploadOptions) (err error) {
h := fileHasher()
Expand Down
19 changes: 14 additions & 5 deletions pkg/bee/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (

// File represents Bee file
type File struct {
address swarm.Address
name string
hash []byte
dataReader io.Reader
size int64
address swarm.Address
name string
hash []byte
dataReader io.Reader
size int64
historyAddress swarm.Address
}

// NewRandomFile returns new pseudorandom file
Expand Down Expand Up @@ -62,6 +63,10 @@ func (f *File) Address() swarm.Address {
return f.address
}

func (f *File) HistroryAddress() swarm.Address {
return f.historyAddress
}

// Name returns file's name
func (f *File) Name() string {
return f.name
Expand Down Expand Up @@ -109,6 +114,10 @@ func (f *File) SetAddress(a swarm.Address) {
f.address = a
}

func (f *File) SetHistroryAddress(a swarm.Address) {
f.historyAddress = a
}

func (f *File) SetHash(h []byte) {
f.hash = h
}
Expand Down
Loading

0 comments on commit 7fe190f

Please sign in to comment.