Skip to content

Commit

Permalink
Merge pull request #841 from forta-network/caner/forta-1483-add-json-…
Browse files Browse the repository at this point in the history
…rpc-method-to-node-metrics

Add JSON-RPC method to metrics
  • Loading branch information
canercidam authored Dec 26, 2023
2 parents e6154bf + 3fa0a60 commit 45dcd3b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
20 changes: 18 additions & 2 deletions services/components/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"fmt"
"time"

"github.com/forta-network/forta-core-go/domain"
Expand Down Expand Up @@ -143,7 +144,7 @@ func GetCombinerMetrics(agt config.AgentConfig, resp *protocol.EvaluateAlertResp
return createMetrics(agt, resp.Timestamp, metrics)
}

func GetJSONRPCMetrics(agt config.AgentConfig, at time.Time, success, throttled int, latencyMs time.Duration) []*protocol.AgentMetric {
func GetJSONRPCMetrics(agt config.AgentConfig, at time.Time, success, throttled int, latencyMs time.Duration, method string) []*protocol.AgentMetric {
values := make(map[string]float64)
if latencyMs > 0 {
values[MetricJSONRPCLatency] = float64(latencyMs.Milliseconds())
Expand All @@ -156,7 +157,22 @@ func GetJSONRPCMetrics(agt config.AgentConfig, at time.Time, success, throttled
values[MetricJSONRPCThrottled] = float64(throttled)
values[MetricJSONRPCRequest] += float64(throttled)
}
return createMetrics(agt, at.Format(time.RFC3339), values)
return createJsonRpcMetrics(agt, at.Format(time.RFC3339), values, method)
}

func createJsonRpcMetrics(agt config.AgentConfig, timestamp string, metricMap map[string]float64, method string) []*protocol.AgentMetric {
var res []*protocol.AgentMetric

for name, value := range metricMap {
res = append(res, &protocol.AgentMetric{
AgentId: agt.ID,
Timestamp: timestamp,
Name: fmt.Sprintf("%s.%s", name, method),
Value: value,
ShardId: agt.ShardID(),
})
}
return res
}

func GetPublicAPIMetrics(botID string, at time.Time, success, throttled int, latencyMs time.Duration) []*protocol.AgentMetric {
Expand Down
24 changes: 24 additions & 0 deletions services/json-rpc/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package json_rpc

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)

type jsonRpcReq struct {
Method string `json:"method"`
}

func decodeAndReplaceBody(req *http.Request) (*jsonRpcReq, error) {
b, _ := io.ReadAll(req.Body)
req.Body.Close()
req.Body = io.NopCloser(bytes.NewBuffer(b))
var decodedBody jsonRpcReq
if err := json.Unmarshal(b, &decodedBody); err != nil {
return nil, fmt.Errorf("failed to decode json-rpc request body")
}
return &decodedBody, nil
}
27 changes: 27 additions & 0 deletions services/json-rpc/decode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package json_rpc

import (
"bytes"
"io"
"net/http"
"testing"

"github.com/stretchr/testify/require"
)

func TestDecodeAndReplaceBody(t *testing.T) {
r := require.New(t)

bodyStr := `{"method":"eth_call"}`
req, err := http.NewRequest("GET", "/", bytes.NewBufferString(bodyStr))
r.NoError(err)

// decodes successfully
decodedBody, err := decodeAndReplaceBody(req)
r.NoError(err)
r.Equal("eth_call", decodedBody.Method)

// still can read body because it was replaced
b, err := io.ReadAll(req.Body)
r.Equal(bodyStr, string(b))
}
18 changes: 12 additions & 6 deletions services/json-rpc/json_rpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
"net/url"
"time"

"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/ratelimiter"
"github.com/rs/cors"

"github.com/forta-network/forta-core-go/clients/health"
"github.com/forta-network/forta-core-go/ethereum"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-core-go/protocol/settings"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/messaging"
"github.com/forta-network/forta-node/clients/ratelimiter"
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/services/components/metrics"
"github.com/rs/cors"
"github.com/sirupsen/logrus"
)

// JsonRpcProxy proxies requests from agents to json-rpc endpoint
Expand Down Expand Up @@ -71,12 +71,18 @@ func (p *JsonRpcProxy) Start() error {
func (p *JsonRpcProxy) metricHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
t := time.Now()

decodedBody, err := decodeAndReplaceBody(req)
if err != nil {
logrus.WithError(err).Error("failed to decode request")
}

agentConfig, err := p.botAuthenticator.FindAgentFromRemoteAddr(req.RemoteAddr)
if err == nil && p.rateLimiter.ExceedsLimit(agentConfig.ID) {
writeTooManyReqsErr(w, req)
p.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: metrics.GetJSONRPCMetrics(*agentConfig, t, 0, 1, 0),
Metrics: metrics.GetJSONRPCMetrics(*agentConfig, t, 0, 1, 0, decodedBody.Method),
},
)
return
Expand All @@ -88,7 +94,7 @@ func (p *JsonRpcProxy) metricHandler(h http.Handler) http.Handler {
duration := time.Since(t)
p.msgClient.PublishProto(
messaging.SubjectMetricAgent, &protocol.AgentMetricList{
Metrics: metrics.GetJSONRPCMetrics(*agentConfig, t, 1, 0, duration),
Metrics: metrics.GetJSONRPCMetrics(*agentConfig, t, 1, 0, duration, decodedBody.Method),
},
)
}
Expand Down

0 comments on commit 45dcd3b

Please sign in to comment.