Skip to content

Commit

Permalink
fix: long running
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Oct 26, 2023
1 parent f8ed27e commit 6b193a0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 37 deletions.
78 changes: 47 additions & 31 deletions pkg/check/longavailability/longavailability.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ import (
)

type Options struct {
Refs []string
RndSeed int64
RetryCount int
RetryWait time.Duration
Refs []string
RndSeed int64
RetryCount int
RetryWait time.Duration
NextIterWait time.Duration
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
RndSeed: time.Now().UnixNano(),
RetryCount: 3,
RetryWait: 10 * time.Second,
RndSeed: time.Now().UnixNano(),
RetryCount: 3,
RetryWait: 10 * time.Second,
NextIterWait: 30 * time.Minute,
}
}

Expand Down Expand Up @@ -60,37 +62,51 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, o interf
addresses = append(addresses, addr)
}

for _, addr := range addresses {
node, err := findRandomNode(ctx, addr, cluster, opts.RndSeed)
if err != nil {
c.logger.Errorf("find node %s. Skipping. %w", addr.String(), err)
continue
for it := 0; true; it++ {
select {
case <-ctx.Done():
return nil
default:
c.logger.Infof("iteration %d", it)
}

var i int
for i = 0; i < opts.RetryCount; i++ {
c.metrics.DownloadAttempts.Inc()
c.logger.Infof("node %s: download attempt %d for %s", node.Name(), i+1, addr)

start := time.Now()
size, _, err := node.Client().DownloadFile(ctx, addr)
for _, addr := range addresses {
node, err := findRandomNode(ctx, addr, cluster, opts.RndSeed)
if err != nil {
c.metrics.DownloadErrors.Inc()
c.logger.Errorf("node %s: download %s error: %v", node.Name(), addr, err)
c.logger.Infof("retrying in: %v", opts.RetryWait)
time.Sleep(opts.RetryWait)
c.logger.Errorf("find node %s. Skipping. %w", addr.String(), err)
continue
}
c.metrics.DownloadSize.Set(float64(size))
dur := time.Since(start)
c.metrics.DownloadDuration.Observe(dur.Seconds())
c.logger.Infof("node %s: downloaded %s successfully in %v", node.Name(), addr, dur)
break
}

if i >= opts.RetryCount {
c.logger.Errorf("node %s: download for %s failed: %v", node.Name(), addr, err)
var i int
for i = 0; i < opts.RetryCount; i++ {
c.metrics.DownloadAttempts.Inc()
c.logger.Infof("node %s: download attempt %d for %s", node.Name(), i+1, addr)

start := time.Now()
size, _, err := node.Client().DownloadFile(ctx, addr)
if err != nil {
c.metrics.DownloadErrors.Inc()
c.logger.Errorf("node %s: download %s error: %v", node.Name(), addr, err)
c.logger.Infof("retrying in: %v", opts.RetryWait)
time.Sleep(opts.RetryWait)
continue
}
c.logger.Infof("download size %d", size)
c.metrics.DownloadSize.Set(float64(size))
dur := time.Since(start)
c.metrics.DownloadDuration.Observe(dur.Seconds())
c.logger.Infof("node %s: downloaded %s successfully in %v", node.Name(), addr, dur)
break
}

if i >= opts.RetryCount {
c.logger.Errorf("node %s: download for %s failed after %d tries", node.Name(), addr, opts.RetryCount)
}
}

c.logger.Infof("iteration %d completed", it)
c.logger.Infof("sleeping for %v", opts.NextIterWait)
time.Sleep(opts.NextIterWait)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/check/longavailability/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func newMetrics(subsystem string) metrics {
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "data_download_duration",
Name: "d_download_duration_seconds",
Help: "Data download duration through the /bytes endpoint.",
}),
DownloadSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "download_size",
Name: "d_download_size_bytes",
Help: "Amount of data downloaded per download.",
},
),
Expand Down
9 changes: 5 additions & 4 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,11 @@ var Checks = map[string]CheckType{
NewAction: longavailability.NewCheck,
NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (interface{}, error) {
checkOpts := new(struct {
RndSeed *int64 `yaml:"rnd-seed"`
RetryCount *int64 `yaml:"retry-count"`
RetryWait *time.Duration `yaml:"retry-wait"`
Refs *[]string `yaml:"refs"`
RndSeed *int64 `yaml:"rnd-seed"`
RetryCount *int64 `yaml:"retry-count"`
RetryWait *time.Duration `yaml:"retry-wait"`
Refs *[]string `yaml:"refs"`
NextIterWait *time.Duration `yaml:"next-iter-wait"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
Expand Down

0 comments on commit 6b193a0

Please sign in to comment.