Skip to content

Commit

Permalink
Merge branch 'prometheus:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
rajagopalanand committed Oct 18, 2023
2 parents e9854c5 + 16af867 commit 635001b
Show file tree
Hide file tree
Showing 41 changed files with 1,581 additions and 778 deletions.
5 changes: 3 additions & 2 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
case "native-histograms":
c.tsdb.EnableNativeHistograms = true
c.scrape.EnableProtobufNegotiation = true
level.Info(logger).Log("msg", "Experimental native histogram support enabled.")
// Change global variable. Hacky, but it's hard to pass new option or default to unmarshaller.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols
level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultConfig.GlobalConfig.ScrapeProtocols))
case "":
continue
case "promql-at-modifier", "promql-negative-offset":
Expand Down
117 changes: 98 additions & 19 deletions cmd/promtool/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -459,7 +458,16 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
postingInfos := []postingInfo{}

printInfo := func(postingInfos []postingInfo) {
slices.SortFunc(postingInfos, func(a, b postingInfo) int { return int(b.metric) - int(a.metric) })
slices.SortFunc(postingInfos, func(a, b postingInfo) int {
switch {
case b.metric < a.metric:
return -1
case b.metric > a.metric:
return 1
default:
return 0
}
})

for i, pc := range postingInfos {
if i >= limit {
Expand Down Expand Up @@ -620,10 +628,12 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
err = tsdb_errors.NewMulti(err, chunkr.Close()).Err()
}()

const maxSamplesPerChunk = 120
nBuckets := 10
histogram := make([]int, nBuckets)
totalChunks := 0
floatChunkSamplesCount := make([]int, 0)
floatChunkSize := make([]int, 0)
histogramChunkSamplesCount := make([]int, 0)
histogramChunkSize := make([]int, 0)
histogramChunkBucketsCount := make([]int, 0)
var builder labels.ScratchBuilder
for postingsr.Next() {
var chks []chunks.Meta
Expand All @@ -637,26 +647,56 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
if err != nil {
return err
}
chunkSize := math.Min(float64(chk.NumSamples()), maxSamplesPerChunk)
// Calculate the bucket for the chunk and increment it in the histogram.
bucket := int(math.Ceil(float64(nBuckets)*chunkSize/maxSamplesPerChunk)) - 1
histogram[bucket]++
switch chk.Encoding() {
case chunkenc.EncXOR:
floatChunkSamplesCount = append(floatChunkSamplesCount, chk.NumSamples())
floatChunkSize = append(floatChunkSize, len(chk.Bytes()))
case chunkenc.EncFloatHistogram:
histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples())
histogramChunkSize = append(histogramChunkSize, len(chk.Bytes()))
fhchk, ok := chk.(*chunkenc.FloatHistogramChunk)
if !ok {
return fmt.Errorf("chunk is not FloatHistogramChunk")
}
it := fhchk.Iterator(nil)
bucketCount := 0
for it.Next() == chunkenc.ValFloatHistogram {
_, f := it.AtFloatHistogram()
bucketCount += len(f.PositiveBuckets)
bucketCount += len(f.NegativeBuckets)
}
histogramChunkBucketsCount = append(histogramChunkBucketsCount, bucketCount)
case chunkenc.EncHistogram:
histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples())
histogramChunkSize = append(histogramChunkSize, len(chk.Bytes()))
hchk, ok := chk.(*chunkenc.HistogramChunk)
if !ok {
return fmt.Errorf("chunk is not HistogramChunk")
}
it := hchk.Iterator(nil)
bucketCount := 0
for it.Next() == chunkenc.ValHistogram {
_, f := it.AtHistogram()
bucketCount += len(f.PositiveBuckets)
bucketCount += len(f.NegativeBuckets)
}
histogramChunkBucketsCount = append(histogramChunkBucketsCount, bucketCount)
}
totalChunks++
}
}

fmt.Printf("\nCompaction analysis:\n")
fmt.Println("Fullness: Amount of samples in chunks (100% is 120 samples)")
// Normalize absolute counts to percentages and print them out.
for bucket, count := range histogram {
percentage := 100.0 * count / totalChunks
fmt.Printf("%7d%%: ", (bucket+1)*10)
for j := 0; j < percentage; j++ {
fmt.Printf("#")
}
fmt.Println()
}
fmt.Println()
displayHistogram("samples per float chunk", floatChunkSamplesCount, totalChunks)

displayHistogram("bytes per float chunk", floatChunkSize, totalChunks)

displayHistogram("samples per histogram chunk", histogramChunkSamplesCount, totalChunks)

displayHistogram("bytes per histogram chunk", histogramChunkSize, totalChunks)

displayHistogram("buckets per histogram chunk", histogramChunkBucketsCount, totalChunks)
return nil
}

Expand Down Expand Up @@ -732,3 +772,42 @@ func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxB

return checkErr(backfill(5000, inputFile.Bytes(), outputDir, humanReadable, quiet, maxBlockDuration))
}

func displayHistogram(dataType string, datas []int, total int) {
slices.Sort(datas)
start, end, step := generateBucket(datas[0], datas[len(datas)-1])
sum := 0
buckets := make([]int, (end-start)/step+1)
maxCount := 0
for _, c := range datas {
sum += c
buckets[(c-start)/step]++
if buckets[(c-start)/step] > maxCount {
maxCount = buckets[(c-start)/step]
}
}
avg := sum / len(datas)
fmt.Printf("%s (min/avg/max): %d/%d/%d\n", dataType, datas[0], avg, datas[len(datas)-1])
maxLeftLen := strconv.Itoa(len(fmt.Sprintf("%d", end)))
maxRightLen := strconv.Itoa(len(fmt.Sprintf("%d", end+step)))
maxCountLen := strconv.Itoa(len(fmt.Sprintf("%d", maxCount)))
for bucket, count := range buckets {
percentage := 100.0 * count / total
fmt.Printf("[%"+maxLeftLen+"d, %"+maxRightLen+"d]: %"+maxCountLen+"d %s\n", bucket*step+start+1, (bucket+1)*step+start, count, strings.Repeat("#", percentage))
}
fmt.Println()
}

func generateBucket(min, max int) (start, end, step int) {
s := (max - min) / 10

step = 10
for step < s && step <= 10000 {
step *= 10
}

start = min - min%step
end = max - max%step + step

return
}
43 changes: 43 additions & 0 deletions cmd/promtool/tsdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGenerateBucket(t *testing.T) {
tcs := []struct {
min, max int
start, end, step int
}{
{
min: 101,
max: 141,
start: 100,
end: 150,
step: 10,
},
}

for _, tc := range tcs {
start, end, step := generateBucket(tc.min, tc.max)

require.Equal(t, tc.start, start)
require.Equal(t, tc.end, end)
require.Equal(t, tc.step, step)
}
}
100 changes: 96 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -143,12 +144,14 @@ var (
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(10 * time.Second),
EvaluationInterval: model.Duration(1 * time.Minute),
// When native histogram feature flag is enabled, ScrapeProtocols default
// changes to DefaultNativeHistogramScrapeProtocols.
ScrapeProtocols: DefaultScrapeProtocols,
}

// DefaultScrapeConfig is the default scrape configuration.
DefaultScrapeConfig = ScrapeConfig{
// ScrapeTimeout and ScrapeInterval default to the configured
// globals.
// ScrapeTimeout, ScrapeInterval and ScrapeProtocols default to the configured globals.
ScrapeClassicHistograms: false,
MetricsPath: "/metrics",
Scheme: "http",
Expand Down Expand Up @@ -260,7 +263,7 @@ func (c Config) String() string {
return string(b)
}

// ScrapeConfigs returns the scrape configurations.
// GetScrapeConfigs returns the scrape configurations.
func (c *Config) GetScrapeConfigs() ([]*ScrapeConfig, error) {
scfgs := make([]*ScrapeConfig, len(c.ScrapeConfigs))

Expand Down Expand Up @@ -385,6 +388,11 @@ type GlobalConfig struct {
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The default timeout when scraping targets.
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// The protocols to negotiate during a scrape. It tells clients what
// protocol are accepted by Prometheus and with what weight (most wanted is first).
// Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
// OpenMetricsText1.0.0, PrometheusText0.0.4.
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// How frequently to evaluate rules by default.
EvaluationInterval model.Duration `yaml:"evaluation_interval,omitempty"`
// File to which PromQL queries are logged.
Expand Down Expand Up @@ -414,6 +422,68 @@ type GlobalConfig struct {
KeepDroppedTargets uint `yaml:"keep_dropped_targets,omitempty"`
}

// ScrapeProtocol represents supported protocol for scraping metrics.
type ScrapeProtocol string

// Validate returns error if given scrape protocol is not supported.
func (s ScrapeProtocol) Validate() error {
if _, ok := ScrapeProtocolsHeaders[s]; !ok {
return fmt.Errorf("unknown scrape protocol %v, supported: %v",
s, func() (ret []string) {
for k := range ScrapeProtocolsHeaders {
ret = append(ret, string(k))
}
sort.Strings(ret)
return ret
}())
}
return nil
}

var (
PrometheusProto ScrapeProtocol = "PrometheusProto"
PrometheusText0_0_4 ScrapeProtocol = "PrometheusText0.0.4"
OpenMetricsText0_0_1 ScrapeProtocol = "OpenMetricsText0.0.1"
OpenMetricsText1_0_0 ScrapeProtocol = "OpenMetricsText1.0.0"

ScrapeProtocolsHeaders = map[ScrapeProtocol]string{
PrometheusProto: "application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited",
PrometheusText0_0_4: "text/plain;version=0.0.4",
OpenMetricsText0_0_1: "application/openmetrics-text;version=0.0.1",
OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0",
}

DefaultScrapeProtocols = []ScrapeProtocol{
OpenMetricsText1_0_0,
OpenMetricsText0_0_1,
PrometheusText0_0_4,
}
DefaultNativeHistogramScrapeProtocols = []ScrapeProtocol{
PrometheusProto,
OpenMetricsText1_0_0,
OpenMetricsText0_0_1,
PrometheusText0_0_4,
}
)

// validateAcceptScrapeProtocols return errors if we see problems with accept scrape protocols option.
func validateAcceptScrapeProtocols(sps []ScrapeProtocol) error {
if len(sps) == 0 {
return errors.New("scrape_protocols cannot be empty")
}
dups := map[string]struct{}{}
for _, sp := range sps {
if _, ok := dups[strings.ToLower(string(sp))]; ok {
return fmt.Errorf("duplicated protocol in scrape_protocols, got %v", sps)
}
if err := sp.Validate(); err != nil {
return fmt.Errorf("scrape_protocols: %w", err)
}
dups[strings.ToLower(string(sp))] = struct{}{}
}
return nil
}

// SetDirectory joins any relative file paths with dir.
func (c *GlobalConfig) SetDirectory(dir string) {
c.QueryLogFile = config.JoinDir(dir, c.QueryLogFile)
Expand Down Expand Up @@ -459,6 +529,14 @@ func (c *GlobalConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if gc.EvaluationInterval == 0 {
gc.EvaluationInterval = DefaultGlobalConfig.EvaluationInterval
}

if gc.ScrapeProtocols == nil {
gc.ScrapeProtocols = DefaultGlobalConfig.ScrapeProtocols
}
if err := validateAcceptScrapeProtocols(gc.ScrapeProtocols); err != nil {
return fmt.Errorf("%w for global config", err)
}

*c = *gc
return nil
}
Expand All @@ -469,7 +547,8 @@ func (c *GlobalConfig) isZero() bool {
c.ScrapeInterval == 0 &&
c.ScrapeTimeout == 0 &&
c.EvaluationInterval == 0 &&
c.QueryLogFile == ""
c.QueryLogFile == "" &&
c.ScrapeProtocols == nil
}

type ScrapeConfigs struct {
Expand All @@ -490,6 +569,11 @@ type ScrapeConfig struct {
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The timeout for scraping targets of this config.
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// The protocols to negotiate during a scrape. It tells clients what
// protocol are accepted by Prometheus and with what preference (most wanted is first).
// Supported values (case sensitive): PrometheusProto, OpenMetricsText0.0.1,
// OpenMetricsText1.0.0, PrometheusText0.0.4.
ScrapeProtocols []ScrapeProtocol `yaml:"scrape_protocols,omitempty"`
// Whether to scrape a classic histogram that is also exposed as a native histogram.
ScrapeClassicHistograms bool `yaml:"scrape_classic_histograms,omitempty"`
// The HTTP resource path on which to fetch metrics from targets.
Expand Down Expand Up @@ -577,6 +661,7 @@ func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

// Validate validates scrape config, but also fills relevant default values from global config if needed.
func (c *ScrapeConfig) Validate(globalConfig GlobalConfig) error {
if c == nil {
return errors.New("empty or null scrape config section")
Expand Down Expand Up @@ -618,6 +703,13 @@ func (c *ScrapeConfig) Validate(globalConfig GlobalConfig) error {
c.KeepDroppedTargets = globalConfig.KeepDroppedTargets
}

if c.ScrapeProtocols == nil {
c.ScrapeProtocols = globalConfig.ScrapeProtocols
}
if err := validateAcceptScrapeProtocols(c.ScrapeProtocols); err != nil {
return fmt.Errorf("%w for scrape config with job name %q", err, c.JobName)
}

return nil
}

Expand Down
Loading

0 comments on commit 635001b

Please sign in to comment.