Skip to content

Commit

Permalink
Remove unused ingestion logic. (#984)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavdugar-stripe authored Aug 29, 2022
1 parent 7cb32d9 commit b86033b
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 597 deletions.
18 changes: 0 additions & 18 deletions datadog_trace_span.go

This file was deleted.

121 changes: 0 additions & 121 deletions handlers_global.go

This file was deleted.

121 changes: 1 addition & 120 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,10 @@ package veneur
import (
"net/http"
"net/http/pprof"
"sort"
"time"

"github.com/stripe/veneur/v14/samplers"
"github.com/stripe/veneur/v14/ssf"
"github.com/stripe/veneur/v14/trace"
"github.com/stripe/veneur/v14/trace/metrics"
"github.com/stripe/veneur/v14/util/build"
"github.com/stripe/veneur/v14/util/config"

"context"

"github.com/segmentio/fasthash/fnv1a"
"goji.io"
"goji.io/pat"
)
Expand Down Expand Up @@ -59,121 +50,11 @@ func (s *Server) Handler() http.Handler {
mux.HandleFunc(pat.Get(endpoint), customHandler)
}

mux.Handle(pat.Post("/import"), http.HandlerFunc(s.handleImport))

mux.Handle(pat.Get("/debug/pprof/cmdline"), http.HandlerFunc(pprof.Cmdline))
mux.Handle(pat.Get("/debug/pprof/profile"), http.HandlerFunc(pprof.Profile))
mux.Handle(pat.Get("/debug/pprof/symbol"), http.HandlerFunc(pprof.Symbol))
mux.Handle(pat.Get("/debug/pprof/trace"), http.HandlerFunc(pprof.Trace))
// TODO match without trailing slash as well
mux.Handle(pat.Get("/debug/pprof/*"), http.HandlerFunc(pprof.Index))
mux.Handle(pat.Get("/debug/pprof/"), http.HandlerFunc(pprof.Index))

return mux
}

// handleImport generates the handler that responds to POST requests submitting
// metrics to the global veneur instance.
func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
span, jsonMetrics, err := unmarshalMetricsFromHTTP(
ctx, s.TraceClient, w, r, s.logger)
if err != nil {
s.logger.WithError(err).Error("Error unmarshalling metrics in global import")
span.Add(ssf.Count("import.unmarshal.errors_total", 1, nil))
return
}
// the server usually waits for this to return before finalizing the
// response, so this part must be done asynchronously
go s.ImportMetrics(span.Attach(ctx), jsonMetrics)
}

// ImportMetrics feeds a slice of json metrics to the server's workers
func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric) {
span, _ := trace.StartSpanFromContext(ctx, "veneur.opentracing.import.import_metrics")
defer span.Finish()

// we have a slice of json metrics that we need to divide up across the workers
// we don't want to push one metric at a time (too much channel contention
// and goroutine switching) and we also don't want to allocate a temp
// slice for each worker (which we'll have to append to, therefore lots
// of allocations)
// instead, we'll compute the fnv hash of every metric in the array,
// and sort the array by the hashes
sortedIter := newJSONMetricsByWorker(jsonMetrics, len(s.Workers))
for sortedIter.Next() {
nextChunk, workerIndex := sortedIter.Chunk()
s.Workers[workerIndex].ImportChan <- nextChunk
}
metrics.ReportOne(s.TraceClient, ssf.Timing("import.response_duration_ns", time.Since(span.Start), time.Nanosecond, map[string]string{"part": "merge"}))
}

// sorts a set of jsonmetrics by what worker they belong to
type sortableJSONMetrics struct {
metrics []samplers.JSONMetric
workerIndices []uint32
}

func newSortableJSONMetrics(metrics []samplers.JSONMetric, numWorkers int) *sortableJSONMetrics {
ret := sortableJSONMetrics{
metrics: metrics,
workerIndices: make([]uint32, 0, len(metrics)),
}
for _, j := range metrics {
h := fnv1a.Init32
h = fnv1a.AddString32(h, j.Name)
h = fnv1a.AddString32(h, j.Type)
h = fnv1a.AddString32(h, j.JoinedTags)
ret.workerIndices = append(ret.workerIndices, h%uint32(numWorkers))
}
return &ret
}

var _ sort.Interface = &sortableJSONMetrics{}

func (sjm *sortableJSONMetrics) Len() int {
return len(sjm.metrics)
}
func (sjm *sortableJSONMetrics) Less(i, j int) bool {
return sjm.workerIndices[i] < sjm.workerIndices[j]
}
func (sjm *sortableJSONMetrics) Swap(i, j int) {
sjm.metrics[i], sjm.metrics[j] = sjm.metrics[j], sjm.metrics[i]
sjm.workerIndices[i], sjm.workerIndices[j] = sjm.workerIndices[j], sjm.workerIndices[i]
}

type jsonMetricsByWorker struct {
sjm *sortableJSONMetrics
currentStart int
nextStart int
}

// iterate over a sorted set of jsonmetrics, returning them in contiguous
// nonempty chunks such that each chunk corresponds to a single worker.
func newJSONMetricsByWorker(metrics []samplers.JSONMetric, numWorkers int) *jsonMetricsByWorker {
ret := &jsonMetricsByWorker{
sjm: newSortableJSONMetrics(metrics, numWorkers),
}
sort.Sort(ret.sjm)
return ret
}

func (jmbw *jsonMetricsByWorker) Next() bool {
if jmbw.sjm.Len() == jmbw.nextStart {
return false
}

// look for the first metric whose worker is different from our starting
// one, or until the end of the list in which case all metrics have the
// same worker
for i := jmbw.nextStart; i <= jmbw.sjm.Len(); i++ {
if i == jmbw.sjm.Len() || jmbw.sjm.workerIndices[i] != jmbw.sjm.workerIndices[jmbw.nextStart] {
jmbw.currentStart = jmbw.nextStart
jmbw.nextStart = i
break
}
}
return true
}
func (jmbw *jsonMetricsByWorker) Chunk() ([]samplers.JSONMetric, int) {
return jmbw.sjm.metrics[jmbw.currentStart:jmbw.nextStart], int(jmbw.sjm.workerIndices[jmbw.currentStart])
}
Loading

0 comments on commit b86033b

Please sign in to comment.