diff --git a/pkg/quickwit/client/client.go b/pkg/quickwit/client/client.go index c11fb19..cac315d 100644 --- a/pkg/quickwit/client/client.go +++ b/pkg/quickwit/client/client.go @@ -12,7 +12,6 @@ import ( "strings" "time" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/log" ) @@ -25,6 +24,7 @@ type DatasourceInfo struct { MaxConcurrentShardRequests int64 } +// TODO: Move ConfiguredFields closer to handlers, the client layer doesn't need this stuff type ConfiguredFields struct { TimeField string TimeOutputFormat string @@ -34,85 +34,29 @@ type ConfiguredFields struct { // Client represents a client which can interact with elasticsearch api type Client interface { - GetConfiguredFields() ConfiguredFields - ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) - MultiSearch() *MultiSearchRequestBuilder + ExecuteMultisearch(r []*SearchRequest) ([]*json.RawMessage, error) } +var logger = log.New() + // NewClient creates a new Quickwit client -var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) { - logger := log.New() - logger.Debug("Creating new client", "configuredFields", fmt.Sprintf("%#v", ds.ConfiguredFields), "index", ds.Database) +var NewClient = func(ctx context.Context, ds *DatasourceInfo) (Client, error) { + logger.Debug("Creating new client", "index", ds.Database) return &baseClientImpl{ - logger: logger, - ctx: ctx, - ds: ds, - configuredFields: ds.ConfiguredFields, - index: ds.Database, - timeRange: timeRange, + ctx: ctx, + ds: ds, + index: ds.Database, }, nil } type baseClientImpl struct { - ctx context.Context - ds *DatasourceInfo - configuredFields ConfiguredFields - index string - timeRange backend.TimeRange - logger log.Logger -} - -func (c *baseClientImpl) GetConfiguredFields() ConfiguredFields { - return c.configuredFields -} - -type multiRequest struct { - header map[string]interface{} - body interface{} - interval time.Duration -} - -func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*http.Response, error) { - bytes, err := c.encodeBatchRequests(requests) - if err != nil { - return nil, err - } - return c.executeRequest(http.MethodPost, uriPath, uriQuery, bytes) -} - -func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) { - c.logger.Debug("Encoding batch requests to json", "batch requests", len(requests)) - start := time.Now() - - payload := bytes.Buffer{} - for _, r := range requests { - reqHeader, err := json.Marshal(r.header) - if err != nil { - return nil, err - } - payload.WriteString(string(reqHeader) + "\n") - - reqBody, err := json.Marshal(r.body) - - if err != nil { - return nil, err - } - - body := string(reqBody) - body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10)) - body = strings.ReplaceAll(body, "$__interval", r.interval.String()) - - payload.WriteString(body + "\n") - } - - elapsed := time.Since(start) - c.logger.Debug("Encoded batch requests to json", "took", elapsed) - - return payload.Bytes(), nil + ctx context.Context + ds *DatasourceInfo + index string } -func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*http.Response, error) { +func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []byte) (*http.Request, error) { u, err := url.Parse(c.ds.URL) if err != nil { return nil, err @@ -129,59 +73,49 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [ if err != nil { return nil, err } - - c.logger.Debug("Executing request", "url", req.URL.String(), "method", method) - req.Header.Set("Content-Type", "application/x-ndjson") + return req, nil +} - start := time.Now() - defer func() { - elapsed := time.Since(start) - c.logger.Debug("Executed request", "took", elapsed) - }() - //nolint:bodyclose - resp, err := c.ds.HTTPClient.Do(req) +// Multisearch uses a shallow unmarshalled struct to defer the decoding to downstream handlers +type MultiSearchResponse struct { + Responses []*json.RawMessage `json:"responses"` +} + +func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) ([]*json.RawMessage, error) { + req, err := c.createMultiSearchRequest(requests, c.index) if err != nil { return nil, err } - return resp, nil -} - -func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) { - c.logger.Debug("Executing multisearch", "search requests", r.Requests) - - multiRequests := c.createMultiSearchRequests(r.Requests) - queryParams := c.getMultiSearchQueryParameters() - clientRes, err := c.executeBatchRequest("_elastic/_msearch", queryParams, multiRequests) + res, err := c.ds.HTTPClient.Do(req) if err != nil { return nil, err } - res := clientRes defer func() { if err := res.Body.Close(); err != nil { - c.logger.Warn("Failed to close response body", "err", err) + logger.Warn("Failed to close response body", "err", err) } }() - c.logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength) + logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength) if res.StatusCode >= 400 { qe := QuickwitQueryError{ Status: res.StatusCode, Message: "Error on multisearch", ResponseBody: res.Body, - QueryParam: queryParams, - RequestBody: r.Requests, + QueryParam: req.URL.RawQuery, + RequestBody: requests, } errorPayload, _ := json.Marshal(qe) - c.logger.Error(string(errorPayload)) + logger.Error(string(errorPayload)) return nil, fmt.Errorf(string(errorPayload)) } start := time.Now() - c.logger.Debug("Decoding multisearch json response") + logger.Debug("Decoding multisearch json response") var msr MultiSearchResponse dec := json.NewDecoder(res.Body) @@ -191,43 +125,53 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch } elapsed := time.Since(start) - c.logger.Debug("Decoded multisearch json response", "took", elapsed) + logger.Debug("Decoded multisearch json response", "took", elapsed) - msr.Status = res.StatusCode - - return &msr, nil + return msr.Responses, nil } -func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { - multiRequests := []*multiRequest{} - - for _, searchReq := range searchRequests { - mr := multiRequest{ - header: map[string]interface{}{ - "ignore_unavailable": true, - "index": strings.Split(c.index, ","), - }, - body: searchReq, - interval: searchReq.Interval, +func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) { + // Format, marshall and interpolate + payload := bytes.Buffer{} + for _, r := range searchRequests { + header := map[string]interface{}{ + "ignore_unavailable": true, + "index": strings.Split(index, ","), } + reqHeader, err := json.Marshal(header) + if err != nil { + return nil, err + } + payload.WriteString(string(reqHeader) + "\n") - multiRequests = append(multiRequests, &mr) - } + reqBody, err := json.Marshal(r) + + if err != nil { + return nil, err + } + + body := string(reqBody) + body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.Interval.Milliseconds(), 10)) + body = strings.ReplaceAll(body, "$__interval", r.Interval.String()) - return multiRequests + payload.WriteString(body + "\n") + } + return payload.Bytes(), nil } -func (c *baseClientImpl) getMultiSearchQueryParameters() string { - var qs []string +func (c *baseClientImpl) createMultiSearchRequest(requests []*SearchRequest, index string) (*http.Request, error) { + body, err := c.makeMultiSearchPayload(requests, index) + if err != nil { + return nil, err + } + var qs []string maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests if maxConcurrentShardRequests == 0 { maxConcurrentShardRequests = 5 } qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests)) - return strings.Join(qs, "&") -} + queryParams := strings.Join(qs, "&") -func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder { - return NewMultiSearchRequestBuilder() + return c.makeRequest(http.MethodPost, "_elastic/_msearch", queryParams, body) } diff --git a/pkg/quickwit/client/client_test.go b/pkg/quickwit/client/client_test.go index ede4e29..e8ff948 100644 --- a/pkg/quickwit/client/client_test.go +++ b/pkg/quickwit/client/client_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -56,14 +55,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) { MaxConcurrentShardRequests: 6, } - from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) - to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) - timeRange := backend.TimeRange{ - From: from, - To: to, - } - - c, err := NewClient(context.Background(), &ds, timeRange) + c, err := NewClient(context.Background(), &ds) require.NoError(t, err) require.NotNil(t, c) @@ -71,7 +63,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) { ts.Close() }) - ms, err := createMultisearchForTest(t, c) + ms, err := createMultisearchForTest(t) require.NoError(t, err) res, err := c.ExecuteMultisearch(ms) require.NoError(t, err) @@ -102,15 +94,14 @@ func TestClient_ExecuteMultisearch(t *testing.T) { assert.Equal(t, "15s", jBody.GetPath("aggs", "2", "date_histogram", "fixed_interval").MustString()) - assert.Equal(t, 200, res.Status) - require.Len(t, res.Responses, 1) + require.Len(t, res, 1) }) } -func createMultisearchForTest(t *testing.T, c Client) (*MultiSearchRequest, error) { +func createMultisearchForTest(t *testing.T) ([]*SearchRequest, error) { t.Helper() - msb := c.MultiSearch() + msb := NewMultiSearchRequestBuilder() s := msb.Search(15 * time.Second) s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) { a.FixedInterval = "$__interval" diff --git a/pkg/quickwit/client/models.go b/pkg/quickwit/client/models.go index 6f0d105..7bcd86a 100644 --- a/pkg/quickwit/client/models.go +++ b/pkg/quickwit/client/models.go @@ -59,17 +59,6 @@ type SearchResponse struct { Hits *SearchResponseHits `json:"hits"` } -// MultiSearchRequest represents a multi search request -type MultiSearchRequest struct { - Requests []*SearchRequest -} - -// MultiSearchResponse represents a multi search response -type MultiSearchResponse struct { - Status int `json:"status,omitempty"` - Responses []*SearchResponse `json:"responses"` -} - // Query represents a query type Query struct { Bool *BoolQuery `json:"bool"` diff --git a/pkg/quickwit/client/search_request.go b/pkg/quickwit/client/search_request.go index c3a1d5a..b768b61 100644 --- a/pkg/quickwit/client/search_request.go +++ b/pkg/quickwit/client/search_request.go @@ -143,7 +143,7 @@ func (m *MultiSearchRequestBuilder) Search(interval time.Duration) *SearchReques } // Build builds and return a multi search request -func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) { +func (m *MultiSearchRequestBuilder) Build() ([]*SearchRequest, error) { requests := []*SearchRequest{} for _, sb := range m.requestBuilders { searchRequest, err := sb.Build() @@ -153,9 +153,7 @@ func (m *MultiSearchRequestBuilder) Build() (*MultiSearchRequest, error) { requests = append(requests, searchRequest) } - return &MultiSearchRequest{ - Requests: requests, - }, nil + return requests, nil } // QueryBuilder represents a query builder diff --git a/pkg/quickwit/client/search_request_test.go b/pkg/quickwit/client/search_request_test.go index 18abe48..9c42b00 100644 --- a/pkg/quickwit/client/search_request_test.go +++ b/pkg/quickwit/client/search_request_test.go @@ -404,7 +404,7 @@ func TestMultiSearchRequest(t *testing.T) { t.Run("When building search request should contain one search request", func(t *testing.T) { mr, err := b.Build() require.Nil(t, err) - require.Equal(t, 1, len(mr.Requests)) + require.Equal(t, 1, len(mr)) }) }) @@ -416,7 +416,7 @@ func TestMultiSearchRequest(t *testing.T) { t.Run("When building search request should contain two search requests", func(t *testing.T) { mr, err := b.Build() require.Nil(t, err) - require.Equal(t, 2, len(mr.Requests)) + require.Equal(t, 2, len(mr)) }) }) } diff --git a/pkg/quickwit/data_query.go b/pkg/quickwit/data_query.go index 752329f..7e1fa2e 100644 --- a/pkg/quickwit/data_query.go +++ b/pkg/quickwit/data_query.go @@ -1,13 +1,10 @@ package quickwit import ( - "encoding/json" "fmt" "regexp" "strconv" - "time" - "github.com/grafana/grafana-plugin-sdk-go/backend" es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client" "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson" ) @@ -16,93 +13,32 @@ const ( defaultSize = 100 ) -type elasticsearchDataQuery struct { - client es.Client - dataQueries []backend.DataQuery -} - -var newElasticsearchDataQuery = func(client es.Client, dataQuery []backend.DataQuery) *elasticsearchDataQuery { - return &elasticsearchDataQuery{ - client: client, - dataQueries: dataQuery, - } -} - -func handleQuickwitErrors(e *elasticsearchDataQuery, err error) (*backend.QueryDataResponse, error) { - if nil == err { - return nil, nil - } - - var payload = err.Error() - var qe es.QuickwitQueryError - unmarshall_err := json.Unmarshal([]byte(payload), &qe) - if unmarshall_err == nil { - return nil, err - } - - result := backend.QueryDataResponse{ - Responses: backend.Responses{}, - } - - result.Responses[e.dataQueries[0].RefID] = backend.ErrDataResponse(backend.Status(qe.Status), payload) - return &result, nil -} - -func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) { - queries, err := parseQuery(e.dataQueries) - if err != nil { - return &backend.QueryDataResponse{}, err - } +func buildMSR(queries []*Query, defaultTimeField string) ([]*es.SearchRequest, error) { + ms := es.NewMultiSearchRequestBuilder() - ms := e.client.MultiSearch() - - from := e.dataQueries[0].TimeRange.From.UnixNano() / int64(time.Millisecond) - to := e.dataQueries[0].TimeRange.To.UnixNano() / int64(time.Millisecond) for _, q := range queries { - if err := e.processQuery(q, ms, from, to); err != nil { - return &backend.QueryDataResponse{}, err + err := isQueryWithError(q) + if err != nil { + return nil, err } - } - - req, err := ms.Build() - if err != nil { - return &backend.QueryDataResponse{}, err - } - res, err := e.client.ExecuteMultisearch(req) - result, err := handleQuickwitErrors(e, err) - if result != nil { - return result, nil - } else if err != nil { - return &backend.QueryDataResponse{}, err - } + b := ms.Search(q.Interval) + b.Size(0) + filters := b.Query().Bool().Filter() + filters.AddDateRangeFilter(defaultTimeField, q.RangeTo, q.RangeFrom) + filters.AddQueryStringFilter(q.RawQuery, true, "AND") - return parseResponse(res.Responses, queries, e.client.GetConfiguredFields()) -} - -func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error { - err := isQueryWithError(q) - if err != nil { - return err - } - - defaultTimeField := e.client.GetConfiguredFields().TimeField - b := ms.Search(q.Interval) - b.Size(0) - filters := b.Query().Bool().Filter() - filters.AddDateRangeFilter(defaultTimeField, to, from) - filters.AddQueryStringFilter(q.RawQuery, true, "AND") - - if isLogsQuery(q) { - processLogsQuery(q, b, from, to, defaultTimeField) - } else if isDocumentQuery(q) { - processDocumentQuery(q, b, from, to, defaultTimeField) - } else { - // Otherwise, it is a time series query and we process it - processTimeSeriesQuery(q, b, from, to, defaultTimeField) + if isLogsQuery(q) { + processLogsQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) + } else if isDocumentQuery(q) { + processDocumentQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) + } else { + // Otherwise, it is a time series query and we process it + processTimeSeriesQuery(q, b, q.RangeFrom, q.RangeTo, defaultTimeField) + } } - return nil + return ms.Build() } func setFloatPath(settings *simplejson.Json, path ...string) { diff --git a/pkg/quickwit/data_query_test.go b/pkg/quickwit/data_query_test.go index 06e0a0a..a9b8f81 100644 --- a/pkg/quickwit/data_query_test.go +++ b/pkg/quickwit/data_query_test.go @@ -26,9 +26,9 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "0" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) - require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField) + // require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField) require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") require.Equal(t, sr.Aggs[0].Key, "2") @@ -44,7 +44,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "avg", "id": "0", "settings": {"missing": "null", "script": "1" } }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] secondLevel := firstLevel.Aggregation.Aggs[0] require.Equal(t, secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Settings["script"], "1") @@ -61,7 +61,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation) @@ -81,7 +81,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "avg", "field": "@value", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") require.Equal(t, firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, "@timestamp") @@ -109,7 +109,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Aggregation.Aggregation.(*es.TermsAggregation).Order["_key"], "asc") }) @@ -132,7 +132,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] avgAggOrderBy := sr.Aggs[0].Aggregation.Aggs[0] require.Equal(t, avgAggOrderBy.Key, "5") @@ -162,7 +162,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] termsAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.TermsAggregation) require.Equal(t, termsAgg.Order["_count"], "asc") @@ -186,7 +186,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation) @@ -211,7 +211,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] orderByAgg := sr.Aggs[0].Aggregation.Aggs[0] secondLevel := orderByAgg.Aggregation.Aggregation @@ -239,7 +239,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] orderByAgg := firstLevel.Aggregation.Aggs[0] @@ -269,7 +269,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") @@ -294,7 +294,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation) @@ -320,7 +320,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] percentilesAgg := sr.Aggs[0].Aggregation.Aggs[0] require.Equal(t, percentilesAgg.Key, "1") @@ -352,7 +352,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] filtersAgg := sr.Aggs[0] require.Equal(t, filtersAgg.Key, "2") @@ -382,7 +382,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] filtersAgg := sr.Aggs[0] require.Equal(t, filtersAgg.Key, "2") @@ -403,7 +403,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{ "id": "1", "type": "raw_document", "settings": {} }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] require.Equal(t, sr.Size, defaultSize) }) @@ -416,7 +416,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] // rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter) // require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField) // require.Equal(t, rangeFilter.Lte, toMs) @@ -437,7 +437,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] // rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter) // require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField) // require.Equal(t, rangeFilter.Lte, toMs) @@ -457,7 +457,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{ "id": "1", "type": "raw_document", "settings": { "size": "1337" } }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] require.Equal(t, sr.Size, 1337) }) @@ -476,7 +476,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") @@ -502,7 +502,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogram := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) require.Empty(t, dateHistogram.TimeZone) @@ -524,7 +524,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogram := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) require.Empty(t, dateHistogram.TimeZone) @@ -546,7 +546,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogram := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) require.Equal(t, dateHistogram.TimeZone, "America/Los_Angeles") @@ -567,7 +567,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") @@ -593,7 +593,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") @@ -618,7 +618,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{"type": "count", "id": "1" }] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") @@ -644,7 +644,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") require.Equal(t, len(firstLevel.Aggregation.Aggs), 2) @@ -679,7 +679,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -715,7 +715,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -746,7 +746,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -780,7 +780,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") @@ -816,7 +816,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "5") @@ -841,7 +841,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") @@ -870,7 +870,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -907,7 +907,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -942,7 +942,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "5") @@ -972,7 +972,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1001,7 +1001,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1029,7 +1029,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1059,7 +1059,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1089,7 +1089,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") @@ -1117,7 +1117,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1150,7 +1150,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") @@ -1186,7 +1186,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1220,7 +1220,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") @@ -1253,7 +1253,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { ] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "2") @@ -1275,7 +1275,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{ "id": "1", "type": "raw_data", "settings": {} }] }`, from, to) require.NoError(t, err) - // sr := c.multisearchRequests[0].Requests[0] + // sr := c.multisearchRequests[0][0] // filter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) // require.Equal(t, filter.Query, "foo") // require.Equal(t, filter.AnalyzeWildcard, true) @@ -1289,7 +1289,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{ "id": "1", "type": "raw_data", "settings": {} }] }`, from, to) require.NoError(t, err) - // sr := c.multisearchRequests[0].Requests[0] + // sr := c.multisearchRequests[0][0] // filter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter) // require.Equal(t, filter.Query, "foo") // require.Equal(t, filter.AnalyzeWildcard, true) @@ -1301,11 +1301,11 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{ "type": "logs", "id": "1"}] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] require.Equal(t, sr.Size, defaultSize) rangeFilter := sr.Query.Bool.Filters[0].(*es.DateRangeFilter) - require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField) + // require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField) require.Equal(t, rangeFilter.Lte, "2018-05-15T17:55:00Z") require.Equal(t, rangeFilter.Gte, "2018-05-15T17:50:00Z") require.Equal(t, sr.Sort[0]["@timestamp"]["order"], "desc") @@ -1317,7 +1317,7 @@ func TestExecuteElasticsearchDataQuery(t *testing.T) { "metrics": [{ "type": "logs", "id": "1", "settings": { "limit": "1000" }}] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] require.Equal(t, sr.Size, 1000) }) @@ -1360,7 +1360,7 @@ func TestSettingsCasting(t *testing.T) { "bucketAggs": [{"type": "date_histogram", "field": "@timestamp", "id": "1"}] }`, from, to) require.NoError(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings assert.Equal(t, movingAvgSettings["window"], 5.0) @@ -1403,7 +1403,7 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings @@ -1437,7 +1437,7 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings assert.Equal(t, serialDiffSettings["lag"], 1.) }) @@ -1463,7 +1463,7 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings @@ -1498,7 +1498,7 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) @@ -1532,7 +1532,7 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) @@ -1558,7 +1558,7 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) @@ -1595,7 +1595,7 @@ func TestSettingsCasting(t *testing.T) { }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] newFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[0].Aggregation.Aggregation.(*es.MetricAggregation).Settings oldFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.MetricAggregation).Settings @@ -1616,7 +1616,7 @@ func TestSettingsCasting(t *testing.T) { }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) assert.Equal(t, dateHistogramAgg.Field, "@timestamp") }) @@ -1631,7 +1631,7 @@ func TestSettingsCasting(t *testing.T) { }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) assert.Equal(t, dateHistogramAgg.Field, "@time") }) @@ -1646,7 +1646,7 @@ func TestSettingsCasting(t *testing.T) { }`, from, to) assert.Nil(t, err) - sr := c.multisearchRequests[0].Requests[0] + sr := c.multisearchRequests[0][0] dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) assert.Equal(t, dateHistogramAgg.FixedInterval, "1d") }) @@ -1654,39 +1654,22 @@ func TestSettingsCasting(t *testing.T) { } type fakeClient struct { - configuredFields es.ConfiguredFields multiSearchResponse *es.MultiSearchResponse multiSearchError error builder *es.MultiSearchRequestBuilder - multisearchRequests []*es.MultiSearchRequest + multisearchRequests [][]*es.SearchRequest } func newFakeClient() *fakeClient { - configuredFields := es.ConfiguredFields{ - TimeField: "@timestamp", - LogMessageField: "line", - LogLevelField: "lvl", - } - return &fakeClient{ - configuredFields: configuredFields, - multisearchRequests: make([]*es.MultiSearchRequest, 0), + multisearchRequests: make([][]*es.SearchRequest, 0), multiSearchResponse: &es.MultiSearchResponse{}, } } -func (c *fakeClient) GetConfiguredFields() es.ConfiguredFields { - return c.configuredFields -} - -func (c *fakeClient) ExecuteMultisearch(r *es.MultiSearchRequest) (*es.MultiSearchResponse, error) { +func (c *fakeClient) ExecuteMultisearch(r []*es.SearchRequest) ([]*json.RawMessage, error) { c.multisearchRequests = append(c.multisearchRequests, r) - return c.multiSearchResponse, c.multiSearchError -} - -func (c *fakeClient) MultiSearch() *es.MultiSearchRequestBuilder { - c.builder = es.NewMultiSearchRequestBuilder() - return c.builder + return c.multiSearchResponse.Responses, c.multiSearchError } func newDataQuery(body string) (backend.QueryDataRequest, error) { @@ -1700,8 +1683,13 @@ func newDataQuery(body string) (backend.QueryDataRequest, error) { }, nil } -func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) ( - *backend.QueryDataResponse, error) { +func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) (*backend.QueryDataResponse, error) { + configuredFields := es.ConfiguredFields{ + TimeField: "@timestamp", + LogMessageField: "line", + LogLevelField: "lvl", + } + timeRange := backend.TimeRange{ From: from, To: to, @@ -1714,6 +1702,24 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) }, }, } - query := newElasticsearchDataQuery(c, dataRequest.Queries) - return query.execute() + + // TODO : refactor parsing, processing and executing tests as separate concerns + queries, err := parseQuery(dataRequest.Queries) + if err != nil { + return nil, err + } + req, err := buildMSR(queries, configuredFields.TimeField) + if err != nil { + return &backend.QueryDataResponse{}, err + } + + res, err := c.ExecuteMultisearch(req) + result, err := handleQuickwitErrors(err) + if result != nil { + return result, nil + } else if err != nil { + return &backend.QueryDataResponse{}, err + } + + return parseResponse(res, queries, configuredFields) } diff --git a/pkg/quickwit/elasticsearch.go b/pkg/quickwit/elasticsearch.go index d7e0548..afb18f6 100644 --- a/pkg/quickwit/elasticsearch.go +++ b/pkg/quickwit/elasticsearch.go @@ -2,6 +2,7 @@ package quickwit import ( "context" + "encoding/json" "fmt" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -10,15 +11,59 @@ import ( ) // separate function to allow testing the whole transformation and query flow -func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { - if len(queries) == 0 { +func queryData(ctx context.Context, dataQueries []backend.DataQuery, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { + + // First validate and parse + if len(dataQueries) == 0 { return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") } - client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange) + queries, err := parseQuery(dataQueries) + if err != nil { + return nil, err + } + + // Create a request + // NODE : Params should probably be assembled in a dedicated structure to be reused by parseResponse + req, err := buildMSR(queries, dsInfo.ConfiguredFields.TimeField) + if err != nil { + return &backend.QueryDataResponse{}, err + } + + // Create a client and execute request + client, err := es.NewClient(ctx, dsInfo) if err != nil { return &backend.QueryDataResponse{}, err } - query := newElasticsearchDataQuery(client, queries) - return query.execute() + res, err := client.ExecuteMultisearch(req) + + // TODO : refactor client error handling + result, err := handleQuickwitErrors(err) + if result != nil { + return result, nil + } else if err != nil { + return &backend.QueryDataResponse{}, err + } + + return parseResponse(res, queries, dsInfo.ConfiguredFields) +} + +func handleQuickwitErrors(err error) (*backend.QueryDataResponse, error) { + if nil == err { + return nil, nil + } + + var payload = err.Error() + var qe es.QuickwitQueryError + unmarshall_err := json.Unmarshal([]byte(payload), &qe) + if unmarshall_err == nil { + return nil, err + } + + result := backend.QueryDataResponse{ + Responses: backend.Responses{}, + } + + result.Responses["__queryDataError"] = backend.ErrDataResponse(backend.Status(qe.Status), payload) + return &result, nil } diff --git a/pkg/quickwit/error_handling_test.go b/pkg/quickwit/error_handling_test.go index 7ce7c21..f9f0e29 100644 --- a/pkg/quickwit/error_handling_test.go +++ b/pkg/quickwit/error_handling_test.go @@ -47,7 +47,7 @@ func TestErrorAvgMissingField(t *testing.T) { result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields) require.Nil(t, err) - require.Contains(t, result.response.Responses["A"].Error.Error(), "\"status\":400") + require.Contains(t, result.response.Responses["__queryDataError"].Error.Error(), "\"status\":400") } func TestErrorAvgMissingFieldNoDetailedErrors(t *testing.T) { @@ -80,7 +80,7 @@ func TestErrorAvgMissingFieldNoDetailedErrors(t *testing.T) { result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields) require.Nil(t, err) - require.Contains(t, result.response.Responses["A"].Error.Error(), "\"status\":400") + require.Contains(t, result.response.Responses["__queryDataError"].Error.Error(), "\"status\":400") } func TestErrorTooManyDateHistogramBuckets(t *testing.T) { @@ -165,5 +165,5 @@ func TestNonElasticError(t *testing.T) { result, err := queryDataTestWithResponseCode(query, 403, response, configuredFields) require.Nil(t, err) - require.Contains(t, result.response.Responses["A"].Error.Error(), "\"status\":403") + require.Contains(t, result.response.Responses["__queryDataError"].Error.Error(), "\"status\":403") } diff --git a/pkg/quickwit/models.go b/pkg/quickwit/models.go index c88e8db..818908e 100644 --- a/pkg/quickwit/models.go +++ b/pkg/quickwit/models.go @@ -16,6 +16,8 @@ type Query struct { IntervalMs int64 RefID string MaxDataPoints int64 + RangeFrom int64 + RangeTo int64 } // BucketAgg represents a bucket aggregation of the time series query model of the datasource diff --git a/pkg/quickwit/parse_query.go b/pkg/quickwit/parse_query.go index 6f02e3c..7f1632e 100644 --- a/pkg/quickwit/parse_query.go +++ b/pkg/quickwit/parse_query.go @@ -1,6 +1,8 @@ package quickwit import ( + "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson" @@ -30,6 +32,9 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { intervalMs := model.Get("intervalMs").MustInt64(0) interval := q.Interval + from := q.TimeRange.From.UnixNano() / int64(time.Millisecond) + to := q.TimeRange.To.UnixNano() / int64(time.Millisecond) + queries = append(queries, &Query{ RawQuery: rawQuery, BucketAggs: bucketAggs, @@ -39,6 +44,8 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { IntervalMs: intervalMs, RefID: q.RefID, MaxDataPoints: q.MaxDataPoints, + RangeFrom: from, + RangeTo: to, }) } diff --git a/pkg/quickwit/response_parser.go b/pkg/quickwit/response_parser.go index 4974e80..57daa65 100644 --- a/pkg/quickwit/response_parser.go +++ b/pkg/quickwit/response_parser.go @@ -1,6 +1,7 @@ package quickwit import ( + "bytes" "encoding/json" "errors" "fmt" @@ -40,17 +41,29 @@ const ( var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString)) -func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) { +func parseResponse(rawResponses []*json.RawMessage, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) { result := backend.QueryDataResponse{ Responses: backend.Responses{}, } - if responses == nil { + if rawResponses == nil { return &result, nil } - for i, res := range responses { + for i, rawRes := range rawResponses { target := targets[i] + byteReader := bytes.NewReader(*rawRes) + dec := json.NewDecoder(byteReader) + if isLogsQuery(target) { + dec.UseNumber() + } + var res *es.SearchResponse + err := dec.Decode(&res) + if nil != err { + qwlog.Debug("Failed to decode response", "err", err.Error(), "byteRes", *rawRes) + continue + } + if res.Error != nil { errResult := getErrorFromElasticResponse(res) result.Responses[target.RefID] = backend.DataResponse{ @@ -263,14 +276,28 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str switch propNameValue.(type) { // We are checking for default data types values (float64, int, bool, string) // and default to json.RawMessage if we cannot find any of them + case json.Number: + rawPropSlice := getDocPropSlice[json.Number](docs, propName, size) + propSlice := make([]*float64, size) + for i, val := range rawPropSlice { + val_f64, err := val.Float64() + if err == nil { + propSlice[i] = &val_f64 + } + } + allFields[propNameIdx] = createFieldOfType[float64](propSlice, propName, size, isFilterable) case float64: - allFields[propNameIdx] = createFieldOfType[float64](docs, propName, size, isFilterable) + propSlice := getDocPropSlice[float64](docs, propName, size) + allFields[propNameIdx] = createFieldOfType[float64](propSlice, propName, size, isFilterable) case int: - allFields[propNameIdx] = createFieldOfType[int](docs, propName, size, isFilterable) + propSlice := getDocPropSlice[int](docs, propName, size) + allFields[propNameIdx] = createFieldOfType[int](propSlice, propName, size, isFilterable) case string: - allFields[propNameIdx] = createFieldOfType[string](docs, propName, size, isFilterable) + propSlice := getDocPropSlice[string](docs, propName, size) + allFields[propNameIdx] = createFieldOfType[string](propSlice, propName, size, isFilterable) case bool: - allFields[propNameIdx] = createFieldOfType[bool](docs, propName, size, isFilterable) + propSlice := getDocPropSlice[bool](docs, propName, size) + allFields[propNameIdx] = createFieldOfType[bool](propSlice, propName, size, isFilterable) default: fieldVector := make([]*json.RawMessage, size) for i, doc := range docs { @@ -1070,15 +1097,21 @@ func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propNa return docs[0][propName] } -func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string, size int, isFilterable bool) *data.Field { - fieldVector := make([]*T, size) +func getDocPropSlice[T json.Number | int | float64 | bool | string](docs []map[string]any, propName string, size int) []*T { + values := make([]*T, size) + for i, doc := range docs { value, ok := doc[propName].(T) if !ok { continue } - fieldVector[i] = &value + values[i] = &value } + + return values +} + +func createFieldOfType[T int | float64 | bool | string](fieldVector []*T, propName string, size int, isFilterable bool) *data.Field { field := data.NewField(propName, nil, fieldVector) field.Config = &data.FieldConfig{Filterable: &isFilterable} return field diff --git a/pkg/utils/parse_time.go b/pkg/utils/parse_time.go index b8279d8..79adaa6 100644 --- a/pkg/utils/parse_time.go +++ b/pkg/utils/parse_time.go @@ -1,6 +1,7 @@ package utils import ( + "encoding/json" "errors" "fmt" "reflect" @@ -55,6 +56,13 @@ func ParseTime(value any, timeOutputFormat string) (time.Time, error) { case TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos: var value_i64 int64 switch value.(type) { + case json.Number: + var err error + valueNumber := value.(json.Number) + value_i64, err = valueNumber.Int64() + if nil != err { + return time.Time{}, errors.New("couldn't convert timestamp from json.Number to Int64") + } case int, int8, int16, int32, int64: value_i64 = reflect.ValueOf(value).Int() case float32, float64: