From 644c36d2ea48c4f25dcd1d20eb3edda15fc36be2 Mon Sep 17 00:00:00 2001 From: Stefan Majewsky Date: Tue, 12 Sep 2023 13:43:35 +0200 Subject: [PATCH 1/3] rewrite capacity scrape into a jobloop.ProducerConsumerJob This allows each capacitor to run individually, and fail individually, without blocking the other capacitors from running. --- internal/collector/capacity.go | 338 ------------------ internal/collector/capacity_scrape.go | 307 ++++++++++++++++ ...pacity_test.go => capacity_scrape_test.go} | 98 +++-- internal/db/migrations.go | 8 + internal/db/models.go | 1 + main.go | 2 +- 6 files changed, 384 insertions(+), 370 deletions(-) delete mode 100644 internal/collector/capacity.go create mode 100644 internal/collector/capacity_scrape.go rename internal/collector/{capacity_test.go => capacity_scrape_test.go} (63%) diff --git a/internal/collector/capacity.go b/internal/collector/capacity.go deleted file mode 100644 index 83a652aed..000000000 --- a/internal/collector/capacity.go +++ /dev/null @@ -1,338 +0,0 @@ -/******************************************************************************* -* -* Copyright 2017 SAP SE -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You should have received a copy of the License along with this -* program. If not, you may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -*******************************************************************************/ - -package collector - -import ( - "encoding/json" - "fmt" - "sort" - "time" - - "github.com/go-gorp/gorp/v3" - "github.com/prometheus/client_golang/prometheus" - limesresources "github.com/sapcc/go-api-declarations/limes/resources" - "github.com/sapcc/go-bits/logg" - "github.com/sapcc/go-bits/sqlext" - - "github.com/sapcc/limes/internal/core" - "github.com/sapcc/limes/internal/db" - "github.com/sapcc/limes/internal/util" -) - -var ( - scanInterval = 15 * time.Minute - scanInitialDelay = 1 * time.Minute -) - -// ScanCapacity queries the cluster's capacity (across all enabled backend -// services) periodically. -// -// Errors are logged instead of returned. The function will not return unless -// startup fails. -func (c *Collector) ScanCapacity() { - //don't start scanning capacity immediately to avoid too much load on the - //backend services when the collector comes up - time.Sleep(scanInitialDelay) - - for { - logg.Debug("scanning capacity") - c.scanCapacity() - - time.Sleep(scanInterval) - } -} - -type capacityDataWithCapacitorID struct { - CapacityData core.CapacityData - CapacitorID string -} - -func (c *Collector) scanCapacity() { - values := make(map[string]map[string]capacityDataWithCapacitorID) - scrapedAt := c.TimeNow() - - capacitorInfo := make(map[string]db.ClusterCapacitor) - - for capacitorID, plugin := range c.Cluster.CapacityPlugins { - labels := prometheus.Labels{ - "capacitor": capacitorID, - } - //always report the counter - clusterCapacitorSuccessCounter.With(labels).Add(0) - clusterCapacitorFailedCounter.With(labels).Add(0) - - scrapeStart := c.TimeNow() - capacities, serializedMetrics, err := plugin.Scrape() - scrapeDuration := c.TimeNow().Sub(scrapeStart) - if err != nil { - c.LogError("scan capacity with capacitor %s failed: %s", capacitorID, util.UnpackError(err).Error()) - clusterCapacitorFailedCounter.With(labels).Inc() - continue - } - - //merge capacities from this plugin into the overall capacity values map - for serviceType, resources := range capacities { - if _, ok := values[serviceType]; !ok { - values[serviceType] = make(map[string]capacityDataWithCapacitorID) - } - for resourceName, value := range resources { - values[serviceType][resourceName] = capacityDataWithCapacitorID{value, capacitorID} - } - } - - clusterCapacitorSuccessCounter.With(labels).Inc() - capacitorInfo[capacitorID] = db.ClusterCapacitor{ - CapacitorID: capacitorID, - ScrapedAt: &scrapedAt, - ScrapeDurationSecs: scrapeDuration.Seconds(), - SerializedMetrics: serializedMetrics, - NextScrapeAt: scrapedAt.Add(c.AddJitter(scanInterval)), - } - } - - //skip values for services not enabled for this cluster - for serviceType := range values { - if !c.Cluster.HasService(serviceType) { - logg.Info("discarding capacity values for unknown service type: %s", serviceType) - delete(values, serviceType) - } - } - - //skip values for resources not announced by the respective QuotaPlugin - for serviceType, plugin := range c.Cluster.QuotaPlugins { - subvalues, exists := values[serviceType] - if !exists { - continue - } - names := make(map[string]bool) - for name := range subvalues { - names[name] = true - } - for _, res := range plugin.Resources() { - delete(names, res.Name) - } - for name := range names { - logg.Info("discarding capacity value for unknown resource: %s/%s", serviceType, name) - delete(subvalues, name) - } - } - - //do the following in a transaction to avoid inconsistent DB state - tx, err := c.DB.Begin() - if err != nil { - c.LogError("write capacity failed: %s", err.Error()) - } - defer sqlext.RollbackUnlessCommitted(tx) - - err = c.writeCapacitorInfo(tx, capacitorInfo) - if err != nil { - c.LogError("write capacity failed: %s", err.Error()) - } - err = c.writeCapacity(tx, values) - if err != nil { - c.LogError("write capacity failed: %s", err.Error()) - } - err = tx.Commit() - if err != nil { - c.LogError("write capacity failed: %s", err.Error()) - } -} - -func (c *Collector) writeCapacitorInfo(tx *gorp.Transaction, capacitorInfo map[string]db.ClusterCapacitor) error { - //remove superfluous cluster_capacitors - var dbCapacitors []db.ClusterCapacitor - _, err := tx.Select(&dbCapacitors, `SELECT * FROM cluster_capacitors`) - if err != nil { - return err - } - isExistingCapacitor := make(map[string]bool) - for _, dbCapacitor := range dbCapacitors { - isExistingCapacitor[dbCapacitor.CapacitorID] = true - _, exists := c.Cluster.CapacityPlugins[dbCapacitor.CapacitorID] - if !exists { - _, err := tx.Delete(&dbCapacitor) //nolint:gosec // Delete is not holding onto the pointer after it returns - if err != nil { - return err - } - } - } - - //insert or update cluster_capacitors where a scrape was successful - for _, capacitor := range capacitorInfo { - if isExistingCapacitor[capacitor.CapacitorID] { - _, err := tx.Update(&capacitor) //nolint:gosec // Update is not holding onto the pointer after it returns - if err != nil { - return err - } - } else { - err := tx.Insert(&capacitor) //nolint:gosec // Insert is not holding onto the pointer after it returns - if err != nil { - return err - } - } - } - - return nil -} - -func (c *Collector) writeCapacity(tx *gorp.Transaction, values map[string]map[string]capacityDataWithCapacitorID) error { - //create missing cluster_services entries (superfluous ones will be cleaned - //up by the CheckConsistency()) - serviceIDForType := make(map[string]int64) - var dbServices []*db.ClusterService - _, err := tx.Select(&dbServices, `SELECT * FROM cluster_services`) - if err != nil { - return err - } - for _, dbService := range dbServices { - serviceIDForType[dbService.Type] = dbService.ID - } - - var allServiceTypes []string - for serviceType := range values { - allServiceTypes = append(allServiceTypes, serviceType) - } - sort.Strings(allServiceTypes) //for reproducibility in unit test - - for _, serviceType := range allServiceTypes { - _, exists := serviceIDForType[serviceType] - if exists { - continue - } - - dbService := &db.ClusterService{Type: serviceType} - err := tx.Insert(dbService) - if err != nil { - return err - } - serviceIDForType[dbService.Type] = dbService.ID - } - - //enumerate cluster_resources: create missing ones, update existing ones, delete superfluous ones - for _, serviceType := range allServiceTypes { - serviceValues := values[serviceType] - serviceID := serviceIDForType[serviceType] - var dbResources []*db.ClusterResource - _, err := tx.Select(&dbResources, `SELECT * FROM cluster_resources WHERE service_id = $1`, serviceID) - if err != nil { - return err - } - - seen := make(map[string]bool) - for _, dbResource := range dbResources { - seen[dbResource.Name] = true - - data, exists := serviceValues[dbResource.Name] - if exists { - dbResource.RawCapacity = data.CapacityData.Capacity - dbResource.CapacitorID = data.CapacitorID - - if len(data.CapacityData.Subcapacities) == 0 { - dbResource.SubcapacitiesJSON = "" - } else { - bytes, err := json.Marshal(data.CapacityData.Subcapacities) - if err != nil { - return fmt.Errorf("failed to convert subcapacities to JSON: %s", err.Error()) - } - dbResource.SubcapacitiesJSON = string(bytes) - } - - if len(data.CapacityData.CapacityPerAZ) == 0 { - dbResource.CapacityPerAZJSON = "" - } else { - bytes, err := json.Marshal(convertAZReport(data.CapacityData.CapacityPerAZ)) - if err != nil { - return fmt.Errorf("failed to convert capacities per availability zone to JSON: %s", err.Error()) - } - dbResource.CapacityPerAZJSON = string(bytes) - } - - _, err := tx.Update(dbResource) - if err != nil { - return err - } - } else { - _, err := tx.Delete(dbResource) - if err != nil { - return err - } - } - } - - //insert missing cluster_resources entries - var missingResourceNames []string - for name := range serviceValues { - if !seen[name] { - missingResourceNames = append(missingResourceNames, name) - } - } - sort.Strings(missingResourceNames) //for reproducibility in unit test - for _, name := range missingResourceNames { - data := serviceValues[name] - res := &db.ClusterResource{ - ServiceID: serviceID, - Name: name, - RawCapacity: data.CapacityData.Capacity, - CapacityPerAZJSON: "", //but see below - SubcapacitiesJSON: "", - CapacitorID: data.CapacitorID, - } - - if len(data.CapacityData.Subcapacities) != 0 { - bytes, err := json.Marshal(data.CapacityData.Subcapacities) - if err != nil { - return fmt.Errorf("failed to convert subcapacities to JSON: %s", err.Error()) - } - res.SubcapacitiesJSON = string(bytes) - } - - if len(data.CapacityData.CapacityPerAZ) != 0 { - bytes, err := json.Marshal(convertAZReport(data.CapacityData.CapacityPerAZ)) - if err != nil { - return fmt.Errorf("failed to convert capacities per availability zone to JSON: %s", err.Error()) - } - res.CapacityPerAZJSON = string(bytes) - } - - err := tx.Insert(res) - if err != nil { - return err - } - } - } - - return nil -} - -func convertAZReport(capacityPerAZ map[string]*core.CapacityDataForAZ) limesresources.ClusterAvailabilityZoneReports { - //The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into - //the CapacityPerAZJSON database field, even though - //map[string]*core.CapacityDataForAZ would have been more appropriate. Now we - //stick with it for compatibility's sake. - report := make(limesresources.ClusterAvailabilityZoneReports, len(capacityPerAZ)) - for azName, azData := range capacityPerAZ { - report[azName] = &limesresources.ClusterAvailabilityZoneReport{ - Name: azName, - Capacity: azData.Capacity, - Usage: azData.Usage, - } - } - return report -} diff --git a/internal/collector/capacity_scrape.go b/internal/collector/capacity_scrape.go new file mode 100644 index 000000000..6dd97870c --- /dev/null +++ b/internal/collector/capacity_scrape.go @@ -0,0 +1,307 @@ +/******************************************************************************* +* +* Copyright 2023 SAP SE +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You should have received a copy of the License along with this +* program. If not, you may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*******************************************************************************/ + +package collector + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + limesresources "github.com/sapcc/go-api-declarations/limes/resources" + "github.com/sapcc/go-bits/jobloop" + "github.com/sapcc/go-bits/logg" + "github.com/sapcc/go-bits/sqlext" + + "github.com/sapcc/limes/internal/core" + "github.com/sapcc/limes/internal/db" + "github.com/sapcc/limes/internal/util" +) + +const ( + //how long to wait before scraping the same capacitor again + capacityScrapeInterval = 15 * time.Minute + //how long to wait after error before retrying the same capacitor + capacityScrapeErrorInterval = 3 * time.Minute +) + +// CapacityScrapeJob is a jobloop.Job. Each task scrapes one capacitor. +// Cluster resources managed by this capacitor are added, updated and deleted as necessary. +func (c *Collector) CapacityScrapeJob(registerer prometheus.Registerer) jobloop.Job { + //used by discoverCapacityScrapeTask() to trigger a consistency check every + //once in a while; starts out very far in the past to force a consistency + //check on first run + lastConsistencyCheckAt := time.Unix(-1000000, 0).UTC() + + return (&jobloop.ProducerConsumerJob[capacityScrapeTask]{ + Metadata: jobloop.JobMetadata{ + ReadableName: "scrape capacity", + CounterOpts: prometheus.CounterOpts{ + Name: "limes_capacity_scrapes", + Help: "Counter for capacity scrape operations per capacitor.", + }, + CounterLabels: []string{"capacitor_id"}, + }, + DiscoverTask: func(ctx context.Context, labels prometheus.Labels) (capacityScrapeTask, error) { + return c.discoverCapacityScrapeTask(ctx, labels, &lastConsistencyCheckAt) + }, + ProcessTask: c.processCapacityScrapeTask, + }).Setup(registerer) +} + +type capacityScrapeTask struct { + Capacitor db.ClusterCapacitor + Timing TaskTiming +} + +var ( + // upsert a cluster_capacitors entry + initCapacitorQuery = sqlext.SimplifyWhitespace(` + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, next_scrape_at) + VALUES ($1, $2, $2) + ON CONFLICT DO NOTHING + `) + + // find the next capacitor that needs to have capacity scraped + findCapacitorForScrapeQuery = sqlext.SimplifyWhitespace(` + SELECT * FROM cluster_capacitors + -- filter by need to be updated + WHERE next_scrape_at <= $1 + -- order by update priority (first schedule, then ID for deterministic test behavior) + ORDER BY next_scrape_at ASC, capacitor_id ASC + -- find only one capacitor to scrape per iteration + LIMIT 1 + `) +) + +func (c *Collector) discoverCapacityScrapeTask(_ context.Context, _ prometheus.Labels, lastConsistencyCheckAt *time.Time) (task capacityScrapeTask, err error) { + task.Timing.StartedAt = c.TimeNow() + + //consistency check: every once in a while (and also immediately on startup), + //check that all required `cluster_capacitors` entries exist + if lastConsistencyCheckAt.Before(task.Timing.StartedAt.Add(-5 * time.Minute)) { + err = sqlext.WithPreparedStatement(c.DB, initCapacitorQuery, func(stmt *sql.Stmt) error { + for capacitorID := range c.Cluster.CapacityPlugins { + _, err := stmt.Exec(capacitorID, task.Timing.StartedAt) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return task, fmt.Errorf("while creating cluster_capacitors entries: %w", err) + } + *lastConsistencyCheckAt = task.Timing.StartedAt + } + + err = c.DB.SelectOne(&task.Capacitor, findCapacitorForScrapeQuery, task.Timing.StartedAt) + return task, err +} + +func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacityScrapeTask, labels prometheus.Labels) (returnedErr error) { + capacitor := task.Capacitor + labels["capacitor_id"] = capacitor.CapacitorID + + defer func() { + if returnedErr != nil { + returnedErr = fmt.Errorf("while scraping capacitor %s: %w", capacitor.CapacitorID, returnedErr) + } + }() + + //if capacitor was removed from the configuration, clean up its DB entry + plugin := c.Cluster.CapacityPlugins[capacitor.CapacitorID] + if plugin == nil { + _, err := c.DB.Delete(&capacitor) + return err + } + + //collect mapping of cluster_services type names to IDs + //(these DB entries are maintained for us by checkConsistencyCluster) + query := `SELECT id, type FROM cluster_services` + serviceIDForType := make(map[string]int64) + err := sqlext.ForeachRow(c.DB, query, nil, func(rows *sql.Rows) error { + var ( + serviceID int64 + serviceType string + ) + err := rows.Scan(&serviceID, &serviceType) + if err == nil { + serviceIDForType[serviceType] = serviceID + } + return err + }) + if err != nil { + return fmt.Errorf("cannot collect cluster service mapping: %w", err) + } + + //collect ownership information for existing cluster_resources + query = `SELECT cs.type, cr.name, cr.capacitor_id FROM cluster_resources cr JOIN cluster_services cs ON cr.service_id = cs.id` + capacitorIDForResource := make(map[string]map[string]string) + err = sqlext.ForeachRow(c.DB, query, nil, func(rows *sql.Rows) error { + var ( + serviceType string + resourceName string + capacitorID string + ) + err := rows.Scan(&serviceType, &resourceName, &capacitorID) + if err == nil { + if capacitorIDForResource[serviceType] == nil { + capacitorIDForResource[serviceType] = make(map[string]string) + } + capacitorIDForResource[serviceType][resourceName] = capacitorID + } + return err + }) + if err != nil { + return fmt.Errorf("cannot collect resource ownership mapping: %w", err) + } + + //scrape capacity data + capacityData, serializedMetrics, err := plugin.Scrape() + task.Timing.FinishedAt = c.TimeNow() + if err == nil { + capacitor.ScrapedAt = &task.Timing.FinishedAt + capacitor.ScrapeDurationSecs = task.Timing.Duration().Seconds() + capacitor.SerializedMetrics = serializedMetrics + capacitor.NextScrapeAt = task.Timing.FinishedAt.Add(c.AddJitter(capacityScrapeInterval)) + capacitor.ScrapeErrorMessage = "" + //NOTE: in this case, we continue below, with the cluster_resources update + //the cluster_capacitors row will be updated at the end of the tx + } else { + err = util.UnpackError(err) + capacitor.NextScrapeAt = task.Timing.FinishedAt.Add(c.AddJitter(capacityScrapeErrorInterval)) + capacitor.ScrapeErrorMessage = err.Error() + + _, updateErr := c.DB.Update(&capacitor) + if updateErr != nil { + err = fmt.Errorf("%w (additional error while updating DB: %s", err, updateErr.Error()) + } + return err + } + + //do the following in a transaction to avoid inconsistent DB state + tx, err := c.DB.Begin() + if err != nil { + return err + } + defer sqlext.RollbackUnlessCommitted(tx) + + //create or update cluster_resources for this capacitor + for serviceType, serviceData := range capacityData { + if !c.Cluster.HasService(serviceType) { + logg.Info("discarding capacities reported by %s for unknown service type: %s", capacitor.CapacitorID, serviceType) + continue + } + + for resourceName, resourceData := range serviceData { + if !c.Cluster.HasResource(serviceType, resourceName) { + logg.Info("discarding capacity reported by %s for unknown resource name: %s/%s", capacitor.CapacitorID, serviceType, resourceName) + continue + } + serviceID, ok := serviceIDForType[serviceType] + if !ok { + return fmt.Errorf("no cluster_services entry for service type %s (check if CheckConsistencyJob runs correctly)", serviceType) + } + + res := db.ClusterResource{ + ServiceID: serviceID, + Name: resourceName, + RawCapacity: resourceData.Capacity, + CapacityPerAZJSON: "", //see below + SubcapacitiesJSON: "", //see below + CapacitorID: capacitor.CapacitorID, + } + + if len(resourceData.CapacityPerAZ) > 0 { + buf, err := json.Marshal(convertAZReport(resourceData.CapacityPerAZ)) + if err != nil { + return fmt.Errorf("could not convert capacities per AZ to JSON: %w", err) + } + res.CapacityPerAZJSON = string(buf) + } + + if len(resourceData.Subcapacities) > 0 { + buf, err := json.Marshal(resourceData.Subcapacities) + if err != nil { + return fmt.Errorf("could not convert subcapacities to JSON: %w", err) + } + res.SubcapacitiesJSON = string(buf) + } + + if capacitorIDForResource[serviceType][resourceName] == "" { + err = tx.Insert(&res) + } else { + _, err = tx.Update(&res) + } + if err != nil { + return fmt.Errorf("could not write cluster resource %s/%s: %w", serviceType, resourceName, err) + } + } + } + + //delete cluster_resources owned by this capacitor for which we do not have capacityData anymore + for serviceType, serviceMapping := range capacitorIDForResource { + for resourceName, owningCapacitorID := range serviceMapping { + //do not delete if owned by a different capacitor + if owningCapacitorID != capacitor.CapacitorID { + continue + } + + //do not delete if we still have capacity data + _, ok := capacityData[serviceType][resourceName] + if ok { + continue + } + + _, err = tx.Delete(&db.ClusterResource{ + ServiceID: serviceIDForType[serviceType], + Name: resourceName, + }) + if err != nil { + return fmt.Errorf("could not cleanup cluster resource %s/%s: %w", serviceType, resourceName, err) + } + } + } + + _, err = tx.Update(&capacitor) + if err != nil { + return err + } + return tx.Commit() +} + +func convertAZReport(capacityPerAZ map[string]*core.CapacityDataForAZ) limesresources.ClusterAvailabilityZoneReports { + //The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into + //the CapacityPerAZJSON database field, even though + //map[string]*core.CapacityDataForAZ would have been more appropriate. Now we + //stick with it for compatibility's sake. + report := make(limesresources.ClusterAvailabilityZoneReports, len(capacityPerAZ)) + for azName, azData := range capacityPerAZ { + report[azName] = &limesresources.ClusterAvailabilityZoneReport{ + Name: azName, + Capacity: azData.Capacity, + Usage: azData.Usage, + } + } + return report +} diff --git a/internal/collector/capacity_test.go b/internal/collector/capacity_scrape_test.go similarity index 63% rename from internal/collector/capacity_test.go rename to internal/collector/capacity_scrape_test.go index d5bc9e3f0..1a57af4a5 100644 --- a/internal/collector/capacity_test.go +++ b/internal/collector/capacity_scrape_test.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sapcc/go-bits/assert" "github.com/sapcc/go-bits/easypg" + "github.com/sapcc/go-bits/jobloop" "github.com/sapcc/limes/internal/db" "github.com/sapcc/limes/internal/test" @@ -82,21 +83,31 @@ func Test_ScanCapacity(t *testing.T) { TimeNow: test.TimeNow, AddJitter: test.NoJitter, } + job := c.CapacityScrapeJob(s.Registry) + + //cluster_services must be created as a baseline (this is usually done by the CheckConsistencyJob) + for _, serviceType := range s.Cluster.ServiceTypesInAlphabeticalOrder() { + err := s.DB.Insert(&db.ClusterService{Type: serviceType}) + mustT(t, err) + } //check baseline tr, tr0 := easypg.NewTracker(t, s.DB.Db) - tr0.AssertEmpty() + tr0.AssertEqualf(` + INSERT INTO cluster_services (id, type) VALUES (1, 'shared'); + INSERT INTO cluster_services (id, type) VALUES (2, 'unshared'); + INSERT INTO cluster_services (id, type) VALUES (3, 'unshared2'); + `) //check that capacity records are created correctly (and that nonexistent //resources are ignored by the scraper) - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest', 0, 1, 900); - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest2', 0, 1, 900); + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest', 1, 1, 901); + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest2', 3, 1, 903); INSERT INTO cluster_resources (service_id, name, capacity, capacitor_id) VALUES (1, 'things', 42, 'unittest'); INSERT INTO cluster_resources (service_id, name, capacity, capacitor_id) VALUES (2, 'capacity', 42, 'unittest2'); - INSERT INTO cluster_services (id, type) VALUES (1, 'shared'); - INSERT INTO cluster_services (id, type) VALUES (2, 'unshared'); `) //insert some crap records @@ -120,13 +131,20 @@ func Test_ScanCapacity(t *testing.T) { //next scan should throw out the crap records and recreate the deleted ones; //also change the reported Capacity to see if updates are getting through s.Cluster.CapacityPlugins["unittest"].(*plugins.StaticCapacityPlugin).Capacity = 23 - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` UPDATE cluster_capacitors SET scraped_at = 5, next_scrape_at = 905 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 5, next_scrape_at = 905 WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = 7, next_scrape_at = 907 WHERE capacitor_id = 'unittest2'; UPDATE cluster_resources SET capacity = 23 WHERE service_id = 1 AND name = 'things'; `) + //move the clock forward by 300 seconds (the capacitor add step only triggers every five minutes) + //TODO: I hate this clock + for step := 1; step <= 300; step++ { + _ = test.TimeNow() + } + //add a capacity plugin that reports subcapacities; check that subcapacities //are correctly written when creating a cluster_resources record pluginConfig := ` @@ -138,24 +156,32 @@ func Test_ScanCapacity(t *testing.T) { with_subcapacities: true ` subcapacityPlugin := s.AddCapacityPlugin(t, pluginConfig).(*plugins.StaticCapacityPlugin) //nolint:errcheck - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 10, next_scrape_at = 910 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 10, next_scrape_at = 910 WHERE capacitor_id = 'unittest2'; - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', 10, 1, '{"smaller_half":14,"larger_half":28}', 910); + UPDATE cluster_capacitors SET scraped_at = 309, next_scrape_at = 1209 WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = 311, next_scrape_at = 1211 WHERE capacitor_id = 'unittest2'; + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', 313, 1, '{"smaller_half":14,"larger_half":28}', 1213); INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"smaller_half":14},{"larger_half":28}]', 'unittest4'); `) //check that scraping correctly updates subcapacities on an existing record subcapacityPlugin.Capacity = 10 - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 17, next_scrape_at = 917 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 17, next_scrape_at = 917 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 17, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = 917 WHERE capacitor_id = 'unittest4'; + UPDATE cluster_capacitors SET scraped_at = 315, next_scrape_at = 1215 WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = 317, next_scrape_at = 1217 WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = 319, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = 1219 WHERE capacitor_id = 'unittest4'; UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"smaller_half":3},{"larger_half":7}]' WHERE service_id = 2 AND name = 'things'; `) + //move the clock forward by 300 seconds (the capacitor add step only triggers every five minutes) + //TODO: I hate this clock + for step := 1; step <= 300; step++ { + _ = test.TimeNow() + } + //add a capacity plugin that also reports capacity per availability zone; check that //these capacities are correctly written when creating a cluster_resources record pluginConfig = ` @@ -167,24 +193,25 @@ func Test_ScanCapacity(t *testing.T) { with_capacity_per_az: true ` azCapacityPlugin := s.AddCapacityPlugin(t, pluginConfig).(*plugins.StaticCapacityPlugin) //nolint:errcheck - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 24, next_scrape_at = 924 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 24, next_scrape_at = 924 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 24, next_scrape_at = 924 WHERE capacitor_id = 'unittest4'; - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest5', 24, 1, 924); + UPDATE cluster_capacitors SET scraped_at = 621, next_scrape_at = 1521 WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = 623, next_scrape_at = 1523 WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = 625, next_scrape_at = 1525 WHERE capacitor_id = 'unittest4'; + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest5', 627, 1, 1527); INSERT INTO cluster_resources (service_id, name, capacity, capacity_per_az, capacitor_id) VALUES (3, 'things', 42, '[{"name":"az-one","capacity":21,"usage":4},{"name":"az-two","capacity":21,"usage":4}]', 'unittest5'); - INSERT INTO cluster_services (id, type) VALUES (3, 'unshared2'); `) //check that scraping correctly updates the capacities on an existing record azCapacityPlugin.Capacity = 30 - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 33, next_scrape_at = 933 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 33, next_scrape_at = 933 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 33, next_scrape_at = 933 WHERE capacitor_id = 'unittest4'; - UPDATE cluster_capacitors SET scraped_at = 33, next_scrape_at = 933 WHERE capacitor_id = 'unittest5'; + UPDATE cluster_capacitors SET scraped_at = 629, next_scrape_at = 1529 WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = 631, next_scrape_at = 1531 WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = 633, next_scrape_at = 1533 WHERE capacitor_id = 'unittest4'; + UPDATE cluster_capacitors SET scraped_at = 635, next_scrape_at = 1535 WHERE capacitor_id = 'unittest5'; UPDATE cluster_resources SET capacity = 30, capacity_per_az = '[{"name":"az-one","capacity":15,"usage":3},{"name":"az-two","capacity":15,"usage":3}]' WHERE service_id = 3 AND name = 'things'; `) @@ -203,12 +230,21 @@ func Test_ScanCapacity(t *testing.T) { //check that removing a capacitor removes its associated resources delete(s.Cluster.CapacityPlugins, "unittest5") - c.scanCapacity() + setClusterCapacitorsStale(t, s) + mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins)+1)) //+1 to account for the deleted capacitor tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 42, next_scrape_at = 942 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 42, next_scrape_at = 942 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 42, next_scrape_at = 942 WHERE capacitor_id = 'unittest4'; + UPDATE cluster_capacitors SET scraped_at = 637, next_scrape_at = 1537 WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = 639, next_scrape_at = 1539 WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = 641, next_scrape_at = 1541 WHERE capacitor_id = 'unittest4'; DELETE FROM cluster_capacitors WHERE capacitor_id = 'unittest5'; DELETE FROM cluster_resources WHERE service_id = 3 AND name = 'things'; `) } + +func setClusterCapacitorsStale(t *testing.T, s test.Setup) { + //NOTE: This is built to not use `test.TimeNow()`, because using this function shifts the time around. + //TODO: I hate this clock + t.Helper() + _, err := s.DB.Exec(`UPDATE cluster_capacitors SET next_scrape_at = (SELECT MAX(scraped_at) FROM cluster_capacitors)`) + mustT(t, err) +} diff --git a/internal/db/migrations.go b/internal/db/migrations.go index 3f27b3e0e..3774bd34a 100644 --- a/internal/db/migrations.go +++ b/internal/db/migrations.go @@ -177,4 +177,12 @@ var sqlMigrations = map[string]string{ ALTER TABLE cluster_resources ALTER COLUMN capacitor_id SET DEFAULT NULL; `, + "025_capacity_scan_rework.up.sql": ` + ALTER TABLE cluster_capacitors + ADD COLUMN scrape_error_message TEXT NOT NULL DEFAULT ''; + `, + "025_capacity_scan_rework.down.sql": ` + ALTER TABLE cluster_capacitors + DROP COLUMN scrape_error_message; + `, } diff --git a/internal/db/models.go b/internal/db/models.go index 188e97b89..8e88ff051 100644 --- a/internal/db/models.go +++ b/internal/db/models.go @@ -33,6 +33,7 @@ type ClusterCapacitor struct { ScrapeDurationSecs float64 `db:"scrape_duration_secs"` SerializedMetrics string `db:"serialized_metrics"` NextScrapeAt time.Time `db:"next_scrape_at"` + ScrapeErrorMessage string `db:"scrape_error_message"` } // ClusterService contains a record from the `cluster_services` table. diff --git a/main.go b/main.go index b2f0c00d9..d763d7af5 100644 --- a/main.go +++ b/main.go @@ -162,8 +162,8 @@ func taskCollect(cluster *core.Cluster, args []string) { } //start those collector threads which operate over all services simultaneously + go c.CapacityScrapeJob(nil).Run(ctx) go c.CheckConsistencyJob(nil).Run(ctx) - go c.ScanCapacity() go func() { for { _, err := c.ScanDomains(collector.ScanDomainsOpts{ScanAllProjects: true}) From 01d3bd58111185dd8521d8173d9b37fb30f96f19 Mon Sep 17 00:00:00 2001 From: Stefan Majewsky Date: Tue, 12 Sep 2023 13:50:23 +0200 Subject: [PATCH 2/3] fix removal of NOT NULL constraint I'm retroactively editing an existing migration because this is only rolled out on QA so far and thus easy to fix. --- internal/collector/capacity_scrape.go | 5 +++-- internal/db/migrations.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/collector/capacity_scrape.go b/internal/collector/capacity_scrape.go index 6dd97870c..57862e9f6 100644 --- a/internal/collector/capacity_scrape.go +++ b/internal/collector/capacity_scrape.go @@ -76,8 +76,8 @@ type capacityScrapeTask struct { var ( // upsert a cluster_capacitors entry initCapacitorQuery = sqlext.SimplifyWhitespace(` - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, next_scrape_at) - VALUES ($1, $2, $2) + INSERT INTO cluster_capacitors (capacitor_id, next_scrape_at) + VALUES ($1, $2) ON CONFLICT DO NOTHING `) @@ -98,6 +98,7 @@ func (c *Collector) discoverCapacityScrapeTask(_ context.Context, _ prometheus.L //consistency check: every once in a while (and also immediately on startup), //check that all required `cluster_capacitors` entries exist + //(this is important because the query below will only find capacitors that have such an entry) if lastConsistencyCheckAt.Before(task.Timing.StartedAt.Add(-5 * time.Minute)) { err = sqlext.WithPreparedStatement(c.DB, initCapacitorQuery, func(stmt *sql.Stmt) error { for capacitorID := range c.Cluster.CapacityPlugins { diff --git a/internal/db/migrations.go b/internal/db/migrations.go index 3774bd34a..f675ee2b5 100644 --- a/internal/db/migrations.go +++ b/internal/db/migrations.go @@ -163,7 +163,7 @@ var sqlMigrations = map[string]string{ `, "024_move_capacity_scrape_timestamps.up.sql": ` ALTER TABLE cluster_capacitors - ALTER COLUMN scraped_at SET DEFAULT NULL; -- null if scraping did not happen yet + ALTER COLUMN scraped_at DROP NOT NULL; -- null if scraping did not happen yet ALTER TABLE cluster_services DROP COLUMN scraped_at; ALTER TABLE cluster_resources @@ -175,7 +175,7 @@ var sqlMigrations = map[string]string{ ALTER TABLE cluster_services ADD COLUMN scraped_at TIMESTAMP NOT NULL DEFAULT NOW(); ALTER TABLE cluster_resources - ALTER COLUMN capacitor_id SET DEFAULT NULL; + ALTER COLUMN capacitor_id DROP NOT NULL; `, "025_capacity_scan_rework.up.sql": ` ALTER TABLE cluster_capacitors From a73e2dbbeb1b796e475867b63090d3d10fd59586 Mon Sep 17 00:00:00 2001 From: Stefan Majewsky Date: Tue, 12 Sep 2023 15:25:53 +0200 Subject: [PATCH 3/3] apply style comments from code review --- internal/collector/capacity_scrape.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/internal/collector/capacity_scrape.go b/internal/collector/capacity_scrape.go index 57862e9f6..caf9454ab 100644 --- a/internal/collector/capacity_scrape.go +++ b/internal/collector/capacity_scrape.go @@ -91,6 +91,14 @@ var ( -- find only one capacitor to scrape per iteration LIMIT 1 `) + + // queries to collect context data within processCapacityScrapeTask() + getClusterServicesQuery = sqlext.SimplifyWhitespace(` + SELECT id, type FROM cluster_services + `) + getClusterResourceOwnershipQuery = sqlext.SimplifyWhitespace(` + SELECT cs.type, cr.name, cr.capacitor_id FROM cluster_resources cr JOIN cluster_services cs ON cr.service_id = cs.id + `) ) func (c *Collector) discoverCapacityScrapeTask(_ context.Context, _ prometheus.Labels, lastConsistencyCheckAt *time.Time) (task capacityScrapeTask, err error) { @@ -138,9 +146,8 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc //collect mapping of cluster_services type names to IDs //(these DB entries are maintained for us by checkConsistencyCluster) - query := `SELECT id, type FROM cluster_services` serviceIDForType := make(map[string]int64) - err := sqlext.ForeachRow(c.DB, query, nil, func(rows *sql.Rows) error { + err := sqlext.ForeachRow(c.DB, getClusterServicesQuery, nil, func(rows *sql.Rows) error { var ( serviceID int64 serviceType string @@ -156,9 +163,8 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc } //collect ownership information for existing cluster_resources - query = `SELECT cs.type, cr.name, cr.capacitor_id FROM cluster_resources cr JOIN cluster_services cs ON cr.service_id = cs.id` capacitorIDForResource := make(map[string]map[string]string) - err = sqlext.ForeachRow(c.DB, query, nil, func(rows *sql.Rows) error { + err = sqlext.ForeachRow(c.DB, getClusterResourceOwnershipQuery, nil, func(rows *sql.Rows) error { var ( serviceType string resourceName string @@ -292,10 +298,9 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc } func convertAZReport(capacityPerAZ map[string]*core.CapacityDataForAZ) limesresources.ClusterAvailabilityZoneReports { - //The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into - //the CapacityPerAZJSON database field, even though - //map[string]*core.CapacityDataForAZ would have been more appropriate. Now we - //stick with it for compatibility's sake. + //The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into the CapacityPerAZJSON database field, + //even though map[string]*core.CapacityDataForAZ would have been more appropriate. + //Now we stick with it for compatibility's sake. report := make(limesresources.ClusterAvailabilityZoneReports, len(capacityPerAZ)) for azName, azData := range capacityPerAZ { report[azName] = &limesresources.ClusterAvailabilityZoneReport{