From 3096d26a6bb0ff183be507ef60a43929acc8c26b Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 23 Jul 2017 02:16:04 -0700 Subject: [PATCH 1/4] add influxdb client --- docker-compose.yml | 5 + influxdb/client.go | 339 ++++++++++++++++++++++++++++++++++++++++ influxdb/client_test.go | 97 ++++++++++++ influxdb/metric.go | 147 +++++++++++++++++ influxdb/metric_test.go | 125 +++++++++++++++ tag.go | 30 ++++ tag_test.go | 80 ++++++++++ 7 files changed, 823 insertions(+) create mode 100644 docker-compose.yml create mode 100644 influxdb/client.go create mode 100644 influxdb/client_test.go create mode 100644 influxdb/metric.go create mode 100644 influxdb/metric_test.go diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7db821a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,5 @@ +influxdb: + image: influxdb:alpine + ports: + - 8086:8086 + diff --git a/influxdb/client.go b/influxdb/client.go new file mode 100644 index 0000000..2217454 --- /dev/null +++ b/influxdb/client.go @@ -0,0 +1,339 @@ +package influxdb + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "strings" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/segmentio/stats" +) + +const ( + // DefaultAddress is the default address to which the InfluxDB client tries + // to connect to. + DefaultAddress = "localhost:8086" + + // DefaultDatabase is the default database used by the InfluxDB client. + DefaultDatabase = "stats" + + // DefaultBatchSize is the default size for batches of metrics sent to + // InfluxDB. + DefaultBatchSize = 1000 + + // DefaultFlushInterval is the default value used to configure the interval + // at which batches of metrics are flushed to InfluxDB. + DefaultFlushInterval = 10 * time.Second + + // DefaultTimeout is the default timeout value used when sending requests to + // InfluxDB. + DefaultTimeout = 5 * time.Second +) + +// The ClientConfig type is used to configure InfluxDB clients. +type ClientConfig struct { + // Address of the InfluxDB database to send metrics to. + Address string + + // Name of the InfluxDB database to send metrics to. + Database string + + // Maximum size of batch of events sent to InfluxDB. + BatchSize int + + FlushInterval time.Duration + + // Maximum amount of time that requests to InfluxDB may take. + Timeout time.Duration + + // Transport configures the HTTP transport used by the client to send + // requests to InfluxDB. By default http.DefaultTransport is used. + Transport http.RoundTripper +} + +// Client represents an InfluxDB client that implements the stats.Handler +// interface. +type Client struct { + url *url.URL + httpClient http.Client + metrics unsafe.Pointer + pool sync.Pool + join sync.WaitGroup + once sync.Once + done chan struct{} + flushedAt int64 +} + +// NewClient creates and returns a new InfluxDB client publishing metrics to the +// server running at addr. +func NewClient(addr string) *Client { + return NewClientWith(ClientConfig{ + Address: addr, + FlushInterval: DefaultFlushInterval, + }) +} + +// NewClientWith creates and returns a new InfluxDB client configured with the +// given config. +func NewClientWith(config ClientConfig) *Client { + if len(config.Address) == 0 { + config.Address = DefaultAddress + } + + if len(config.Database) == 0 { + config.Database = DefaultDatabase + } + + if config.BatchSize == 0 { + config.BatchSize = DefaultBatchSize + } + + if config.Timeout == 0 { + config.Timeout = DefaultTimeout + } + + c := &Client{ + url: makeURL(config.Address, config.Database), + httpClient: http.Client{ + Timeout: config.Timeout, + Transport: config.Transport, + }, + pool: sync.Pool{New: func() interface{} { return newMetrics(config.BatchSize) }}, + done: make(chan struct{}), + } + + if config.FlushInterval != 0 { + go c.run(config.FlushInterval) + } + + return c +} + +// CreateDB creates a database named db in the InfluxDB server that the client +// was configured to send metrics to. +func (c *Client) CreateDB(db string) error { + u := *c.url + q := u.Query() + q.Del("db") + u.Path = "/query" + u.RawQuery = q.Encode() + + r, err := c.httpClient.Post(u.String(), "application/x-www-form-urlencoded", strings.NewReader( + fmt.Sprintf("q=CREATE DATABASE %q", db), + )) + if err != nil { + return err + } + return readResponse(r) +} + +// HandleMetric satisfies the stats.Handler interface. +func (c *Client) HandleMetric(m *stats.Metric) { + if !stats.TagsAreSorted(m.Tags) { + stats.SortTags(m.Tags) + } + + var mptr *metrics + var flush bool + var added bool +handleMetric: + mptr = c.loadMetrics() + + for mptr == nil { + mptr = c.acquireMetrics() + if c.compareAndSwapMetrics(nil, mptr) { + break + } + c.releaseMetrics(mptr) + mptr = nil + } + + flush, added = mptr.append(m) + + if !added { + c.compareAndSwapMetrics(mptr, nil) + goto handleMetric + } + + if flush { + c.compareAndSwapMetrics(mptr, nil) + c.sendAsync(mptr) + } +} + +// Flush satisfies the stats.Flusher interface. +func (c *Client) Flush() { + c.flush() + c.join.Wait() +} + +// Close closes the client, flushing all buffered metrics and releasing internal +// iresources. +func (c *Client) Close() error { + c.flush() + c.once.Do(func() { close(c.done) }) + c.join.Wait() + return nil +} + +func (c *Client) flush() { + for { + mptr := c.loadMetrics() + if mptr == nil { + break + } + if c.compareAndSwapMetrics(mptr, nil) { + c.sendAsync(mptr) + break + } + } +} + +func (c *Client) sendAsync(m *metrics) { + c.setLastFlush(time.Now()) + c.join.Add(1) + go c.send(m) +} + +func (c *Client) send(m *metrics) { + defer c.join.Done() + defer c.releaseMetrics(m) + + for attempt := 0; attempt != 10; attempt++ { + if attempt != 0 { + select { + case <-time.After(c.httpClient.Timeout): + case <-c.done: + return + } + } + + r, err := c.httpClient.Do(&http.Request{ + Method: "POST", + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + URL: c.url, + Body: newMetricsReader(m), + ContentLength: int64(m.size), + }) + + if err != nil { + log.Print("stats/influxdb:", err) + continue + } + + if err := readResponse(r); err != nil { + log.Printf("stats/influxdb: POST %s: %d %s: %s", c.url, r.StatusCode, r.Status, err) + continue + } + + break + } +} + +func (c *Client) acquireMetrics() *metrics { + return c.pool.Get().(*metrics) +} + +func (c *Client) releaseMetrics(m *metrics) { + m.reset() + c.pool.Put(m) +} + +func (c *Client) loadMetrics() *metrics { + return (*metrics)(atomic.LoadPointer(&c.metrics)) +} + +func (c *Client) compareAndSwapMetrics(old *metrics, new *metrics) bool { + return atomic.CompareAndSwapPointer(&c.metrics, + unsafe.Pointer(old), + unsafe.Pointer(new), + ) +} + +func (c *Client) setLastFlush(t time.Time) { + atomic.StoreInt64(&c.flushedAt, time.Now().UnixNano()) +} + +func (c *Client) lastFlush() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.flushedAt)) +} + +func (c *Client) run(flushInterval time.Duration) { + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + + for { + select { + case <-c.done: + return + case now := <-ticker.C: + if now.Sub(c.lastFlush()) >= flushInterval { + c.flush() + } + } + } +} + +func makeURL(address string, database string) *url.URL { + if !strings.Contains(address, "://") { + address = "http://" + address + } + + u, err := url.Parse(address) + if err != nil { + panic(err) + } + + if len(u.Scheme) == 0 { + u.Scheme = "http" + } + + if len(u.Path) == 0 { + u.Path = "/write" + } + + q := u.Query() + + if _, ok := q["db"]; !ok { + q.Set("db", database) + u.RawQuery = q.Encode() + } + + return u +} + +func readResponse(r *http.Response) error { + if r.StatusCode < 300 { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + return nil + } + + info := &influxError{} + err := json.NewDecoder(r.Body).Decode(info) + r.Body.Close() + + if err != nil { + return err + } + + return info +} + +type influxError struct { + Err string `json:"error"` +} + +func (e *influxError) Error() string { + return e.Err +} diff --git a/influxdb/client_test.go b/influxdb/client_test.go new file mode 100644 index 0000000..62cf906 --- /dev/null +++ b/influxdb/client_test.go @@ -0,0 +1,97 @@ +package influxdb + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" + "testing" + "time" + + "github.com/segmentio/stats" +) + +func TestClient(t *testing.T) { + transport := &errorCaptureTransport{ + RoundTripper: http.DefaultTransport, + } + + client := NewClientWith(ClientConfig{ + Address: DefaultAddress, + Database: "test-db", + BatchSize: 100, + FlushInterval: 100 * time.Millisecond, + Transport: transport, + }) + + if err := client.CreateDB("test-db"); err != nil { + t.Error(err) + } + + for i := 0; i != 1000; i++ { + client.HandleMetric(&stats.Metric{ + Name: "test.metric", + Value: float64(i), + Time: time.Now(), + }) + } + + client.Close() + + if transport.err != nil { + t.Error(transport.err) + } +} + +func BenchmarkClient(b *testing.B) { + c := NewClientWith(ClientConfig{ + Address: DefaultAddress, + BatchSize: 1000, + Transport: &discardTransport{}, + }) + + m := &stats.Metric{ + Namespace: "benchmark", + Name: "test.metric", + Value: 42, + Time: time.Now(), + Tags: []stats.Tag{ + {"hello", "world"}, + {"answer", "42"}, + }, + } + + for i := 0; i != b.N; i++ { + c.HandleMetric(m) + } +} + +type errorCaptureTransport struct { + http.RoundTripper + err error +} + +func (t *errorCaptureTransport) RoundTrip(req *http.Request) (*http.Response, error) { + res, err := t.RoundTripper.RoundTrip(req) + + if t.err == nil { + if err != nil { + t.err = err + } else if res.StatusCode >= 300 { + t.err = fmt.Errorf("%s %s: %d", req.Method, req.URL, res.StatusCode) + } + } + + return res, err +} + +type discardTransport struct { +} + +func (t *discardTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("")), + Request: req, + }, nil +} diff --git a/influxdb/metric.go b/influxdb/metric.go new file mode 100644 index 0000000..cfb6deb --- /dev/null +++ b/influxdb/metric.go @@ -0,0 +1,147 @@ +package influxdb + +import ( + "io" + "strconv" + "sync/atomic" + + "github.com/segmentio/stats" +) + +func appendMetric(b []byte, m *stats.Metric) []byte { + if len(m.Namespace) != 0 { + b = append(b, m.Namespace...) + b = append(b, '.') + } + + b = append(b, m.Name...) + + for _, tag := range m.Tags { + b = append(b, ',') + b = append(b, tag.Name...) + b = append(b, '=') + b = append(b, tag.Value...) + } + + b = append(b, " value="...) + b = strconv.AppendFloat(b, m.Value, 'g', -1, 64) + + b = append(b, ' ') + b = strconv.AppendInt(b, m.Time.UnixNano(), 10) + + return append(b, '\n') +} + +type metrics struct { + lines [][]byte + length int32 + size int32 + remain int32 +} + +func newMetrics(capacity int) *metrics { + return &metrics{ + lines: make([][]byte, capacity), + remain: int32(capacity), + } +} + +func (m *metrics) reset() { + m.length = 0 + m.size = 0 + m.remain = int32(len(m.lines)) +} + +func (m *metrics) len() int { + i := int(atomic.LoadInt32(&m.length)) + n := len(m.lines) + + if i > n { + i = n + } + + return i +} + +func (m *metrics) append(metric *stats.Metric) (flush bool, added bool) { + i := int(atomic.AddInt32(&m.length, 1)) - 1 + n := len(m.lines) + + if i >= n { + return + } + + line := appendMetric(m.lines[i][:0], metric) + m.lines[i] = line + atomic.AddInt32(&m.size, int32(len(line))) + + flush = atomic.AddInt32(&m.remain, -1) == 0 + added = true + return +} + +type metricsReader struct { + lines [][]byte + index int + offset int +} + +func newMetricsReader(m *metrics) *metricsReader { + return &metricsReader{ + lines: m.lines[:m.len()], + } +} + +func (m *metricsReader) Close() error { + m.index = len(m.lines) + m.offset = 0 + return nil +} + +func (m *metricsReader) Read(b []byte) (n int, err error) { + for c := -1; c != 0 && n < len(b); n += c { + c = m.fill(b[n:]) + } + if n == 0 && m.index == len(m.lines) { + err = io.EOF + } + return +} + +func (m *metricsReader) fill(b []byte) int { + if m.index == len(m.lines) { + return 0 + } + + l := m.lines[m.index][m.offset:] + n := copy(b, l) + + if n == len(l) { + m.index++ + m.offset = 0 + } else { + m.offset += n + } + + return n +} + +func (m *metricsReader) WriteTo(w io.Writer) (n int64, err error) { + for err == nil && m.index != len(m.lines) { + var c int + var l = m.lines[m.index][m.offset:] + + c, err = w.Write(l) + if c > 0 { + n += int64(c) + + if c == len(l) { + m.index++ + m.offset = 0 + } else { + m.offset += c + } + } + } + return +} diff --git a/influxdb/metric_test.go b/influxdb/metric_test.go new file mode 100644 index 0000000..38b0fcc --- /dev/null +++ b/influxdb/metric_test.go @@ -0,0 +1,125 @@ +package influxdb + +import ( + "bytes" + "io" + "testing" + "time" + + "github.com/segmentio/stats" +) + +var ( + timestamp = time.Date(2017, 7, 23, 3, 36, 0, 123456789, time.UTC) + testMetrics = []struct { + m stats.Metric + s string + }{ + { + m: stats.Metric{ + Name: "request.count", + Time: timestamp, + Value: 0.5, + }, + s: `request.count value=0.5 1500780960123456789`, + }, + { + m: stats.Metric{ + Namespace: "testing", + Name: "request.count", + Time: timestamp, + Value: 0.5, + }, + s: `testing.request.count value=0.5 1500780960123456789`, + }, + { + m: stats.Metric{ + Name: "request.count", + Time: timestamp, + Value: 0.5, + Tags: []stats.Tag{ + {"hello", "world"}, + {"answer", "42"}, + }, + }, + s: `request.count,hello=world,answer=42 value=0.5 1500780960123456789`, + }, + } +) + +func TestAppendMetric(t *testing.T) { + for _, test := range testMetrics { + t.Run(test.s, func(t *testing.T) { + if s := string(appendMetric(nil, &test.m)); s != (test.s + "\n") { + t.Error("bad metric representation:") + t.Log("expected:", test.s) + t.Log("found: ", s) + } + }) + } +} + +func TestMetricsReaderRead(t *testing.T) { + m := newMetrics(100) + s := "" + + for _, test := range testMetrics { + m.append(&test.m) + s += test.s + s += "\n" + } + + b := &bytes.Buffer{} + w := newMetricsReader(m) + + for { + var a [128]byte + n, err := w.Read(a[:]) + if err == io.EOF { + break + } + if err != nil { + t.Error(err) + } + b.Write(a[:n]) + } + + if b.String() != s { + t.Error("bad metrics read:") + t.Log("expected:", s) + t.Log("found: ", b.String()) + } +} + +func TestMetricsReaderWriteTo(t *testing.T) { + m := newMetrics(100) + s := "" + + for _, test := range testMetrics { + m.append(&test.m) + s += test.s + s += "\n" + } + + b := &bytes.Buffer{} + w := newMetricsReader(m) + w.WriteTo(b) + + if b.String() != s { + t.Error("bad metrics read:") + t.Log("expected:", s) + t.Log("found: ", b.String()) + } +} + +func BenchmarkAppendMetrics(b *testing.B) { + a := make([]byte, 1024) + + for _, test := range testMetrics { + b.Run(test.s, func(b *testing.B) { + for i := 0; i != b.N; i++ { + appendMetric(a[:0], &test.m) + } + }) + } +} diff --git a/tag.go b/tag.go index a8c850a..a89529b 100644 --- a/tag.go +++ b/tag.go @@ -1,11 +1,41 @@ package stats +import "sort" + // Tag represents a single tag that can be set on a metric. type Tag struct { Name string Value string } +// TagsAreSorted returns true if the given list of tags is sorted by tag name, +// false otherwise. +func TagsAreSorted(tags []Tag) bool { + if len(tags) > 1 { + min := tags[0].Name + for _, tag := range tags[1:] { + if tag.Name < min { + return false + } + min = tag.Name + } + } + return true +} + +// SortTags sorts the slice of tags. +func SortTags(tags []Tag) { + // TODO: optimize to get rid of the dynamic memory allocation that occurs + // to construct the interface value. + sort.Sort(tagsByName(tags)) +} + +type tagsByName []Tag + +func (t tagsByName) Len() int { return len(t) } +func (t tagsByName) Less(i int, j int) bool { return t[i].Name < t[j].Name } +func (t tagsByName) Swap(i int, j int) { t[i], t[j] = t[j], t[i] } + func concatTags(t1 []Tag, t2 []Tag) []Tag { n := len(t1) + len(t2) if n == 0 { diff --git a/tag_test.go b/tag_test.go index 08786a8..c6e9061 100644 --- a/tag_test.go +++ b/tag_test.go @@ -1,8 +1,12 @@ package stats import ( + "fmt" "reflect" + "sort" "testing" + + "github.com/segmentio/stats" ) func TestCopyTags(t *testing.T) { @@ -74,3 +78,79 @@ func TestConcatTags(t *testing.T) { }) } } + +func TestTagsAreSorted(t *testing.T) { + tests := []struct { + tags []Tag + sorted bool + }{ + { + tags: nil, + sorted: true, + }, + { + tags: []Tag{{"A", ""}}, + sorted: true, + }, + { + tags: []Tag{{"A", ""}, {"B", ""}, {"C", ""}}, + sorted: true, + }, + { + tags: []Tag{{"C", ""}, {"A", ""}, {"B", ""}}, + sorted: false, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%v", test.tags), func(t *testing.T) { + if sorted := TagsAreSorted(test.tags); sorted != test.sorted { + t.Error(sorted) + } + }) + } +} + +func BenchmarkTagsOrder(b *testing.B) { + b.Run("TagsAreSorted", func(b *testing.B) { + benchmarkTagsOrder(b, TagsAreSorted) + }) + b.Run("sort.IsSorted(tags)", func(b *testing.B) { + benchmarkTagsOrder(b, func(tags []Tag) bool { return sort.IsSorted(tagsByName(tags)) }) + }) +} + +func benchmarkTagsOrder(b *testing.B, isSorted func([]Tag) bool) { + tags := []Tag{ + {"A", ""}, + {"B", ""}, + {"C", ""}, + {"answer", "42"}, + {"hello", "world"}, + {"some long tag name", "!"}, + {"some longer tag name", "1234"}, + } + + for i := 0; i != b.N; i++ { + isSorted(tags) + } +} + +func BenchmarkSortTags(b *testing.B) { + t0 := []Tag{ + {"hello", "world"}, + {"answer", "42"}, + {"some long tag name", "!"}, + {"some longer tag name", "1234"}, + {"A", ""}, + {"B", ""}, + {"C", ""}, + } + + t1 := make([]stats.Tag, len(t0.tags)) + + for i := 0; i != b.N; i++ { + copy(t1, t0) + SortTags(t1) + } +} From 241c4b4b39ff9537f8170229aa8bee87033b8d48 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 23 Jul 2017 02:18:44 -0700 Subject: [PATCH 2/4] use docker 1.10 --- circle.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/circle.yml b/circle.yml index 06ffebf..ee29bdc 100644 --- a/circle.yml +++ b/circle.yml @@ -1,4 +1,6 @@ machine: + pre: + - curl -sSL https://s3.amazonaws.com/circle-downloads/install-circleci-docker.sh | bash -s -- 1.10.0 services: - docker From a5415367271c71f6a097fe087205c3993ef8567d Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 23 Jul 2017 02:22:15 -0700 Subject: [PATCH 3/4] fix tests --- tag_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tag_test.go b/tag_test.go index c6e9061..abbec3f 100644 --- a/tag_test.go +++ b/tag_test.go @@ -5,8 +5,6 @@ import ( "reflect" "sort" "testing" - - "github.com/segmentio/stats" ) func TestCopyTags(t *testing.T) { @@ -147,7 +145,7 @@ func BenchmarkSortTags(b *testing.B) { {"C", ""}, } - t1 := make([]stats.Tag, len(t0.tags)) + t1 := make([]Tag, len(t0.tags)) for i := 0; i != b.N; i++ { copy(t1, t0) From ef2a73bba91acd5de7cd2e2148ac68041ad4c8b4 Mon Sep 17 00:00:00 2001 From: Achille Roussel Date: Sun, 23 Jul 2017 02:35:48 -0700 Subject: [PATCH 4/4] fix tests --- tag_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tag_test.go b/tag_test.go index abbec3f..0ed1247 100644 --- a/tag_test.go +++ b/tag_test.go @@ -145,7 +145,7 @@ func BenchmarkSortTags(b *testing.B) { {"C", ""}, } - t1 := make([]Tag, len(t0.tags)) + t1 := make([]Tag, len(t0)) for i := 0; i != b.N; i++ { copy(t1, t0)