Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start batching combiner requests #297

Merged
merged 14 commits into from
Jan 9, 2024
185 changes: 185 additions & 0 deletions clients/graphql/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package graphql

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/Khan/genqlient/graphql"
"github.com/forta-network/forta-core-go/protocol"
log "github.com/sirupsen/logrus"
)

var (
ErrResponseSizeTooBig = fmt.Errorf("response size too big")
)

// paginateBatch processes the response received from the server and extracts the relevant data.
// It takes an array of AlertsInput objects and a graphql.Response object as input.
// It then iterates over the inputs and extracts the response item based on the alias.
// If there is an error for the input, it logs a warning and continues to the next input.
// If there are more alerts for the input, it adds the input to the list of pagination inputs.
// The function returns the pagination inputs, the extracted alert events, and any encountered error.
func paginateBatch(inputs []*AlertsInput, response *graphql.Response) ([]*AlertsInput,
[]*protocol.AlertEvent, error) {
// type-checking response
if response == nil {
return nil, nil, fmt.Errorf("nil graphql response")
}

batchAlertsResponseUnsafe, ok := response.Data.(*BatchGetAlertsResponse)
if !ok {
return nil, nil, fmt.Errorf("invalid pagination response")
}
if batchAlertsResponseUnsafe == nil {
return nil, nil, fmt.Errorf("nil pagination response")
}

batchAlertsResponse := *batchAlertsResponseUnsafe

var pagination []*AlertsInput
var alerts []*protocol.AlertEvent
for inputIdx := range inputs {
alias := idxToResponseAlias(inputIdx)

logger := log.WithFields(log.Fields{
"input": idxToInputAlias(inputIdx),
})

responseItem, ok := batchAlertsResponse[alias]
if !ok {
logger.Warn("no response for input")
continue
}

// check if there is an error for the input
err := HasError(response.Errors, inputIdx)
if err != nil {
logger.WithError(err).Warn("error response for input")
continue
}

alerts = append(alerts, responseItem.ToAlertEvents()...)

// check if there are more alerts for the input
if !responseItem.PageInfo.HasNextPage {
continue
}

// check if there are more alerts for the input
if responseItem.PageInfo.EndCursor == nil {
continue
}

// add input to the list of pagination inputs
inputs[inputIdx].After = &AlertEndCursorInput{
AlertId: responseItem.PageInfo.EndCursor.AlertId,
BlockNumber: responseItem.PageInfo.EndCursor.BlockNumber,
}

pagination = append(pagination, inputs[inputIdx])
}

return pagination, alerts, nil
}

// fetchAlertsBatch retrieves the alerts in batches from the server using the provided client, inputs, and headers.
func fetchAlertsBatch(ctx context.Context, client string, inputs []*AlertsInput, headers map[string]string) (*graphql.Response, error) {
query, variables := createGetAlertsQuery(inputs)
req := &graphql.Request{
OpName: "getAlerts",
Query: query,
Variables: variables,
}

respBody, err := makeRequest(ctx, client, req, headers)
if err != nil {
return nil, err
}

resp := parseBatchResponse(respBody)
if err != nil {
return nil, err
}

return resp, nil
}

// makeRequest sends a GraphQL request to the specified client and returns the response body.
// It takes the context, client URL, request, and headers as input parameters.
// It marshals the request into JSON and creates an HTTP request.
// It sets the custom headers and executes the query with the default HTTP client.
// If the response status code is not OK, it returns an error with the status and response body.
// If the response is gzip compressed, it decompresses the body before parsing.
// It reads the response body and returns it along with any encountered error.
func makeRequest(ctx context.Context, client string, req *graphql.Request, headers map[string]string) ([]byte, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, err
}

httpReq, err := http.NewRequest(
http.MethodPost,
client,
bytes.NewReader(body),
)
if err != nil {
return nil, err
}

httpReq = httpReq.WithContext(ctx)

// set custom headers
for key, val := range headers {
httpReq.Header.Set(key, val)
}

httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept-Encoding", "gzip")

queryTime := time.Now().Truncate(time.Minute).UnixMilli()
httpReq.Header.Set("Forta-Query-Timestamp", fmt.Sprintf("%d", queryTime))

// execute query
httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
defer httpResp.Body.Close()

if httpResp.StatusCode == http.StatusInternalServerError {
return nil, ErrResponseSizeTooBig
}

if httpResp.StatusCode != http.StatusOK {
var respBody []byte
respBody, err = io.ReadAll(httpResp.Body)
if err != nil {
respBody = []byte(fmt.Sprintf("<unreadable: %v>", err))
}
return nil, fmt.Errorf("returned error %v: %s", httpResp.Status, respBody)
}

// Check if the response is compressed with gzip
var respBodyReader = httpResp.Body
if strings.Contains(httpResp.Header.Get("Content-Encoding"), "gzip") {
respBodyReader, err = gzip.NewReader(httpResp.Body)
if err != nil {
return nil, err
}
defer respBodyReader.Close()
}

// Parse response
respBody, err := io.ReadAll(respBodyReader)
if err != nil {
return nil, err
}
return respBody, err
}
77 changes: 77 additions & 0 deletions clients/graphql/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package graphql

import (
"fmt"
"testing"

"github.com/Khan/genqlient/graphql"
"github.com/forta-network/forta-core-go/protocol"
"github.com/stretchr/testify/assert"
"github.com/vektah/gqlparser/v2/ast"
"github.com/vektah/gqlparser/v2/gqlerror"
)

func TestPaginateBatch(t *testing.T) {
var alerts0 ast.PathName = "alerts0"
tests := []struct {
name string
inputs []*AlertsInput
response *graphql.Response
expectedPagination []*AlertsInput
expectedAlerts []*protocol.AlertEvent
expectedErr error
}{
{
name: "Invalid Pagination Response",
response: &graphql.Response{},
expectedErr: fmt.Errorf("invalid pagination response"),
},
{
name: "Test with two inputs and one error in response",
inputs: []*AlertsInput{{}, {}},
response: &graphql.Response{
Data: &BatchGetAlertsResponse{
"alerts0": {
PageInfo: nil,
},
"alerts1": {
PageInfo: &PageInfo{
HasNextPage: true,
EndCursor: &EndCursor{AlertId: "0xaaa"},
},
},
},
Errors: gqlerror.List{{
Path: ast.Path{
alerts0,
},
Message: "test error",
}},
},
expectedPagination: []*AlertsInput{
{
After: &AlertEndCursorInput{
AlertId: "0xaaa",
},
},
},
expectedAlerts: nil,
expectedErr: nil,
},
// Add more test cases here for other scenarios.
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pagination, alerts, err := paginateBatch(tt.inputs, tt.response)
assert.Equal(t, tt.expectedPagination, pagination)
assert.Equal(t, tt.expectedAlerts, alerts)
if tt.expectedErr != nil {
assert.Error(t, err)
assert.Equal(t, tt.expectedErr.Error(), err.Error())
} else {
assert.NoError(t, err)
}
})
}
}
67 changes: 66 additions & 1 deletion clients/graphql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type client struct {

type Client interface {
GetAlerts(ctx context.Context, input *AlertsInput, headers map[string]string) ([]*protocol.AlertEvent, error)
GetAlertsBatch(ctx context.Context, input []*AlertsInput, headers map[string]string) ([]*protocol.AlertEvent, error)
}

func NewClient(url string) Client {
Expand Down Expand Up @@ -60,7 +61,7 @@ func (ac *client) GetAlerts(
return nil, fmt.Errorf("failed to fetch alerts: %v", err)
}

alerts = append(alerts, response.ToAlertEvents()...)
alerts = append(alerts, response.Alerts.ToAlertEvents()...)

// check if there are more alerts
if !response.Alerts.PageInfo.HasNextPage {
Expand Down Expand Up @@ -175,3 +176,67 @@ func parseResponse(responseBody []byte) (*graphql.Response, *GetAlertsResponse,

return resp, &data, err
}

// GetAlertsBatch is a method that retrieves alerts in batches using pagination.
// It takes a context, a slice of AlertsInput, and a map of headers as parameters.
// It returns a slice of AlertEvent and an error.
//
// The method pre-processes the inputs by assigning default values to some fields if they are not provided.
//
// It then iterates until there are no more pagination inputs to query. In each iteration, it calls the fetchAlertsBatch function to fetch alerts based on the inputs and headers.
// If an error occurs during fetching, the method returns nil and the error.
//
// After fetching alerts, the method calls the paginateBatch function to paginate the inputs and the response. It assigns the new inputs and the alert page to variables.
// If an error occurs during pagination, the method returns nil and the error.
//
// Finally, the method appends the alert page to the alerts slice and repeats the iteration until there are no more pagination inputs to query.
// It then returns the alerts slice and nil.
func (ac *client) GetAlertsBatch(ctx context.Context, inputs []*AlertsInput, headers map[string]string) ([]*protocol.AlertEvent, error) {
// pre-process inputs
for _, input := range inputs {
if input.BlockSortDirection == "" {
input.BlockSortDirection = SortDesc
}

// have a default of 10m
if input.CreatedSince == 0 {
input.CreatedSince = uint(DefaultLastNMinutes.Milliseconds())
}

if input.First == 0 {
input.First = DefaultPageSize
}
}

var alerts []*protocol.AlertEvent

// iterate until there are no more pagination inputs to query
for len(inputs) > 0 {
response, err := fetchAlertsBatch(ctx, ac.url, inputs, headers)
if err != nil {
return nil, err
}

var alertPage []*protocol.AlertEvent
inputs, alertPage, err = paginateBatch(inputs, response)
if err != nil {
return nil, err
}

alerts = append(alerts, alertPage...)
}

return alerts, nil
}

func parseBatchResponse(responseBody []byte) *graphql.Response {
var data BatchGetAlertsResponse
resp := &graphql.Response{Data: &data}

err := json.Unmarshal(responseBody, resp)
if err != nil {
return nil
}

return resp
}
Loading
Loading