Skip to content

Commit

Permalink
fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb committed Nov 18, 2024
1 parent ffedeb0 commit 65b1f69
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
45 changes: 24 additions & 21 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ type simpleServer struct {
// a standard http2 over TCP connection. And it also sets up grpc services for webrtc
// PeerConnections. These counters are specifically for requests coming in over TCP.
counters struct {
TCPpGrpcRequestsStarted atomic.Int64
TCPpGrpcWebRequestsStarted atomic.Int64
TCPpOtherRequestsStarted atomic.Int64
TCPpGrpcRequestsCompleted atomic.Int64
TCPpGrpcWebRequestsCompleted atomic.Int64
TCPpOtherRequestsCompleted atomic.Int64
TCPGrpcRequestsStarted atomic.Int64
TCPGrpcWebRequestsStarted atomic.Int64
TCPOtherRequestsStarted atomic.Int64
TCPGrpcRequestsCompleted atomic.Int64
TCPGrpcWebRequestsCompleted atomic.Int64
TCPOtherRequestsCompleted atomic.Int64
}
}

Expand Down Expand Up @@ -738,19 +738,19 @@ func (ss *simpleServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = requestWithHost(r)
switch ss.getRequestType(r) {
case requestTypeGRPC:
ss.counters.TCPpGrpcRequestsStarted.Add(1)
ss.counters.TCPGrpcRequestsStarted.Add(1)
ss.grpcServer.ServeHTTP(w, r)
ss.counters.TCPpGrpcRequestsCompleted.Add(1)
ss.counters.TCPGrpcRequestsCompleted.Add(1)
case requestTypeGRPCWeb:
ss.counters.TCPpGrpcWebRequestsStarted.Add(1)
ss.counters.TCPGrpcWebRequestsStarted.Add(1)
ss.grpcWebServer.ServeHTTP(w, r)
ss.counters.TCPpGrpcWebRequestsCompleted.Add(1)
ss.counters.TCPGrpcWebRequestsCompleted.Add(1)
case requestTypeNone:
fallthrough
default:
ss.counters.TCPpOtherRequestsStarted.Add(1)
ss.counters.TCPOtherRequestsStarted.Add(1)
ss.grpcGatewayHandler.ServeHTTP(w, r)
ss.counters.TCPpOtherRequestsCompleted.Add(1)
ss.counters.TCPOtherRequestsCompleted.Add(1)
}
}

Expand Down Expand Up @@ -884,12 +884,14 @@ func (ss *simpleServer) Stop() error {
return err
}

// SimpleServerStats are stats of the simple variety.
type SimpleServerStats struct {
TCPpGrpcStats TCPpGrpcStats
TCPGrpcStats TCPGrpcStats
WebRTCGrpcStats WebRTCGrpcStats
}

type TCPpGrpcStats struct {
// TCPGrpcStats are stats for the classic tcp/http2 webserver.
type TCPGrpcStats struct {
RequestsStarted int64
WebRequestsStarted int64
OtherRequestsStarted int64
Expand All @@ -898,15 +900,16 @@ type TCPpGrpcStats struct {
OtherRequestsCompleted int64
}

// Stats returns stats. The return value of `any` is to satisfy the FTDC interface.
func (ss *simpleServer) Stats() any {
return SimpleServerStats{
TCPpGrpcStats: TCPpGrpcStats{
RequestsStarted: ss.counters.TCPpGrpcRequestsStarted.Load(),
WebRequestsStarted: ss.counters.TCPpGrpcWebRequestsStarted.Load(),
OtherRequestsStarted: ss.counters.TCPpOtherRequestsStarted.Load(),
RequestsCompleted: ss.counters.TCPpGrpcRequestsCompleted.Load(),
WebRequestsCompleted: ss.counters.TCPpGrpcWebRequestsCompleted.Load(),
OtherRequestsCompleted: ss.counters.TCPpOtherRequestsCompleted.Load(),
TCPGrpcStats: TCPGrpcStats{
RequestsStarted: ss.counters.TCPGrpcRequestsStarted.Load(),
WebRequestsStarted: ss.counters.TCPGrpcWebRequestsStarted.Load(),
OtherRequestsStarted: ss.counters.TCPOtherRequestsStarted.Load(),
RequestsCompleted: ss.counters.TCPGrpcRequestsCompleted.Load(),
WebRequestsCompleted: ss.counters.TCPGrpcWebRequestsCompleted.Load(),
OtherRequestsCompleted: ss.counters.TCPOtherRequestsCompleted.Load(),
},
WebRTCGrpcStats: ss.webrtcServer.Stats(),
}
Expand Down
3 changes: 1 addition & 2 deletions rpc/wrtc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (srv *webrtcServer) Stats() WebRTCGrpcStats {
PeersDisconnected: srv.counters.PeersDisconnected.Load(),
PeerConnectionErrors: srv.counters.PeerConnectionErrors.Load(),
HeadersProcessed: srv.counters.HeadersProcessed.Load(),
CallTicketsAvailable: itn32(cap(srv.callTickets) - len(srv.callTickets)),
CallTicketsAvailable: int32(cap(srv.callTickets) - len(srv.callTickets)),
TotalTimeConnectingMillis: srv.counters.TotalTimeConnectingMillis.Load(),
}
ret.PeersActive = ret.PeersConnected - ret.PeersDisconnected
Expand Down Expand Up @@ -129,7 +129,6 @@ func newWebRTCServerWithInterceptorsAndUnknownStreamHandler(
streamInt: streamInt,
unknownStreamDesc: unknownStreamDesc,
}
srv.counters.CallTicketsAvailable.Store(int32(DefaultWebRTCMaxGRPCCalls))
srv.ctx, srv.cancel = context.WithCancel(context.Background())
return srv
}
Expand Down
2 changes: 0 additions & 2 deletions rpc/wrtc_server_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
// take a ticket
select {
case s.ch.server.callTickets <- struct{}{}:
s.ch.server.counters.CallTicketsAvailable.Add(-1)
default:
if err := s.closeWithSendError(status.Error(codes.ResourceExhausted, "too many in-flight requests")); err != nil {
s.logger.Errorw("error closing", "error", err)
Expand All @@ -305,7 +304,6 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) {
defer func() {
s.ch.server.processHeadersWorkers.Done()
<-s.ch.server.callTickets // return a ticket
s.ch.server.counters.CallTicketsAvailable.Add(1)
}()
if err := handlerFunc(s); err != nil {
if errors.Is(err, io.ErrClosedPipe) || isContextCanceled(err) {
Expand Down

0 comments on commit 65b1f69

Please sign in to comment.