Skip to content

Commit

Permalink
papercut: carry serialized metrics as []byte through the plugin inter…
Browse files Browse the repository at this point in the history
…face boundary

This avoids casting in the plugin implementation at the expense of
casting once in the central business logic.
  • Loading branch information
majewsky committed Nov 2, 2023
1 parent ac1a8d4 commit f24aa37
Show file tree
Hide file tree
Showing 25 changed files with 141 additions and 144 deletions.
2 changes: 1 addition & 1 deletion internal/collector/capacity_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (c *Collector) processCapacityScrapeTask(_ context.Context, task capacitySc
if err == nil {
capacitor.ScrapedAt = &task.Timing.FinishedAt
capacitor.ScrapeDurationSecs = task.Timing.Duration().Seconds()
capacitor.SerializedMetrics = serializedMetrics
capacitor.SerializedMetrics = string(serializedMetrics)
capacitor.NextScrapeAt = task.Timing.FinishedAt.Add(c.AddJitter(capacityScrapeInterval))
capacitor.ScrapeErrorMessage = ""
//NOTE: in this case, we continue below, with the cluster_resources update
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *CapacityPluginMetricsCollector) collectOneCapacitor(ch chan<- prometheu
if plugin == nil {
return
}
err := plugin.CollectMetrics(ch, instance.SerializedMetrics)
err := plugin.CollectMetrics(ch, []byte(instance.SerializedMetrics))
successAsFloat := 1.0
if err != nil {
successAsFloat = 0.0
Expand Down Expand Up @@ -331,7 +331,7 @@ func (c *QuotaPluginMetricsCollector) collectOneProjectService(ch chan<- prometh
if plugin == nil {
return
}
err := plugin.CollectMetrics(ch, instance.Project, instance.SerializedMetrics)
err := plugin.CollectMetrics(ch, instance.Project, []byte(instance.SerializedMetrics))
successAsFloat := 1.0
if err != nil {
successAsFloat = 0.0
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *Collector) processResourceScrapeTask(_ context.Context, task projectScr
return fmt.Errorf("during resource scrape of project %s/%s: %w", dbDomain.Name, dbProject.Name, task.Err)
}

func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.Project, task projectScrapeTask, resourceData map[string]core.ResourceData, serializedMetrics string) error {
func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.Project, task projectScrapeTask, resourceData map[string]core.ResourceData, serializedMetrics []byte) error {
srv := task.Service

tx, err := c.DB.Begin()
Expand Down Expand Up @@ -318,7 +318,7 @@ func (c *Collector) writeResourceScrapeResult(dbDomain db.Domain, dbProject db.P
//attributes that we have not written yet
_, err = tx.Exec(writeResourceScrapeSuccessQuery,
task.Timing.FinishedAt, task.Timing.FinishedAt.Add(c.AddJitter(scrapeInterval)), task.Timing.Duration().Seconds(),
serializedMetrics, srv.ID,
string(serializedMetrics), srv.ID,
)
if err != nil {
return fmt.Errorf("while updating metadata on project service: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions internal/core/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (p quotaConstraintTestPlugin) ServiceInfo(serviceType string) limes.Service
func (p quotaConstraintTestPlugin) Rates() []limesrates.RateInfo {
return nil
}
func (p quotaConstraintTestPlugin) Scrape(project KeystoneProject) (result map[string]ResourceData, serializedMetrics string, err error) {
return nil, "", nil
func (p quotaConstraintTestPlugin) Scrape(project KeystoneProject) (result map[string]ResourceData, serializedMetrics []byte, err error) {
return nil, nil, nil
}
func (p quotaConstraintTestPlugin) IsQuotaAcceptableForProject(project KeystoneProject, fullQuotas map[string]map[string]uint64, allServiceInfos []limes.ServiceInfo) error {
return nil
Expand All @@ -188,7 +188,7 @@ func (p quotaConstraintTestPlugin) ScrapeRates(project KeystoneProject, prevSeri
}
func (p quotaConstraintTestPlugin) DescribeMetrics(ch chan<- *prometheus.Desc) {
}
func (p quotaConstraintTestPlugin) CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics string) error {
func (p quotaConstraintTestPlugin) CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics []byte) error {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions internal/core/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type QuotaPlugin interface {
//
//The serializedMetrics return value is persisted in the Limes DB and
//supplied to all subsequent RenderMetrics calls.
Scrape(project KeystoneProject) (result map[string]ResourceData, serializedMetrics string, error error)
Scrape(project KeystoneProject) (result map[string]ResourceData, serializedMetrics []byte, err error)
//IsQuotaAcceptableForProject checks if the given quota set is acceptable
//for the given project, and returns nil if the quota is acceptable, or a
//human-readable error otherwise. This should only be used when the
Expand Down Expand Up @@ -167,7 +167,7 @@ type QuotaPlugin interface {
//Some plugins also emit metrics directly within Scrape. This newer interface
//should be preferred since metrics emitted here won't be lost between
//restarts of limes-collect.
CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics string) error
CollectMetrics(ch chan<- prometheus.Metric, project KeystoneProject, serializedMetrics []byte) error
}

// CapacityPlugin is the interface that all capacity collector plugins must
Expand Down Expand Up @@ -201,7 +201,7 @@ type CapacityPlugin interface {
//
//The serializedMetrics return value is persisted in the Limes DB and
//supplied to all subsequent RenderMetrics calls.
Scrape() (result map[string]map[string]PerAZ[CapacityData], serializedMetrics string, err error)
Scrape() (result map[string]map[string]PerAZ[CapacityData], serializedMetrics []byte, err error)

//DescribeMetrics is called when Prometheus is scraping metrics from
//limes-collect, to provide an opportunity to the plugin to emit its own
Expand All @@ -218,7 +218,7 @@ type CapacityPlugin interface {
//Some plugins also emit metrics directly within Scrape. This newer interface
//should be preferred since metrics emitted here won't be lost between
//restarts of limes-collect.
CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics string) error
CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics []byte) error
}

var (
Expand Down
8 changes: 4 additions & 4 deletions internal/plugins/archer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (p *archerPlugin) Rates() []limesrates.RateInfo {
}

// Scrape implements the core.QuotaPlugin interface.
func (p *archerPlugin) Scrape(project core.KeystoneProject) (result map[string]core.ResourceData, serializedMetrics string, err error) {
func (p *archerPlugin) Scrape(project core.KeystoneProject) (result map[string]core.ResourceData, serializedMetrics []byte, err error) {
url := p.Archer.ServiceURL("quotas", project.UUID)
var res gophercloud.Result
//nolint:bodyclose // already closed by gophercloud
Expand All @@ -107,7 +107,7 @@ func (p *archerPlugin) Scrape(project core.KeystoneProject) (result map[string]c
InUseService uint64 `json:"in_use_service"`
}
if err = res.ExtractInto(&archerQuota); err != nil {
return nil, "", err
return nil, nil, err
}

result = map[string]core.ResourceData{
Expand All @@ -124,7 +124,7 @@ func (p *archerPlugin) Scrape(project core.KeystoneProject) (result map[string]c
}),
},
}
return result, "", nil
return result, nil, nil
}

// IsQuotaAcceptableForProject implements the core.QuotaPlugin interface.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (p *archerPlugin) DescribeMetrics(ch chan<- *prometheus.Desc) {
}

// CollectMetrics implements the core.QuotaPlugin interface.
func (p *archerPlugin) CollectMetrics(ch chan<- prometheus.Metric, project core.KeystoneProject, serializedMetrics string) error {
func (p *archerPlugin) CollectMetrics(ch chan<- prometheus.Metric, project core.KeystoneProject, serializedMetrics []byte) error {
//not used by this plugin
return nil
}
14 changes: 7 additions & 7 deletions internal/plugins/capacity_cinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (p *capacityCinderPlugin) makeResourceName(volumeType string) string {
}

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], serializedMetrics string, err error) {
func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], serializedMetrics []byte, err error) {
//list storage pools
var poolData struct {
StoragePools []struct {
Expand All @@ -102,21 +102,21 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.PerAZ

allPages, err := schedulerstats.List(p.CinderV3, schedulerstats.ListOpts{Detail: true}).AllPages()
if err != nil {
return nil, "", err
return nil, nil, err
}
err = allPages.(schedulerstats.StoragePoolPage).ExtractInto(&poolData)
if err != nil {
return nil, "", err
return nil, nil, err
}

//list service hosts
allPages, err = services.List(p.CinderV3, nil).AllPages()
if err != nil {
return nil, "", err
return nil, nil, err
}
allServices, err := services.ExtractServices(allPages)
if err != nil {
return nil, "", err
return nil, nil, err
}

serviceHostsPerAZ := make(map[string][]string)
Expand Down Expand Up @@ -191,7 +191,7 @@ func (p *capacityCinderPlugin) Scrape() (result map[string]map[string]core.PerAZ
}
}

return map[string]map[string]core.PerAZ[core.CapacityData]{"volumev2": capaData}, "", nil
return map[string]map[string]core.PerAZ[core.CapacityData]{"volumev2": capaData}, nil, nil
}

// DescribeMetrics implements the core.CapacityPlugin interface.
Expand All @@ -200,7 +200,7 @@ func (p *capacityCinderPlugin) DescribeMetrics(ch chan<- *prometheus.Desc) {
}

// CollectMetrics implements the core.CapacityPlugin interface.
func (p *capacityCinderPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics string) error {
func (p *capacityCinderPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics []byte) error {
//not used by this plugin
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions internal/plugins/capacity_manila.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func (p *capacityManilaPlugin) makeResourceName(kind string, shareType ManilaSha
}

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityManilaPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], _ string, err error) {
func (p *capacityManilaPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], _ []byte, err error) {
allPages, err := services.List(p.ManilaV2, nil).AllPages()
if err != nil {
return nil, "", err
return nil, nil, err
}
allServices, err := services.ExtractServices(allPages)
if err != nil {
return nil, "", err
return nil, nil, err
}

azForServiceHost := make(map[string]limes.AvailabilityZone)
Expand All @@ -131,14 +131,14 @@ func (p *capacityManilaPlugin) Scrape() (result map[string]map[string]core.PerAZ
for _, shareType := range p.ShareTypes {
capForType, err := p.scrapeForShareType(shareType, azForServiceHost)
if err != nil {
return nil, "", err
return nil, nil, err
}
caps[p.makeResourceName("shares", shareType)] = capForType.Shares
caps[p.makeResourceName("share_snapshots", shareType)] = capForType.Snapshots
caps[p.makeResourceName("share_capacity", shareType)] = capForType.ShareGigabytes
caps[p.makeResourceName("snapshot_capacity", shareType)] = capForType.SnapshotGigabytes
}
return map[string]map[string]core.PerAZ[core.CapacityData]{"sharev2": caps}, "", nil
return map[string]map[string]core.PerAZ[core.CapacityData]{"sharev2": caps}, nil, nil
}

// DescribeMetrics implements the core.CapacityPlugin interface.
Expand All @@ -147,7 +147,7 @@ func (p *capacityManilaPlugin) DescribeMetrics(ch chan<- *prometheus.Desc) {
}

// CollectMetrics implements the core.CapacityPlugin interface.
func (p *capacityManilaPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics string) error {
func (p *capacityManilaPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics []byte) error {
//not used by this plugin
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/plugins/capacity_manual.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func (p *capacityManualPlugin) PluginTypeID() string {
var errNoManualData = errors.New(`missing values for capacitor plugin "manual"`)

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityManualPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], _ string, err error) {
func (p *capacityManualPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], _ []byte, err error) {
if p.Values == nil {
return nil, "", errNoManualData
return nil, nil, errNoManualData
}

result = make(map[string]map[string]core.PerAZ[core.CapacityData])
Expand All @@ -63,7 +63,7 @@ func (p *capacityManualPlugin) Scrape() (result map[string]map[string]core.PerAZ
result[serviceType] = serviceResult
}

return result, "", nil
return result, nil, nil
}

// DescribeMetrics implements the core.CapacityPlugin interface.
Expand All @@ -72,7 +72,7 @@ func (p *capacityManualPlugin) DescribeMetrics(ch chan<- *prometheus.Desc) {
}

// CollectMetrics implements the core.CapacityPlugin interface.
func (p *capacityManualPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics string) error {
func (p *capacityManualPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics []byte) error {
//not used by this plugin
return nil
}
30 changes: 15 additions & 15 deletions internal/plugins/capacity_nova.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,46 +99,46 @@ func (p *capacityNovaPlugin) PluginTypeID() string {
}

// Scrape implements the core.CapacityPlugin interface.
func (p *capacityNovaPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], serializedMetrics string, err error) {
func (p *capacityNovaPlugin) Scrape() (result map[string]map[string]core.PerAZ[core.CapacityData], serializedMetrics []byte, err error) {
//enumerate aggregates which establish the hypervisor <-> AZ mapping
page, err := aggregates.List(p.NovaV2).AllPages()
if err != nil {
return nil, "", err
return nil, nil, err
}
allAggregates, err := aggregates.ExtractAggregates(page)
if err != nil {
return nil, "", err
return nil, nil, err
}

//enumerate hypervisors (cannot use type Hypervisor provided by Gophercloud;
//in our clusters, it breaks because some hypervisor report unexpected NULL
//values on fields that we are not even interested in)
page, err = hypervisors.List(p.NovaV2, nil).AllPages()
if err != nil {
return nil, "", err
return nil, nil, err
}
var hypervisorData struct {
Hypervisors []novaHypervisor `json:"hypervisors"`
}
err = page.(hypervisors.HypervisorPage).ExtractInto(&hypervisorData)
if err != nil {
return nil, "", err
return nil, nil, err
}

//enumerate resource providers (we need to match these to the hypervisors later)
page, err = resourceproviders.List(p.PlacementV1, nil).AllPages()
if err != nil {
return nil, "", err
return nil, nil, err
}
allResourceProviders, err := resourceproviders.ExtractResourceProviders(page)
if err != nil {
return nil, "", err
return nil, nil, err
}

//for the instances capacity, we need to know the max root disk size on public flavors
maxRootDiskSize, err := getMaxRootDiskSize(p.NovaV2, p.ExtraSpecs)
if err != nil {
return nil, "", err
return nil, nil, err
}

//we need to prepare several aggregations in the big loop below
Expand All @@ -159,7 +159,7 @@ func (p *capacityNovaPlugin) Scrape() (result map[string]map[string]core.PerAZ[c
//query Placement API for hypervisor capacity
hvCapacity, traits, err := hypervisor.getCapacity(p.PlacementV1, allResourceProviders)
if err != nil {
return nil, "", fmt.Errorf(
return nil, nil, fmt.Errorf(
"cannot get capacity for hypervisor %s with .service.host %q from Placement API (falling back to Nova Hypervisor API): %s",
hypervisor.HypervisorHostname, hypervisor.Service.Host, err.Error())
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func (p *capacityNovaPlugin) Scrape() (result map[string]map[string]core.PerAZ[c
continue
}
if len(matchingAggregates) > 1 {
return nil, "", fmt.Errorf(
return nil, nil, fmt.Errorf(
"hypervisor %s with .service.host %q could not be uniquely matched to an aggregate (matching aggregates = %v)",
hypervisor.HypervisorHostname, hypervisor.Service.Host, matchingAggregates)
}
Expand All @@ -234,7 +234,7 @@ func (p *capacityNovaPlugin) Scrape() (result map[string]map[string]core.PerAZ[c
continue
}
if len(matchingAZs) > 1 {
return nil, "", fmt.Errorf(
return nil, nil, fmt.Errorf(
"hypervisor %s with .service.host %q could not be uniquely matched to an AZ (matching AZs = %v)",
hypervisor.HypervisorHostname, hypervisor.Service.Host, matchingAZs)
}
Expand Down Expand Up @@ -289,8 +289,8 @@ func (p *capacityNovaPlugin) Scrape() (result map[string]map[string]core.PerAZ[c
delete(capacities, "instances")
}

serializedMetricsBytes, err := json.Marshal(metrics)
return map[string]map[string]core.PerAZ[core.CapacityData]{"compute": capacities}, string(serializedMetricsBytes), err
serializedMetrics, err = json.Marshal(metrics)
return map[string]map[string]core.PerAZ[core.CapacityData]{"compute": capacities}, serializedMetrics, err
}

var novaHypervisorWellformedGauge = prometheus.NewGaugeVec(
Expand All @@ -307,9 +307,9 @@ func (p *capacityNovaPlugin) DescribeMetrics(ch chan<- *prometheus.Desc) {
}

// CollectMetrics implements the core.CapacityPlugin interface.
func (p *capacityNovaPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics string) error {
func (p *capacityNovaPlugin) CollectMetrics(ch chan<- prometheus.Metric, serializedMetrics []byte) error {
var metrics capacityNovaSerializedMetrics
err := json.Unmarshal([]byte(serializedMetrics), &metrics)
err := json.Unmarshal(serializedMetrics, &metrics)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f24aa37

Please sign in to comment.