Skip to content

Commit

Permalink
Sample rate for shifting queries
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Sep 17, 2024
1 parent 92a5a28 commit 232c310
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 27 deletions.
7 changes: 5 additions & 2 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ProxyConfig struct {
BackendSkipTLSVerify bool
AddMissingTimeParamToInstantQueries bool
SecondaryBackendsRequestProportion float64
ShiftComparisonSamplingRate float64
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -62,7 +63,6 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 150*time.Second, "The timeout when reading the response from a backend.")
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
f.DurationVar(&cfg.ShiftComparisonQueriesBy, "proxy.shift-comparison-queries-by", 0, "Shift the timestamps of the queries by the given duration before querying and comparing them. This will still do the query for the preferred backend with the original timestamps but do another query with shifted timestamps for comparison.")
f.DurationVar(&cfg.LogSlowQueryResponseThreshold, "proxy.log-slow-query-response-threshold", 10*time.Second, "The minimum difference in response time between slowest and fastest back-end over which to log the query. 0 to disable.")
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
Expand All @@ -71,6 +71,9 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.")
f.BoolVar(&cfg.AddMissingTimeParamToInstantQueries, "proxy.add-missing-time-parameter-to-instant-queries", true, "Add a 'time' parameter to proxied instant query requests if they do not have one.")
f.Float64Var(&cfg.SecondaryBackendsRequestProportion, "proxy.secondary-backends-request-proportion", 1.0, "Proportion of requests to send to secondary backends. Must be between 0 and 1 (inclusive), and if not 1, then -backend.preferred must be set.")
f.DurationVar(&cfg.ShiftComparisonQueriesBy, "proxy.shift-comparison-queries-by", 0, "Shift the timestamps of the queries by the given duration before querying and comparing them. This will still do the query for the preferred backend with the original timestamps but do another query with shifted timestamps for comparison.")
// Defaulted to 0 to avoid mistakes of not setting this correctly and overloading the store-gateways with shifted queries.
f.Float64Var(&cfg.ShiftComparisonSamplingRate, "proxy.shift-comparison-sampling-rate", 0, "Ratio of queries for which query times are shifted based on proxy.shift-comparison-queries-by config, sampled randomly. Must be between 0 and 1 (inclusive).")
}

type Route struct {
Expand Down Expand Up @@ -234,7 +237,7 @@ func (p *Proxy) Start() error {
if p.cfg.CompareResponses {
comparator = route.ResponseComparator
}
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold, p.cfg.SecondaryBackendsRequestProportion))
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg))
}

if p.cfg.PassThroughNonRegisteredRoutes {
Expand Down
45 changes: 20 additions & 25 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,19 @@ type ResponsesComparator interface {
}

type ProxyEndpoint struct {
backends []ProxyBackendInterface
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
slowResponseThreshold time.Duration
secondaryBackendRequestProportion float64
backends []ProxyBackendInterface
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
cfg ProxyConfig

// The preferred backend, if any.
preferredBackend ProxyBackendInterface

shiftComparisonQueriesBy time.Duration

route Route
}

func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration, secondaryBackendRequestProportion float64, shiftComparisonQueriesBy time.Duration) *ProxyEndpoint {
func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, cfg ProxyConfig) *ProxyEndpoint {
var preferredBackend ProxyBackendInterface
for _, backend := range backends {
if backend.Preferred() {
Expand All @@ -54,15 +51,13 @@ func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *Pr
}

return &ProxyEndpoint{
backends: backends,
route: route,
metrics: metrics,
logger: logger,
comparator: comparator,
slowResponseThreshold: slowResponseThreshold,
secondaryBackendRequestProportion: secondaryBackendRequestProportion,
preferredBackend: preferredBackend,
shiftComparisonQueriesBy: shiftComparisonQueriesBy,
backends: backends,
route: route,
metrics: metrics,
logger: logger,
comparator: comparator,
cfg: cfg,
preferredBackend: preferredBackend,
}
}

Expand All @@ -89,15 +84,15 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (p *ProxyEndpoint) selectBackends() []ProxyBackendInterface {
if len(p.backends) == 1 || p.secondaryBackendRequestProportion == 1.0 {
if len(p.backends) == 1 || p.cfg.SecondaryBackendsRequestProportion == 1.0 {
return p.backends
}

if p.secondaryBackendRequestProportion == 0.0 {
if p.cfg.SecondaryBackendsRequestProportion == 0.0 {
return []ProxyBackendInterface{p.preferredBackend}
}

if rand.Float64() > p.secondaryBackendRequestProportion {
if rand.Float64() > p.cfg.SecondaryBackendsRequestProportion {
return []ProxyBackendInterface{p.preferredBackend}
}

Expand Down Expand Up @@ -194,7 +189,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
elapsed, status, body, resp, err := b.ForwardRequest(ctx, req, bodyReader)
contentType := ""

if p.slowResponseThreshold > 0 {
if p.cfg.LogSlowQueryResponseThreshold > 0 {
timingMtx.Lock()
if elapsed > slowestDuration {
slowestDuration = elapsed
Expand Down Expand Up @@ -270,8 +265,8 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
}

var shiftedReq *http.Request
if p.shiftComparisonQueriesBy > 0 {
shiftedReq, err = shiftQueryRequest(req, p.shiftComparisonQueriesBy)
if p.cfg.ShiftComparisonQueriesBy > 0 && rand.Float64() < p.cfg.ShiftComparisonSamplingRate {
shiftedReq, err = shiftQueryRequest(req, p.cfg.ShiftComparisonQueriesBy)
if err != nil {
// TODO: logs & metrics
}
Expand Down Expand Up @@ -338,7 +333,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, backends []Pro
}

// Log queries that are slower in some backends than others
if p.slowResponseThreshold > 0 && slowestDuration-fastestDuration >= p.slowResponseThreshold {
if p.cfg.LogSlowQueryResponseThreshold > 0 && slowestDuration-fastestDuration >= p.cfg.LogSlowQueryResponseThreshold {
level.Warn(logger).Log(
"msg", "response time difference between backends exceeded threshold",
"slowest_duration", slowestDuration,
Expand Down

0 comments on commit 232c310

Please sign in to comment.