Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add full AZ awareness to plugin scraping interfaces #358

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions internal/collector/capacity_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
continue
}

for resourceName, resourceData := range serviceData {
for resourceName, topologicalResourceData := range serviceData {
if !c.Cluster.HasResource(serviceType, resourceName) {
logg.Info("discarding capacity reported by %s for unknown resource name: %s/%s", capacitor.CapacitorID, serviceType, resourceName)
continue
Expand All @@ -230,25 +230,26 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
return fmt.Errorf("no cluster_services entry for service type %s (check if CheckConsistencyJob runs correctly)", serviceType)
}

summedResourceData := topologicalResourceData.Sum()
res := db.ClusterResource{
ServiceID: serviceID,
Name: resourceName,
RawCapacity: resourceData.Capacity,
RawCapacity: summedResourceData.Capacity,
CapacityPerAZJSON: "", //see below
SubcapacitiesJSON: "", //see below
CapacitorID: capacitor.CapacitorID,
}

if len(resourceData.CapacityPerAZ) > 0 {
buf, err := json.Marshal(convertAZReport(resourceData.CapacityPerAZ))
if topologicalResourceData.PerAZ != nil {
buf, err := json.Marshal(convertAZReport(topologicalResourceData.PerAZ))
if err != nil {
return fmt.Errorf("could not convert capacities per AZ to JSON: %w", err)
}
res.CapacityPerAZJSON = string(buf)
}

if len(resourceData.Subcapacities) > 0 {
buf, err := json.Marshal(resourceData.Subcapacities)
if len(summedResourceData.Subcapacities) > 0 {
buf, err := json.Marshal(summedResourceData.Subcapacities)
if err != nil {
return fmt.Errorf("could not convert subcapacities to JSON: %w", err)
}
Expand Down Expand Up @@ -297,10 +298,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
return tx.Commit()
}

func convertAZReport(capacityPerAZ map[string]*core.CapacityDataForAZ) limesresources.ClusterAvailabilityZoneReports {
//The initial implementation wrote limesresources.ClusterAvailabilityZoneReports into the CapacityPerAZJSON database field,
//even though map[string]*core.CapacityDataForAZ would have been more appropriate.
//Now we stick with it for compatibility's sake.
func convertAZReport(capacityPerAZ map[string]*core.CapacityData) limesresources.ClusterAvailabilityZoneReports {
report := make(limesresources.ClusterAvailabilityZoneReports, len(capacityPerAZ))
for azName, azData := range capacityPerAZ {
report[azName] = &limesresources.ClusterAvailabilityZoneReport{
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/capacity_scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Test_ScanCapacity(t *testing.T) {
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest';
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2';
INSERT INTO cluster_capacitors (capacitor_id, scraped_at, scrape_duration_secs, serialized_metrics, next_scrape_at) VALUES ('unittest4', %d, 5, '{"smaller_half":14,"larger_half":28}', %d);
INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"smaller_half":14},{"larger_half":28}]', 'unittest4');
INSERT INTO cluster_resources (service_id, name, capacity, subcapacities, capacitor_id) VALUES (2, 'things', 42, '[{"az":"az-one","smaller_half":7},{"az":"az-one","larger_half":14},{"az":"az-two","smaller_half":7},{"az":"az-two","larger_half":14}]', 'unittest4');
`,
scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(),
Expand All @@ -180,7 +180,7 @@ func Test_ScanCapacity(t *testing.T) {
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest';
UPDATE cluster_capacitors SET scraped_at = %d, next_scrape_at = %d WHERE capacitor_id = 'unittest2';
UPDATE cluster_capacitors SET scraped_at = %d, serialized_metrics = '{"smaller_half":3,"larger_half":7}', next_scrape_at = %d WHERE capacitor_id = 'unittest4';
UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"smaller_half":3},{"larger_half":7}]' WHERE service_id = 2 AND name = 'things';
UPDATE cluster_resources SET capacity = 10, subcapacities = '[{"az":"az-one","smaller_half":1},{"az":"az-one","larger_half":4},{"az":"az-two","smaller_half":1},{"az":"az-two","larger_half":4}]' WHERE service_id = 2 AND name = 'things';
`,
scrapedAt1.Unix(), scrapedAt1.Add(15*time.Minute).Unix(),
scrapedAt2.Unix(), scrapedAt2.Add(15*time.Minute).Unix(),
Expand Down
13 changes: 7 additions & 6 deletions internal/collector/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
//this is the callback that ProjectResourceUpdate will use to write the scraped data into the project_resources
updateResource := func(res *db.ProjectResource) error {
data := resourceData[res.Name]
res.Usage = data.Usage
res.PhysicalUsage = data.PhysicalUsage
usageData := data.UsageData.Sum()
res.Usage = usageData.Usage
res.PhysicalUsage = usageData.PhysicalUsage

resInfo := c.Cluster.InfoForResource(srv.Type, res.Name)
if !resInfo.NoQuota {
Expand All @@ -238,17 +239,17 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
res.BackendQuota = &data.Quota
}

if len(data.Subresources) == 0 {
if len(usageData.Subresources) == 0 {
res.SubresourcesJSON = ""
} else {
//warn when the backend is inconsistent with itself
if uint64(len(data.Subresources)) != res.Usage {
if uint64(len(usageData.Subresources)) != res.Usage {
logg.Info("resource quantity mismatch in project %s, resource %s/%s: usage = %d, but found %d subresources",
dbProject.UUID, srv.Type, res.Name,
res.Usage, len(data.Subresources),
res.Usage, len(usageData.Subresources),
)
}
bytes, err := json.Marshal(data.Subresources)
bytes, err := json.Marshal(usageData.Subresources)
if err != nil {
return fmt.Errorf("failed to convert subresources to JSON: %s", err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func Test_ScrapeSuccess(t *testing.T) {
//change the data that is reported by the plugin
s.Clock.StepBy(scrapeInterval)
plugin.StaticResourceData["capacity"].Quota = 110
plugin.StaticResourceData["things"].Usage = 5
plugin.StaticResourceData["things"].UsageData.Regional.Usage = 5
//Scrape should pick up the changed resource data
mustT(t, job.ProcessOne(s.Ctx, withLabel))
mustT(t, job.ProcessOne(s.Ctx, withLabel))
Expand Down Expand Up @@ -269,7 +269,7 @@ func Test_ScrapeSuccess(t *testing.T) {
//"capacity_portion" (otherwise this resource has been all zeroes this entire
//time)
s.Clock.StepBy(scrapeInterval)
plugin.StaticResourceData["capacity"].Usage = 20
plugin.StaticResourceData["capacity"].UsageData.Regional.Usage = 20
mustT(t, job.ProcessOne(s.Ctx, withLabel))
mustT(t, job.ProcessOne(s.Ctx, withLabel))

Expand Down
126 changes: 126 additions & 0 deletions internal/core/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*******************************************************************************
*
* Copyright 2023 SAP SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You should have received a copy of the License along with this
* program. If not, you may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

package core

import (
"slices"
"sort"
)

// Topological is a container for data that can either be reported for
// the entire region at once, or separately by AZ.
// Exactly one field shall be non-nil.
type Topological[D TopologicalData[D]] struct {
Regional *D
PerAZ map[string]*D
}

// Regional is a shorthand to construct a Topological instance with the Regional member filled.
func Regional[D TopologicalData[D]](data D) Topological[D] {
return Topological[D]{Regional: &data}
}

// PerAZ is a shorthand to construct a Topological instance with the PerAZ member filled.
func PerAZ[D TopologicalData[D]](data map[string]*D) Topological[D] {
return Topological[D]{PerAZ: data}
}

// Sum returns a sum of all data in this container.
// If the Regional field is filled, that data is returned directly.
// Otherwise, all entries in the PerAZ field are summed together.
func (t Topological[D]) Sum() D {
if t.PerAZ == nil {
return *t.Regional
}

//fold AZ data in a well-defined order for deterministic test result
azNames := make([]string, 0, len(t.PerAZ))
for az := range t.PerAZ {
azNames = append(azNames, az)
}
sort.Strings(azNames)

var result D
for _, az := range azNames {
result = result.add(*t.PerAZ[az])
}
return result
}

// TopologicalData is an interfaces for types that can be put into the Topological container.
type TopologicalData[Self any] interface {
// List of permitted types. This is required for type inference, as explained here:
// <https://stackoverflow.com/a/73851453>
UsageData | CapacityData

// Computes the sum of this structure and `other`.
// This is used to implement Topological.Sum().
add(other Self) Self
}

// ResourceData contains quota and usage data for a single project resource.
type ResourceData struct {
Quota int64 //negative values indicate infinite quota
UsageData Topological[UsageData]
}

// UsageData contains usage data for a single project resource.
// It appears in type ResourceData.
type UsageData struct {
Usage uint64
PhysicalUsage *uint64 //only supported by some plugins
Subresources []any //only if supported by plugin and enabled in config
}

// add implements the TopologicalData interface.
//
//nolint:unused // looks like a linter bug
func (d UsageData) add(other UsageData) UsageData {
result := UsageData{
Usage: d.Usage + other.Usage,
Subresources: append(slices.Clone(d.Subresources), other.Subresources...),
}

//the sum can only have a PhysicalUsage value if both sides have it
if d.PhysicalUsage != nil && other.PhysicalUsage != nil {
physUsage := *d.PhysicalUsage + *other.PhysicalUsage
result.PhysicalUsage = &physUsage
}

return result
}

// CapacityData contains capacity data for a single project resource.
type CapacityData struct {
Capacity uint64
Usage uint64 //NOTE: currently only relevant on AZ level, regional level uses the aggregation of project usages
Subcapacities []any //only if supported by plugin and enabled in config
}

// add implements the TopologicalData interface.
//
//nolint:unused // looks like a linter bug
func (d CapacityData) add(other CapacityData) CapacityData {
return CapacityData{
Capacity: d.Capacity + other.Capacity,
Usage: d.Usage + other.Usage,
Subcapacities: append(slices.Clone(d.Subcapacities), other.Subcapacities...),
}
}
32 changes: 1 addition & 31 deletions internal/core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,6 @@ type DiscoveryPlugin interface {
ListProjects(domain KeystoneDomain) ([]KeystoneProject, error)
}

// ResourceData contains quota and usage data for a single resource.
//
// The Subresources field may optionally be populated with subresources, if the
// quota plugin providing this ResourceData instance has been instructed to (and
// is able to) scrape subresources for this resource.
type ResourceData struct {
Quota int64 //negative values indicate infinite quota
Usage uint64
PhysicalUsage *uint64 //only supported by some plugins
Subresources []any
}

// QuotaPlugin is the interface that the quota/usage collector plugins for all
// backend services must implement. There can only be one QuotaPlugin for each
// backend service.
Expand Down Expand Up @@ -182,24 +170,6 @@ type QuotaPlugin interface {
CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics string) error
}

// CapacityData contains the total and per-availability-zone capacity data for a
// single resource.
//
// The Subcapacities field may optionally be populated with subcapacities, if the
// capacity plugin providing this CapacityData instance has been instructed to (and
// is able to) scrape subcapacities for this resource.
type CapacityData struct {
Capacity uint64
CapacityPerAZ map[string]*CapacityDataForAZ
Subcapacities []any
}

// CapacityDataForAZ is the capacity data for a single resource in a single AZ.
type CapacityDataForAZ struct {
Capacity uint64
Usage uint64
}

// CapacityPlugin is the interface that all capacity collector plugins must
// implement.
//
Expand Down Expand Up @@ -231,7 +201,7 @@ type CapacityPlugin interface {
//
//The serializedMetrics return value is persisted in the Limes DB and
//supplied to all subsequent RenderMetrics calls.
Scrape() (result map[string]map[string]CapacityData, serializedMetrics string, err error)
Scrape() (result map[string]map[string]Topological[CapacityData], serializedMetrics string, err error)

//DescribeMetrics is called when Prometheus is scraping metrics from
//limes-collect, to provide an opportunity to the plugin to emit its own
Expand Down
8 changes: 6 additions & 2 deletions internal/plugins/archer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,15 @@ func (p *archerPlugin) Scrape(project core.KeystoneProject) (result map[string]c
result = map[string]core.ResourceData{
"endpoints": {
Quota: archerQuota.Endpoint,
Usage: archerQuota.InUseEndpoint,
UsageData: core.Regional(core.UsageData{
Usage: archerQuota.InUseEndpoint,
}),
},
"services": {
Quota: archerQuota.Service,
Usage: archerQuota.InUseService,
UsageData: core.Regional(core.UsageData{
Usage: archerQuota.InUseService,
}),
},
}
return result, "", nil
Expand Down
34 changes: 13 additions & 21 deletions internal/plugins/capacity_cinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *capacityCinderPlugin) makeResourceName(volumeType string) string {
}

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.CapacityData, serializedMetrics string, err error) {
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Topological[core.CapacityData], serializedMetrics string, err error) {
//list storage pools
var poolData struct {
StoragePools []struct {
Expand Down Expand Up @@ -126,14 +126,11 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
}
}

capaData := make(map[string]*core.CapacityData)
capaData := make(map[string]core.Topological[core.CapacityData])
volumeTypesByBackendName := make(map[string]string)
for volumeType, cfg := range p.VolumeTypes {
volumeTypesByBackendName[cfg.VolumeBackendName] = volumeType
capaData[p.makeResourceName(volumeType)] = &core.CapacityData{
Capacity: 0,
CapacityPerAZ: make(map[string]*core.CapacityDataForAZ),
}
capaData[p.makeResourceName(volumeType)] = core.PerAZ(make(map[string]*core.CapacityData))
}

//add results from scheduler-stats
Expand All @@ -153,12 +150,8 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
logg.Info("Cinder capacity plugin: skipping pool %q with unknown volume_backend_name %q", pool.Name, pool.Capabilities.VolumeBackendName)
continue
}

logg.Debug("Cinder capacity plugin: considering pool %q with volume_backend_name %q for volume type %q", pool.Name, pool.Capabilities.VolumeBackendName, volumeType)

resourceName := p.makeResourceName(volumeType)
capaData[resourceName].Capacity += uint64(pool.Capabilities.TotalCapacityGB)

var poolAZ string
for az, hosts := range serviceHostsPerAZ {
for _, v := range hosts {
Expand All @@ -173,16 +166,19 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
logg.Info("Cinder storage pool %q does not match any service host", pool.Name)
poolAZ = "unknown"
}
if _, ok := capaData[resourceName].CapacityPerAZ[poolAZ]; !ok {
capaData[resourceName].CapacityPerAZ[poolAZ] = &core.CapacityDataForAZ{}

resourceName := p.makeResourceName(volumeType)
capa := capaData[resourceName].PerAZ[poolAZ]
if capa == nil {
capa = &core.CapacityData{}
capaData[resourceName].PerAZ[poolAZ] = capa
}

azCapaData := capaData[resourceName].CapacityPerAZ[poolAZ]
azCapaData.Capacity += uint64(pool.Capabilities.TotalCapacityGB)
azCapaData.Usage += uint64(pool.Capabilities.AllocatedCapacityGB)
capa.Capacity += uint64(pool.Capabilities.TotalCapacityGB)
capa.Usage += uint64(pool.Capabilities.AllocatedCapacityGB)

if p.reportSubcapacities["capacity"] {
capaData[resourceName].Subcapacities = append(capaData[resourceName].Subcapacities, storagePoolSubcapacity{
capa.Subcapacities = append(capa.Subcapacities, storagePoolSubcapacity{
PoolName: pool.Name,
AvailabilityZone: poolAZ,
CapacityGiB: uint64(pool.Capabilities.TotalCapacityGB),
Expand All @@ -191,11 +187,7 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.Capac
}
}

capaDataFinal := make(map[string]core.CapacityData)
for k, v := range capaData {
capaDataFinal[k] = *v
}
return map[string]map[string]core.CapacityData{"volumev2": capaDataFinal}, "", nil
return map[string]map[string]core.Topological[core.CapacityData]{"volumev2": capaData}, "", nil
}

// DescribeMetrics implements the core.CapacityPlugin interface.
Expand Down
Loading
Loading