Skip to content

Commit

Permalink
add full AZ awareness to capacity plugins
Browse files Browse the repository at this point in the history
This is the first half of a refactor of the scrape methods towards full
AZ awareness. Scraping can return data either for the entire region,
or broken down by AZs. This distinction is represented by the enum type
Topological.

For capacity scraping, this does not present a huge change, because
several capacitors already report capacity by AZ. What changes is that:

- subcapacities are now reported on the AZ (instead of regional) level
- the region-wide capacity is computed outside of the plugin by summing
  the individual AZ capacities

The frontend does not change in any way at this point. These changes
will come later, once the user-visible changes have been agreed upon.
This changeset is about preparing the things that we will definitely
need, and making data available to the collector that we will need to
surface in the frontend later on.
  • Loading branch information
majewsky committed Sep 18, 2023
1 parent 510ec5e commit 544d323
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 181 deletions.
18 changes: 8 additions & 10 deletions internal/collector/capacity_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
continue
}

for resourceName, resourceData := range serviceData {
for resourceName, topologicalResourceData := range serviceData {
if !c.Cluster.HasResource(serviceType, resourceName) {
logg.Info("discarding capacity reported by %s for unknown resource name: %s/%s", capacitor.CapacitorID, serviceType, resourceName)
continue
Expand All @@ -230,25 +230,26 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
return fmt.Errorf("no cluster_services entry for service type %s (check if CheckConsistencyJob runs correctly)", serviceType)
}

summedResourceData := topologicalResourceData.Sum()
res := db.ClusterResource{
ServiceID: serviceID,
Name: resourceName,
RawCapacity: resourceData.Capacity,
RawCapacity: summedResourceData.Capacity,
CapacityPerAZJSON: "", //see below
SubcapacitiesJSON: "", //see below
CapacitorID: capacitor.CapacitorID,
}

if len(resourceData.CapacityPerAZ) > 0 {
buf, err := json.Marshal(convertAZReport(resourceData.CapacityPerAZ))
if topologicalResourceData.PerAZ != nil {
buf, err := json.Marshal(convertAZReport(topologicalResourceData.PerAZ))
if err != nil {
return fmt.Errorf("could not convert capacities per AZ to JSON: %w", err)
}
res.CapacityPerAZJSON = string(buf)
}

if len(resourceData.Subcapacities) > 0 {
buf, err := json.Marshal(resourceData.Subcapacities)
if len(summedResourceData.Subcapacities) > 0 {
buf, err := json.Marshal(summedResourceData.Subcapacities)
if err != nil {
return fmt.Errorf("could not convert subcapacities to JSON: %w", err)
}
Expand Down Expand Up @@ -297,10 +298,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
return tx.Commit()
}

func convertAZReport(capacityPerAZ map[string]*core.CapacityDataForAZ) limesresources.ClusterAvailabilityZoneReports {
//The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into the CapacityPerAZJSON database field,
//even though map[string]*core.CapacityDataForAZ would have been more appropriate.
//Now we stick with it for compatibility's sake.
func convertAZReport(capacityPerAZ map[string]*core.CapacityData) limesresources.ClusterAvailabilityZoneReports {
report := make(limesresources.ClusterAvailabilityZoneReports, len(capacityPerAZ))
for azName, azData := range capacityPerAZ {
report[azName] = &limesresources.ClusterAvailabilityZoneReport{
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/capacity_scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Test_ScanCapacity(t *testing.T) {
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest';
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2';
INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', %d, 5, '{"smaller_half":14,"larger_half":28}', %d);
INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"smaller_half":14},{"larger_half":28}]', 'unittest4');
INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"az":"az-one","smaller_half":7},{"az":"az-one","larger_half":14},{"az":"az-two","smaller_half":7},{"az":"az-two","larger_half":14}]', 'unittest4');
`,
scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(),
Expand All @@ -180,7 +180,7 @@ func Test_ScanCapacity(t *testing.T) {
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest';
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2';
UPDATE cluster_capacitors SET scraped_at = %d, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = %d WHERE capacitor_id = 'unittest4';
UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"smaller_half":3},{"larger_half":7}]' WHERE service_id = 2 AND name = 'things';
UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"az":"az-one","smaller_half":1},{"az":"az-one","larger_half":4},{"az":"az-two","smaller_half":1},{"az":"az-two","larger_half":4}]' WHERE service_id = 2 AND name = 'things';
`,
scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(),
Expand Down
94 changes: 94 additions & 0 deletions internal/core/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*******************************************************************************
*
* Copyright 2023 SAP SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You should have received a copy of the License along with this
* program. If not, you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

package core

import (
"slices"
"sort"
)

// Topological is a container for data that can either be reported for
// the entire region at once, or separately by AZ.
// Exactly one field shall be non-nil.
type Topological[D TopologicalData[D]] struct {
Regional *D
PerAZ map[string]*D
}

// Regional is a shorthand to construct a Topological instance with the Regional member filled.
func Regional[D TopologicalData[D]](data D) Topological[D] {
return Topological[D]{Regional: &data}
}

// PerAZ is a shorthand to construct a Topological instance with the PerAZ member filled.
func PerAZ[D TopologicalData[D]](data map[string]*D) Topological[D] {
return Topological[D]{PerAZ: data}
}

// Sum returns a sum of all data in this container.
// If the Regional field is filled, that data is returned directly.
// Otherwise, all entries in the PerAZ field are summed together.
func (t Topological[D]) Sum() D {
if t.PerAZ == nil {
return *t.Regional
}

//fold AZ data in a well-defined order for deterministic test result
azNames := make([]string, 0, len(t.PerAZ))
for az := range t.PerAZ {
azNames = append(azNames, az)
}
sort.Strings(azNames)

var result D
for _, az := range azNames {
result = result.add(*t.PerAZ[az])
}
return result
}

// TopologicalData is an interfaces for types that can be put into the Topological container.
type TopologicalData[Self any] interface {
// List of permitted types. This is required for type inference, as explained here:
// <https://stackoverflow.com/a/73851453>
CapacityData

// Computes the sum of this structure and `other`.
// This is used to implement Topological.Sum().
add(other Self) Self
}

// CapacityData contains capacity data for a single project resource.
type CapacityData struct {
Capacity uint64
Usage uint64 //NOTE: currently only relevant on AZ level, regional level uses the aggregation of project usages
Subcapacities []any //only if supported by plugin and enabled in config
}

// add implements the TopologicalData interface.
//
//nolint:unused // looks like a linter bug
func (d CapacityData) add(other CapacityData) CapacityData {
return CapacityData{
Capacity: d.Capacity + other.Capacity,
Usage: d.Usage + other.Usage,
Subcapacities: append(slices.Clone(d.Subcapacities), other.Subcapacities...),
}
}
20 changes: 1 addition & 19 deletions internal/core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,6 @@ type QuotaPlugin interface {
CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics string) error
}

// CapacityData contains the total and per-availability-zone capacity data for a
// single resource.
//
// The Subcapacities field may optionally be populated with subcapacities, if the
// capacity plugin providing this CapacityData instance has been instructed to (and
// is able to) scrape subcapacities for this resource.
type CapacityData struct {
Capacity uint64
CapacityPerAZ map[string]*CapacityDataForAZ
Subcapacities []any
}

// CapacityDataForAZ is the capacity data for a single resource in a single AZ.
type CapacityDataForAZ struct {
Capacity uint64
Usage uint64
}

// CapacityPlugin is the interface that all capacity collector plugins must
// implement.
//
Expand Down Expand Up @@ -231,7 +213,7 @@ type CapacityPlugin interface {
//
//The serializedMetrics return value is persisted in the Limes DB and
//supplied to all subsequent RenderMetrics calls.
Scrape() (result map[string]map[string]CapacityData, serializedMetrics string, err error)
Scrape() (result map[string]map[string]Topological[CapacityData], serializedMetrics string, err error)

//DescribeMetrics is called when Prometheus is scraping metrics from
//limes-collect, to provide an opportunity to the plugin to emit its own
Expand Down
34 changes: 13 additions & 21 deletions internal/plugins/capacity_cinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *capacityCinderPlugin) makeResourceName(volumeType string) string {
}

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.CapacityData, serializedMetrics string, err error) {
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Topological[core.CapacityData], serializedMetrics string, err error) {
//list storage pools
var poolData struct {
StoragePools []struct {
Expand Down Expand Up @@ -126,14 +126,11 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
}
}

capaData := make(map[string]*core.CapacityData)
capaData := make(map[string]core.Topological[core.CapacityData])
volumeTypesByBackendName := make(map[string]string)
for volumeType, cfg := range p.VolumeTypes {
volumeTypesByBackendName[cfg.VolumeBackendName] = volumeType
capaData[p.makeResourceName(volumeType)] = &core.CapacityData{
Capacity: 0,
CapacityPerAZ: make(map[string]*core.CapacityDataForAZ),
}
capaData[p.makeResourceName(volumeType)] = core.PerAZ(make(map[string]*core.CapacityData))
}

//add results from scheduler-stats
Expand All @@ -153,12 +150,8 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
logg.Info("Cinder capacity plugin: skipping pool %q with unknown volume_backend_name %q", pool.Name, pool.Capabilities.VolumeBackendName)
continue
}

logg.Debug("Cinder capacity plugin: considering pool %q with volume_backend_name %q for volume type %q", pool.Name, pool.Capabilities.VolumeBackendName, volumeType)

resourceName := p.makeResourceName(volumeType)
capaData[resourceName].Capacity += uint64(pool.Capabilities.TotalCapacityGB)

var poolAZ string
for az, hosts := range serviceHostsPerAZ {
for _, v := range hosts {
Expand All @@ -173,16 +166,19 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
logg.Info("Cinder storage pool %q does not match any service host", pool.Name)
poolAZ = "unknown"
}
if _, ok := capaData[resourceName].CapacityPerAZ[poolAZ]; !ok {
capaData[resourceName].CapacityPerAZ[poolAZ] = &core.CapacityDataForAZ{}

resourceName := p.makeResourceName(volumeType)
capa := capaData[resourceName].PerAZ[poolAZ]
if capa == nil {
capa = &core.CapacityData{}
capaData[resourceName].PerAZ[poolAZ] = capa
}

azCapaData := capaData[resourceName].CapacityPerAZ[poolAZ]
azCapaData.Capacity += uint64(pool.Capabilities.TotalCapacityGB)
azCapaData.Usage += uint64(pool.Capabilities.AllocatedCapacityGB)
capa.Capacity += uint64(pool.Capabilities.TotalCapacityGB)
capa.Usage += uint64(pool.Capabilities.AllocatedCapacityGB)

if p.reportSubcapacities["capacity"] {
capaData[resourceName].Subcapacities = append(capaData[resourceName].Subcapacities, storagePoolSubcapacity{
capa.Subcapacities = append(capa.Subcapacities, storagePoolSubcapacity{
PoolName: pool.Name,
AvailabilityZone: poolAZ,
CapacityGiB: uint64(pool.Capabilities.TotalCapacityGB),
Expand All @@ -191,11 +187,7 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
}
}

capaDataFinal := make(map[string]core.CapacityData)
for k, v := range capaData {
capaDataFinal[k] = *v
}
return map[string]map[string]core.CapacityData{"volumev2": capaDataFinal}, "", nil
return map[string]map[string]core.Topological[core.CapacityData]{"volumev2": capaData}, "", nil
}

// DescribeMetrics implements the core.CapacityPlugin interface.
Expand Down
Loading

0 comments on commit 544d323

Please sign in to comment.