Skip to content

Commit

Permalink
feat: data durability check
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Nov 30, 2023
1 parent 70cfa94 commit b5acecc
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 9 deletions.
4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ checks:
retry-wait: 1m
retry-count: 3
type: longavailability
datadurabiliity:
options:
ref:
type: datadurabiliity

# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
Expand Down
4 changes: 4 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,7 @@ checks:
retry-wait: 1m
retry-count: 3
type: longavailability
ci-datadurability:
options:
ref:
type: datadurability
6 changes: 3 additions & 3 deletions pkg/bee/api/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
type ChunksService service

// Download downloads data from the node
func (c *ChunksService) Download(ctx context.Context, a swarm.Address, targets string) (resp io.ReadCloser, err error) {
func (c *ChunksService) Download(ctx context.Context, a swarm.Address, targets string, opts *DownloadOptions) (resp io.ReadCloser, err error) {
if targets == "" {
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String(), nil, nil)
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String(), nil, opts)
}

return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String()+"?targets="+targets, nil, nil)
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String()+"?targets="+targets, nil, opts)
}

// ChunksUploadResponse represents Upload's response
Expand Down
4 changes: 2 additions & 2 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func (c *Client) DownloadBytes(ctx context.Context, a swarm.Address) (data []byt
}

// DownloadChunk downloads chunk from the node
func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets string) (data []byte, err error) {
r, err := c.api.Chunks.Download(ctx, a, targets)
func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets string, opts *api.DownloadOptions) (data []byte, err error) {
r, err := c.api.Chunks.Download(ctx, a, targets, opts)
if err != nil {
return nil, fmt.Errorf("download chunk %s: %w", a, err)
}
Expand Down
170 changes: 170 additions & 0 deletions pkg/check/datadurability/datadurability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package datadurability

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"math/rand"
"slices"
"sync"
"time"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
)

type Options struct {
Ref string
RndSeed int64
}

func NewDefaultOptions() Options {
return Options{
RndSeed: time.Now().UnixNano(),
}
}

var _ beekeeper.Action = (*Check)(nil)

// Check instance
type Check struct {
metrics metrics
logger logging.Logger
}

// NewCheck returns new check
func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
logger: logger,
metrics: newMetrics("check_data_durability", []string{"ref"}),
}
}

// Run runs the check
// It downloads a file that contains a list of chunks and then attempts to download each chunk in the file.
func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) error {
opts, ok := o.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}
rnd := random.PseudoGenerator(opts.RndSeed)
ref, err := swarm.ParseHexAddress(opts.Ref)
if err != nil {
return fmt.Errorf("parse hex ref %s: %w", opts.Ref, err)
}

d, err := fetchFileFromRandomNode(ctx, ref, cluster, rnd)
if err != nil {
return fmt.Errorf("fetch file from random node: %w", err)
}

refs, err := parseFile(bytes.NewReader(d))
if err != nil {
return fmt.Errorf("parse file: %w", err)
}
rootRef, chunkRefs := refs[0], refs[1:]

node, err := findRandomNode(ctx, rootRef, cluster, rnd)
if err != nil {
return fmt.Errorf("get random node: %w", err)
}

c.logger.Infof("downloading chunks. node=%s chunks=%d", node.Name(), len(chunkRefs))
var totalDownloadSize int64
once := sync.Once{}
fileStart := time.Now()
for i, ref := range chunkRefs {
c.logger.Infof("%d of %d. chunk=%s", i+1, len(chunkRefs), ref)

labelValue := ref.String()
c.metrics.ChunkDownloadAttempts.WithLabelValues(labelValue).Inc()
c.metrics.FileDownloadAttempts.Inc()

cache := false
chunkStart := time.Now()
d, err = node.Client().DownloadChunk(ctx, ref, "", &api.DownloadOptions{Cache: &cache})
if err != nil {
c.logger.Errorf("download failed. chunk=%s err=%v", ref, err)
c.metrics.ChunkDownloadErrors.WithLabelValues(labelValue).Inc()
once.Do(func() {
c.metrics.FileDownloadErrors.Inc()
})
continue
}
dur := time.Since(chunkStart)
c.logger.Infof("download successful. chunk=%s dur=%v", ref, dur)
c.metrics.ChunkDownloadDuration.WithLabelValues(labelValue).Observe(dur.Seconds())
totalDownloadSize = totalDownloadSize + int64(len(d))
}

c.metrics.FileSize.Set(float64(totalDownloadSize))
c.metrics.ChunksCount.Set(float64(len(chunkRefs)))
dur := time.Since(fileStart)
c.logger.Infof("done. dur=%v", dur)
c.metrics.FileDownloadDuration.Observe(dur.Seconds())
return nil
}

func fetchFileFromRandomNode(ctx context.Context, ref swarm.Address, cluster orchestration.Cluster, rnd *rand.Rand) ([]byte, error) {
node, err := cluster.RandomNode(ctx, rnd)
if err != nil {
return nil, fmt.Errorf("random node: %w", err)
}
d, err := node.Client().DownloadBytes(ctx, ref)
if err != nil {
return nil, fmt.Errorf("download bytes: %w", err)
}
return d, nil
}

// parseFile returns the list of references in the reader.
// It expects a list of swarm hashes where the 1st line is the root reference
// and the following lines are the individual chunk references.§
func parseFile(r io.Reader) ([]swarm.Address, error) {
var refs []swarm.Address
scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
ref, err := swarm.ParseHexAddress(line)
if err != nil {
return nil, fmt.Errorf("parse hex ref %s: %w", line, err)
}
refs = append(refs, ref)
}

if len(refs) < 2 {
return nil, fmt.Errorf("invalid file format. Expected at least 1 line")
}
return refs, nil
}

// findRandomNode finds a random node where the root ref is not pinned.
func findRandomNode(ctx context.Context, rootRef swarm.Address, cluster orchestration.Cluster, rnd *rand.Rand) (orchestration.Node, error) {
nodes := cluster.Nodes()
var eligible []string
for name, node := range nodes {
pins, err := node.Client().GetPins(ctx)
if err != nil {
return nil, fmt.Errorf("get pins. node=%s, err=%w", name, err)
}
found := slices.ContainsFunc(pins, func(ref swarm.Address) bool {
return ref.Equal(rootRef)
})
if !found {
eligible = append(eligible, name)
}
}

if len(eligible) == 0 {
return nil, fmt.Errorf("no eligible node found")
}

node := nodes[eligible[rnd.Intn(len(eligible))]]
return node, nil
}
93 changes: 93 additions & 0 deletions pkg/check/datadurability/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package datadurability

import (
m "github.com/ethersphere/beekeeper/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type metrics struct {
ChunkDownloadAttempts *prometheus.CounterVec
ChunkDownloadErrors *prometheus.CounterVec
ChunkDownloadDuration *prometheus.HistogramVec
ChunksCount prometheus.Gauge
FileDownloadAttempts prometheus.Counter
FileDownloadErrors prometheus.Counter
FileSize prometheus.Gauge
FileDownloadDuration prometheus.Histogram
}

func newMetrics(subsystem string, labels []string) metrics {
return metrics{
ChunkDownloadAttempts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_attempts",
Help: "Number of download attempts for the chunk.",
},
labels,
),
ChunkDownloadErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_errors",
Help: "Number of download errors for the chunk.",
},
labels,
),
ChunkDownloadDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_duration_seconds",
Help: "Chunk download duration through the /chunks endpoint.",
},
labels,
),
ChunksCount: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_count",
Help: "The number of chunks in the check",
},
),
FileDownloadAttempts: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_attempts",
Help: "Number of download attempts for the file.",
},
),
FileDownloadErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_errors",
Help: "Number of download errors for the file.",
},
),
FileSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_size_bytes",
Help: "The size of the file downloaded (sum of chunk sizes)",
},
),
FileDownloadDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_duration_seconds",
Help: "File download duration",
},
),
}
}

func (c *Check) Report() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(c.metrics)
}
2 changes: 1 addition & 1 deletion pkg/check/soc/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

c.logger.Infof("soc: chunk uploaded to node %s", nodeName)

retrieved, err := node.DownloadChunk(ctx, ref, "")
retrieved, err := node.DownloadChunk(ctx, ref, "", nil)
if err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/check/authenticated"
"github.com/ethersphere/beekeeper/pkg/check/balances"
"github.com/ethersphere/beekeeper/pkg/check/cashout"
"github.com/ethersphere/beekeeper/pkg/check/datadurability"
"github.com/ethersphere/beekeeper/pkg/check/fileretrieval"
"github.com/ethersphere/beekeeper/pkg/check/fullconnectivity"
"github.com/ethersphere/beekeeper/pkg/check/gc"
Expand Down Expand Up @@ -509,6 +510,25 @@ var Checks = map[string]CheckType{
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
"datadurability": {
NewAction: datadurability.NewCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
RndSeed *int64 `yaml:"rnd-seed"`
Ref *string `yaml:"ref"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
}
opts := datadurability.NewDefaultOptions()

if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil {
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/simulate/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func downloadChunks(ctx context.Context, o Options, uploadCount int, client *bee
for i := 0; i < int(o.DownloadRetry); i++ {
count := 0
for _, chunk := range chunks {
_, err := client.DownloadChunk(ctx, chunk.Address(), "")
_, err := client.DownloadChunk(ctx, chunk.Address(), "", nil)
if err == nil {
count++
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/simulate/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *Simulation) Run(ctx context.Context, cluster orchestration.Cluster, opt

// download chunk
t2 := time.Now()
data, err := clients[downloadNode].DownloadChunk(ctx, ref, "")
data, err := clients[downloadNode].DownloadChunk(ctx, ref, "", nil)
d2 := time.Since(t2)
if err != nil {
s.metrics.NotDownloadedCounter.WithLabelValues(overlays[downloadNode].String()).Inc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (b *BeeV2) Restricted() bool {
}

func (b *BeeV2) DownloadChunk(ctx context.Context, ref swarm.Address) ([]byte, error) {
return b.client.DownloadChunk(ctx, ref, "")
return b.client.DownloadChunk(ctx, ref, "", nil)
}

// NewRandomFile returns new pseudorandom file
Expand Down

0 comments on commit b5acecc

Please sign in to comment.