Skip to content

Commit

Permalink
Enable Thanos Query Stats Propagation & cache response headers
Browse files Browse the repository at this point in the history
Thanos Query gives us some really nice detailed internal stats - but thanos-query-frontend annoyingly blats them by trying to parse the response as a Prometheus response, which has different fields and a different structure.

In addition, thanos-query-frontend doesn't even pass the &stats=all parameter through if you try to request it.

This change fixes this by propagating the stats request parameter to the downstream, and decoding/encoding the Thanos Query stats structure properly.

As an extra, we also set a response header if we get a cache hit so that upstreams can use this.

Signed-off-by: milesbryant <[email protected]>
  • Loading branch information
milesbxf committed Nov 15, 2024
1 parent 0577661 commit 6e3b564
Show file tree
Hide file tree
Showing 11 changed files with 652 additions and 302 deletions.
4 changes: 1 addition & 3 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ func registerQueryFrontend(app *extkingpin.App) {

cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)
cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+
"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
Expand Down Expand Up @@ -313,7 +311,7 @@ func runQueryFrontend(
return err
}

roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper)
if err != nil {
return errors.Wrap(err, "setup downstream roundtripper")
}
Expand Down
16 changes: 4 additions & 12 deletions internal/cortex/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ import (

// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
queryStatsEnabled bool
downstreamURL *url.URL
transport http.RoundTripper
}

func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) {
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
}

return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil
return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
}

func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
Expand All @@ -37,13 +36,6 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
}
}

if d.queryStatsEnabled {
// add &stats query param to get thanos-query to add query statistics to log
q := r.URL.Query()
q.Set("stats", "true")
r.URL.RawQuery = q.Encode()
}

r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
Expand Down
90 changes: 1 addition & 89 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package transport
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/util/stats"
"io"
"net/http"
Expand Down Expand Up @@ -44,7 +42,6 @@ var (
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand All @@ -68,27 +65,6 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_seconds",
Help: "Total amount of wall clock time spend processing queries.",
Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 360},
}, []string{"user"})

h.querySamplesTotal = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_total_fetched_samples",
Help: "Number of samples touched to execute a query.",
Buckets: []float64{1, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000},
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySamplesTotal.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
}

return h
}

Expand Down Expand Up @@ -129,38 +105,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

w.WriteHeader(resp.StatusCode)

var respBuf bytes.Buffer
if f.cfg.QueryStatsEnabled {
// Buffer the response body for query stat tracking later
resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf))
}

// log copy response body error so that we will know even though success response code returned
bytesCopied, err := io.Copy(w, resp.Body)
if err != nil && !errors.Is(err, syscall.EPIPE) {
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
}

if f.cfg.QueryStatsEnabled {
// Parse the stats field out of the response body
var statsResponse ResponseWithStats
if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil {
if statsResponse.Data.Stats != nil {
queryString = f.parseRequestQueryString(r, buf)
f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Data.Stats)
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil"))
}
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err)
}
}

// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
if shouldReportSlowQuery {
queryString = f.parseRequestQueryString(r, buf)
}

Expand Down Expand Up @@ -203,47 +156,6 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *stats.BuiltinStats) {
remoteUser, _, _ := r.BasicAuth()

// Log stats.
logMessage := []interface{}{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"remote_user", remoteUser,
"remote_addr", r.RemoteAddr,
"response_time", queryResponseTime,
"query_timings_preparation_time", stats.Timings.QueryPreparationTime,
"query_timings_eval_total_time", stats.Timings.EvalTotalTime,
"query_timings_exec_total_time", stats.Timings.ExecTotalTime,
"query_timings_exec_queue_time", stats.Timings.ExecQueueTime,
"query_timings_inner_eval_time", stats.Timings.InnerEvalTime,
"query_timings_result_sort_time", stats.Timings.ResultSortTime,
}
if stats.Samples != nil {
samples := stats.Samples

logMessage = append(logMessage, []interface{}{
"total_queryable_samples", samples.TotalQueryableSamples,
"peak_samples", samples.PeakSamples,
}...)
}

logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

// Record metrics.
if f.querySeconds != nil {
f.querySeconds.WithLabelValues(remoteUser).Observe(queryResponseTime.Seconds())
}
if f.querySamplesTotal != nil && stats.Samples != nil {
f.querySamplesTotal.WithLabelValues(remoteUser).Observe(float64(stats.Samples.TotalQueryableSamples))
}
}

func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
// Use previously buffered body.
r.Body = io.NopCloser(&bodyBuf)
Expand Down
1 change: 0 additions & 1 deletion internal/cortex/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Config struct {
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"`
AtModifierEnabled bool `yaml:"at_modifier_enabled"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
Expand Down
52 changes: 48 additions & 4 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Response interface {
GetHeaders() []*PrometheusResponseHeader
// GetStats returns the Prometheus query stats in the response.
GetStats() *PrometheusResponseStats
// AddHeader adds a HTTP header to the response
AddHeader(key, value string)
}

type prometheusCodec struct{}
Expand Down Expand Up @@ -184,6 +186,14 @@ func (resp *PrometheusInstantQueryResponse) GetStats() *PrometheusResponseStats
return resp.Data.Stats
}

func (resp *PrometheusResponse) AddHeader(key, value string) {
resp.Headers = append(resp.Headers, &PrometheusResponseHeader{Name: key, Values: []string{value}})
}

func (resp *PrometheusInstantQueryResponse) AddHeader(key, value string) {
resp.Headers = append(resp.Headers, &PrometheusResponseHeader{Name: key, Values: []string{value}})
}

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
return &PrometheusResponse{
Expand Down Expand Up @@ -244,10 +254,10 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
}

if len(resultsCacheGenNumberHeaderValues) != 0 {
response.Headers = []*PrometheusResponseHeader{{
response.Headers = append(response.Headers, &PrometheusResponseHeader{
Name: ResultsCacheGenNumberHeaderName,
Values: resultsCacheGenNumberHeaderValues,
}}
})
}

return &response, nil
Expand Down Expand Up @@ -411,6 +421,7 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.

sp.LogFields(otlog.Int("bytes", len(b)))

fmt.Printf("premarshalled response headers: %v\n", a.Headers)
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
Expand All @@ -419,6 +430,12 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
}
for _, h := range a.Headers {
for _, v := range h.Values {
resp.Header.Add(h.Name, v)
}
}
fmt.Printf("response headers: %v\n", resp.Header)
return &resp, nil
}

Expand Down Expand Up @@ -665,6 +682,23 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
func StatsMerge(resps []Response) *PrometheusResponseStats {
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
hasStats := false

result := &PrometheusResponseStats{
Timings: &PrometheusResponseStats_Timings{
EvalTotalTime: 0,
ResultSortTime: 0,
QueryPreparationTime: 0,
InnerEvalTime: 0,
ExecQueueTime: 0,
ExecTotalTime: 0,
},
Samples: &PrometheusResponseStats_Samples{
TotalQueryableSamples: 0,
PeakSamples: 0,
TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{},
},
}

for _, resp := range resps {
stats := resp.GetStats()
if stats == nil {
Expand All @@ -679,6 +713,18 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
output[s.GetTimestampMs()] = s
}

result.Timings.EvalTotalTime += stats.Timings.EvalTotalTime
result.Timings.ResultSortTime += stats.Timings.ResultSortTime
result.Timings.QueryPreparationTime += stats.Timings.QueryPreparationTime
result.Timings.InnerEvalTime += stats.Timings.InnerEvalTime
result.Timings.ExecQueueTime += stats.Timings.ExecQueueTime
result.Timings.ExecTotalTime += stats.Timings.ExecTotalTime

result.Samples.TotalQueryableSamples += stats.Samples.TotalQueryableSamples
if stats.Samples.PeakSamples > result.Samples.PeakSamples {
result.Samples.PeakSamples = stats.Samples.PeakSamples
}
}

if !hasStats {
Expand All @@ -692,10 +738,8 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {

sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
for _, key := range keys {
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key])
result.Samples.TotalQueryableSamples += output[key].Value
}

return result
Expand Down
Loading

0 comments on commit 6e3b564

Please sign in to comment.