Skip to content

Commit

Permalink
feat: use labels for long data availability (#365)
Browse files Browse the repository at this point in the history
* feat: use vec

* chore: use opts timeout

* feat: add failed_download_attemps_metric

* feat: added retreived metric

* feat: added download opts
  • Loading branch information
acha-bill authored Nov 15, 2023
1 parent af93de6 commit f70ba8d
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 35 deletions.
24 changes: 17 additions & 7 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/ethersphere/beekeeper"
)

const (
apiVersion = "v1"
contentType = "application/json; charset=utf-8"
postageStampBatchHeader = "Swarm-Postage-Batch-Id"
deferredUploadHeader = "Swarm-Deferred-Upload"
swarmPinHeader = "Swarm-Pin"
swarmTagHeader = "Swarm-Tag"
apiVersion = "v1"
contentType = "application/json; charset=utf-8"
postageStampBatchHeader = "Swarm-Postage-Batch-Id"
deferredUploadHeader = "Swarm-Deferred-Upload"
swarmPinHeader = "Swarm-Pin"
swarmTagHeader = "Swarm-Tag"
swarmCacheDownloadHeader = "Swarm-Cache"
)

var userAgent = "beekeeper/" + beekeeper.Version
Expand Down Expand Up @@ -171,7 +173,7 @@ func encodeJSON(w io.Writer, v interface{}) (err error) {
}

// requestData handles the HTTP request response cycle.
func (c *Client) requestData(ctx context.Context, method, path string, body io.Reader, v interface{}) (resp io.ReadCloser, err error) {
func (c *Client) requestData(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, err error) {
req, err := http.NewRequest(method, path, body)
if err != nil {
return nil, err
Expand All @@ -191,6 +193,10 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R
req.Header.Set("Authorization", "Bearer "+key)
}

if opts != nil && opts.Cache != nil {
req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache))
}

r, err := c.httpClient.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -309,3 +315,7 @@ type UploadOptions struct {
BatchID string
Direct bool
}

type DownloadOptions struct {
Cache *bool
}
4 changes: 2 additions & 2 deletions pkg/bee/api/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type FilesService service

// Download downloads data from the node
func (f *FilesService) Download(ctx context.Context, a swarm.Address) (resp io.ReadCloser, err error) {
return f.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/bzz/"+a.String(), nil, nil)
func (f *FilesService) Download(ctx context.Context, a swarm.Address, opts *DownloadOptions) (resp io.ReadCloser, err error) {
return f.client.requestData(ctx, http.MethodGet, "/"+apiVersion+"/bzz/"+a.String(), nil, opts)
}

// FilesUploadResponse represents Upload's response
Expand Down
6 changes: 3 additions & 3 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func (c *Client) DownloadChunk(ctx context.Context, a swarm.Address, targets str
return io.ReadAll(r)
}

// DownloadFile downloads chunk from the node and returns it's size and hash
func (c *Client) DownloadFile(ctx context.Context, a swarm.Address) (size int64, hash []byte, err error) {
r, err := c.api.Files.Download(ctx, a)
// DownloadFile downloads chunk from the node and returns it's size and hash.
func (c *Client) DownloadFile(ctx context.Context, a swarm.Address, opts *api.DownloadOptions) (size int64, hash []byte, err error) {
r, err := c.api.Files.Download(ctx, a, opts)
if err != nil {
return 0, nil, fmt.Errorf("download file %s: %w", a, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/check/fileretrieval/fileretrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *Check) defaultCheck(ctx context.Context, cluster orchestration.Cluster,

client = clients[lastNodeName]

size, hash, err := client.DownloadFile(ctx, file.Address())
size, hash, err := client.DownloadFile(ctx, file.Address(), nil)
if err != nil {
return fmt.Errorf("node %s: %w", lastNodeName, err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Check) fullCheck(ctx context.Context, cluster orchestration.Cluster, o
}

t1 := time.Now()
size, hash, err := nc.DownloadFile(ctx, file.Address())
size, hash, err := nc.DownloadFile(ctx, file.Address(), nil)
if err != nil {
return fmt.Errorf("node %s: %w", n, err)
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/check/longavailability/longavailability.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
Expand Down Expand Up @@ -43,7 +44,7 @@ type Check struct {
func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
logger: logger,
metrics: newMetrics("check_longavailability"),
metrics: newMetrics("check_longavailability", []string{"ref"}),
}
}

Expand Down Expand Up @@ -73,35 +74,42 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interf
}

for _, addr := range addresses {
labelValue := addr.String()
node, err := findRandomNode(ctx, addr, cluster, opts.RndSeed)
if err != nil {
c.logger.Errorf("find node %s. Skipping. %w", addr.String(), err)
continue
}

c.metrics.DownloadAttempts.WithLabelValues(labelValue).Inc()

for i := 0; i <= opts.RetryCount; i++ {
if i == opts.RetryCount {
c.logger.Errorf("node %s: download for %s failed after %d tries", node.Name(), addr, opts.RetryCount)
c.metrics.DownloadErrors.WithLabelValues(labelValue).Inc()
c.metrics.DownloadStatus.WithLabelValues(labelValue).Set(0)
break
}

c.metrics.DownloadAttempts.Inc()
c.logger.Infof("node %s: download attempt %d for %s", node.Name(), i+1, addr)

start := time.Now()
size, _, err := node.Client().DownloadFile(ctx, addr)
cache := false
size, _, err := node.Client().DownloadFile(ctx, addr, &api.DownloadOptions{Cache: &cache})
if err != nil {
c.metrics.DownloadErrors.Inc()
c.metrics.FailedDownloadAttempts.WithLabelValues(labelValue).Inc()
c.logger.Errorf("node %s: download %s error: %v", node.Name(), addr, err)
c.logger.Infof("retrying in: %v", opts.RetryWait)
time.Sleep(opts.RetryWait)
continue
}
c.logger.Infof("download size %d", size)
c.metrics.DownloadSize.Set(float64(size))
c.metrics.DownloadSize.WithLabelValues(labelValue).Set(float64(size))

dur := time.Since(start)
c.metrics.DownloadDuration.Observe(dur.Seconds())
c.metrics.DownloadDuration.WithLabelValues(labelValue).Observe(dur.Seconds())
c.logger.Infof("node %s: downloaded %s successfully in %v", node.Name(), addr, dur)
c.metrics.DownloadStatus.WithLabelValues(labelValue).Set(1)
c.metrics.Retrieved.WithLabelValues(labelValue).Inc()
break
}
}
Expand Down
61 changes: 49 additions & 12 deletions pkg/check/longavailability/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,79 @@ import (
)

type metrics struct {
DownloadErrors prometheus.Counter
DownloadAttempts prometheus.Counter
DownloadDuration prometheus.Histogram
DownloadSize prometheus.Gauge
DownloadErrors *prometheus.CounterVec
DownloadAttempts *prometheus.CounterVec
Retrieved *prometheus.CounterVec
FailedDownloadAttempts *prometheus.CounterVec
DownloadDuration *prometheus.HistogramVec
DownloadSize *prometheus.GaugeVec
DownloadStatus *prometheus.GaugeVec
}

func newMetrics(subsystem string) metrics {
func newMetrics(subsystem string, labels []string) metrics {
return metrics{
DownloadAttempts: prometheus.NewCounter(
DownloadAttempts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_attempts",
Help: "Number of download attempts.",
}),
DownloadErrors: prometheus.NewCounter(
},
labels,
),
Retrieved: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "retrieved",
Help: "Number of successful downloads.",
},
labels,
),
FailedDownloadAttempts: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "failed_download_attempts",
Help: "Number of failed download attempts.",
},
labels,
),
DownloadErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_errors_count",
Help: "The total number of errors encountered before successful download.",
}),
DownloadDuration: prometheus.NewHistogram(
},
labels,
),
DownloadDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "d_download_duration_seconds",
Help: "Data download duration through the /bytes endpoint.",
}),
DownloadSize: prometheus.NewGauge(
},
labels,
),
DownloadSize: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "d_download_size_bytes",
Help: "Amount of data downloaded per download.",
},
labels,
),
DownloadStatus: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "d_download_status",
Help: "Download status.",
},
labels,
),
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/check/settlements/settlements.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int
dNode := pickAtRandom(rnd, sortedNodes, uNode)
c.logger.Infof("download starting from %s", overlays[dNode].String())

size, hash, err := clients[dNode].DownloadFile(ctx, file.Address())
size, hash, err := clients[dNode].DownloadFile(ctx, file.Address(), nil)
if err != nil {
return fmt.Errorf("node %s: %w", dNode, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/test/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (b *BeeV2) UploadFile(ctx context.Context, file File) error {
}

func (b *BeeV2) ExpectToHaveFile(ctx context.Context, file File) error {
size, hash, err := b.client.DownloadFile(ctx, file.address)
size, hash, err := b.client.DownloadFile(ctx, file.address, nil)
if err != nil {
return fmt.Errorf("node %s: %w", b.name, err)
}
Expand Down

0 comments on commit f70ba8d

Please sign in to comment.