Skip to content

Commit

Permalink
feat: check long availability (#359)
Browse files Browse the repository at this point in the history
* feat: check long availability

* fix: use simple metrics

* fix: remove download timeout

* fix: use ErrorF

* chore: do not use pinned node

* fix: metrics

* fix: metrics

* chore: add download size metric

* fix: long running

* fix: review comments
  • Loading branch information
acha-bill authored Oct 30, 2023
1 parent f6d8292 commit 1f6e3b0
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ checks:
timeout: 5m
options:
amount: 1000000000000000000
longavailability:
options:
refs:
retry-wait: 1m
retry-count: 3
type: longavailability

# simulations defines simulations Beekeeper can execute against the cluster
# type filed allows defining same simulation with different names and options
Expand Down
6 changes: 6 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,9 @@ checks:
private-key: "4663c222787e30c1994b59044aa5045377a6e79193a8ead88293926b535c722d"
geth-url: "http://geth-swap.localhost"
geth-chain-id: 12345
ci-longavailability:
options:
refs:
retry-wait: 1m
retry-count: 3
type: longavailability
138 changes: 138 additions & 0 deletions pkg/check/longavailability/longavailability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package longavailability

import (
"context"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
)

type Options struct {
Refs []string
RndSeed int64
RetryCount int
RetryWait time.Duration
NextIterWait time.Duration
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
RndSeed: time.Now().UnixNano(),
RetryCount: 3,
RetryWait: 10 * time.Second,
NextIterWait: 6 * time.Hour,
}
}

// compile check whether Check implements interface
var _ beekeeper.Action = (*Check)(nil)

// Check instance
type Check struct {
metrics metrics
logger logging.Logger
}

// NewCheck returns new check
func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
logger: logger,
metrics: newMetrics("check_longavailability"),
}
}

func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interface{}) error {
opts, ok := o.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}

var addresses []swarm.Address
for _, ref := range opts.Refs {
addr, err := swarm.ParseHexAddress(ref)
if err != nil {
return fmt.Errorf("parse hex address: %w", err)
}
addresses = append(addresses, addr)
}

var it int
for {
it++
select {
case <-ctx.Done():
return nil
default:
c.logger.Infof("iteration %d", it)
}

for _, addr := range addresses {
node, err := findRandomNode(ctx, addr, cluster, opts.RndSeed)
if err != nil {
c.logger.Errorf("find node %s. Skipping. %w", addr.String(), err)
continue
}

for i := 0; i <= opts.RetryCount; i++ {
if i == opts.RetryCount {
c.logger.Errorf("node %s: download for %s failed after %d tries", node.Name(), addr, opts.RetryCount)
break
}

c.metrics.DownloadAttempts.Inc()
c.logger.Infof("node %s: download attempt %d for %s", node.Name(), i+1, addr)

start := time.Now()
size, _, err := node.Client().DownloadFile(ctx, addr)
if err != nil {
c.metrics.DownloadErrors.Inc()
c.logger.Errorf("node %s: download %s error: %v", node.Name(), addr, err)
c.logger.Infof("retrying in: %v", opts.RetryWait)
time.Sleep(opts.RetryWait)
continue
}
c.logger.Infof("download size %d", size)
c.metrics.DownloadSize.Set(float64(size))
dur := time.Since(start)
c.metrics.DownloadDuration.Observe(dur.Seconds())
c.logger.Infof("node %s: downloaded %s successfully in %v", node.Name(), addr, dur)
break
}
}

c.logger.Infof("iteration %d completed", it)
c.logger.Infof("sleeping for %v", opts.NextIterWait)
time.Sleep(opts.NextIterWait)
}
}

func findRandomNode(ctx context.Context, addr swarm.Address, cluster orchestration.Cluster, randSeed int64) (orchestration.Node, error) {
nodes := cluster.Nodes()
var eligible []string
for name, node := range nodes {
pins, err := node.Client().GetPins(ctx)
if err != nil {
return nil, fmt.Errorf("node %s: get pins: %w", name, err)
}
var found bool
for _, pin := range pins {
if pin.Equal(addr) {
found = true
break
}
}
if !found {
eligible = append(eligible, name)
}
}

rnd := random.PseudoGenerator(randSeed)
node := nodes[eligible[rnd.Intn(len(eligible))]]
return node, nil
}
51 changes: 51 additions & 0 deletions pkg/check/longavailability/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package longavailability

import (
m "github.com/ethersphere/beekeeper/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

type metrics struct {
DownloadErrors prometheus.Counter
DownloadAttempts prometheus.Counter
DownloadDuration prometheus.Histogram
DownloadSize prometheus.Gauge
}

func newMetrics(subsystem string) metrics {
return metrics{
DownloadAttempts: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_attempts",
Help: "Number of download attempts.",
}),
DownloadErrors: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_errors_count",
Help: "The total number of errors encountered before successful download.",
}),
DownloadDuration: prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "d_download_duration_seconds",
Help: "Data download duration through the /bytes endpoint.",
}),
DownloadSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "d_download_size_bytes",
Help: "Amount of data downloaded per download.",
},
),
}
}

func (c *Check) Report() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(c.metrics)
}
23 changes: 23 additions & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/check/fullconnectivity"
"github.com/ethersphere/beekeeper/pkg/check/gc"
"github.com/ethersphere/beekeeper/pkg/check/kademlia"
"github.com/ethersphere/beekeeper/pkg/check/longavailability"
"github.com/ethersphere/beekeeper/pkg/check/manifest"
"github.com/ethersphere/beekeeper/pkg/check/peercount"
"github.com/ethersphere/beekeeper/pkg/check/pingpong"
Expand Down Expand Up @@ -489,6 +490,28 @@ var Checks = map[string]CheckType{
return opts, nil
},
},
"longavailability": {
NewAction: longavailability.NewCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
RndSeed *int64 `yaml:"rnd-seed"`
RetryCount *int64 `yaml:"retry-count"`
RetryWait *time.Duration `yaml:"retry-wait"`
Refs *[]string `yaml:"refs"`
NextIterWait *time.Duration `yaml:"next-iter-wait"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
}
opts := longavailability.NewDefaultOptions()

if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil {
return nil, fmt.Errorf("applying options: %w", err)
}

return opts, nil
},
},
}

// applyCheckConfig merges global and local options into default options
Expand Down

0 comments on commit 1f6e3b0

Please sign in to comment.