Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: data durability check #370

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ checks:
retry-wait: 1m
retry-count: 3
type: longavailability
datadurability:
options:
ref:
concurrency:
type: datadurability

# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
Expand Down
5 changes: 5 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,8 @@ checks:
retry-wait: 1m
retry-count: 3
type: longavailability
ci-datadurability:
options:
ref:
concurrency:
type: datadurability
6 changes: 3 additions & 3 deletions pkg/bee/api/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
type ChunksService service

// Download downloads data from the node
func (c *ChunksService) Download(ctx context.Context, a swarm.Address, targets string) (resp io.ReadCloser, err error) {
func (c *ChunksService) Download(ctx context.Context, a swarm.Address, targets string, opts *DownloadOptions) (resp io.ReadCloser, err error) {
if targets == "" {
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String(), nil, nil)
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String(), nil, opts)
}

return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String()+"?targets="+targets, nil, nil)
return c.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/chunks/"+a.String()+"?targets="+targets, nil, opts)
}

// ChunksUploadResponse represents Upload's response
Expand Down
15 changes: 13 additions & 2 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func (c *Client) DownloadBytes(ctx context.Context, a swarm.Address) (data []byt
}

// DownloadChunk downloads chunk from the node
func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets string) (data []byte, err error) {
r, err := c.api.Chunks.Download(ctx, a, targets)
func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets string, opts *api.DownloadOptions) (data []byte, err error) {
r, err := c.api.Chunks.Download(ctx, a, targets, opts)
if err != nil {
return nil, fmt.Errorf("download chunk %s: %w", a, err)
}
Expand All @@ -204,6 +204,17 @@ func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets str
return io.ReadAll(r)
}

// DownloadFileBytes downloads a flie from the node and returns the data.
func (c *Client) DownloadFileBytes(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (data []byte, err error) {
r, err := c.api.Files.Download(ctx, a, opts)
if err != nil {
return nil, fmt.Errorf("download file %s: %w", a, err)
}
defer r.Close()

return io.ReadAll(r)
}

// DownloadFile downloads chunk from the node and returns it's size and hash.
func (c *Client) DownloadFile(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (size int64, hash []byte, err error) {
r, err := c.api.Files.Download(ctx, a, opts)
Expand Down
187 changes: 187 additions & 0 deletions pkg/check/datadurability/datadurability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package datadurability

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"math/rand"
"slices"
"sync"
"time"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
)

type Options struct {
Ref string
RndSeed int64
Concurrency int
}

func NewDefaultOptions() Options {
return Options{
RndSeed: time.Now().UnixNano(),
Concurrency: 10,
}
}

var _ beekeeper.Action = (*Check)(nil)

// Check instance
type Check struct {
metrics metrics
logger logging.Logger
}

// NewCheck returns new check
func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
logger: logger,
metrics: newMetrics("check_data_durability"),
}
}

// Run runs the check
// It downloads a file that contains a list of chunks and then attempts to download each chunk in the file.
func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) error {
opts, ok := o.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}
rnd := random.PseudoGenerator(opts.RndSeed)
ref, err := swarm.ParseHexAddress(opts.Ref)
if err != nil {
return fmt.Errorf("parse hex ref %s: %w", opts.Ref, err)
}

d, err := fetchFile(ctx, ref, cluster, rnd)
if err != nil {
return fmt.Errorf("fetch file from random node: %w", err)
}

refs, err := parseFile(bytes.NewReader(d))
if err != nil {
return fmt.Errorf("parse file: %w", err)
}
rootRef, chunkRefs := refs[0], refs[1:]

nodes, err := findRandomEligibleNodes(ctx, rootRef, cluster)
if err != nil {
return fmt.Errorf("get random node: %w", err)
}

once := sync.Once{}
fileStart := time.Now()
c.metrics.ChunksCount.Set(float64(len(chunkRefs)))

var wg sync.WaitGroup
wg.Add(len(chunkRefs))
limitCh := make(chan struct{}, opts.Concurrency)
var fileAttemptCounted bool

for i, ref := range chunkRefs {
node := nodes[i%len(nodes)] // distribute evenly
limitCh <- struct{}{}

go func(i int, ref swarm.Address, node orchestration.Node) {
defer func() {
<-limitCh
wg.Done()
}()
c.metrics.ChunkDownloadAttempts.Inc()
cache := false
chunkStart := time.Now()
d, err = node.Client().DownloadChunk(ctx, ref, "", &api.DownloadOptions{Cache: &cache})
if err != nil {
c.logger.Errorf("download failed. %s (%d of %d). chunk=%s node=%s err=%v", percentage(i, len(chunkRefs)), i, len(chunkRefs), ref, node.Name(), err)
c.metrics.ChunkDownloadErrors.Inc()
once.Do(func() {
c.metrics.FileDownloadAttempts.Inc()
fileAttemptCounted = true
c.metrics.FileDownloadErrors.Inc()
})
return
}
dur := time.Since(chunkStart)
c.logger.Infof("download successful. %s (%d of %d) chunk=%s node=%s dur=%v", percentage(i, len(chunkRefs)), i, len(chunkRefs), ref, node.Name(), dur)
c.metrics.ChunkDownloadDuration.Observe(dur.Seconds())
c.metrics.FileSize.Add(float64(len(d)))
}(i, ref, node)
}

wg.Wait()
if !fileAttemptCounted {
c.metrics.FileDownloadAttempts.Inc()
}
dur := time.Since(fileStart)
c.logger.Infof("done. dur=%v", dur)
c.metrics.FileDownloadDuration.Observe(dur.Seconds())
return nil
}

func percentage(a, b int) string {
return fmt.Sprintf("%.2f%%", float64(a)/float64(b)*100)
}

func fetchFile(ctx context.Context, ref swarm.Address, cluster orchestration.Cluster, rnd *rand.Rand) ([]byte, error) {
node, err := cluster.RandomNode(ctx, rnd)
if err != nil {
return nil, fmt.Errorf("random node: %w", err)
}
d, err := node.Client().DownloadFileBytes(ctx, ref, nil)
if err != nil {
return nil, fmt.Errorf("download bytes: %w", err)
}
return d, nil
}

// parseFile returns the list of references in the reader.
// It expects a list of swarm hashes where the 1st line is the root reference
// and the following lines are the individual chunk references.§
func parseFile(r io.Reader) ([]swarm.Address, error) {
var refs []swarm.Address
scanner := bufio.NewScanner(r)
for scanner.Scan() {
line := scanner.Text()
ref, err := swarm.ParseHexAddress(line)
if err != nil {
return nil, fmt.Errorf("parse hex ref %s: %w", line, err)
}
refs = append(refs, ref)
}

if len(refs) < 2 {
return nil, fmt.Errorf("invalid file format. Expected at least 1 line")
}
return refs, nil
}

// findRandomEligibleNodes finds nodes where the root ref is not pinned.
func findRandomEligibleNodes(ctx context.Context, rootRef swarm.Address, cluster orchestration.Cluster) ([]orchestration.Node, error) {
nodes := cluster.Nodes()
var eligible []orchestration.Node
for _, node := range nodes {
pins, err := node.Client().GetPins(ctx)
if err != nil {
return nil, fmt.Errorf("get pins. node=%s, err=%w", node.Name(), err)
}
found := slices.ContainsFunc(pins, func(ref swarm.Address) bool {
return ref.Equal(rootRef)
})
if !found {
eligible = append(eligible, node)
}
}

if len(eligible) == 0 {
return nil, fmt.Errorf("no eligible node found")
}
return eligible, nil
}
90 changes: 90 additions & 0 deletions pkg/check/datadurability/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package datadurability

import (
m "github.com/ethersphere/beekeeper/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type metrics struct {
ChunkDownloadAttempts prometheus.Counter
ChunkDownloadErrors prometheus.Counter
ChunkDownloadDuration prometheus.Histogram
ChunksCount prometheus.Gauge
FileDownloadAttempts prometheus.Counter
FileDownloadErrors prometheus.Counter
FileSize prometheus.Counter
FileDownloadDuration prometheus.Histogram
}

func newMetrics(subsystem string) metrics {
return metrics{
ChunkDownloadAttempts: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_attempts",
Help: "Number of download attempts for the chunks.",
},
),
ChunkDownloadErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_errors",
Help: "Number of download errors for the chunks.",
},
),
ChunkDownloadDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunk_download_duration_seconds",
Help: "Chunk download duration through the /chunks endpoint.",
},
),
ChunksCount: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "chunks_count",
Help: "The number of chunks in the check",
},
),
FileDownloadAttempts: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_attempts",
Help: "Number of download attempts for the file.",
},
),
FileDownloadErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_errors",
Help: "Number of download errors for the file.",
},
),
FileSize: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_size_bytes",
Help: "The size of the file downloaded (sum of chunk sizes)",
},
),
FileDownloadDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "file_download_duration_seconds",
Help: "File download duration",
},
),
}
}

func (c *Check) Report() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(c.metrics)
}
2 changes: 1 addition & 1 deletion pkg/check/soc/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int

c.logger.Infof("soc: chunk uploaded to node %s", nodeName)

retrieved, err := node.DownloadChunk(ctx, ref, "")
retrieved, err := node.DownloadChunk(ctx, ref, "", nil)
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/check/authenticated"
"github.com/ethersphere/beekeeper/pkg/check/balances"
"github.com/ethersphere/beekeeper/pkg/check/cashout"
"github.com/ethersphere/beekeeper/pkg/check/datadurability"
"github.com/ethersphere/beekeeper/pkg/check/fileretrieval"
"github.com/ethersphere/beekeeper/pkg/check/fullconnectivity"
"github.com/ethersphere/beekeeper/pkg/check/gc"
Expand Down Expand Up @@ -509,6 +510,26 @@ var Checks = map[string]CheckType{
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
"datadurability": {
NewAction: datadurability.NewCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
RndSeed *int64 `yaml:"rnd-seed"`
Ref *string `yaml:"ref"`
Concurrency *int `yaml:"concurrency"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
}
opts := datadurability.NewDefaultOptions()

if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil {
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/simulate/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func downloadChunks(ctx context.Context, o Options, uploadCount int, client *bee
for i := 0; i < int(o.DownloadRetry); i++ {
count := 0
for _, chunk := range chunks {
_, err := client.DownloadChunk(ctx, chunk.Address(), "")
_, err := client.DownloadChunk(ctx, chunk.Address(), "", nil)
if err == nil {
count++
}
Expand Down
Loading