diff --git a/internal/collector/capacity_scrape.go b/internal/collector/capacity_scrape.go index caf9454ab..5e8f1f23b 100644 --- a/internal/collector/capacity_scrape.go +++ b/internal/collector/capacity_scrape.go @@ -102,7 +102,7 @@ var ( ) func (c *Collector) discoverCapacityScrapeTask(_ context.Context, _ prometheus.Labels, lastConsistencyCheckAt *time.Time) (task capacityScrapeTask, err error) { - task.Timing.StartedAt = c.TimeNow() + task.Timing.StartedAt = c.MeasureTime() //consistency check: every once in a while (and also immediately on startup), //check that all required `cluster_capacitors` entries exist @@ -185,7 +185,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc //scrape capacity data capacityData, serializedMetrics, err := plugin.Scrape() - task.Timing.FinishedAt = c.TimeNow() + task.Timing.FinishedAt = c.MeasureTimeAtEnd() if err == nil { capacitor.ScrapedAt = &task.Timing.FinishedAt capacitor.ScrapeDurationSecs = task.Timing.Duration().Seconds() diff --git a/internal/collector/capacity_scrape_test.go b/internal/collector/capacity_scrape_test.go index 1a57af4a5..91728e528 100644 --- a/internal/collector/capacity_scrape_test.go +++ b/internal/collector/capacity_scrape_test.go @@ -21,6 +21,7 @@ package collector import ( "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -74,15 +75,8 @@ func Test_ScanCapacity(t *testing.T) { s := test.NewSetup(t, test.WithConfig(testScanCapacityConfigYAML), ) - test.ResetTime() - - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + + c := getCollector(t, s) job := c.CapacityScrapeJob(s.Registry) //cluster_services must be created as a baseline (this is usually done by the CheckConsistencyJob) @@ -104,8 +98,8 @@ func Test_ScanCapacity(t *testing.T) { setClusterCapacitorsStale(t, s) mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) tr.DBChanges().AssertEqualf(` - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest', 1, 1, 901); - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest2', 3, 1, 903); + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest', 5, 5, 905); + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest2', 10, 5, 910); INSERT INTO cluster_resources (service_id, name, capacity, capacitor_id) VALUES (1, 'things', 42, 'unittest'); INSERT INTO cluster_resources (service_id, name, capacity, capacitor_id) VALUES (2, 'capacity', 42, 'unittest2'); `) @@ -133,17 +127,17 @@ func Test_ScanCapacity(t *testing.T) { s.Cluster.CapacityPlugins["unittest"].(*plugins.StaticCapacityPlugin).Capacity = 23 setClusterCapacitorsStale(t, s) mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) + + scrapedAt1 := s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 := s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 5, next_scrape_at = 905 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 7, next_scrape_at = 907 WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2'; UPDATE cluster_resources SET capacity = 23 WHERE service_id = 1 AND name = 'things'; - `) - - //move the clock forward by 300 seconds (the capacitor add step only triggers every five minutes) - //TODO: I hate this clock - for step := 1; step <= 300; step++ { - _ = test.TimeNow() - } + `, + scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(), + ) //add a capacity plugin that reports subcapacities; check that subcapacities //are correctly written when creating a cluster_resources record @@ -157,30 +151,41 @@ func Test_ScanCapacity(t *testing.T) { ` subcapacityPlugin := s.AddCapacityPlugin(t, pluginConfig).(*plugins.StaticCapacityPlugin) //nolint:errcheck setClusterCapacitorsStale(t, s) + s.Clock.StepBy(5 * time.Minute) //to force a capacitor consistency check to run mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) + + scrapedAt1 = s.Clock.Now().Add(-10 * time.Second) + scrapedAt2 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt4 := s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 309, next_scrape_at = 1209 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 311, next_scrape_at = 1211 WHERE capacitor_id = 'unittest2'; - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', 313, 1, '{"smaller_half":14,"larger_half":28}', 1213); + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2'; + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', %d, 5, '{"smaller_half":14,"larger_half":28}', %d); INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"smaller_half":14},{"larger_half":28}]', 'unittest4'); - `) + `, + scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(), + scrapedAt4.Unix(), scrapedAt4.Add(15*time.Minute).Unix(), + ) //check that scraping correctly updates subcapacities on an existing record subcapacityPlugin.Capacity = 10 setClusterCapacitorsStale(t, s) mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) + + scrapedAt1 = s.Clock.Now().Add(-10 * time.Second) + scrapedAt2 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt4 = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 315, next_scrape_at = 1215 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 317, next_scrape_at = 1217 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 319, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = 1219 WHERE capacitor_id = 'unittest4'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = %d, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = %d WHERE capacitor_id = 'unittest4'; UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"smaller_half":3},{"larger_half":7}]' WHERE service_id = 2 AND name = 'things'; - `) - - //move the clock forward by 300 seconds (the capacitor add step only triggers every five minutes) - //TODO: I hate this clock - for step := 1; step <= 300; step++ { - _ = test.TimeNow() - } + `, + scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(), + scrapedAt4.Unix(), scrapedAt4.Add(15*time.Minute).Unix(), + ) //add a capacity plugin that also reports capacity per availability zone; check that //these capacities are correctly written when creating a cluster_resources record @@ -194,26 +199,47 @@ func Test_ScanCapacity(t *testing.T) { ` azCapacityPlugin := s.AddCapacityPlugin(t, pluginConfig).(*plugins.StaticCapacityPlugin) //nolint:errcheck setClusterCapacitorsStale(t, s) + s.Clock.StepBy(5 * time.Minute) //to force a capacitor consistency check to run mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) + + scrapedAt1 = s.Clock.Now().Add(-15 * time.Second) + scrapedAt2 = s.Clock.Now().Add(-10 * time.Second) + scrapedAt4 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt5 := s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 621, next_scrape_at = 1521 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 623, next_scrape_at = 1523 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 625, next_scrape_at = 1525 WHERE capacitor_id = 'unittest4'; - INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest5', 627, 1, 1527); + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest4'; + INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, next_scrape_at) VALUES ('unittest5', %d, 5, %d); INSERT INTO cluster_resources (service_id, name, capacity, capacity_per_az, capacitor_id) VALUES (3, 'things', 42, '[{"name":"az-one","capacity":21,"usage":4},{"name":"az-two","capacity":21,"usage":4}]', 'unittest5'); - `) + `, + scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(), + scrapedAt4.Unix(), scrapedAt4.Add(15*time.Minute).Unix(), + scrapedAt5.Unix(), scrapedAt5.Add(15*time.Minute).Unix(), + ) //check that scraping correctly updates the capacities on an existing record azCapacityPlugin.Capacity = 30 setClusterCapacitorsStale(t, s) mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins))) + + scrapedAt1 = s.Clock.Now().Add(-15 * time.Second) + scrapedAt2 = s.Clock.Now().Add(-10 * time.Second) + scrapedAt4 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt5 = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 629, next_scrape_at = 1529 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 631, next_scrape_at = 1531 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 633, next_scrape_at = 1533 WHERE capacitor_id = 'unittest4'; - UPDATE cluster_capacitors SET scraped_at = 635, next_scrape_at = 1535 WHERE capacitor_id = 'unittest5'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest4'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest5'; UPDATE cluster_resources SET capacity = 30, capacity_per_az = '[{"name":"az-one","capacity":15,"usage":3},{"name":"az-two","capacity":15,"usage":3}]' WHERE service_id = 3 AND name = 'things'; - `) + `, + scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(), + scrapedAt4.Unix(), scrapedAt4.Add(15*time.Minute).Unix(), + scrapedAt5.Unix(), scrapedAt5.Add(15*time.Minute).Unix(), + ) //check data metrics generated for these capacity data registry := prometheus.NewPedanticRegistry() @@ -232,19 +258,26 @@ func Test_ScanCapacity(t *testing.T) { delete(s.Cluster.CapacityPlugins, "unittest5") setClusterCapacitorsStale(t, s) mustT(t, jobloop.ProcessMany(job, s.Ctx, len(s.Cluster.CapacityPlugins)+1)) //+1 to account for the deleted capacitor + + scrapedAt1 = s.Clock.Now().Add(-10 * time.Second) + scrapedAt2 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt4 = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE cluster_capacitors SET scraped_at = 637, next_scrape_at = 1537 WHERE capacitor_id = 'unittest'; - UPDATE cluster_capacitors SET scraped_at = 639, next_scrape_at = 1539 WHERE capacitor_id = 'unittest2'; - UPDATE cluster_capacitors SET scraped_at = 641, next_scrape_at = 1541 WHERE capacitor_id = 'unittest4'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2'; + UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest4'; DELETE FROM cluster_capacitors WHERE capacitor_id = 'unittest5'; DELETE FROM cluster_resources WHERE service_id = 3 AND name = 'things'; - `) + `, + scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(), + scrapedAt4.Unix(), scrapedAt4.Add(15*time.Minute).Unix(), + ) } func setClusterCapacitorsStale(t *testing.T, s test.Setup) { //NOTE: This is built to not use `test.TimeNow()`, because using this function shifts the time around. - //TODO: I hate this clock t.Helper() - _, err := s.DB.Exec(`UPDATE cluster_capacitors SET next_scrape_at = (SELECT MAX(scraped_at) FROM cluster_capacitors)`) + _, err := s.DB.Exec(`UPDATE cluster_capacitors SET next_scrape_at = $1`, s.Clock.Now()) mustT(t, err) } diff --git a/internal/collector/collector.go b/internal/collector/collector.go index 377e4c46f..61fef955e 100644 --- a/internal/collector/collector.go +++ b/internal/collector/collector.go @@ -40,7 +40,10 @@ type Collector struct { //Usually logg.Error, but can be changed inside unit tests. LogError func(msg string, args ...any) //Usually time.Now, but can be changed inside unit tests. - TimeNow func() time.Time + //MeasureTimeAtEnd behaves slightly differently in unit tests: It will advance + //the mock.Clock before reading it to simulate time passing during the previous task. + MeasureTime func() time.Time + MeasureTimeAtEnd func() time.Time //Usually addJitter, but can be changed inside unit tests. AddJitter func(time.Duration) time.Duration } @@ -48,11 +51,12 @@ type Collector struct { // NewCollector creates a Collector instance. func NewCollector(cluster *core.Cluster, dbm *gorp.DbMap) *Collector { return &Collector{ - Cluster: cluster, - DB: dbm, - LogError: logg.Error, - TimeNow: time.Now, - AddJitter: addJitter, + Cluster: cluster, + DB: dbm, + LogError: logg.Error, + MeasureTime: time.Now, + MeasureTimeAtEnd: time.Now, + AddJitter: addJitter, } } diff --git a/internal/collector/consistency.go b/internal/collector/consistency.go index 889329b9b..b1161c047 100644 --- a/internal/collector/consistency.go +++ b/internal/collector/consistency.go @@ -126,7 +126,7 @@ func (c *Collector) checkConsistencyDomain(domain db.Domain) error { } logg.Info("checking consistency for %d projects in domain %q...", len(projects), domain.Name) - now := c.TimeNow() + now := c.MeasureTime() for _, project := range projects { //ValidateProjectServices usually does nothing or does maybe one DELETE or //INSERT, so it does not need to be in a transaction diff --git a/internal/collector/consistency_test.go b/internal/collector/consistency_test.go index 31808e8ef..8a11098e5 100644 --- a/internal/collector/consistency_test.go +++ b/internal/collector/consistency_test.go @@ -27,20 +27,11 @@ import ( "github.com/sapcc/limes/internal/core" "github.com/sapcc/limes/internal/db" - "github.com/sapcc/limes/internal/test" ) func Test_Consistency(t *testing.T) { - test.ResetTime() s, cluster := keystoneTestCluster(t) - c := Collector{ - Cluster: cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } - + c := getCollector(t, s) consistencyJob := c.CheckConsistencyJob(s.Registry) //run ScanDomains once to establish a baseline @@ -53,6 +44,7 @@ func Test_Consistency(t *testing.T) { //check that CheckConsistency() is satisfied with the //{domain,project}_services created by ScanDomains(), but adds //cluster_services entries + s.Clock.StepBy(time.Hour) err = consistencyJob.ProcessOne(s.Ctx) if err != nil { t.Error(err) @@ -140,6 +132,7 @@ func Test_Consistency(t *testing.T) { //are added; for all project services that are created here, project //resources are added where the quota constraint contains a Minimum value or //the quota distribution configuration contains a DefaultQuota value.. + s.Clock.StepBy(time.Hour) err = consistencyJob.ProcessOne(s.Ctx) if err != nil { t.Error(err) diff --git a/internal/collector/fixtures/checkconsistency-pre.sql b/internal/collector/fixtures/checkconsistency-pre.sql index 69c00b6e0..278090f45 100644 --- a/internal/collector/fixtures/checkconsistency-pre.sql +++ b/internal/collector/fixtures/checkconsistency-pre.sql @@ -30,12 +30,12 @@ INSERT INTO domains (id, name, uuid) VALUES (2, 'france', 'uuid-for-france'); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'centralized', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (2, 1, 'shared', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (3, 1, 'unshared', 0, 0); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (5, 2, 'shared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (8, 3, 'shared', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 2, 2); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (5, 2, 'shared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (8, 3, 'shared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 0, 0); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (2, 1, 'dresden', 'uuid-for-dresden', 'uuid-for-berlin', FALSE); diff --git a/internal/collector/fixtures/checkconsistency0.sql b/internal/collector/fixtures/checkconsistency0.sql index 5a03548a2..f8cd0e3b8 100644 --- a/internal/collector/fixtures/checkconsistency0.sql +++ b/internal/collector/fixtures/checkconsistency0.sql @@ -34,12 +34,12 @@ INSERT INTO domains (id, name, uuid) VALUES (2, 'france', 'uuid-for-france'); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'centralized', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (2, 1, 'shared', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (3, 1, 'unshared', 0, 0); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (5, 2, 'shared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (8, 3, 'shared', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 2, 2); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (5, 2, 'shared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (8, 3, 'shared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 0, 0); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (2, 1, 'dresden', 'uuid-for-dresden', 'uuid-for-berlin', FALSE); diff --git a/internal/collector/fixtures/checkconsistency1.sql b/internal/collector/fixtures/checkconsistency1.sql index 368f9c1cb..d31582e4c 100644 --- a/internal/collector/fixtures/checkconsistency1.sql +++ b/internal/collector/fixtures/checkconsistency1.sql @@ -27,10 +27,10 @@ INSERT INTO domains (id, name, uuid) VALUES (2, 'france', 'uuid-for-france'); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'centralized', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (10, 1, 'whatever', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (3, 1, 'unshared', 0, 0); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 2, 2); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 0, 0); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (2, 1, 'dresden', 'uuid-for-dresden', 'uuid-for-berlin', FALSE); diff --git a/internal/collector/fixtures/checkconsistency2.sql b/internal/collector/fixtures/checkconsistency2.sql index 9e91d8ad3..0d5c2d056 100644 --- a/internal/collector/fixtures/checkconsistency2.sql +++ b/internal/collector/fixtures/checkconsistency2.sql @@ -32,14 +32,14 @@ INSERT INTO domains (id, name, uuid) VALUES (1, 'germany', 'uuid-for-germany'); INSERT INTO domains (id, name, uuid) VALUES (2, 'france', 'uuid-for-france'); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'centralized', 0, 0); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (11, 1, 'shared', 5, 5); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (12, 2, 'shared', 5, 5); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (13, 3, 'shared', 6, 6); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (11, 1, 'shared', 7200, 7200); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (12, 2, 'shared', 7200, 7200); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (13, 3, 'shared', 7200, 7200); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (3, 1, 'unshared', 0, 0); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 2, 2); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 0, 0); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (2, 1, 'dresden', 'uuid-for-dresden', 'uuid-for-berlin', FALSE); diff --git a/internal/collector/fixtures/scandomains1.sql b/internal/collector/fixtures/scandomains1.sql index dfd39b4ca..f13e56c5f 100644 --- a/internal/collector/fixtures/scandomains1.sql +++ b/internal/collector/fixtures/scandomains1.sql @@ -30,12 +30,12 @@ INSERT INTO domains (id, name, uuid) VALUES (2, 'france', 'uuid-for-france'); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'centralized', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (2, 1, 'shared', 0, 0); INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (3, 1, 'unshared', 0, 0); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (5, 2, 'shared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 1, 1); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (8, 3, 'shared', 2, 2); -INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 2, 2); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (4, 2, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (5, 2, 'shared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (6, 2, 'unshared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (7, 3, 'centralized', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (8, 3, 'shared', 0, 0); +INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (9, 3, 'unshared', 0, 0); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (2, 1, 'dresden', 'uuid-for-dresden', 'uuid-for-berlin', FALSE); diff --git a/internal/collector/fixtures/scrape_metrics.prom b/internal/collector/fixtures/scrape_metrics.prom index 7120d990a..f8421cd69 100644 --- a/internal/collector/fixtures/scrape_metrics.prom +++ b/internal/collector/fixtures/scrape_metrics.prom @@ -10,10 +10,10 @@ limes_domain_quota{domain="germany",domain_id="uuid-for-germany",resource="capac limes_domain_quota{domain="germany",domain_id="uuid-for-germany",resource="things",service="unittest"} 0 # HELP limes_newest_scraped_at Newest (i.e. largest) scraped_at timestamp for any project given a certain service in a certain OpenStack cluster. # TYPE limes_newest_scraped_at gauge -limes_newest_scraped_at{service="unittest",service_name=""} 24 +limes_newest_scraped_at{service="unittest",service_name=""} 10860 # HELP limes_oldest_scraped_at Oldest (i.e. smallest) scraped_at timestamp for any project given a certain service in a certain OpenStack cluster. # TYPE limes_oldest_scraped_at gauge -limes_oldest_scraped_at{service="unittest",service_name=""} 22 +limes_oldest_scraped_at{service="unittest",service_name=""} 10855 # HELP limes_plugin_metrics_ok Whether quota plugin metrics were rendered successfully for a particular project service. Only present when the project service emits metrics. # TYPE limes_plugin_metrics_ok gauge limes_plugin_metrics_ok{domain="germany",domain_id="uuid-for-germany",project="berlin",project_id="uuid-for-berlin",service="unittest"} 1 diff --git a/internal/collector/fixtures/scrape_metrics_skipzero.prom b/internal/collector/fixtures/scrape_metrics_skipzero.prom index 7120d990a..f8421cd69 100644 --- a/internal/collector/fixtures/scrape_metrics_skipzero.prom +++ b/internal/collector/fixtures/scrape_metrics_skipzero.prom @@ -10,10 +10,10 @@ limes_domain_quota{domain="germany",domain_id="uuid-for-germany",resource="capac limes_domain_quota{domain="germany",domain_id="uuid-for-germany",resource="things",service="unittest"} 0 # HELP limes_newest_scraped_at Newest (i.e. largest) scraped_at timestamp for any project given a certain service in a certain OpenStack cluster. # TYPE limes_newest_scraped_at gauge -limes_newest_scraped_at{service="unittest",service_name=""} 24 +limes_newest_scraped_at{service="unittest",service_name=""} 10860 # HELP limes_oldest_scraped_at Oldest (i.e. smallest) scraped_at timestamp for any project given a certain service in a certain OpenStack cluster. # TYPE limes_oldest_scraped_at gauge -limes_oldest_scraped_at{service="unittest",service_name=""} 22 +limes_oldest_scraped_at{service="unittest",service_name=""} 10855 # HELP limes_plugin_metrics_ok Whether quota plugin metrics were rendered successfully for a particular project service. Only present when the project service emits metrics. # TYPE limes_plugin_metrics_ok gauge limes_plugin_metrics_ok{domain="germany",domain_id="uuid-for-germany",project="berlin",project_id="uuid-for-berlin",service="unittest"} 1 diff --git a/internal/collector/keystone.go b/internal/collector/keystone.go index 8549e4446..c3a7e7d5b 100644 --- a/internal/collector/keystone.go +++ b/internal/collector/keystone.go @@ -276,7 +276,7 @@ func (c *Collector) initProject(domain *db.Domain, project core.KeystoneProject) } //add records to `project_services` table - err = datamodel.ValidateProjectServices(tx, c.Cluster, *domain, *dbProject, c.TimeNow()) + err = datamodel.ValidateProjectServices(tx, c.Cluster, *domain, *dbProject, c.MeasureTime()) if err != nil { return err } diff --git a/internal/collector/keystone_test.go b/internal/collector/keystone_test.go index e5052f02d..cbf32783c 100644 --- a/internal/collector/keystone_test.go +++ b/internal/collector/keystone_test.go @@ -22,6 +22,7 @@ package collector import ( "sort" "testing" + "time" "github.com/sapcc/go-bits/assert" "github.com/sapcc/go-bits/easypg" @@ -49,7 +50,6 @@ const ( ) func keystoneTestCluster(t *testing.T) (test.Setup, *core.Cluster) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testKeystoneConfigYAML), ) @@ -58,6 +58,7 @@ func keystoneTestCluster(t *testing.T) (test.Setup, *core.Cluster) { func Test_ScanDomains(t *testing.T) { s, cluster := keystoneTestCluster(t) + c := getCollector(t, s) discovery := cluster.DiscoveryPlugin.(*plugins.StaticDiscoveryPlugin) //nolint:errcheck //construct expectation for return value @@ -80,14 +81,6 @@ func Test_ScanDomains(t *testing.T) { Projects: nil, //not relevant since ScanDomains will never create project_resources } - c := Collector{ - Cluster: cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } - //first ScanDomains should discover the StaticDomains in the cluster, //and initialize domains, projects and project_services (project_resources //are then constructed by the scraper, and domain_services/domain_resources @@ -104,7 +97,8 @@ func Test_ScanDomains(t *testing.T) { tr, tr0 := easypg.NewTracker(t, s.DB.Db) tr0.AssertEqualToFile("fixtures/scandomains1.sql") - //first ScanDomains should not discover anything new + //second ScanDomains should not discover anything new + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{}) if err != nil { t.Errorf("ScanDomains #2 failed: %v", err) @@ -119,6 +113,7 @@ func Test_ScanDomains(t *testing.T) { ) //ScanDomains without ScanAllProjects should not see this new project + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{}) if err != nil { t.Errorf("ScanDomains #3 failed: %v", err) @@ -127,22 +122,26 @@ func Test_ScanDomains(t *testing.T) { tr.DBChanges().AssertEmpty() //ScanDomains with ScanAllProjects should discover the new project + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{ScanAllProjects: true}) if err != nil { t.Errorf("ScanDomains #4 failed: %v", err) } assert.DeepEqual(t, "new domains after ScanDomains #4", actualNewDomains, []string(nil)) tr.DBChanges().AssertEqualf(` - INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (10, 4, 'centralized', 3, 3); - INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (11, 4, 'shared', 3, 3); - INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (12, 4, 'unshared', 3, 3); + INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (10, 4, 'centralized', %[1]d, %[1]d); + INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (11, 4, 'shared', %[1]d, %[1]d); + INSERT INTO project_services (id, project_id, type, next_scrape_at, rates_next_scrape_at) VALUES (12, 4, 'unshared', %[1]d, %[1]d); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (4, 2, 'bordeaux', 'uuid-for-bordeaux', 'uuid-for-france', FALSE); - `) + `, + s.Clock.Now().Unix(), + ) //remove the project again discovery.Projects[domainUUID] = discovery.Projects[domainUUID][0:1] //ScanDomains without ScanAllProjects should not notice anything + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{}) if err != nil { t.Errorf("ScanDomains #5 failed: %v", err) @@ -151,6 +150,7 @@ func Test_ScanDomains(t *testing.T) { tr.DBChanges().AssertEmpty() //ScanDomains with ScanAllProjects should notice the deleted project and cleanup its records + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{ScanAllProjects: true}) if err != nil { t.Errorf("ScanDomains #6 failed: %v", err) @@ -167,6 +167,7 @@ func Test_ScanDomains(t *testing.T) { discovery.Domains = discovery.Domains[0:1] //ScanDomains should notice the deleted domain and cleanup its records and also its projects + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{}) if err != nil { t.Errorf("ScanDomains #7 failed: %v", err) @@ -197,6 +198,7 @@ func Test_ScanDomains(t *testing.T) { discovery.Projects["uuid-for-germany"][0].Name = "berlin-changed" //ScanDomains should notice the changed names and update the domain/project records accordingly + s.Clock.StepBy(10 * time.Minute) actualNewDomains, err = c.ScanDomains(ScanDomainsOpts{ScanAllProjects: true}) if err != nil { t.Errorf("ScanDomains #8 failed: %v", err) diff --git a/internal/collector/ratescrape.go b/internal/collector/ratescrape.go index b78690700..3ec4e7bf6 100644 --- a/internal/collector/ratescrape.go +++ b/internal/collector/ratescrape.go @@ -109,7 +109,7 @@ func (c *Collector) processRateScrapeTask(_ context.Context, task projectScrapeT if err != nil { task.Err = util.UnpackError(err) } - task.Timing.FinishedAt = c.TimeNow() + task.Timing.FinishedAt = c.MeasureTimeAtEnd() //write result on success; if anything fails, try to record the error in the DB if task.Err == nil { diff --git a/internal/collector/ratescrape_test.go b/internal/collector/ratescrape_test.go index ac99f6664..2ca13a4e5 100644 --- a/internal/collector/ratescrape_test.go +++ b/internal/collector/ratescrape_test.go @@ -23,8 +23,8 @@ import ( "database/sql" "regexp" "testing" + "time" - "github.com/go-gorp/gorp/v3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" limesrates "github.com/sapcc/go-api-declarations/limes/rates" @@ -63,19 +63,12 @@ const ( ) func Test_RateScrapeSuccess(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testRateScrapeBasicConfigYAML), ) prepareDomainsAndProjectsForScrape(t, s) - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.RateScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "unittest") @@ -118,16 +111,23 @@ func Test_RateScrapeSuccess(t *testing.T) { `) //first Scrape should create the entries + s.Clock.StepBy(scrapeInterval) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) //twice because there are two projects + + scrapedAt1 := s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 := s.Clock.Now() tr.DBChanges().AssertEqualf(` INSERT INTO project_rates (service_id, name, usage_as_bigint) VALUES (1, 'firstrate', '9'); UPDATE project_rates SET usage_as_bigint = '10' WHERE service_id = 1 AND name = 'secondrate'; INSERT INTO project_rates (service_id, name, usage_as_bigint) VALUES (2, 'firstrate', '9'); INSERT INTO project_rates (service_id, name, usage_as_bigint) VALUES (2, 'secondrate', '10'); - UPDATE project_services SET rates_scraped_at = 1, rates_scrape_duration_secs = 1, rates_scrape_state = '{"firstrate":0,"secondrate":0}', rates_checked_at = 1, rates_next_scrape_at = 1801 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET rates_scraped_at = 3, rates_scrape_duration_secs = 1, rates_scrape_state = '{"firstrate":0,"secondrate":0}', rates_checked_at = 3, rates_next_scrape_at = 1803 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET rates_scraped_at = %[1]d, rates_scrape_duration_secs = 5, rates_scrape_state = '{"firstrate":0,"secondrate":0}', rates_checked_at = %[1]d, rates_next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET rates_scraped_at = %[3]d, rates_scrape_duration_secs = 5, rates_scrape_state = '{"firstrate":0,"secondrate":0}', rates_checked_at = %[3]d, rates_next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //second Scrape should not change anything (not even the timestamps) since //less than 30 minutes have passed since the last Scrape() @@ -146,17 +146,23 @@ func Test_RateScrapeSuccess(t *testing.T) { `) //but the changed state will be taken into account when the next scrape is in order - setProjectServicesRatesStale(t, s.DB) + s.Clock.StepBy(scrapeInterval) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt1 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_rates SET usage_as_bigint = '5129' WHERE service_id = 1 AND name = 'firstrate'; UPDATE project_rates SET usage_as_bigint = '1034' WHERE service_id = 1 AND name = 'secondrate'; UPDATE project_rates SET usage_as_bigint = '1033' WHERE service_id = 2 AND name = 'firstrate'; UPDATE project_rates SET usage_as_bigint = '1034' WHERE service_id = 2 AND name = 'secondrate'; - UPDATE project_services SET rates_scraped_at = 7, rates_scrape_state = '{"firstrate":5120,"secondrate":1024}', rates_checked_at = 7, rates_next_scrape_at = 1807 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET rates_scraped_at = 9, rates_scrape_state = '{"firstrate":1024,"secondrate":1024}', rates_checked_at = 9, rates_next_scrape_at = 1809 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET rates_scraped_at = %[1]d, rates_scrape_state = '{"firstrate":5120,"secondrate":1024}', rates_checked_at = %[1]d, rates_next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET rates_scraped_at = %[3]d, rates_scrape_state = '{"firstrate":1024,"secondrate":1024}', rates_checked_at = %[3]d, rates_next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //check data metrics generated by this scraping pass registry := prometheus.NewPedanticRegistry() @@ -186,19 +192,12 @@ func Test_RateScrapeSuccess(t *testing.T) { } func Test_RateScrapeFailure(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testRateScrapeBasicConfigYAML), ) prepareDomainsAndProjectsForScrape(t, s) - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.RateScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "unittest") @@ -213,18 +212,13 @@ func Test_RateScrapeFailure(t *testing.T) { plugin := s.Cluster.QuotaPlugins["unittest"].(*plugins.GenericQuotaPlugin) //nolint:errcheck plugin.ScrapeFails = true mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) - tr.DBChanges().AssertEqualf(` - UPDATE project_services SET rates_checked_at = 1, rates_scrape_error_message = 'ScrapeRates failed as requested', rates_next_scrape_at = 301 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - `) -} -func setProjectServicesRatesStale(t *testing.T, dbm *gorp.DbMap) { - t.Helper() - //make sure that the project is scraped again - _, err := dbm.Exec(`UPDATE project_services SET rates_stale = $1`, true) - if err != nil { - t.Fatal(err) - } + checkedAt := s.Clock.Now() + tr.DBChanges().AssertEqualf(` + UPDATE project_services SET rates_checked_at = %[1]d, rates_scrape_error_message = 'ScrapeRates failed as requested', rates_next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + `, + checkedAt.Unix(), checkedAt.Add(recheckInterval).Unix(), + ) } func p2window(val limesrates.Window) *limesrates.Window { @@ -233,19 +227,12 @@ func p2window(val limesrates.Window) *limesrates.Window { //nolint:dupl func Test_ScrapeRatesButNoRates(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testNoopConfigYAML), ) prepareDomainsAndProjectsForScrape(t, s) - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.RateScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "noop") @@ -253,11 +240,15 @@ func Test_ScrapeRatesButNoRates(t *testing.T) { //with no Rates() (in the wild, this can happen because some quota plugins //only have Resources()) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt := s.Clock.Now() _, tr0 := easypg.NewTracker(t, s.DB.Db) tr0.AssertEqualf(` INSERT INTO domain_services (id, domain_id, type) VALUES (1, 1, 'noop'); INSERT INTO domains (id, name, uuid) VALUES (1, 'germany', 'uuid-for-germany'); - INSERT INTO project_services (id, project_id, type, rates_scraped_at, rates_scrape_duration_secs, rates_checked_at, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'noop', 1, 1, 1, 0, 1801); + INSERT INTO project_services (id, project_id, type, rates_scraped_at, rates_scrape_duration_secs, rates_checked_at, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'noop', %[1]d, 5, %[1]d, 0, %[2]d); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); - `) + `, + scrapedAt.Unix(), scrapedAt.Add(scrapeInterval).Unix(), + ) } diff --git a/internal/collector/scrape.go b/internal/collector/scrape.go index 91fab2326..e158ac592 100644 --- a/internal/collector/scrape.go +++ b/internal/collector/scrape.go @@ -123,7 +123,7 @@ func (c *Collector) discoverScrapeTask(labels prometheus.Labels, query string) ( } labels["service_name"] = c.Cluster.InfoForService(serviceType).ProductName - task.Timing.StartedAt = c.TimeNow() + task.Timing.StartedAt = c.MeasureTime() err = c.DB.SelectOne(&task.Service, query, serviceType, task.Timing.StartedAt) return task, err } @@ -160,7 +160,7 @@ func (c *Collector) processResourceScrapeTask(_ context.Context, task projectScr if err != nil { task.Err = util.UnpackError(err) } - task.Timing.FinishedAt = c.TimeNow() + task.Timing.FinishedAt = c.MeasureTimeAtEnd() //write result on success; if anything fails, try to record the error in the DB if task.Err == nil { diff --git a/internal/collector/scrape_test.go b/internal/collector/scrape_test.go index 38e7a4c0b..31bb2bce7 100644 --- a/internal/collector/scrape_test.go +++ b/internal/collector/scrape_test.go @@ -25,7 +25,6 @@ import ( "testing" "time" - "github.com/go-gorp/gorp/v3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sapcc/go-bits/assert" @@ -71,7 +70,7 @@ func prepareDomainsAndProjectsForScrape(t *testing.T, s test.Setup) { //ScanDomains is required to create the entries in `domains`, //`domain_services`, `projects` and `project_services` timeZero := func() time.Time { return time.Unix(0, 0).UTC() } - _, err := (&Collector{Cluster: s.Cluster, DB: s.DB, TimeNow: timeZero, AddJitter: test.NoJitter}).ScanDomains(ScanDomainsOpts{}) + _, err := (&Collector{Cluster: s.Cluster, DB: s.DB, MeasureTime: timeZero, AddJitter: test.NoJitter}).ScanDomains(ScanDomainsOpts{}) if err != nil { t.Fatal(err) } @@ -106,7 +105,6 @@ const ( ) func Test_ScrapeSuccess(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testScrapeBasicConfigYAML), ) @@ -134,13 +132,7 @@ func Test_ScrapeSuccess(t *testing.T) { } s.Cluster.Authoritative = true - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.ResourceScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "unittest") @@ -151,10 +143,14 @@ func Test_ScrapeSuccess(t *testing.T) { //first Scrape should create the entries in `project_resources` with the //correct usage and backend quota values (and quota = 0 because nothing was approved yet) //and set `project_services.scraped_at` to the current time + s.Clock.StepBy(scrapeInterval) plugin := s.Cluster.QuotaPlugins["unittest"].(*plugins.GenericQuotaPlugin) //nolint:errcheck plugin.SetQuotaFails = true mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) //twice because there are two projects + + scrapedAt1 := s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 := s.Clock.Now() tr.DBChanges().AssertEqualf(` INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota, physical_usage) VALUES (1, 'capacity', 10, 0, 100, 10, 0); INSERT INTO project_resources (service_id, name, usage) VALUES (1, 'capacity_portion', 0); @@ -162,9 +158,12 @@ func Test_ScrapeSuccess(t *testing.T) { INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota, physical_usage) VALUES (2, 'capacity', 10, 0, 100, 12, 0); INSERT INTO project_resources (service_id, name, usage) VALUES (2, 'capacity_portion', 0); INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, subresources, desired_backend_quota) VALUES (2, 'things', 0, 2, 42, '[{"index":0},{"index":1}]', 0); - UPDATE project_services SET scraped_at = 1, scrape_duration_secs = 1, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = 1, next_scrape_at = 1801 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 3, scrape_duration_secs = 1, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = 3, next_scrape_at = 1803 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //second Scrape should not change anything (not even the timestamps) since //less than 30 minutes have passed since the last Scrape("unittest") @@ -172,20 +171,26 @@ func Test_ScrapeSuccess(t *testing.T) { tr.DBChanges().AssertEmpty() //change the data that is reported by the plugin + s.Clock.StepBy(scrapeInterval) plugin.StaticResourceData["capacity"].Quota = 110 plugin.StaticResourceData["things"].Usage = 5 - setProjectServicesStale(t, s.DB) //Scrape should pick up the changed resource data mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt1 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_resources SET backend_quota = 110 WHERE service_id = 1 AND name = 'capacity'; UPDATE project_resources SET usage = 5, subresources = '[{"index":0},{"index":1},{"index":2},{"index":3},{"index":4}]' WHERE service_id = 1 AND name = 'things'; UPDATE project_resources SET backend_quota = 110 WHERE service_id = 2 AND name = 'capacity'; UPDATE project_resources SET usage = 5, subresources = '[{"index":0},{"index":1},{"index":2},{"index":3},{"index":4}]' WHERE service_id = 2 AND name = 'things'; - UPDATE project_services SET scraped_at = 6, serialized_metrics = '{"capacity_usage":0,"things_usage":5}', checked_at = 6, next_scrape_at = 1806 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 8, serialized_metrics = '{"capacity_usage":0,"things_usage":5}', checked_at = 8, next_scrape_at = 1808 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, serialized_metrics = '{"capacity_usage":0,"things_usage":5}', checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, serialized_metrics = '{"capacity_usage":0,"things_usage":5}', checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //set some new quota values (note that "capacity" already had a non-zero //quota because of the cluster.QuotaConstraints) @@ -200,29 +205,41 @@ func Test_ScrapeSuccess(t *testing.T) { //Scrape should try to enforce quota values in the backend (this did not work //until now because the test.Plugin was instructed to have SetQuota fail) + s.Clock.StepBy(scrapeInterval) plugin.SetQuotaFails = false - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt1 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_resources SET quota = 20, backend_quota = 20, desired_backend_quota = 20 WHERE service_id = 1 AND name = 'capacity'; UPDATE project_resources SET quota = 13, backend_quota = 13, desired_backend_quota = 13 WHERE service_id = 1 AND name = 'things'; UPDATE project_resources SET quota = 20, backend_quota = 24, desired_backend_quota = 24 WHERE service_id = 2 AND name = 'capacity'; UPDATE project_resources SET quota = 13, backend_quota = 15, desired_backend_quota = 15 WHERE service_id = 2 AND name = 'things'; - UPDATE project_services SET scraped_at = 10, checked_at = 10, next_scrape_at = 1810 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 12, checked_at = 12, next_scrape_at = 1812 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //another Scrape (with SetQuota disabled again) should show that the quota //update was durable + s.Clock.StepBy(scrapeInterval) plugin.SetQuotaFails = true - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt1 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE project_services SET scraped_at = 14, checked_at = 14, next_scrape_at = 1814 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 16, checked_at = 16, next_scrape_at = 1816 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //set a quota that contradicts the cluster.QuotaConstraints _, err = s.DB.Exec(`UPDATE project_resources SET quota = $1 WHERE name = $2`, 50, "capacity") @@ -231,32 +248,44 @@ func Test_ScrapeSuccess(t *testing.T) { } //Scrape should apply the constraint, then enforce quota values in the backend + s.Clock.StepBy(scrapeInterval) plugin.SetQuotaFails = false - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt1 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_resources SET quota = 40, backend_quota = 40, desired_backend_quota = 40 WHERE service_id = 1 AND name = 'capacity'; UPDATE project_resources SET quota = 40, backend_quota = 48, desired_backend_quota = 48 WHERE service_id = 2 AND name = 'capacity'; - UPDATE project_services SET scraped_at = 18, checked_at = 18, next_scrape_at = 1818 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 20, checked_at = 20, next_scrape_at = 1820 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //set "capacity" to a non-zero usage to observe a non-zero usage on //"capacity_portion" (otherwise this resource has been all zeroes this entire //time) + s.Clock.StepBy(scrapeInterval) plugin.StaticResourceData["capacity"].Usage = 20 - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt1 = s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_resources SET usage = 20, physical_usage = 10 WHERE service_id = 1 AND name = 'capacity'; UPDATE project_resources SET usage = 5 WHERE service_id = 1 AND name = 'capacity_portion'; UPDATE project_resources SET usage = 20, physical_usage = 10 WHERE service_id = 2 AND name = 'capacity'; UPDATE project_resources SET usage = 5 WHERE service_id = 2 AND name = 'capacity_portion'; - UPDATE project_services SET scraped_at = 22, serialized_metrics = '{"capacity_usage":20,"things_usage":5}', checked_at = 22, next_scrape_at = 1822 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 24, serialized_metrics = '{"capacity_usage":20,"things_usage":5}', checked_at = 24, next_scrape_at = 1824 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, serialized_metrics = '{"capacity_usage":20,"things_usage":5}', checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, serialized_metrics = '{"capacity_usage":20,"things_usage":5}', checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //check data metrics generated by this scraping pass registry := prometheus.NewPedanticRegistry() @@ -289,17 +318,7 @@ func Test_ScrapeSuccess(t *testing.T) { }.Check(t, promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) } -func setProjectServicesStale(t *testing.T, dbm *gorp.DbMap) { - t.Helper() - //make sure that the project is scraped again - _, err := dbm.Exec(`UPDATE project_services SET stale = $1`, true) - if err != nil { - t.Fatal(err) - } -} - func Test_ScrapeFailure(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testScrapeBasicConfigYAML), ) @@ -307,7 +326,7 @@ func Test_ScrapeFailure(t *testing.T) { //setup a quota constraint for the projects that we're scraping // - //TODO: duplicated with Test_ScrapeFailure + //TODO: duplicated with Test_ScrapeSuccess //NOTE: This is set only *after* ScanDomains has run, in order to exercise //the code path in Scrape() that applies constraints when first creating //project_resources entries. If we had set this before ScanDomains, then @@ -326,13 +345,7 @@ func Test_ScrapeFailure(t *testing.T) { }, } - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - TimeNow: test.TimeNow, - LogError: t.Errorf, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.ResourceScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "unittest") @@ -345,10 +358,14 @@ func Test_ScrapeFailure(t *testing.T) { //failing Scrape should create dummy records to ensure that the API finds //plausibly-structured data + s.Clock.StepBy(scrapeInterval) plugin := s.Cluster.QuotaPlugins["unittest"].(*plugins.GenericQuotaPlugin) //nolint:errcheck plugin.ScrapeFails = true mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) //twice because there are two projects + + checkedAt1 := s.Clock.Now().Add(-5 * time.Second) + checkedAt2 := s.Clock.Now() tr.DBChanges().AssertEqualf(` INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota) VALUES (1, 'capacity', 10, 0, -1, 10); INSERT INTO project_resources (service_id, name, usage) VALUES (1, 'capacity_portion', 0); @@ -356,44 +373,65 @@ func Test_ScrapeFailure(t *testing.T) { INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota) VALUES (2, 'capacity', 10, 0, -1, 12); INSERT INTO project_resources (service_id, name, usage) VALUES (2, 'capacity_portion', 0); INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota) VALUES (2, 'things', 0, 0, -1, 0); - UPDATE project_services SET scraped_at = 0, checked_at = 1, scrape_error_message = 'Scrape failed as requested', next_scrape_at = 301 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 0, checked_at = 3, scrape_error_message = 'Scrape failed as requested', next_scrape_at = 303 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = 0, checked_at = %[1]d, scrape_error_message = 'Scrape failed as requested', next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = 0, checked_at = %[3]d, scrape_error_message = 'Scrape failed as requested', next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + checkedAt1.Unix(), checkedAt1.Add(recheckInterval).Unix(), + checkedAt2.Unix(), checkedAt2.Add(recheckInterval).Unix(), + ) //next Scrape should yield the same result - setProjectServicesStale(t, s.DB) + s.Clock.StepBy(scrapeInterval) mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) + + checkedAt1 = s.Clock.Now().Add(-5 * time.Second) + checkedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE project_services SET checked_at = 5, next_scrape_at = 305 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET checked_at = 7, next_scrape_at = 307 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) //TODO advance clock + UPDATE project_services SET checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET checked_at = %[3]d, next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + checkedAt1.Unix(), checkedAt1.Add(recheckInterval).Unix(), + checkedAt2.Unix(), checkedAt2.Add(recheckInterval).Unix(), + ) //once the backend starts working, we start to see plausible data again + s.Clock.StepBy(scrapeInterval) plugin.ScrapeFails = false - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) mustT(t, job.ProcessOne(s.Ctx, withLabel)) //twice because there are two projects + + scrapedAt1 := s.Clock.Now().Add(-5 * time.Second) + scrapedAt2 := s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_resources SET backend_quota = 100, physical_usage = 0 WHERE service_id = 1 AND name = 'capacity'; UPDATE project_resources SET usage = 2, backend_quota = 42, subresources = '[{"index":0},{"index":1}]' WHERE service_id = 1 AND name = 'things'; UPDATE project_resources SET backend_quota = 100, physical_usage = 0 WHERE service_id = 2 AND name = 'capacity'; UPDATE project_resources SET usage = 2, backend_quota = 42, subresources = '[{"index":0},{"index":1}]' WHERE service_id = 2 AND name = 'things'; - UPDATE project_services SET scraped_at = 9, scrape_duration_secs = 1, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = 9, scrape_error_message = '', next_scrape_at = 1809 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET scraped_at = 11, scrape_duration_secs = 1, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = 11, scrape_error_message = '', next_scrape_at = 1811 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET scraped_at = %[1]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = %[1]d, scrape_error_message = '', next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET scraped_at = %[3]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = %[3]d, scrape_error_message = '', next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(), + scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(), + ) //backend fails again and we need to scrape because of the stale flag -> //touch neither scraped_at nor the existing resources (this also tests that a //failed check causes Scrape("unittest") to continue with the next resource afterwards) + s.Clock.StepBy(scrapeInterval) plugin.ScrapeFails = true - setProjectServicesStale(t, s.DB) mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) mustFailLikeT(t, job.ProcessOne(s.Ctx, withLabel), expectedErrorRx) //twice because there are two projects + + checkedAt1 = s.Clock.Now().Add(-5 * time.Second) + checkedAt2 = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE project_services SET checked_at = 13, scrape_error_message = 'Scrape failed as requested', next_scrape_at = 313 WHERE id = 1 AND project_id = 1 AND type = 'unittest'; - UPDATE project_services SET checked_at = 15, scrape_error_message = 'Scrape failed as requested', next_scrape_at = 315 WHERE id = 2 AND project_id = 2 AND type = 'unittest'; - `) + UPDATE project_services SET checked_at = %[1]d, scrape_error_message = 'Scrape failed as requested', next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'unittest'; + UPDATE project_services SET checked_at = %[3]d, scrape_error_message = 'Scrape failed as requested', next_scrape_at = %[4]d WHERE id = 2 AND project_id = 2 AND type = 'unittest'; + `, + checkedAt1.Unix(), checkedAt1.Add(recheckInterval).Unix(), + checkedAt2.Unix(), checkedAt2.Add(recheckInterval).Unix(), + ) } const ( @@ -424,7 +462,6 @@ func Test_ScrapeCentralized(t *testing.T) { for _, hasBursting := range []bool{false, true} { logg.Info("===== hasBursting = %t =====", hasBursting) - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testScrapeCentralizedConfigYAML), ) @@ -449,13 +486,7 @@ func Test_ScrapeCentralized(t *testing.T) { } s.Cluster.Authoritative = true - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.ResourceScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "centralized") @@ -476,29 +507,38 @@ func Test_ScrapeCentralized(t *testing.T) { //first Scrape creates the remaining project_resources, fills usage and //enforces quota constraints (note that both projects behave identically //since bursting is ineffective under centralized quota distribution) + s.Clock.StepBy(scrapeInterval) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt := s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE domain_resources SET quota = 10 WHERE service_id = 1 AND name = 'capacity'; UPDATE domain_resources SET quota = 20 WHERE service_id = 1 AND name = 'things'; INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota, physical_usage) VALUES (1, 'capacity', 10, 0, 10, 10, 0); INSERT INTO project_resources (service_id, name, usage) VALUES (1, 'capacity_portion', 0); INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, subresources, desired_backend_quota) VALUES (1, 'things', 20, 2, 20, '[{"index":0},{"index":1}]', 20); - UPDATE project_services SET scraped_at = 1, scrape_duration_secs = 1, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = 1, next_scrape_at = 1801 WHERE id = 1 AND project_id = 1 AND type = 'centralized'; - `) + UPDATE project_services SET scraped_at = %[1]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":2}', checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'centralized'; + `, + scrapedAt.Unix(), scrapedAt.Add(scrapeInterval).Unix(), + ) //check that DefaultProjectQuota gets reapplied when the quota is 0 (zero //quota on CQD resources is defined to mean "DefaultProjectQuota not //applied yet"; this check is also relevant for resources moving from HQD to CQD) + s.Clock.StepBy(scrapeInterval) _, err := s.DB.Exec(`UPDATE project_resources SET quota = 0 WHERE service_id = 1`) if err != nil { t.Fatal(err) } - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + //because Scrape converges back into the same state, the only change is in the timestamp fields + scrapedAt = s.Clock.Now() tr.DBChanges().AssertEqualf(` - UPDATE project_services SET scraped_at = 3, checked_at = 3, next_scrape_at = 1803 WHERE id = 1 AND project_id = 1 AND type = 'centralized'; - `) + UPDATE project_services SET scraped_at = %[1]d, checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'centralized'; + `, + scrapedAt.Unix(), scrapedAt.Add(scrapeInterval).Unix(), + ) } } @@ -524,19 +564,12 @@ const ( ) func Test_AutoApproveInitialQuota(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testAutoApprovalConfigYAML), ) prepareDomainsAndProjectsForScrape(t, s) - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.ResourceScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "autoapprovaltest") @@ -546,25 +579,34 @@ func Test_AutoApproveInitialQuota(t *testing.T) { //when first scraping, the initial backend quota of the "approve" resource //shall be approved automatically + s.Clock.StepBy(scrapeInterval) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt := s.Clock.Now() tr.DBChanges().AssertEqualf(` INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota) VALUES (1, 'approve', 10, 0, 10, 10); INSERT INTO project_resources (service_id, name, quota, usage, backend_quota, desired_backend_quota) VALUES (1, 'noapprove', 0, 0, 20, 0); - UPDATE project_services SET scraped_at = 1, scrape_duration_secs = 1, checked_at = 1, next_scrape_at = 1801 WHERE id = 1 AND project_id = 1 AND type = 'autoapprovaltest'; - `) + UPDATE project_services SET scraped_at = %[1]d, scrape_duration_secs = 5, checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'autoapprovaltest'; + `, + scrapedAt.Unix(), scrapedAt.Add(scrapeInterval).Unix(), + ) //modify the backend quota; verify that the second scrape does not //auto-approve the changed value again (auto-approval is limited to the //initial scrape) + s.Clock.StepBy(scrapeInterval) plugin := s.Cluster.QuotaPlugins["autoapprovaltest"].(*plugins.AutoApprovalQuotaPlugin) //nolint:errcheck plugin.StaticBackendQuota += 10 - setProjectServicesStale(t, s.DB) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt = s.Clock.Now() tr.DBChanges().AssertEqualf(` UPDATE project_resources SET backend_quota = 20 WHERE service_id = 1 AND name = 'approve'; UPDATE project_resources SET backend_quota = 30 WHERE service_id = 1 AND name = 'noapprove'; - UPDATE project_services SET scraped_at = 3, checked_at = 3, next_scrape_at = 1803 WHERE id = 1 AND project_id = 1 AND type = 'autoapprovaltest'; - `) + UPDATE project_services SET scraped_at = %[1]d, checked_at = %[1]d, next_scrape_at = %[2]d WHERE id = 1 AND project_id = 1 AND type = 'autoapprovaltest'; + `, + scrapedAt.Unix(), scrapedAt.Add(scrapeInterval).Unix(), + ) } const ( @@ -585,19 +627,12 @@ const ( //nolint:dupl func Test_ScrapeButNoResources(t *testing.T) { - test.ResetTime() s := test.NewSetup(t, test.WithConfig(testNoopConfigYAML), ) prepareDomainsAndProjectsForScrape(t, s) - c := Collector{ - Cluster: s.Cluster, - DB: s.DB, - LogError: t.Errorf, - TimeNow: test.TimeNow, - AddJitter: test.NoJitter, - } + c := getCollector(t, s) job := c.ResourceScrapeJob(s.Registry) withLabel := jobloop.WithLabel("service_type", "noop") @@ -605,11 +640,15 @@ func Test_ScrapeButNoResources(t *testing.T) { //no Resources() (in the wild, this can happen because some quota plugins //only have Rates()) mustT(t, job.ProcessOne(s.Ctx, withLabel)) + + scrapedAt := s.Clock.Now() _, tr0 := easypg.NewTracker(t, s.DB.Db) tr0.AssertEqualf(` INSERT INTO domain_services (id, domain_id, type) VALUES (1, 1, 'noop'); INSERT INTO domains (id, name, uuid) VALUES (1, 'germany', 'uuid-for-germany'); - INSERT INTO project_services (id, project_id, type, scraped_at, scrape_duration_secs, checked_at, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'noop', 1, 1, 1, 1801, 0); + INSERT INTO project_services (id, project_id, type, scraped_at, scrape_duration_secs, checked_at, next_scrape_at, rates_next_scrape_at) VALUES (1, 1, 'noop', %[1]d, 5, %[1]d, %[2]d, 0); INSERT INTO projects (id, domain_id, name, uuid, parent_uuid, has_bursting) VALUES (1, 1, 'berlin', 'uuid-for-berlin', 'uuid-for-germany', FALSE); - `) + `, + scrapedAt.Unix(), scrapedAt.Add(scrapeInterval).Unix(), + ) } diff --git a/internal/collector/shared_test.go b/internal/collector/shared_test.go new file mode 100644 index 000000000..a51bd40a9 --- /dev/null +++ b/internal/collector/shared_test.go @@ -0,0 +1,41 @@ +/******************************************************************************* +* +* Copyright 2023 SAP SE +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You should have received a copy of the License along with this +* program. If not, you may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*******************************************************************************/ + +package collector + +import ( + "testing" + "time" + + "github.com/sapcc/limes/internal/test" +) + +func getCollector(t *testing.T, s test.Setup) Collector { + return Collector{ + Cluster: s.Cluster, + DB: s.DB, + LogError: t.Errorf, + MeasureTime: s.Clock.Now, + MeasureTimeAtEnd: func() time.Time { + s.Clock.StepBy(5 * time.Second) + return s.Clock.Now() + }, + AddJitter: test.NoJitter, + } +} diff --git a/internal/test/clock.go b/internal/test/clock.go index 6838208c1..384ccc791 100644 --- a/internal/test/clock.go +++ b/internal/test/clock.go @@ -21,22 +21,6 @@ package test import "time" -var clockSeconds int64 = -1 - -// TimeNow replaces time.Now in unit tests. It provides a simulated clock that -// behaves the same in every test run: It returns the UNIX epoch the first time, -// and then advances by one second on every call. -func TimeNow() time.Time { - clockSeconds++ - return time.Unix(clockSeconds, 0).UTC() -} - -// ResetTime should be called at the start of unit tests that use TimeNow, to -// ensure a reproducible flow of time. -func ResetTime() { - clockSeconds = -1 -} - // NoJitter replaces test.AddJitter in unit tests, to provide deterministic // behavior. func NoJitter(d time.Duration) time.Duration {