Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor multisearch, fix numeric timestamp unmarshalling #122

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 63 additions & 119 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
)

Expand All @@ -25,6 +24,7 @@ type DatasourceInfo struct {
MaxConcurrentShardRequests int64
}

// TODO: Move ConfiguredFields closer to handlers, the client layer doesn't need this stuff
type ConfiguredFields struct {
TimeField string
TimeOutputFormat string
Expand All @@ -34,85 +34,29 @@ type ConfiguredFields struct {

// Client represents a client which can interact with elasticsearch api
type Client interface {
GetConfiguredFields() ConfiguredFields
ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error)
MultiSearch() *MultiSearchRequestBuilder
ExecuteMultisearch(r []*SearchRequest) ([]*json.RawMessage, error)
}

var logger = log.New()

// NewClient creates a new Quickwit client
var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) {
logger := log.New()
logger.Debug("Creating new client", "configuredFields", fmt.Sprintf("%#v", ds.ConfiguredFields), "index", ds.Database)
var NewClient = func(ctx context.Context, ds *DatasourceInfo) (Client, error) {
logger.Debug("Creating new client", "index", ds.Database)

return &baseClientImpl{
logger: logger,
ctx: ctx,
ds: ds,
configuredFields: ds.ConfiguredFields,
index: ds.Database,
timeRange: timeRange,
ctx: ctx,
ds: ds,
index: ds.Database,
}, nil
}

type baseClientImpl struct {
ctx context.Context
ds *DatasourceInfo
configuredFields ConfiguredFields
index string
timeRange backend.TimeRange
logger log.Logger
}

func (c *baseClientImpl) GetConfiguredFields() ConfiguredFields {
return c.configuredFields
}

type multiRequest struct {
header map[string]interface{}
body interface{}
interval time.Duration
}

func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*http.Response, error) {
bytes, err := c.encodeBatchRequests(requests)
if err != nil {
return nil, err
}
return c.executeRequest(http.MethodPost, uriPath, uriQuery, bytes)
}

func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
c.logger.Debug("Encoding batch requests to json", "batch requests", len(requests))
start := time.Now()

payload := bytes.Buffer{}
for _, r := range requests {
reqHeader, err := json.Marshal(r.header)
if err != nil {
return nil, err
}
payload.WriteString(string(reqHeader) + "\n")

reqBody, err := json.Marshal(r.body)

if err != nil {
return nil, err
}

body := string(reqBody)
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10))
body = strings.ReplaceAll(body, "$__interval", r.interval.String())

payload.WriteString(body + "\n")
}

elapsed := time.Since(start)
c.logger.Debug("Encoded batch requests to json", "took", elapsed)

return payload.Bytes(), nil
ctx context.Context
ds *DatasourceInfo
index string
}

func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*http.Response, error) {
func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []byte) (*http.Request, error) {
u, err := url.Parse(c.ds.URL)
if err != nil {
return nil, err
Expand All @@ -129,59 +73,49 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [
if err != nil {
return nil, err
}

c.logger.Debug("Executing request", "url", req.URL.String(), "method", method)

req.Header.Set("Content-Type", "application/x-ndjson")
return req, nil
}

start := time.Now()
defer func() {
elapsed := time.Since(start)
c.logger.Debug("Executed request", "took", elapsed)
}()
//nolint:bodyclose
resp, err := c.ds.HTTPClient.Do(req)
// Multisearch uses a shallow unmarshalled struct to defer the decoding to downstream handlers
type MultiSearchResponse struct {
Responses []*json.RawMessage `json:"responses"`
}

func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) ([]*json.RawMessage, error) {
req, err := c.createMultiSearchRequest(requests, c.index)
if err != nil {
return nil, err
}

return resp, nil
}

func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) {
c.logger.Debug("Executing multisearch", "search requests", r.Requests)

multiRequests := c.createMultiSearchRequests(r.Requests)
queryParams := c.getMultiSearchQueryParameters()
clientRes, err := c.executeBatchRequest("_elastic/_msearch", queryParams, multiRequests)
res, err := c.ds.HTTPClient.Do(req)
if err != nil {
return nil, err
}
res := clientRes
defer func() {
if err := res.Body.Close(); err != nil {
c.logger.Warn("Failed to close response body", "err", err)
logger.Warn("Failed to close response body", "err", err)
}
}()

c.logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)

if res.StatusCode >= 400 {
qe := QuickwitQueryError{
Status: res.StatusCode,
Message: "Error on multisearch",
ResponseBody: res.Body,
QueryParam: queryParams,
RequestBody: r.Requests,
QueryParam: req.URL.RawQuery,
RequestBody: requests,
}

errorPayload, _ := json.Marshal(qe)
c.logger.Error(string(errorPayload))
logger.Error(string(errorPayload))
return nil, fmt.Errorf(string(errorPayload))
}

start := time.Now()
c.logger.Debug("Decoding multisearch json response")
logger.Debug("Decoding multisearch json response")

var msr MultiSearchResponse
dec := json.NewDecoder(res.Body)
Expand All @@ -191,43 +125,53 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
}

elapsed := time.Since(start)
c.logger.Debug("Decoded multisearch json response", "took", elapsed)
logger.Debug("Decoded multisearch json response", "took", elapsed)

msr.Status = res.StatusCode

return &msr, nil
return msr.Responses, nil
}

func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
multiRequests := []*multiRequest{}

for _, searchReq := range searchRequests {
mr := multiRequest{
header: map[string]interface{}{
"ignore_unavailable": true,
"index": strings.Split(c.index, ","),
},
body: searchReq,
interval: searchReq.Interval,
func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) {
// Format, marshall and interpolate
payload := bytes.Buffer{}
for _, r := range searchRequests {
header := map[string]interface{}{
"ignore_unavailable": true,
"index": strings.Split(index, ","),
}
reqHeader, err := json.Marshal(header)
if err != nil {
return nil, err
}
payload.WriteString(string(reqHeader) + "\n")

multiRequests = append(multiRequests, &mr)
}
reqBody, err := json.Marshal(r)

if err != nil {
return nil, err
}

body := string(reqBody)
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.Interval.Milliseconds(), 10))
body = strings.ReplaceAll(body, "$__interval", r.Interval.String())

return multiRequests
payload.WriteString(body + "\n")
}
return payload.Bytes(), nil
}

func (c *baseClientImpl) getMultiSearchQueryParameters() string {
var qs []string
func (c *baseClientImpl) createMultiSearchRequest(requests []*SearchRequest, index string) (*http.Request, error) {
body, err := c.makeMultiSearchPayload(requests, index)
if err != nil {
return nil, err
}

var qs []string
maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests
if maxConcurrentShardRequests == 0 {
maxConcurrentShardRequests = 5
}
qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests))
return strings.Join(qs, "&")
}
queryParams := strings.Join(qs, "&")

func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
return NewMultiSearchRequestBuilder()
return c.makeRequest(http.MethodPost, "_elastic/_msearch", queryParams, body)
}
19 changes: 5 additions & 14 deletions pkg/quickwit/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -56,22 +55,15 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
MaxConcurrentShardRequests: 6,
}

from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
timeRange := backend.TimeRange{
From: from,
To: to,
}

c, err := NewClient(context.Background(), &ds, timeRange)
c, err := NewClient(context.Background(), &ds)
require.NoError(t, err)
require.NotNil(t, c)

t.Cleanup(func() {
ts.Close()
})

ms, err := createMultisearchForTest(t, c)
ms, err := createMultisearchForTest(t)
require.NoError(t, err)
res, err := c.ExecuteMultisearch(ms)
require.NoError(t, err)
Expand Down Expand Up @@ -102,15 +94,14 @@ func TestClient_ExecuteMultisearch(t *testing.T) {

assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "fixed_interval").MustString())

assert.Equal(t, 200, res.Status)
require.Len(t, res.Responses, 1)
require.Len(t, res, 1)
})
}

func createMultisearchForTest(t *testing.T, c Client) (*MultiSearchRequest, error) {
func createMultisearchForTest(t *testing.T) ([]*SearchRequest, error) {
t.Helper()

msb := c.MultiSearch()
msb := NewMultiSearchRequestBuilder()
s := msb.Search(15 * time.Second)
s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
a.FixedInterval = "$__interval"
Expand Down
11 changes: 0 additions & 11 deletions pkg/quickwit/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,6 @@ type SearchResponse struct {
Hits *SearchResponseHits `json:"hits"`
}

// MultiSearchRequest represents a multi search request
type MultiSearchRequest struct {
Requests []*SearchRequest
}

// MultiSearchResponse represents a multi search response
type MultiSearchResponse struct {
Status int `json:"status,omitempty"`
Responses []*SearchResponse `json:"responses"`
}

// Query represents a query
type Query struct {
Bool *BoolQuery `json:"bool"`
Expand Down
6 changes: 2 additions & 4 deletions pkg/quickwit/client/search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (m *MultiSearchRequestBuilder) Search(interval time.Duration) *SearchReques
}

// Build builds and return a multi search request
func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) {
func (m *MultiSearchRequestBuilder) Build() ([]*SearchRequest, error) {
requests := []*SearchRequest{}
for _, sb := range m.requestBuilders {
searchRequest, err := sb.Build()
Expand All @@ -153,9 +153,7 @@ func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) {
requests = append(requests, searchRequest)
}

return &MultiSearchRequest{
Requests: requests,
}, nil
return requests, nil
}

// QueryBuilder represents a query builder
Expand Down
4 changes: 2 additions & 2 deletions pkg/quickwit/client/search_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestMultiSearchRequest(t *testing.T) {
t.Run("When building search request should contain one search request", func(t *testing.T) {
mr, err := b.Build()
require.Nil(t, err)
require.Equal(t, 1, len(mr.Requests))
require.Equal(t, 1, len(mr))
})
})

Expand All @@ -416,7 +416,7 @@ func TestMultiSearchRequest(t *testing.T) {
t.Run("When building search request should contain two search requests", func(t *testing.T) {
mr, err := b.Build()
require.Nil(t, err)
require.Equal(t, 2, len(mr.Requests))
require.Equal(t, 2, len(mr))
})
})
}
Loading
Loading