Skip to content

Commit

Permalink
Add backend sync logic
Browse files Browse the repository at this point in the history
separated AZ quota will now sync the resource quota and AZ quota to the backend
  • Loading branch information
VoigtS committed Nov 29, 2024
1 parent 4e1f282 commit 08938a6
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 39 deletions.
8 changes: 5 additions & 3 deletions internal/collector/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,12 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
azRes.Usage = data.Usage
azRes.PhysicalUsage = data.PhysicalUsage

// set AZ backend quota
// set AZ backend quota. Do not set backend quota to the automatically created any AZ.
resInfo := c.Cluster.InfoForResource(srv.Type, res.Name)
if resInfo.HasQuota && resInfo.Topology == liquid.AZSeparatedResourceTopology {
azRes.BackendQuota = &data.Quota
if resInfo.Topology == liquid.AZSeparatedResourceTopology && resInfo.HasQuota {
if azRes.AvailabilityZone != liquid.AvailabilityZoneAny {
azRes.BackendQuota = &data.Quota
}
}

// warn when the backend is inconsistent with itself
Expand Down
91 changes: 90 additions & 1 deletion internal/collector/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,25 +593,114 @@ func Test_TopologyScrapes(t *testing.T) {
c := getCollector(t, s)
job := c.ResourceScrapeJob(s.Registry)
withLabel := jobloop.WithLabel("service_type", "unittest")
syncJob := c.SyncQuotaToBackendJob(s.Registry)
plugin := s.Cluster.QuotaPlugins["unittest"].(*plugins.GenericQuotaPlugin)

_, tr0 := easypg.NewTracker(t, s.DB.Db)
tr, tr0 := easypg.NewTracker(t, s.DB.Db)
tr0.AssertEqualToFile("fixtures/scrape0.sql")

var status *any
// positive: Sync az-separated quota values with the backend
plugin.LiquidServiceInfo.Resources = map[liquid.ResourceName]liquid.ResourceInfo{"capacity": {Topology: liquid.AZSeparatedResourceTopology}, "things": {Topology: liquid.AZSeparatedResourceTopology}}
plugin.ReportedAZs = map[liquid.AvailabilityZone]*any{"az-one": status, "az-two": status}
mustT(t, job.ProcessOne(s.Ctx, withLabel))
mustT(t, job.ProcessOne(s.Ctx, withLabel))

scrapedAt1 := s.Clock.Now().Add(-5 * time.Second)
scrapedAt2 := s.Clock.Now()
tr.DBChanges().AssertEqualf(`
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (1, 1, 'any', 0, '{"t":[%[1]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (10, 4, 'any', 0, '{"t":[%[3]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, physical_usage, historical_usage, backend_quota) VALUES (11, 4, 'az-one', 0, 0, '{"t":[%[3]d],"v":[0]}', 50);
INSERT INTO project_az_resources (id, resource_id, az, usage, physical_usage, historical_usage, backend_quota) VALUES (12, 4, 'az-two', 0, 0, '{"t":[%[3]d],"v":[0]}', 50);
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (13, 5, 'any', 0, '{"t":[%[3]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (14, 5, 'az-one', 0, '{"t":[%[3]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (15, 5, 'az-two', 0, '{"t":[%[3]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (16, 6, 'any', 0, '{"t":[%[3]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, subresources, historical_usage, backend_quota) VALUES (17, 6, 'az-one', 2, '[{"index":0},{"index":1}]', '{"t":[%[3]d],"v":[2]}', 21);
INSERT INTO project_az_resources (id, resource_id, az, usage, subresources, historical_usage, backend_quota) VALUES (18, 6, 'az-two', 2, '[{"index":2},{"index":3}]', '{"t":[%[3]d],"v":[2]}', 21);
INSERT INTO project_az_resources (id, resource_id, az, usage, physical_usage, historical_usage, backend_quota) VALUES (2, 1, 'az-one', 0, 0, '{"t":[%[1]d],"v":[0]}', 50);
INSERT INTO project_az_resources (id, resource_id, az, usage, physical_usage, historical_usage, backend_quota) VALUES (3, 1, 'az-two', 0, 0, '{"t":[%[1]d],"v":[0]}', 50);
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (4, 2, 'any', 0, '{"t":[%[1]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (5, 2, 'az-one', 0, '{"t":[%[1]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (6, 2, 'az-two', 0, '{"t":[%[1]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, historical_usage) VALUES (7, 3, 'any', 0, '{"t":[%[1]d],"v":[0]}');
INSERT INTO project_az_resources (id, resource_id, az, usage, subresources, historical_usage, backend_quota) VALUES (8, 3, 'az-one', 2, '[{"index":0},{"index":1}]', '{"t":[%[1]d],"v":[2]}', 21);
INSERT INTO project_az_resources (id, resource_id, az, usage, subresources, historical_usage, backend_quota) VALUES (9, 3, 'az-two', 2, '[{"index":2},{"index":3}]', '{"t":[%[1]d],"v":[2]}', 21);
INSERT INTO project_resources (id, service_id, name, quota, backend_quota) VALUES (1, 1, 'capacity', 0, 100);
INSERT INTO project_resources (id, service_id, name) VALUES (2, 1, 'capacity_portion');
INSERT INTO project_resources (id, service_id, name, quota, backend_quota) VALUES (3, 1, 'things', 0, 42);
INSERT INTO project_resources (id, service_id, name, quota, backend_quota) VALUES (4, 2, 'capacity', 0, 100);
INSERT INTO project_resources (id, service_id, name) VALUES (5, 2, 'capacity_portion');
INSERT INTO project_resources (id, service_id, name, quota, backend_quota) VALUES (6, 2, 'things', 0, 42);
UPDATE project_services SET scraped_at = %[1]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":4}', checked_at = %[1]d, next_scrape_at = %[2]d, quota_desynced_at = %[1]d WHERE id = 1 AND project_id = 1 AND type = 'unittest';
UPDATE project_services SET scraped_at = %[3]d, scrape_duration_secs = 5, serialized_metrics = '{"capacity_usage":0,"things_usage":4}', checked_at = %[3]d, next_scrape_at = %[4]d, quota_desynced_at = %[3]d WHERE id = 2 AND project_id = 2 AND type = 'unittest';
`,
scrapedAt1.Unix(), scrapedAt1.Add(scrapeInterval).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(scrapeInterval).Unix(),
)

// set some quota acpq values.
// resource level
_, err := s.DB.Exec(`UPDATE project_resources SET quota = $1 WHERE name = $2`, 20, "capacity")
if err != nil {
t.Fatal(err)
}
_, err = s.DB.Exec(`UPDATE project_resources SET quota = $1 WHERE name = $2`, 13, "things")
if err != nil {
t.Fatal(err)
}
// az level
_, err = s.DB.Exec(`UPDATE project_az_resources SET quota = $1 WHERE resource_id IN (1,4) and az != 'any'`, 20)
if err != nil {
t.Fatal(err)
}
_, err = s.DB.Exec(`UPDATE project_az_resources SET quota = $1 WHERE resource_id IN (3,6) and az != 'any'`, 13)
if err != nil {
t.Fatal(err)
}
tr.DBChanges().Ignore()

mustT(t, syncJob.ProcessOne(s.Ctx, withLabel))
mustT(t, syncJob.ProcessOne(s.Ctx, withLabel))

tr.DBChanges().AssertEqualf(`
UPDATE project_az_resources SET backend_quota = 20 WHERE id = 11 AND resource_id = 4 AND az = 'az-one';
UPDATE project_az_resources SET backend_quota = 20 WHERE id = 12 AND resource_id = 4 AND az = 'az-two';
UPDATE project_az_resources SET backend_quota = 13 WHERE id = 17 AND resource_id = 6 AND az = 'az-one';
UPDATE project_az_resources SET backend_quota = 13 WHERE id = 18 AND resource_id = 6 AND az = 'az-two';
UPDATE project_az_resources SET backend_quota = 20 WHERE id = 2 AND resource_id = 1 AND az = 'az-one';
UPDATE project_az_resources SET backend_quota = 20 WHERE id = 3 AND resource_id = 1 AND az = 'az-two';
UPDATE project_az_resources SET backend_quota = 13 WHERE id = 8 AND resource_id = 3 AND az = 'az-one';
UPDATE project_az_resources SET backend_quota = 13 WHERE id = 9 AND resource_id = 3 AND az = 'az-two';
UPDATE project_resources SET backend_quota = 20 WHERE id = 1 AND service_id = 1 AND name = 'capacity';
UPDATE project_resources SET backend_quota = 13 WHERE id = 3 AND service_id = 1 AND name = 'things';
UPDATE project_resources SET backend_quota = 20 WHERE id = 4 AND service_id = 2 AND name = 'capacity';
UPDATE project_resources SET backend_quota = 13 WHERE id = 6 AND service_id = 2 AND name = 'things';
UPDATE project_services SET quota_desynced_at = NULL, quota_sync_duration_secs = 5 WHERE id = 1 AND project_id = 1 AND type = 'unittest';
UPDATE project_services SET quota_desynced_at = NULL, quota_sync_duration_secs = 5 WHERE id = 2 AND project_id = 2 AND type = 'unittest';
`)

s.Clock.StepBy(scrapeInterval)

// negative: scrape with flat topology returns invalid AZs
plugin.LiquidServiceInfo.Resources = map[liquid.ResourceName]liquid.ResourceInfo{"capacity": {Topology: liquid.FlatResourceTopology}}
plugin.ReportedAZs = map[liquid.AvailabilityZone]*any{"az-one": status, "az-two": status}
mustFailT(t, job.ProcessOne(s.Ctx, withLabel), errors.New("during resource scrape of project germany/berlin: service: unittest, resource: capacity: scrape with toplogy type: flat returned AZs: [az-one az-two]"))

// negative: scrape with az-aware toplogy returns invalid any AZ
plugin.LiquidServiceInfo.Resources["capacity"] = liquid.ResourceInfo{Topology: liquid.AZAwareResourceTopology}
plugin.ReportedAZs = map[liquid.AvailabilityZone]*any{"any": status}
mustFailT(t, job.ProcessOne(s.Ctx, withLabel), errors.New("during resource scrape of project germany/dresden: service: unittest, resource: capacity: scrape with toplogy type: az-aware returned AZs: [any]"))

s.Clock.StepBy(scrapeInterval)
// negative: scrape with az-separated toplogy returns invalid AZs any and unknown
plugin.LiquidServiceInfo.Resources["capacity"] = liquid.ResourceInfo{Topology: liquid.AZSeparatedResourceTopology}
plugin.ReportedAZs = map[liquid.AvailabilityZone]*any{"az-one": status, "unknown": status}
mustFailT(t, job.ProcessOne(s.Ctx, withLabel), errors.New("during resource scrape of project germany/berlin: service: unittest, resource: capacity: scrape with toplogy type: az-separated returned AZs: [az-one unknown]"))

// negative: reject liquid initialization with invalid topologies
plugin.LiquidServiceInfo.Resources = map[liquid.ResourceName]liquid.ResourceInfo{"capacity": {Topology: "invalidAZ1"}, "things": {Topology: "invalidAZ2"}}
mustFailT(t, job.ProcessOne(s.Ctx, withLabel), errors.New("during resource scrape of project germany/dresden: invalid toplogy: invalidAZ1 on resource: capacity\ninvalid toplogy: invalidAZ2 on resource: things"))

}

Check failure on line 706 in internal/collector/scrape_test.go

View workflow job for this annotation

GitHub Actions / Checks

unnecessary trailing newline (whitespace)
63 changes: 58 additions & 5 deletions internal/collector/sync_quota_to_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,30 @@ func (c *Collector) processQuotaSyncTask(ctx context.Context, srv db.ProjectServ

var (
quotaSyncSelectQuery = sqlext.SimplifyWhitespace(`
SELECT name, backend_quota, quota
SELECT id, name, backend_quota, quota
FROM project_resources
WHERE service_id = $1 AND quota IS NOT NULL
`)
azQuotaSyncSelectQuery = sqlext.SimplifyWhitespace(`
SELECT az, backend_quota, quota
FROM project_az_resources
WHERE resource_id = $1 AND quota IS NOT NULL
`)
quotaSyncMarkResourcesAsAppliedQuery = sqlext.SimplifyWhitespace(`
UPDATE project_resources
SET backend_quota = quota
WHERE service_id = $1
`)
azQuotaSyncMarkResourcesAsAppliedQuery = sqlext.SimplifyWhitespace(`
WITH resourceIDs as (
SELECT id
FROM project_resources
WHERE service_id = $1
)
UPDATE project_az_resources
SET backend_quota = quota
WHERE resource_id in (SELECT id from resourceIDs)
`)
quotaSyncMarkServiceAsAppliedQuery = sqlext.SimplifyWhitespace(`
UPDATE project_services
SET quota_desynced_at = NULL, quota_sync_duration_secs = $2
Expand All @@ -123,36 +138,68 @@ func (c *Collector) performQuotaSync(ctx context.Context, srv db.ProjectService,

// collect backend quota values that we want to apply
targetQuotasInDB := make(map[liquid.ResourceName]uint64)
targetAZQuotasInDB := make(map[liquid.ResourceName]map[liquid.AvailabilityZone]liquid.AZResourceQuotaRequest)
needsApply := false
azSeparatedNeedsApply := false
err := sqlext.ForeachRow(c.DB, quotaSyncSelectQuery, []any{srv.ID}, func(rows *sql.Rows) error {
var (
resourceID db.ProjectResourceID
resourceName liquid.ResourceName
currentQuota *int64
targetQuota uint64
)
err := rows.Scan(&resourceName, &currentQuota, &targetQuota)
err := rows.Scan(&resourceID, &resourceName, &currentQuota, &targetQuota)
if err != nil {
return err
}
targetQuotasInDB[resourceName] = targetQuota
if currentQuota == nil || *currentQuota < 0 || uint64(*currentQuota) != targetQuota {
needsApply = true
}

resInfo := c.Cluster.InfoForResource(srv.Type, resourceName)
if resInfo.Topology != liquid.AZSeparatedResourceTopology {
return nil
}
err = sqlext.ForeachRow(c.DB, azQuotaSyncSelectQuery, []any{resourceID}, func(rows *sql.Rows) error {
var (
availabilityZone liquid.AvailabilityZone
currentAZQuota *int64
targetAZQuota uint64
)
err := rows.Scan(&availabilityZone, &currentAZQuota, &targetAZQuota)
if err != nil {
return err
}
// defense in depth: configured backend_quota for AZ any or unknown are not valid for the azSeparatedQuota topology.
if (availabilityZone == liquid.AvailabilityZoneAny || availabilityZone == liquid.AvailabilityZoneUnknown) && currentAZQuota != nil {
return fmt.Errorf("detected invalid AZ: %s for resource: %s with topology: %s has backend_quota: %v", availabilityZone, resourceName, resInfo.Topology, currentAZQuota)
}
targetAZQuotasInDB[resourceName] = make(map[liquid.AvailabilityZone]liquid.AZResourceQuotaRequest)
targetAZQuotasInDB[resourceName][availabilityZone] = liquid.AZResourceQuotaRequest{Quota: targetAZQuota}
if currentAZQuota == nil || *currentAZQuota < 0 || uint64(*currentAZQuota) != targetAZQuota {
azSeparatedNeedsApply = true
}
return nil
})
if err != nil {
return err
}
return nil
})
if err != nil {
return fmt.Errorf("while collecting target quota values for %s backend: %w", srv.Type, err)
}

if needsApply {
if needsApply || azSeparatedNeedsApply {
// double-check that we only include quota values for resources that the backend currently knows about
targetQuotasForBackend := make(map[liquid.ResourceName]uint64)
targetQuotasForBackend := make(map[liquid.ResourceName]core.Quotas)
for resName, resInfo := range plugin.Resources() {
if !resInfo.HasQuota {
continue
}
//NOTE: If `targetQuotasInDB` does not have an entry for this resource, we will write 0 into the backend.
targetQuotasForBackend[resName] = targetQuotasInDB[resName]
targetQuotasForBackend[resName] = core.Quotas{QuotaForResource: targetQuotasInDB[resName], QuotasForAZs: targetAZQuotasInDB[resName]}
}

// apply quotas in backend
Expand All @@ -172,6 +219,12 @@ func (c *Collector) performQuotaSync(ctx context.Context, srv db.ProjectService,
if err != nil {
return err
}
if azSeparatedNeedsApply {
_, err = c.DB.Exec(azQuotaSyncMarkResourcesAsAppliedQuery, srv.ID)
if err != nil {
return err
}
}
}

finishedAt := c.MeasureTimeAtEnd()
Expand Down
7 changes: 6 additions & 1 deletion internal/core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type QuotaPlugin interface {
// SetQuota updates the backend service's quotas for the given project in the
// given domain to the values specified here. The map is guaranteed to contain
// values for all resources defined by Resources().
SetQuota(ctx context.Context, project KeystoneProject, quotas map[liquid.ResourceName]uint64) error
SetQuota(ctx context.Context, project KeystoneProject, quotas map[liquid.ResourceName]Quotas) error

// Rates returns metadata for all the rates that this plugin scrapes
// from the backend service.
Expand Down Expand Up @@ -181,6 +181,11 @@ type QuotaPlugin interface {
CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics []byte) error
}

type Quotas struct {
QuotaForResource uint64
QuotasForAZs map[liquid.AvailabilityZone]liquid.AZResourceQuotaRequest
}

// ServiceInfo is a reduced version of type limes.ServiceInfo, suitable for
// being returned from func QuotaPlugin.ServiceInfo().
type ServiceInfo struct {
Expand Down
8 changes: 4 additions & 4 deletions internal/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ var sqlMigrations = map[string]string{
ALTER TABLE project_resources
RENAME COLUMN max_quota_from_admin TO max_quota_from_outside_admin;
`,
"046_service_specific_quota_constraints.down.sql": `
"046_az_backend_quota.down.sql": `
ALTER TABLE project_az_resources
ADD backend_quota BIGINT default NULL;
DROP COLUMN backend_quota;
`,
"046_service_specific_quota_constraints.up.sql": `
"046_az_backend_quota.up.sql": `
ALTER TABLE project_az_resources
DROP COLUMN backend_quota;
ADD COLUMN backend_quota BIGINT default NULL;
`,
}
6 changes: 3 additions & 3 deletions internal/plugins/liquid.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ func castSliceToAny[T any](input []T) (output []any) {
}

// SetQuota implements the core.QuotaPlugin interface.
func (p *liquidQuotaPlugin) SetQuota(ctx context.Context, project core.KeystoneProject, quotas map[liquid.ResourceName]uint64) error {
func (p *liquidQuotaPlugin) SetQuota(ctx context.Context, project core.KeystoneProject, quotas map[liquid.ResourceName]core.Quotas) error {
req := liquid.ServiceQuotaRequest{
Resources: make(map[liquid.ResourceName]liquid.ResourceQuotaRequest, len(quotas)),
}
for resName, quota := range quotas {
req.Resources[resName] = liquid.ResourceQuotaRequest{Quota: quota}
for resName, quotas := range quotas {
req.Resources[resName] = liquid.ResourceQuotaRequest{Quota: quotas.QuotaForResource, PerAZ: quotas.QuotasForAZs}
}
if p.LiquidServiceInfo.QuotaUpdateNeedsProjectMetadata {
req.ProjectMetadata = project.ForLiquid()
Expand Down
6 changes: 3 additions & 3 deletions internal/plugins/nova.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ func (p *novaPlugin) pooledResourceName(hwVersion string, base liquid.ResourceNa
}

// SetQuota implements the core.QuotaPlugin interface.
func (p *novaPlugin) SetQuota(ctx context.Context, project core.KeystoneProject, quotas map[liquid.ResourceName]uint64) error {
func (p *novaPlugin) SetQuota(ctx context.Context, project core.KeystoneProject, quotas map[liquid.ResourceName]core.Quotas) error {
// translate Limes resource names for separate instance quotas into Nova quota names
novaQuotas := make(novaQuotaUpdateOpts, len(quotas))
for resourceName, quota := range quotas {
novaQuotas[string(resourceName)] = quota
for resourceName, quotas := range quotas {
novaQuotas[string(resourceName)] = quotas.QuotaForResource
}

return quotasets.Update(ctx, p.NovaV2, project.UUID, novaQuotas).Err
Expand Down
Loading

0 comments on commit 08938a6

Please sign in to comment.