Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-8819: Instrument some grpc/webrtc FTDC stats. #385

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/edaniels/zeroconf"
Expand Down Expand Up @@ -102,6 +103,10 @@ type Server interface {
http.Handler

EnsureAuthed(ctx context.Context) (context.Context, error)

// Stats returns a structure containing numbers that can be interesting for graphing over time
// as a diagnostics tool.
Stats() any
}

type simpleServer struct {
Expand Down Expand Up @@ -146,6 +151,18 @@ type simpleServer struct {

// authIssuer is the JWT issuer (iss) that will be used for our service.
authIssuer string

// counters are for reporting FTDC metrics. A `simpleServer` sets up both a grpc server wrapping
// a standard http2 over TCP connection. And it also sets up grpc services for webrtc
// PeerConnections. These counters are specifically for requests coming in over TCP.
counters struct {
TCPGrpcRequestsStarted atomic.Int64
TCPGrpcWebRequestsStarted atomic.Int64
TCPOtherRequestsStarted atomic.Int64
TCPGrpcRequestsCompleted atomic.Int64
TCPGrpcWebRequestsCompleted atomic.Int64
TCPOtherRequestsCompleted atomic.Int64
}
}

var errMixedUnauthAndAuth = errors.New("cannot use unauthenticated and auth handlers at same time")
Expand Down Expand Up @@ -721,13 +738,19 @@ func (ss *simpleServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = requestWithHost(r)
switch ss.getRequestType(r) {
case requestTypeGRPC:
ss.counters.TCPGrpcRequestsStarted.Add(1)
ss.grpcServer.ServeHTTP(w, r)
ss.counters.TCPGrpcRequestsCompleted.Add(1)
case requestTypeGRPCWeb:
ss.counters.TCPGrpcWebRequestsStarted.Add(1)
ss.grpcWebServer.ServeHTTP(w, r)
ss.counters.TCPGrpcWebRequestsCompleted.Add(1)
case requestTypeNone:
fallthrough
default:
ss.counters.TCPOtherRequestsStarted.Add(1)
ss.grpcGatewayHandler.ServeHTTP(w, r)
ss.counters.TCPOtherRequestsCompleted.Add(1)
}
}

Expand Down Expand Up @@ -861,6 +884,37 @@ func (ss *simpleServer) Stop() error {
return err
}

// SimpleServerStats are stats of the simple variety.
type SimpleServerStats struct {
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
TCPGrpcStats TCPGrpcStats
WebRTCGrpcStats WebRTCGrpcStats
}

// TCPGrpcStats are stats for the classic tcp/http2 webserver.
type TCPGrpcStats struct {
RequestsStarted int64
WebRequestsStarted int64
OtherRequestsStarted int64
RequestsCompleted int64
WebRequestsCompleted int64
OtherRequestsCompleted int64
}

// Stats returns stats. The return value of `any` is to satisfy the FTDC interface.
func (ss *simpleServer) Stats() any {
return SimpleServerStats{
TCPGrpcStats: TCPGrpcStats{
RequestsStarted: ss.counters.TCPGrpcRequestsStarted.Load(),
WebRequestsStarted: ss.counters.TCPGrpcWebRequestsStarted.Load(),
OtherRequestsStarted: ss.counters.TCPOtherRequestsStarted.Load(),
RequestsCompleted: ss.counters.TCPGrpcRequestsCompleted.Load(),
WebRequestsCompleted: ss.counters.TCPGrpcWebRequestsCompleted.Load(),
OtherRequestsCompleted: ss.counters.TCPOtherRequestsCompleted.Load(),
},
WebRTCGrpcStats: ss.webrtcServer.Stats(),
}
}

// A RegisterServiceHandlerFromEndpointFunc is a means to have a service attach itself to a gRPC gateway mux.
type RegisterServiceHandlerFromEndpointFunc func(
ctx context.Context,
Expand Down
46 changes: 46 additions & 0 deletions rpc/wrtc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/viamrobotics/webrtc/v3"
"google.golang.org/grpc"
Expand Down Expand Up @@ -43,6 +44,49 @@ type webrtcServer struct {

onPeerAdded func(pc *webrtc.PeerConnection)
onPeerRemoved func(pc *webrtc.PeerConnection)

counters struct {
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
PeersConnected atomic.Int64
PeersDisconnected atomic.Int64
PeerConnectionErrors atomic.Int64
HeadersProcessed atomic.Int64

// TotalTimeConnectingMillis just counts successful connection attempts.
TotalTimeConnectingMillis atomic.Int64
}
}

// WebRTCGrpcStats are stats of the webrtc variety.
type WebRTCGrpcStats struct {
PeersConnected int64
PeersDisconnected int64
PeerConnectionErrors int64
PeersActive int64
HeadersProcessed int64
CallTicketsAvailable int32
TotalTimeConnectingMillis int64

// When the FTDC frontend is more feature rich, we can remove this and let the frontend compute
// the value.
AverageTimeConnectingMillis float64
}

// Stats returns stats.
func (srv *webrtcServer) Stats() WebRTCGrpcStats {
ret := WebRTCGrpcStats{
PeersConnected: srv.counters.PeersConnected.Load(),
PeersDisconnected: srv.counters.PeersDisconnected.Load(),
PeerConnectionErrors: srv.counters.PeerConnectionErrors.Load(),
HeadersProcessed: srv.counters.HeadersProcessed.Load(),
CallTicketsAvailable: int32(cap(srv.callTickets) - len(srv.callTickets)),
TotalTimeConnectingMillis: srv.counters.TotalTimeConnectingMillis.Load(),
}
ret.PeersActive = ret.PeersConnected - ret.PeersDisconnected
if ret.PeersConnected > 0 {
ret.AverageTimeConnectingMillis = float64(ret.TotalTimeConnectingMillis) / float64(ret.PeersConnected)
}

return ret
}

// from grpc.
Expand Down Expand Up @@ -112,6 +156,7 @@ func (srv *webrtcServer) Stop() {
srv.peerConnsMu.Unlock()

for _, pc := range peerConns {
srv.counters.PeersDisconnected.Add(1)
if err := pc.GracefulClose(); err != nil {
srv.logger.Errorw("error closing peer connection", "error", err)
}
Expand Down Expand Up @@ -206,6 +251,7 @@ func (srv *webrtcServer) removePeer(peerConn *webrtc.PeerConnection) {
srv.peerConnsMu.Lock()
delete(srv.peerConns, peerConn)
srv.peerConnsMu.Unlock()
srv.counters.PeersDisconnected.Add(1)
if srv.onPeerRemoved != nil {
srv.onPeerRemoved(peerConn)
}
Expand Down
1 change: 1 addition & 0 deletions rpc/wrtc_server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
}
}

s.ch.server.counters.HeadersProcessed.Add(1)
s.ch.server.processHeadersMu.RLock()
s.ch.server.processHeadersWorkers.Add(1)
s.ch.server.processHeadersMu.RUnlock()
Expand Down
13 changes: 13 additions & 0 deletions rpc/wrtc_signaling_answerer.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ type answerAttempt struct {
// the designated WebRTC data channel is passed off to the underlying Server which
// is then used as the server end of a gRPC connection.
func (aa *answerAttempt) connect(ctx context.Context) (err error) {
connectionStartTime := time.Now()

// If SOCKS proxy is indicated by environment, extend WebRTC config with an
// `OptionalWebRTCConfig` call to the signaling server. The usage of a SOCKS
// proxy indicates that the server may need a local TURN ICE candidate to
Expand All @@ -361,8 +363,10 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
// Any error below indicates the signaling server is not present.
if s, ok := status.FromError(err); ok && (s.Code() == codes.Unimplemented ||
(s.Code() == codes.InvalidArgument && s.Message() == hostNotAllowedMsg)) {
aa.server.counters.PeerConnectionErrors.Add(1)
return ErrNoWebRTCSignaler
}
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}
webrtcConfig = extendWebRTCConfig(&webrtcConfig, configResp.Config, true)
Expand All @@ -378,6 +382,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
)
if err != nil {
aa.sendError(err)
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}

Expand Down Expand Up @@ -413,6 +418,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
if aa.trickleEnabled {
answer, err := pc.CreateAnswer(nil)
if err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}

Expand Down Expand Up @@ -491,6 +497,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {

err = pc.SetLocalDescription(answer)
if err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}

Expand All @@ -500,12 +507,14 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
// candidate information. This is a Nagle's algorithm-esque batching optimization. I
// think.
case <-ctx.Done():
aa.server.counters.PeerConnectionErrors.Add(1)
return ctx.Err()
}
}

encodedSDP, err := EncodeSDP(pc.LocalDescription())
if err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
aa.sendError(err)
return err
}
Expand All @@ -518,6 +527,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
},
},
}); err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}
close(initSent)
Expand Down Expand Up @@ -574,9 +584,12 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
case <-serverChannel.Ready():
// Happy path
successful = true
aa.server.counters.PeersConnected.Add(1)
aa.server.counters.TotalTimeConnectingMillis.Add(time.Since(connectionStartTime).Milliseconds())
case <-ctx.Done():
// Timed out or signaling server was closed.
aa.sendError(multierr.Combine(ctx.Err(), serverChannel.Close()))
aa.server.counters.PeerConnectionErrors.Add(1)
return ctx.Err()
}

Expand Down
Loading