Skip to content

Commit

Permalink
feat: data durability check (#370)
Browse files Browse the repository at this point in the history
* feat: data durability

* chore: wip

* feat: add concurrency opts

* fix: retry on failed initial fetch
  • Loading branch information
acha-bill authored Dec 14, 2023
1 parent 70cfa94 commit 7c87f18
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 9 deletions.
6 changes: 6 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ checks:
retry-wait: 1m
retry-count: 3
type: longavailability
datadurability:
options:
ref:
concurrency:
max-attempts:
type: datadurability

# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
Expand Down
6 changes: 6 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,9 @@ checks:
retry-wait: 1m
retry-count: 3
type: longavailability
ci-datadurability:
options:
ref:
concurrency:
max-attempts:
type: datadurability
6 changes: 3 additions & 3 deletions pkg/bee/api/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
type ChunksService service

// Download downloads data from the node
func (c *ChunksService) Download(ctx context.Context, a swarm.Address, targets string) (resp io.ReadCloser, err error) {
func (c *ChunksService) Download(ctx context.Context, a swarm.Address, targets string, opts *DownloadOptions) (resp io.ReadCloser, err error) {
if targets == "" {
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String(), nil, nil)
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String(), nil, opts)
}

return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String()+"?targets="+targets, nil, nil)
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String()+"?targets="+targets, nil, opts)
}

// ChunksUploadResponse represents Upload's response
Expand Down
15 changes: 13 additions & 2 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func (c *Client) DownloadBytes(ctx context.Context, a swarm.Address) (data []byt
}

// DownloadChunk downloads chunk from the node
func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets string) (data []byte, err error) {
r, err := c.api.Chunks.Download(ctx, a, targets)
func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets string, opts *api.DownloadOptions) (data []byte, err error) {
r, err := c.api.Chunks.Download(ctx, a, targets, opts)
if err != nil {
return nil, fmt.Errorf("download chunk %s: %w", a, err)
}
Expand All @@ -204,6 +204,17 @@ func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets str
return io.ReadAll(r)
}

// DownloadFileBytes downloads a flie from the node and returns the data.
func (c *Client) DownloadFileBytes(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (data []byte, err error) {
r, err := c.api.Files.Download(ctx, a, opts)
if err != nil {
return nil, fmt.Errorf("download file %s: %w", a, err)
}
defer r.Close()

return io.ReadAll(r)
}

// DownloadFile downloads chunk from the node and returns it's size and hash.
func (c *Client) DownloadFile(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (size int64, hash []byte, err error) {
r, err := c.api.Files.Download(ctx, a, opts)
Expand Down
191 changes: 191 additions & 0 deletions pkg/check/datadurability/datadurability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package datadurability

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"slices"
"sync"
"time"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
)

type Options struct {
Ref string
Concurrency int
MaxAttempts int
}

func NewDefaultOptions() Options {
return Options{
Concurrency: 10,
MaxAttempts: 10,
}
}

var _ beekeeper.Action = (*Check)(nil)

// Check instance
type Check struct {
metrics metrics
logger logging.Logger
}

// NewCheck returns new check
func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
logger: logger,
metrics: newMetrics("check_data_durability"),
}
}

// Run runs the check
// It downloads a file that contains a list of chunks and then attempts to download each chunk in the file.
func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) error {
opts, ok := o.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}
ref, err := swarm.ParseHexAddress(opts.Ref)
if err != nil {
return fmt.Errorf("parse hex ref %s: %w", opts.Ref, err)
}

d, err := fetchFile(ctx, c.logger, ref, cluster, opts.MaxAttempts)
if err != nil {
return fmt.Errorf("fetch file: %w", err)
}

refs, err := parseFile(bytes.NewReader(d))
if err != nil {
return fmt.Errorf("parse file: %w", err)
}
rootRef, chunkRefs := refs[0], refs[1:]

nodes, err := findRandomEligibleNodes(ctx, rootRef, cluster)
if err != nil {
return fmt.Errorf("get random node: %w", err)
}

once := sync.Once{}
fileStart := time.Now()
c.metrics.ChunksCount.Set(float64(len(chunkRefs)))

var wg sync.WaitGroup
wg.Add(len(chunkRefs))
limitCh := make(chan struct{}, opts.Concurrency)
var fileAttemptCounted bool

for i, ref := range chunkRefs {
node := nodes[i%len(nodes)] // distribute evenly
limitCh <- struct{}{}

go func(i int, ref swarm.Address, node orchestration.Node) {
defer func() {
<-limitCh
wg.Done()
}()
c.metrics.ChunkDownloadAttempts.Inc()
cache := false
chunkStart := time.Now()
d, err = node.Client().DownloadChunk(ctx, ref, "", &api.DownloadOptions{Cache: &cache})
if err != nil {
c.logger.Errorf("download failed. %s (%d of %d). chunk=%s node=%s err=%v", percentage(i, len(chunkRefs)), i, len(chunkRefs), ref, node.Name(), err)
c.metrics.ChunkDownloadErrors.Inc()
once.Do(func() {
c.metrics.FileDownloadAttempts.Inc()
fileAttemptCounted = true
c.metrics.FileDownloadErrors.Inc()
})
return
}
dur := time.Since(chunkStart)
c.logger.Infof("download successful. %s (%d of %d) chunk=%s node=%s dur=%v", percentage(i, len(chunkRefs)), i, len(chunkRefs), ref, node.Name(), dur)
c.metrics.ChunkDownloadDuration.Observe(dur.Seconds())
c.metrics.FileSize.Add(float64(len(d)))
}(i, ref, node)
}

wg.Wait()
if !fileAttemptCounted {
c.metrics.FileDownloadAttempts.Inc()
}
dur := time.Since(fileStart)
c.logger.Infof("done. dur=%v", dur)
c.metrics.FileDownloadDuration.Observe(dur.Seconds())
return nil
}

func percentage(a, b int) string {
return fmt.Sprintf("%.2f%%", float64(a)/float64(b)*100)
}

func fetchFile(ctx context.Context, logger logging.Logger, ref swarm.Address, cluster orchestration.Cluster, maxAttempts int) ([]byte, error) {
var nodes []orchestration.Node
for _, node := range cluster.Nodes() {
nodes = append(nodes, node)
}

for i := 0; i < maxAttempts; i++ {
node := nodes[i%len(nodes)]
d, err := node.Client().DownloadFileBytes(ctx, ref, nil)
if err != nil {
logger.Infof("node: %s failed to fetch file: %v", node.Name(), err)
continue
}
return d, nil

}
return nil, fmt.Errorf("failed to fetch file after %d attempts", maxAttempts)
}

// parseFile returns the list of references in the reader.
// It expects a list of swarm hashes where the 1st line is the root reference
// and the following lines are the individual chunk references.§
func parseFile(r io.Reader) ([]swarm.Address, error) {
var refs []swarm.Address
scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
ref, err := swarm.ParseHexAddress(line)
if err != nil {
return nil, fmt.Errorf("parse hex ref %s: %w", line, err)
}
refs = append(refs, ref)
}

if len(refs) < 2 {
return nil, fmt.Errorf("invalid file format. Expected at least 1 line")
}
return refs, nil
}

// findRandomEligibleNodes finds nodes where the root ref is not pinned.
func findRandomEligibleNodes(ctx context.Context, rootRef swarm.Address, cluster orchestration.Cluster) ([]orchestration.Node, error) {
nodes := cluster.Nodes()
var eligible []orchestration.Node
for _, node := range nodes {
pins, err := node.Client().GetPins(ctx)
if err != nil {
return nil, fmt.Errorf("get pins. node=%s, err=%w", node.Name(), err)
}
found := slices.ContainsFunc(pins, func(ref swarm.Address) bool {
return ref.Equal(rootRef)
})
if !found {
eligible = append(eligible, node)
}
}

if len(eligible) == 0 {
return nil, fmt.Errorf("no eligible node found")
}
return eligible, nil
}
90 changes: 90 additions & 0 deletions pkg/check/datadurability/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package datadurability

import (
m "github.com/ethersphere/beekeeper/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type metrics struct {
ChunkDownloadAttempts prometheus.Counter
ChunkDownloadErrors prometheus.Counter
ChunkDownloadDuration prometheus.Histogram
ChunksCount prometheus.Gauge
FileDownloadAttempts prometheus.Counter
FileDownloadErrors prometheus.Counter
FileSize prometheus.Counter
FileDownloadDuration prometheus.Histogram
}

func newMetrics(subsystem string) metrics {
return metrics{
ChunkDownloadAttempts: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_attempts",
Help: "Number of download attempts for the chunks.",
},
),
ChunkDownloadErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_errors",
Help: "Number of download errors for the chunks.",
},
),
ChunkDownloadDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_duration_seconds",
Help: "Chunk download duration through the /chunks endpoint.",
},
),
ChunksCount: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_count",
Help: "The number of chunks in the check",
},
),
FileDownloadAttempts: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_attempts",
Help: "Number of download attempts for the file.",
},
),
FileDownloadErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_errors",
Help: "Number of download errors for the file.",
},
),
FileSize: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_size_bytes",
Help: "The size of the file downloaded (sum of chunk sizes)",
},
),
FileDownloadDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_duration_seconds",
Help: "File download duration",
},
),
}
}

func (c *Check) Report() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(c.metrics)
}
2 changes: 1 addition & 1 deletion pkg/check/soc/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

c.logger.Infof("soc: chunk uploaded to node %s", nodeName)

retrieved, err := node.DownloadChunk(ctx, ref, "")
retrieved, err := node.DownloadChunk(ctx, ref, "", nil)
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/check/authenticated"
"github.com/ethersphere/beekeeper/pkg/check/balances"
"github.com/ethersphere/beekeeper/pkg/check/cashout"
"github.com/ethersphere/beekeeper/pkg/check/datadurability"
"github.com/ethersphere/beekeeper/pkg/check/fileretrieval"
"github.com/ethersphere/beekeeper/pkg/check/fullconnectivity"
"github.com/ethersphere/beekeeper/pkg/check/gc"
Expand Down Expand Up @@ -509,6 +510,26 @@ var Checks = map[string]CheckType{
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
"datadurability": {
NewAction: datadurability.NewCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
Ref *string `yaml:"ref"`
Concurrency *int `yaml:"concurrency"`
MaxAttempts *int `yaml:"max-attempts"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
}
opts := datadurability.NewDefaultOptions()

if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil {
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
Expand Down
Loading

0 comments on commit 7c87f18

Please sign in to comment.