Skip to content

Commit

Permalink
Merge pull request #358 from sapcc/az-awareness
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperSandro2000 authored Sep 19, 2023
2 parents 510ec5e + 2f07ba7 commit 3941244
Show file tree
Hide file tree
Showing 23 changed files with 366 additions and 259 deletions.
18 changes: 8 additions & 10 deletions internal/collector/capacity_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
continue
}

for resourceName, resourceData := range serviceData {
for resourceName, topologicalResourceData := range serviceData {
if !c.Cluster.HasResource(serviceType, resourceName) {
logg.Info("discarding capacity reported by %s for unknown resource name: %s/%s", capacitor.CapacitorID, serviceType, resourceName)
continue
Expand All @@ -230,25 +230,26 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
return fmt.Errorf("no cluster_services entry for service type %s (check if CheckConsistencyJob runs correctly)", serviceType)
}

summedResourceData := topologicalResourceData.Sum()
res := db.ClusterResource{
ServiceID: serviceID,
Name: resourceName,
RawCapacity: resourceData.Capacity,
RawCapacity: summedResourceData.Capacity,
CapacityPerAZJSON: "", //see below
SubcapacitiesJSON: "", //see below
CapacitorID: capacitor.CapacitorID,
}

if len(resourceData.CapacityPerAZ) > 0 {
buf, err := json.Marshal(convertAZReport(resourceData.CapacityPerAZ))
if topologicalResourceData.PerAZ != nil {
buf, err := json.Marshal(convertAZReport(topologicalResourceData.PerAZ))
if err != nil {
return fmt.Errorf("could not convert capacities per AZ to JSON: %w", err)
}
res.CapacityPerAZJSON = string(buf)
}

if len(resourceData.Subcapacities) > 0 {
buf, err := json.Marshal(resourceData.Subcapacities)
if len(summedResourceData.Subcapacities) > 0 {
buf, err := json.Marshal(summedResourceData.Subcapacities)
if err != nil {
return fmt.Errorf("could not convert subcapacities to JSON: %w", err)
}
Expand Down Expand Up @@ -297,10 +298,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
return tx.Commit()
}

func convertAZReport(capacityPerAZ map[string]*core.CapacityDataForAZ) limesresources.ClusterAvailabilityZoneReports {
//The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into the CapacityPerAZJSON database field,
//even though map[string]*core.CapacityDataForAZ would have been more appropriate.
//Now we stick with it for compatibility's sake.
func convertAZReport(capacityPerAZ map[string]*core.CapacityData) limesresources.ClusterAvailabilityZoneReports {
report := make(limesresources.ClusterAvailabilityZoneReports, len(capacityPerAZ))
for azName, azData := range capacityPerAZ {
report[azName] = &limesresources.ClusterAvailabilityZoneReport{
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/capacity_scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Test_ScanCapacity(t *testing.T) {
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest';
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2';
INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', %d, 5, '{"smaller_half":14,"larger_half":28}', %d);
INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"smaller_half":14},{"larger_half":28}]', 'unittest4');
INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"az":"az-one","smaller_half":7},{"az":"az-one","larger_half":14},{"az":"az-two","smaller_half":7},{"az":"az-two","larger_half":14}]', 'unittest4');
`,
scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(),
Expand All @@ -180,7 +180,7 @@ func Test_ScanCapacity(t *testing.T) {
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest';
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2';
UPDATE cluster_capacitors SET scraped_at = %d, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = %d WHERE capacitor_id = 'unittest4';
UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"smaller_half":3},{"larger_half":7}]' WHERE service_id = 2 AND name = 'things';
UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"az":"az-one","smaller_half":1},{"az":"az-one","larger_half":4},{"az":"az-two","smaller_half":1},{"az":"az-two","larger_half":4}]' WHERE service_id = 2 AND name = 'things';
`,
scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(),
Expand Down
13 changes: 7 additions & 6 deletions internal/collector/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
//this is the callback that ProjectResourceUpdate will use to write the scraped data into the project_resources
updateResource := func(res *db.ProjectResource) error {
data := resourceData[res.Name]
res.Usage = data.Usage
res.PhysicalUsage = data.PhysicalUsage
usageData := data.UsageData.Sum()
res.Usage = usageData.Usage
res.PhysicalUsage = usageData.PhysicalUsage

resInfo := c.Cluster.InfoForResource(srv.Type, res.Name)
if !resInfo.NoQuota {
Expand All @@ -238,17 +239,17 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
res.BackendQuota = &data.Quota
}

if len(data.Subresources) == 0 {
if len(usageData.Subresources) == 0 {
res.SubresourcesJSON = ""
} else {
//warn when the backend is inconsistent with itself
if uint64(len(data.Subresources)) != res.Usage {
if uint64(len(usageData.Subresources)) != res.Usage {
logg.Info("resource quantity mismatch in project %s, resource %s/%s: usage = %d, but found %d subresources",
dbProject.UUID, srv.Type, res.Name,
res.Usage, len(data.Subresources),
res.Usage, len(usageData.Subresources),
)
}
bytes, err := json.Marshal(data.Subresources)
bytes, err := json.Marshal(usageData.Subresources)
if err != nil {
return fmt.Errorf("failed to convert subresources to JSON: %s", err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func Test_ScrapeSuccess(t *testing.T) {
//change the data that is reported by the plugin
s.Clock.StepBy(scrapeInterval)
plugin.StaticResourceData["capacity"].Quota = 110
plugin.StaticResourceData["things"].Usage = 5
plugin.StaticResourceData["things"].UsageData.Regional.Usage = 5
//Scrape should pick up the changed resource data
mustT(t, job.ProcessOne(s.Ctx, withLabel))
mustT(t, job.ProcessOne(s.Ctx, withLabel))
Expand Down Expand Up @@ -269,7 +269,7 @@ func Test_ScrapeSuccess(t *testing.T) {
//"capacity_portion" (otherwise this resource has been all zeroes this entire
//time)
s.Clock.StepBy(scrapeInterval)
plugin.StaticResourceData["capacity"].Usage = 20
plugin.StaticResourceData["capacity"].UsageData.Regional.Usage = 20
mustT(t, job.ProcessOne(s.Ctx, withLabel))
mustT(t, job.ProcessOne(s.Ctx, withLabel))

Expand Down
126 changes: 126 additions & 0 deletions internal/core/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*******************************************************************************
*
* Copyright 2023 SAP SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You should have received a copy of the License along with this
* program. If not, you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

package core

import (
"slices"
"sort"
)

// Topological is a container for data that can either be reported for
// the entire region at once, or separately by AZ.
// Exactly one field shall be non-nil.
type Topological[D TopologicalData[D]] struct {
Regional *D
PerAZ map[string]*D
}

// Regional is a shorthand to construct a Topological instance with the Regional member filled.
func Regional[D TopologicalData[D]](data D) Topological[D] {
return Topological[D]{Regional: &data}
}

// PerAZ is a shorthand to construct a Topological instance with the PerAZ member filled.
func PerAZ[D TopologicalData[D]](data map[string]*D) Topological[D] {
return Topological[D]{PerAZ: data}
}

// Sum returns a sum of all data in this container.
// If the Regional field is filled, that data is returned directly.
// Otherwise, all entries in the PerAZ field are summed together.
func (t Topological[D]) Sum() D {
if t.PerAZ == nil {
return *t.Regional
}

//fold AZ data in a well-defined order for deterministic test result
azNames := make([]string, 0, len(t.PerAZ))
for az := range t.PerAZ {
azNames = append(azNames, az)
}
sort.Strings(azNames)

var result D
for _, az := range azNames {
result = result.add(*t.PerAZ[az])
}
return result
}

// TopologicalData is an interfaces for types that can be put into the Topological container.
type TopologicalData[Self any] interface {
// List of permitted types. This is required for type inference, as explained here:
// <https://stackoverflow.com/a/73851453>
UsageData | CapacityData

// Computes the sum of this structure and `other`.
// This is used to implement Topological.Sum().
add(other Self) Self
}

// ResourceData contains quota and usage data for a single project resource.
type ResourceData struct {
Quota int64 //negative values indicate infinite quota
UsageData Topological[UsageData]
}

// UsageData contains usage data for a single project resource.
// It appears in type ResourceData.
type UsageData struct {
Usage uint64
PhysicalUsage *uint64 //only supported by some plugins
Subresources []any //only if supported by plugin and enabled in config
}

// add implements the TopologicalData interface.
//
//nolint:unused // looks like a linter bug
func (d UsageData) add(other UsageData) UsageData {
result := UsageData{
Usage: d.Usage + other.Usage,
Subresources: append(slices.Clone(d.Subresources), other.Subresources...),
}

//the sum can only have a PhysicalUsage value if both sides have it
if d.PhysicalUsage != nil && other.PhysicalUsage != nil {
physUsage := *d.PhysicalUsage + *other.PhysicalUsage
result.PhysicalUsage = &physUsage
}

return result
}

// CapacityData contains capacity data for a single project resource.
type CapacityData struct {
Capacity uint64
Usage uint64 //NOTE: currently only relevant on AZ level, regional level uses the aggregation of project usages
Subcapacities []any //only if supported by plugin and enabled in config
}

// add implements the TopologicalData interface.
//
//nolint:unused // looks like a linter bug
func (d CapacityData) add(other CapacityData) CapacityData {
return CapacityData{
Capacity: d.Capacity + other.Capacity,
Usage: d.Usage + other.Usage,
Subcapacities: append(slices.Clone(d.Subcapacities), other.Subcapacities...),
}
}
32 changes: 1 addition & 31 deletions internal/core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ type DiscoveryPlugin interface {
ListProjects(domain KeystoneDomain) ([]KeystoneProject, error)
}

// ResourceData contains quota and usage data for a single resource.
//
// The Subresources field may optionally be populated with subresources, if the
// quota plugin providing this ResourceData instance has been instructed to (and
// is able to) scrape subresources for this resource.
type ResourceData struct {
Quota int64 //negative values indicate infinite quota
Usage uint64
PhysicalUsage *uint64 //only supported by some plugins
Subresources []any
}

// QuotaPlugin is the interface that the quota/usage collector plugins for all
// backend services must implement. There can only be one QuotaPlugin for each
// backend service.
Expand Down Expand Up @@ -182,24 +170,6 @@ type QuotaPlugin interface {
CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics string) error
}

// CapacityData contains the total and per-availability-zone capacity data for a
// single resource.
//
// The Subcapacities field may optionally be populated with subcapacities, if the
// capacity plugin providing this CapacityData instance has been instructed to (and
// is able to) scrape subcapacities for this resource.
type CapacityData struct {
Capacity uint64
CapacityPerAZ map[string]*CapacityDataForAZ
Subcapacities []any
}

// CapacityDataForAZ is the capacity data for a single resource in a single AZ.
type CapacityDataForAZ struct {
Capacity uint64
Usage uint64
}

// CapacityPlugin is the interface that all capacity collector plugins must
// implement.
//
Expand Down Expand Up @@ -231,7 +201,7 @@ type CapacityPlugin interface {
//
//The serializedMetrics return value is persisted in the Limes DB and
//supplied to all subsequent RenderMetrics calls.
Scrape() (result map[string]map[string]CapacityData, serializedMetrics string, err error)
Scrape() (result map[string]map[string]Topological[CapacityData], serializedMetrics string, err error)

//DescribeMetrics is called when Prometheus is scraping metrics from
//limes-collect, to provide an opportunity to the plugin to emit its own
Expand Down
8 changes: 6 additions & 2 deletions internal/plugins/archer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,15 @@ func (p *archerPlugin) Scrape(project core.KeystoneProject) (result map[string]c
result = map[string]core.ResourceData{
"endpoints": {
Quota: archerQuota.Endpoint,
Usage: archerQuota.InUseEndpoint,
UsageData: core.Regional(core.UsageData{
Usage: archerQuota.InUseEndpoint,
}),
},
"services": {
Quota: archerQuota.Service,
Usage: archerQuota.InUseService,
UsageData: core.Regional(core.UsageData{
Usage: archerQuota.InUseService,
}),
},
}
return result, "", nil
Expand Down
34 changes: 13 additions & 21 deletions internal/plugins/capacity_cinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *capacityCinderPlugin) makeResourceName(volumeType string) string {
}

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.CapacityData, serializedMetrics string, err error) {
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Topological[core.CapacityData], serializedMetrics string, err error) {
//list storage pools
var poolData struct {
StoragePools []struct {
Expand Down Expand Up @@ -126,14 +126,11 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
}
}

capaData := make(map[string]*core.CapacityData)
capaData := make(map[string]core.Topological[core.CapacityData])
volumeTypesByBackendName := make(map[string]string)
for volumeType, cfg := range p.VolumeTypes {
volumeTypesByBackendName[cfg.VolumeBackendName] = volumeType
capaData[p.makeResourceName(volumeType)] = &core.CapacityData{
Capacity: 0,
CapacityPerAZ: make(map[string]*core.CapacityDataForAZ),
}
capaData[p.makeResourceName(volumeType)] = core.PerAZ(make(map[string]*core.CapacityData))
}

//add results from scheduler-stats
Expand All @@ -153,12 +150,8 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
logg.Info("Cinder capacity plugin: skipping pool %q with unknown volume_backend_name %q", pool.Name, pool.Capabilities.VolumeBackendName)
continue
}

logg.Debug("Cinder capacity plugin: considering pool %q with volume_backend_name %q for volume type %q", pool.Name, pool.Capabilities.VolumeBackendName, volumeType)

resourceName := p.makeResourceName(volumeType)
capaData[resourceName].Capacity += uint64(pool.Capabilities.TotalCapacityGB)

var poolAZ string
for az, hosts := range serviceHostsPerAZ {
for _, v := range hosts {
Expand All @@ -173,16 +166,19 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
logg.Info("Cinder storage pool %q does not match any service host", pool.Name)
poolAZ = "unknown"
}
if _, ok := capaData[resourceName].CapacityPerAZ[poolAZ]; !ok {
capaData[resourceName].CapacityPerAZ[poolAZ] = &core.CapacityDataForAZ{}

resourceName := p.makeResourceName(volumeType)
capa := capaData[resourceName].PerAZ[poolAZ]
if capa == nil {
capa = &core.CapacityData{}
capaData[resourceName].PerAZ[poolAZ] = capa
}

azCapaData := capaData[resourceName].CapacityPerAZ[poolAZ]
azCapaData.Capacity += uint64(pool.Capabilities.TotalCapacityGB)
azCapaData.Usage += uint64(pool.Capabilities.AllocatedCapacityGB)
capa.Capacity += uint64(pool.Capabilities.TotalCapacityGB)
capa.Usage += uint64(pool.Capabilities.AllocatedCapacityGB)

if p.reportSubcapacities["capacity"] {
capaData[resourceName].Subcapacities = append(capaData[resourceName].Subcapacities, storagePoolSubcapacity{
capa.Subcapacities = append(capa.Subcapacities, storagePoolSubcapacity{
PoolName: pool.Name,
AvailabilityZone: poolAZ,
CapacityGiB: uint64(pool.Capabilities.TotalCapacityGB),
Expand All @@ -191,11 +187,7 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
}
}

capaDataFinal := make(map[string]core.CapacityData)
for k, v := range capaData {
capaDataFinal[k] = *v
}
return map[string]map[string]core.CapacityData{"volumev2": capaDataFinal}, "", nil
return map[string]map[string]core.Topological[core.CapacityData]{"volumev2": capaData}, "", nil
}

// DescribeMetrics implements the core.CapacityPlugin interface.
Expand Down
Loading

0 comments on commit 3941244

Please sign in to comment.