Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-8819: Instrument some grpc/webrtc FTDC stats. #385

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/edaniels/zeroconf"
Expand Down Expand Up @@ -102,6 +103,10 @@
http.Handler

EnsureAuthed(ctx context.Context) (context.Context, error)

// Stats returns a structure containing numbers that can be interesting for graphing over time
// as a diagnostics tool.
Stats() any
}

type simpleServer struct {
Expand Down Expand Up @@ -146,6 +151,15 @@

// authIssuer is the JWT issuer (iss) that will be used for our service.
authIssuer string

counters struct {
TcpGrpcRequestsStarted atomic.Int64

Check failure on line 156 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpGrpcRequestsStarted should be TCPGrpcRequestsStarted (revive)
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
TcpGrpcWebRequestsStarted atomic.Int64

Check failure on line 157 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpGrpcWebRequestsStarted should be TCPGrpcWebRequestsStarted (revive)
TcpOtherRequestsStarted atomic.Int64

Check failure on line 158 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpOtherRequestsStarted should be TCPOtherRequestsStarted (revive)
TcpGrpcRequestsCompleted atomic.Int64

Check failure on line 159 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpGrpcRequestsCompleted should be TCPGrpcRequestsCompleted (revive)
TcpGrpcWebRequestsCompleted atomic.Int64

Check failure on line 160 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpGrpcWebRequestsCompleted should be TCPGrpcWebRequestsCompleted (revive)
TcpOtherRequestsCompleted atomic.Int64

Check failure on line 161 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpOtherRequestsCompleted should be TCPOtherRequestsCompleted (revive)
}
}

var errMixedUnauthAndAuth = errors.New("cannot use unauthenticated and auth handlers at same time")
Expand Down Expand Up @@ -721,13 +735,19 @@
r = requestWithHost(r)
switch ss.getRequestType(r) {
case requestTypeGRPC:
ss.counters.TcpGrpcRequestsStarted.Add(1)
ss.grpcServer.ServeHTTP(w, r)
ss.counters.TcpGrpcRequestsCompleted.Add(1)
case requestTypeGRPCWeb:
ss.counters.TcpGrpcWebRequestsStarted.Add(1)
ss.grpcWebServer.ServeHTTP(w, r)
ss.counters.TcpGrpcWebRequestsCompleted.Add(1)
case requestTypeNone:
fallthrough
default:
ss.counters.TcpOtherRequestsStarted.Add(1)
ss.grpcGatewayHandler.ServeHTTP(w, r)
ss.counters.TcpOtherRequestsCompleted.Add(1)
}
}

Expand Down Expand Up @@ -861,6 +881,34 @@
return err
}

type SimpleServerStats struct {

Check failure on line 884 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

exported: exported type SimpleServerStats should have comment or be unexported (revive)
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
TcpGrpcStats TcpGrpcStats

Check failure on line 885 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: struct field TcpGrpcStats should be TCPGrpcStats (revive)
WebRTCGrpcStats WebRTCGrpcStats
}

type TcpGrpcStats struct {

Check failure on line 889 in rpc/server.go

View workflow job for this annotation

GitHub Actions / Build and Test

var-naming: type TcpGrpcStats should be TCPGrpcStats (revive)
RequestsStarted int64
WebRequestsStarted int64
OtherRequestsStarted int64
RequestsCompleted int64
WebRequestsCompleted int64
OtherRequestsCompleted int64
}

func (ss *simpleServer) Stats() any {
return SimpleServerStats{
TcpGrpcStats: TcpGrpcStats{
RequestsStarted: ss.counters.TcpGrpcRequestsStarted.Load(),
WebRequestsStarted: ss.counters.TcpGrpcWebRequestsStarted.Load(),
OtherRequestsStarted: ss.counters.TcpOtherRequestsStarted.Load(),
RequestsCompleted: ss.counters.TcpGrpcRequestsCompleted.Load(),
WebRequestsCompleted: ss.counters.TcpGrpcWebRequestsCompleted.Load(),
OtherRequestsCompleted: ss.counters.TcpOtherRequestsCompleted.Load(),
},
WebRTCGrpcStats: ss.webrtcServer.Stats(),
}
}

// A RegisterServiceHandlerFromEndpointFunc is a means to have a service attach itself to a gRPC gateway mux.
type RegisterServiceHandlerFromEndpointFunc func(
ctx context.Context,
Expand Down
49 changes: 49 additions & 0 deletions rpc/wrtc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/viamrobotics/webrtc/v3"
"google.golang.org/grpc"
Expand Down Expand Up @@ -43,6 +44,51 @@

onPeerAdded func(pc *webrtc.PeerConnection)
onPeerRemoved func(pc *webrtc.PeerConnection)

counters struct {
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
PeersConnected atomic.Int64
PeersDisconnected atomic.Int64
PeerConnectionErrors atomic.Int64
HeadersProcessed atomic.Int64
// CallTicketsAvailable is technically a guage. It increments and decrements. Rather than
// just going up.
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
CallTicketsAvailable atomic.Int32

// TotalTimeConnectingMillis just counts successful connection attempts.
TotalTimeConnectingMillis atomic.Int64
}
}

type WebRTCGrpcStats struct {

Check failure on line 62 in rpc/wrtc_server.go

View workflow job for this annotation

GitHub Actions / Build and Test

exported: exported type WebRTCGrpcStats should have comment or be unexported (revive)
PeersConnected int64
PeersDisconnected int64
PeerConnectionErrors int64
PeersActive int64
HeadersProcessed int64
CallTicketsAvailable int32
TotalTimeConnectingMillis int64

// When the FTDC frontend is more feature reach, we can remove this and let the frontend compute
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
// the value.
AverageTimeConnectingMillis float64
}

// Stats returns stats.
func (srv *webrtcServer) Stats() WebRTCGrpcStats {
ret := WebRTCGrpcStats{
PeersConnected: srv.counters.PeersConnected.Load(),
PeersDisconnected: srv.counters.PeersDisconnected.Load(),
PeerConnectionErrors: srv.counters.PeerConnectionErrors.Load(),
HeadersProcessed: srv.counters.HeadersProcessed.Load(),
CallTicketsAvailable: srv.counters.CallTicketsAvailable.Load(),
TotalTimeConnectingMillis: srv.counters.TotalTimeConnectingMillis.Load(),
}
ret.PeersActive = ret.PeersConnected - ret.PeersDisconnected
if ret.PeersConnected > 0 {
ret.AverageTimeConnectingMillis = float64(ret.TotalTimeConnectingMillis) / float64(ret.PeersConnected)
}

return ret
}

// from grpc.
Expand Down Expand Up @@ -85,6 +131,7 @@
streamInt: streamInt,
unknownStreamDesc: unknownStreamDesc,
}
srv.counters.CallTicketsAvailable.Store(int32(DefaultWebRTCMaxGRPCCalls))
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
srv.ctx, srv.cancel = context.WithCancel(context.Background())
return srv
}
Expand Down Expand Up @@ -112,6 +159,7 @@
srv.peerConnsMu.Unlock()

for _, pc := range peerConns {
srv.counters.PeersDisconnected.Add(1)
if err := pc.GracefulClose(); err != nil {
srv.logger.Errorw("error closing peer connection", "error", err)
}
Expand Down Expand Up @@ -206,6 +254,7 @@
srv.peerConnsMu.Lock()
delete(srv.peerConns, peerConn)
srv.peerConnsMu.Unlock()
srv.counters.PeersDisconnected.Add(1)
if srv.onPeerRemoved != nil {
srv.onPeerRemoved(peerConn)
}
Expand Down
3 changes: 3 additions & 0 deletions rpc/wrtc_server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
}
}

s.ch.server.counters.HeadersProcessed.Add(1)
s.ch.server.processHeadersMu.RLock()
s.ch.server.processHeadersWorkers.Add(1)
s.ch.server.processHeadersMu.RUnlock()
Expand All @@ -291,6 +292,7 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
// take a ticket
select {
case s.ch.server.callTickets <- struct{}{}:
s.ch.server.counters.CallTicketsAvailable.Add(-1)
default:
if err := s.closeWithSendError(status.Error(codes.ResourceExhausted, "too many in-flight requests")); err != nil {
s.logger.Errorw("error closing", "error", err)
Expand All @@ -303,6 +305,7 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
defer func() {
s.ch.server.processHeadersWorkers.Done()
<-s.ch.server.callTickets // return a ticket
s.ch.server.counters.CallTicketsAvailable.Add(1)
}()
if err := handlerFunc(s); err != nil {
if errors.Is(err, io.ErrClosedPipe) || isContextCanceled(err) {
Expand Down
13 changes: 13 additions & 0 deletions rpc/wrtc_signaling_answerer.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ type answerAttempt struct {
// the designated WebRTC data channel is passed off to the underlying Server which
// is then used as the server end of a gRPC connection.
func (aa *answerAttempt) connect(ctx context.Context) (err error) {
connectionStartTime := time.Now()

// If SOCKS proxy is indicated by environment, extend WebRTC config with an
// `OptionalWebRTCConfig` call to the signaling server. The usage of a SOCKS
// proxy indicates that the server may need a local TURN ICE candidate to
Expand All @@ -361,8 +363,10 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
// Any error below indicates the signaling server is not present.
if s, ok := status.FromError(err); ok && (s.Code() == codes.Unimplemented ||
(s.Code() == codes.InvalidArgument && s.Message() == hostNotAllowedMsg)) {
aa.server.counters.PeerConnectionErrors.Add(1)
return ErrNoWebRTCSignaler
}
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}
webrtcConfig = extendWebRTCConfig(&webrtcConfig, configResp.Config, true)
Expand All @@ -378,6 +382,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
)
if err != nil {
aa.sendError(err)
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}

Expand Down Expand Up @@ -413,6 +418,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
if aa.trickleEnabled {
answer, err := pc.CreateAnswer(nil)
if err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}

Expand Down Expand Up @@ -491,6 +497,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {

err = pc.SetLocalDescription(answer)
if err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}

Expand All @@ -500,12 +507,14 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
// candidate information. This is a Nagle's algorithm-esque batching optimization. I
// think.
case <-ctx.Done():
aa.server.counters.PeerConnectionErrors.Add(1)
return ctx.Err()
}
}

encodedSDP, err := EncodeSDP(pc.LocalDescription())
if err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
aa.sendError(err)
return err
}
Expand All @@ -518,6 +527,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
},
},
}); err != nil {
aa.server.counters.PeerConnectionErrors.Add(1)
return err
}
close(initSent)
Expand Down Expand Up @@ -574,9 +584,12 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) {
case <-serverChannel.Ready():
// Happy path
successful = true
aa.server.counters.PeersConnected.Add(1)
aa.server.counters.TotalTimeConnectingMillis.Add(time.Since(connectionStartTime).Milliseconds())
case <-ctx.Done():
// Timed out or signaling server was closed.
aa.sendError(multierr.Combine(ctx.Err(), serverChannel.Close()))
aa.server.counters.PeerConnectionErrors.Add(1)
return ctx.Err()
}

Expand Down
Loading