Skip to content

Commit

Permalink
Merge pull request #354 from sapcc/scrape-code-reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperSandro2000 authored Sep 12, 2023
2 parents aeac026 + ed2218a commit 5d77aa8
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 30 deletions.
11 changes: 11 additions & 0 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ func addJitter(duration time.Duration) time.Duration {
r := rand.Float64() //NOTE: 0 <= r < 1
return time.Duration(float64(duration) * (0.9 + 0.2*r))
}

// TaskTiming appears in the task types of our ProducerConsumerJobs.
type TaskTiming struct {
StartedAt time.Time //filled during DiscoverTask
FinishedAt time.Time //filled during ProcessTask
}

// Duration measures the duration of the main portion of a task.
func (t TaskTiming) Duration() time.Duration {
return t.FinishedAt.Sub(t.StartedAt)
}
14 changes: 7 additions & 7 deletions internal/collector/ratescrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var (
// a target service type is specified using the
// `jobloop.WithLabel("service_type", serviceType)` option.
func (c *Collector) RateScrapeJob(registerer prometheus.Registerer) jobloop.Job {
return (&jobloop.ProducerConsumerJob[scrapeTask]{
return (&jobloop.ProducerConsumerJob[projectScrapeTask]{
Metadata: jobloop.JobMetadata{
ReadableName: "scrape project rate usage",
CounterOpts: prometheus.CounterOpts{
Expand All @@ -86,14 +86,14 @@ func (c *Collector) RateScrapeJob(registerer prometheus.Registerer) jobloop.Job
},
CounterLabels: []string{"service_type", "service_name"},
},
DiscoverTask: func(_ context.Context, labels prometheus.Labels) (scrapeTask, error) {
DiscoverTask: func(_ context.Context, labels prometheus.Labels) (projectScrapeTask, error) {
return c.discoverScrapeTask(labels, findProjectForRateScrapeQuery)
},
ProcessTask: c.processRateScrapeTask,
}).Setup(registerer)
}

func (c *Collector) processRateScrapeTask(_ context.Context, task scrapeTask, labels prometheus.Labels) error {
func (c *Collector) processRateScrapeTask(_ context.Context, task projectScrapeTask, labels prometheus.Labels) error {
srv := task.Service
plugin := c.Cluster.QuotaPlugins[srv.Type] //NOTE: discoverScrapeTask already verified that this exists

Expand All @@ -109,7 +109,7 @@ func (c *Collector) processRateScrapeTask(_ context.Context, task scrapeTask, la
if err != nil {
task.Err = util.UnpackError(err)
}
task.FinishedAt = c.TimeNow()
task.Timing.FinishedAt = c.TimeNow()

//write result on success; if anything fails, try to record the error in the DB
if task.Err == nil {
Expand All @@ -121,7 +121,7 @@ func (c *Collector) processRateScrapeTask(_ context.Context, task scrapeTask, la
if task.Err != nil {
_, err := c.DB.Exec(
writeRateScrapeErrorQuery,
task.FinishedAt, task.FinishedAt.Add(c.AddJitter(recheckInterval)),
task.Timing.FinishedAt, task.Timing.FinishedAt.Add(c.AddJitter(recheckInterval)),
task.Err.Error(), srv.ID,
)
if err != nil {
Expand All @@ -137,7 +137,7 @@ func (c *Collector) processRateScrapeTask(_ context.Context, task scrapeTask, la
return fmt.Errorf("during rate scrape of project %s/%s: %w", project.Domain.Name, project.Name, task.Err)
}

func (c *Collector) writeRateScrapeResult(task scrapeTask, rateData map[string]*big.Int, ratesScrapeState string) error {
func (c *Collector) writeRateScrapeResult(task projectScrapeTask, rateData map[string]*big.Int, ratesScrapeState string) error {
srv := task.Service
plugin := c.Cluster.QuotaPlugins[srv.Type] //NOTE: discoverScrapeTask already verified that this exists

Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *Collector) writeRateScrapeResult(task scrapeTask, rateData map[string]*
//update rate scraping metadata and also reset the rates_stale flag on this
//service so that we don't scrape it again immediately afterwards
_, err = tx.Exec(writeRateScrapeSuccessQuery,
task.FinishedAt, task.FinishedAt.Add(c.AddJitter(scrapeInterval)), task.Duration().Seconds(),
task.Timing.FinishedAt, task.Timing.FinishedAt.Add(c.AddJitter(scrapeInterval)), task.Timing.Duration().Seconds(),
ratesScrapeState, srv.ID,
)
if err != nil {
Expand Down
40 changes: 17 additions & 23 deletions internal/collector/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var (
// a target service type is specified using the
// `jobloop.WithLabel("service_type", serviceType)` option.
func (c *Collector) ResourceScrapeJob(registerer prometheus.Registerer) jobloop.Job {
return (&jobloop.ProducerConsumerJob[scrapeTask]{
return (&jobloop.ProducerConsumerJob[projectScrapeTask]{
Metadata: jobloop.JobMetadata{
ReadableName: "scrape project quota and usage",
CounterOpts: prometheus.CounterOpts{
Expand All @@ -97,7 +97,7 @@ func (c *Collector) ResourceScrapeJob(registerer prometheus.Registerer) jobloop.
},
CounterLabels: []string{"service_type", "service_name"},
},
DiscoverTask: func(_ context.Context, labels prometheus.Labels) (scrapeTask, error) {
DiscoverTask: func(_ context.Context, labels prometheus.Labels) (projectScrapeTask, error) {
return c.discoverScrapeTask(labels, findProjectForResourceScrapeQuery)
},
ProcessTask: c.processResourceScrapeTask,
Expand All @@ -106,31 +106,25 @@ func (c *Collector) ResourceScrapeJob(registerer prometheus.Registerer) jobloop.

// This is the task type for ResourceScrapeJob and RateScrapeJob. The natural
// task type for these jobs is just db.ProjectService, but this more elaborate
// task type allows us to reuse timing information from the discover step, and
// have some helper methods for error formatting and time calculations.
type scrapeTask struct {
// task type allows us to reuse timing information from the discover step.
type projectScrapeTask struct {
//data loaded during discoverScrapeTask
Service db.ProjectService
//timing information (StartedAt is during discover, FinishedAt is during process)
StartedAt time.Time
FinishedAt time.Time
//timing information
Timing TaskTiming
//error reporting
Err error
}

func (t scrapeTask) Duration() time.Duration {
return t.FinishedAt.Sub(t.StartedAt)
}

func (c *Collector) discoverScrapeTask(labels prometheus.Labels, query string) (task scrapeTask, err error) {
func (c *Collector) discoverScrapeTask(labels prometheus.Labels, query string) (task projectScrapeTask, err error) {
serviceType := labels["service_type"]
if !c.Cluster.HasService(serviceType) {
return scrapeTask{}, fmt.Errorf("no such service type: %q", serviceType)
return projectScrapeTask{}, fmt.Errorf("no such service type: %q", serviceType)
}
labels["service_name"] = c.Cluster.InfoForService(serviceType).ProductName

task.StartedAt = c.TimeNow()
err = c.DB.SelectOne(&task.Service, query, serviceType, task.StartedAt)
task.Timing.StartedAt = c.TimeNow()
err = c.DB.SelectOne(&task.Service, query, serviceType, task.Timing.StartedAt)
return task, err
}

Expand All @@ -150,7 +144,7 @@ func (c *Collector) identifyProjectBeingScraped(srv db.ProjectService) (dbProjec
return
}

func (c *Collector) processResourceScrapeTask(_ context.Context, task scrapeTask, labels prometheus.Labels) error {
func (c *Collector) processResourceScrapeTask(_ context.Context, task projectScrapeTask, labels prometheus.Labels) error {
srv := task.Service
plugin := c.Cluster.QuotaPlugins[srv.Type] //NOTE: discoverScrapeTask already verified that this exists

Expand All @@ -166,7 +160,7 @@ func (c *Collector) processResourceScrapeTask(_ context.Context, task scrapeTask
if err != nil {
task.Err = util.UnpackError(err)
}
task.FinishedAt = c.TimeNow()
task.Timing.FinishedAt = c.TimeNow()

//write result on success; if anything fails, try to record the error in the DB
if task.Err == nil {
Expand All @@ -178,7 +172,7 @@ func (c *Collector) processResourceScrapeTask(_ context.Context, task scrapeTask
if task.Err != nil {
_, err := c.DB.Exec(
writeResourceScrapeErrorQuery,
task.FinishedAt, task.FinishedAt.Add(c.AddJitter(recheckInterval)),
task.Timing.FinishedAt, task.Timing.FinishedAt.Add(c.AddJitter(recheckInterval)),
task.Err.Error(), srv.ID,
)
if err != nil {
Expand All @@ -204,7 +198,7 @@ func (c *Collector) processResourceScrapeTask(_ context.Context, task scrapeTask
return fmt.Errorf("during resource scrape of project %s/%s: %w", dbDomain.Name, dbProject.Name, task.Err)
}

func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.Project, task scrapeTask, resourceData map[string]core.ResourceData, serializedMetrics string) error {
func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.Project, task projectScrapeTask, resourceData map[string]core.ResourceData, serializedMetrics string) error {
srv := task.Service

tx, err := c.DB.Begin()
Expand Down Expand Up @@ -277,7 +271,7 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
//that we don't scrape it again immediately afterwards; also persist all other
//attributes that we have not written yet
_, err = tx.Exec(writeResourceScrapeSuccessQuery,
task.FinishedAt, task.FinishedAt.Add(c.AddJitter(scrapeInterval)), task.Duration().Seconds(),
task.Timing.FinishedAt, task.Timing.FinishedAt.Add(c.AddJitter(scrapeInterval)), task.Timing.Duration().Seconds(),
serializedMetrics, srv.ID,
)
if err != nil {
Expand All @@ -289,8 +283,8 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
return fmt.Errorf("while committing transaction: %w", err)
}

if task.Duration() > 5*time.Minute {
logg.Info("scrape of %s in project %s has taken excessively long (%s)", srv.Type, dbProject.UUID, task.Duration().String())
if task.Timing.Duration() > 5*time.Minute {
logg.Info("scrape of %s in project %s has taken excessively long (%s)", srv.Type, dbProject.UUID, task.Timing.Duration().String())
}

//if a mismatch between frontend and backend quota was detected, try to
Expand Down

0 comments on commit 5d77aa8

Please sign in to comment.