From 7634a56949891bcbe238f92213bfe100042c66d2 Mon Sep 17 00:00:00 2001 From: seanavery Date: Tue, 5 Nov 2024 16:11:10 -0500 Subject: [PATCH 01/24] Add basic set stream options --- robot/web/stream/server.go | 48 ++++++++++++++++++++++++ robot/web/stream/stream_test.go | 66 +++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 086fa7cef51..42127f9ab51 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -372,6 +372,54 @@ func (server *Server) GetStreamOptions( }, nil } +// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream +// to the given width and height. +func (server *Server) SetStreamOptions( + ctx context.Context, + req *streampb.SetStreamOptionsRequest, +) (*streampb.SetStreamOptionsResponse, error) { + if req.Name == "" { + return nil, errors.New("stream name is required") + } + if _, ok := server.videoSources[req.Name]; !ok { + return nil, errors.New("stream name not found") + } + if req.Resolution == nil { + return nil, errors.New("resolution is required") + } + if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { + return nil, errors.New("invalid resolution, width and height must be greater than 0") + } + _, err := camera.FromRobot(server.robot, req.Name) + if err != nil { + return nil, fmt.Errorf("failed to get camera from robot: %w", err) + } + err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + if err != nil { + return nil, fmt.Errorf("failed to resize video source: %w", err) + } + return &streampb.SetStreamOptionsResponse{}, nil +} + +// ResizeVideoSource resizes the video source with the given name. +func (server *Server) resizeVideoSource(name string, width, height int) error { + server.mu.Lock() + defer server.mu.Unlock() + existing, ok := server.videoSources[name] + if !ok { + return fmt.Errorf("video source %q not found", name) + } + cam, err := camera.FromRobot(server.robot, name) + if err != nil { + server.logger.Errorw("error getting camera from robot", "error", err) + return err + } + resizer := gostream.NewResizeVideoSource(cam, width, height) + server.logger.Debug("Resizing video source with swap") + existing.Swap(resizer) + return nil +} + // AddNewStreams adds new video and audio streams to the server using the updated set of video and // audio sources. It refreshes the sources, checks for a valid stream configuration, and starts // the streams if applicable. diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index f2cc9472dd4..7cbd2e5514a 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -356,3 +356,69 @@ func TestGetStreamOptions(t *testing.T) { testGetStreamOptions(name, expectedResolutions) } } + +func TestSetStreamOptions(t *testing.T) { + logger := logging.NewTestLogger(t).Sublogger("TestWebReconfigure") + origCfg := &config.Config{Components: []resource.Config{ + // 480p + { + Name: "fake-cam-0-0", + API: resource.NewAPI("rdk", "component", "camera"), + Model: resource.DefaultModelFamily.WithModel("fake"), + ConvertedAttributes: &fake.Config{ + Width: 640, + Height: 480, + }, + }, + }} + + ctx, robot, addr, webSvc := setupRealRobot(t, origCfg, logger) + defer robot.Close(ctx) + defer webSvc.Close(ctx) + conn, err := rgrpc.Dial(context.Background(), addr, logger.Sublogger("TestDial"), rpc.WithDisableDirectGRPC()) + test.That(t, err, test.ShouldBeNil) + defer conn.Close() + + livestreamClient := streampb.NewStreamServiceClient(conn) + listResp, err := livestreamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(listResp.Names), test.ShouldEqual, 1) + + getStreamOptionsResp, err := livestreamClient.GetStreamOptions(ctx, &streampb.GetStreamOptionsRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, getStreamOptionsResp, test.ShouldNotBeNil) + test.That(t, len(getStreamOptionsResp.Resolutions), test.ShouldEqual, 5) + + // Test setting stream options with invalid name + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "invalid-name", + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "not found") + + // Test setting stream options without Name + setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{}) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "name") + + // Test setting stream options with invalid resolution (0x0) + setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + Resolution: &streampb.Resolution{Width: 0, Height: 0}, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") + + // Test setting the stream optoins with a valid resolution + setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) +} From 08ee02ee4039dec1b26fd8e1272a00ecdb885b10 Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 6 Nov 2024 15:48:11 -0500 Subject: [PATCH 02/24] Make sure video track survives resize --- robot/web/stream/stream_test.go | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index 7cbd2e5514a..fc3ef67844a 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -414,6 +414,17 @@ func TestSetStreamOptions(t *testing.T) { test.That(t, setStreamOptionsResp, test.ShouldBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") + // Try adding stream + res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, res, test.ShouldNotBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(tb, videoCnt, test.ShouldEqual, 1) + }) + // Test setting the stream optoins with a valid resolution setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ Name: "fake-cam-0-0", @@ -421,4 +432,36 @@ func TestSetStreamOptions(t *testing.T) { }) test.That(t, err, test.ShouldBeNil) test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + + // Make sure that video strack is still alive through the peer connection + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 1) + + // try RemoveStream + removeRes, err := livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, removeRes, test.ShouldNotBeNil) + // testutils.WaitForAssertion(t, func(tb testing.TB) { + // videoCnt = strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + // test.That(t, videoCnt, test.ShouldEqual, 0) + // }) + + // // Get image from camera and verify the resolution + // !!! this will not work need to go through webrtc path + // cam, err := camera.FromRobot(robot, "fake-cam-0-0") + // cam + // test.That(t, err, test.ShouldBeNil) + // defer cam.Close(ctx) + // var errHandlers []gostream.ErrorHandler + // stream, err := cam.Stream(ctx, errHandlers...) + // test.That(t, err, test.ShouldBeNil) + // defer stream.Close(ctx) + // img, release, err := stream.Next(ctx) + // release() + // test.That(t, err, test.ShouldBeNil) + // test.That(t, img.Bounds().Dx(), test.ShouldEqual, 320) + // test.That(t, img.Bounds().Dy(), test.ShouldEqual, 240) + } From 1d4d5b36809bbf57eb0ae553629f55574fad4df6 Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 7 Nov 2024 13:58:41 -0500 Subject: [PATCH 03/24] Test that video tracks survive setting stream options --- robot/web/stream/stream_test.go | 73 +++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index fc3ef67844a..6827bd42932 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -198,7 +198,7 @@ func TestAudioTrackIsNotCreatedForVideoStream(t *testing.T) { } func TestGetStreamOptions(t *testing.T) { - logger := logging.NewTestLogger(t).Sublogger("TestWebReconfigure") + logger := logging.NewTestLogger(t).Sublogger("GetStreamOptions") // Create a robot with several fake cameras of common resolutions. // Fake cameras with a Model attribute will use Properties to // determine source resolution. Fake cameras without a Model @@ -358,7 +358,7 @@ func TestGetStreamOptions(t *testing.T) { } func TestSetStreamOptions(t *testing.T) { - logger := logging.NewTestLogger(t).Sublogger("TestWebReconfigure") + logger := logging.NewTestLogger(t).Sublogger("TestSetStreamOptions") origCfg := &config.Config{Components: []resource.Config{ // 480p { @@ -370,6 +370,16 @@ func TestSetStreamOptions(t *testing.T) { Height: 480, }, }, + { + Name: "fake-cam-0-1", + API: resource.NewAPI("rdk", "component", "camera"), + Model: resource.DefaultModelFamily.WithModel("fake"), + ConvertedAttributes: &fake.Config{ + Width: 640, + Height: 480, + RTPPassthrough: true, + }, + }, }} ctx, robot, addr, webSvc := setupRealRobot(t, origCfg, logger) @@ -382,7 +392,7 @@ func TestSetStreamOptions(t *testing.T) { livestreamClient := streampb.NewStreamServiceClient(conn) listResp, err := livestreamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) test.That(t, err, test.ShouldBeNil) - test.That(t, len(listResp.Names), test.ShouldEqual, 1) + test.That(t, len(listResp.Names), test.ShouldEqual, 2) getStreamOptionsResp, err := livestreamClient.GetStreamOptions(ctx, &streampb.GetStreamOptionsRequest{ Name: "fake-cam-0-0", @@ -414,13 +424,16 @@ func TestSetStreamOptions(t *testing.T) { test.That(t, setStreamOptionsResp, test.ShouldBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") - // Try adding stream + // Add stream. We want to make sure that we can hot swap with + // SetStreamOptions with a live stream. res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ Name: "fake-cam-0-0", }) test.That(t, err, test.ShouldBeNil) test.That(t, res, test.ShouldNotBeNil) + logger.Info("Checking video track is created") testutils.WaitForAssertion(t, func(tb testing.TB) { + logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") test.That(tb, videoCnt, test.ShouldEqual, 1) }) @@ -437,31 +450,39 @@ func TestSetStreamOptions(t *testing.T) { videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") test.That(t, videoCnt, test.ShouldEqual, 1) - // try RemoveStream + // Add another stream that has RTPPassthrough enabled + res, err = livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, res, test.ShouldNotBeNil) + logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) + testutils.WaitForAssertion(t, func(tb testing.TB) { + videoCnt = strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(tb, videoCnt, test.ShouldEqual, 2) + }) + + // Try setting stream options with RTPPassthrough enabled + setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-1", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + + // Make sure that video strack is still alive through the peer connection + videoCnt = strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 2) + + // Make sure there are no problems removing streams removeRes, err := livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ Name: "fake-cam-0-0", }) test.That(t, err, test.ShouldBeNil) test.That(t, removeRes, test.ShouldNotBeNil) - // testutils.WaitForAssertion(t, func(tb testing.TB) { - // videoCnt = strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") - // test.That(t, videoCnt, test.ShouldEqual, 0) - // }) - - // // Get image from camera and verify the resolution - // !!! this will not work need to go through webrtc path - // cam, err := camera.FromRobot(robot, "fake-cam-0-0") - // cam - // test.That(t, err, test.ShouldBeNil) - // defer cam.Close(ctx) - // var errHandlers []gostream.ErrorHandler - // stream, err := cam.Stream(ctx, errHandlers...) - // test.That(t, err, test.ShouldBeNil) - // defer stream.Close(ctx) - // img, release, err := stream.Next(ctx) - // release() - // test.That(t, err, test.ShouldBeNil) - // test.That(t, img.Bounds().Dx(), test.ShouldEqual, 320) - // test.That(t, img.Bounds().Dy(), test.ShouldEqual, 240) - + removeRes, err = livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, removeRes, test.ShouldNotBeNil) } From 47193fe7761e4b9d6ede4e1b5144274875a27368 Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 7 Nov 2024 14:09:29 -0500 Subject: [PATCH 04/24] Remove duplicate validation --- robot/web/stream/server.go | 9 +-------- robot/web/stream/stream_test.go | 3 ++- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 42127f9ab51..76711089aa1 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -381,20 +381,13 @@ func (server *Server) SetStreamOptions( if req.Name == "" { return nil, errors.New("stream name is required") } - if _, ok := server.videoSources[req.Name]; !ok { - return nil, errors.New("stream name not found") - } if req.Resolution == nil { return nil, errors.New("resolution is required") } if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { return nil, errors.New("invalid resolution, width and height must be greater than 0") } - _, err := camera.FromRobot(server.robot, req.Name) - if err != nil { - return nil, fmt.Errorf("failed to get camera from robot: %w", err) - } - err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + err := server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) if err != nil { return nil, fmt.Errorf("failed to resize video source: %w", err) } diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index 6827bd42932..c882e46395e 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -403,7 +403,8 @@ func TestSetStreamOptions(t *testing.T) { // Test setting stream options with invalid name setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ - Name: "invalid-name", + Name: "invalid-name", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, }) test.That(t, err, test.ShouldNotBeNil) test.That(t, setStreamOptionsResp, test.ShouldBeNil) From 686dbb60a7e2f2ac2c81ee8a4e2ea29ced8b3349 Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 7 Nov 2024 14:19:58 -0500 Subject: [PATCH 05/24] Add read lock to GetStreamOptions --- robot/web/stream/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 76711089aa1..7e6a80f9c61 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -336,6 +336,8 @@ func (server *Server) GetStreamOptions( ctx context.Context, req *streampb.GetStreamOptionsRequest, ) (*streampb.GetStreamOptionsResponse, error) { + server.mu.RLock() + defer server.mu.RUnlock() if req.Name == "" { return nil, errors.New("stream name is required") } From d0d0c5cf4750b83b0940d2b0b7a1db097c1abe2e Mon Sep 17 00:00:00 2001 From: seanavery Date: Fri, 8 Nov 2024 09:30:09 -0600 Subject: [PATCH 06/24] Use %q when logging name --- robot/web/stream/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 7e6a80f9c61..678fcc55592 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -406,11 +406,11 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { } cam, err := camera.FromRobot(server.robot, name) if err != nil { - server.logger.Errorw("error getting camera from robot", "error", err) + server.logger.Errorf("error getting camera %q from robot", name) return err } resizer := gostream.NewResizeVideoSource(cam, width, height) - server.logger.Debug("Resizing video source with swap") + server.logger.Debug("resizing video source with swap") existing.Swap(resizer) return nil } From 3bd8e980d634dfa37f7608aeb3f17b9ae7e69a16 Mon Sep 17 00:00:00 2001 From: seanavery Date: Fri, 8 Nov 2024 15:54:29 -0600 Subject: [PATCH 07/24] Add resize state transition --- robot/web/stream/server.go | 7 +++++++ robot/web/stream/state/state.go | 23 +++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 678fcc55592..8bed177b9cb 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -393,6 +393,13 @@ func (server *Server) SetStreamOptions( if err != nil { return nil, fmt.Errorf("failed to resize video source: %w", err) } + server.mu.RLock() + streamState, ok := server.nameToStreamState[req.Name] + server.mu.RUnlock() + if !ok { + return nil, fmt.Errorf("stream %q not found", req.Name) + } + streamState.Resize() return &streampb.SetStreamOptionsResponse{}, nil } diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index 7a51600e72f..0fd577b7d7a 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -43,6 +43,7 @@ type StreamState struct { streamSource streamSource // streamSourceSub is only non nil if streamSource == streamSourcePassthrough streamSourceSub rtppassthrough.Subscription + resized bool } // New returns a new *StreamState. @@ -61,6 +62,7 @@ func New( robot: r, msgChan: make(chan msg), tickChan: make(chan struct{}), + resized: false, logger: logger, } @@ -90,6 +92,14 @@ func (state *StreamState) Decrement() error { return state.send(msgTypeDecrement) } +// Resize notifies the StreamState that the stream has been resized. +func (state *StreamState) Resize() error { + if err := state.closedCtx.Err(); err != nil { + return multierr.Combine(ErrClosed, err) + } + return state.send(msgTypeResize) +} + // Close closes the StreamState. func (state *StreamState) Close() error { state.logger.Info("Closing streamState") @@ -129,6 +139,7 @@ const ( msgTypeUnknown msgType = iota msgTypeIncrement msgTypeDecrement + msgTypeResize ) func (mt msgType) String() string { @@ -185,6 +196,10 @@ func (state *StreamState) sourceEventHandler() { if state.activeClients == 0 { state.tick() } + case msgTypeResize: + state.logger.Debug("resize event received") + state.resized = true + state.tick() case msgTypeUnknown: fallthrough default: @@ -268,6 +283,14 @@ func (state *StreamState) tick() { state.Stream.Start() state.streamSource = streamSourceGoStream } + // If we are currently using passthrough, and sthe stream state changes to resized + // we need to stop the passthrough stream and restart it through gostream. + case state.streamSource == streamSourcePassthrough && state.resized: + state.logger.Info("stream resized, stopping passthrough stream") + state.stopInputStream() + state.Stream.Start() + state.streamSource = streamSourceGoStream + // state.resized = false case state.streamSource == streamSourcePassthrough: // no op if we are using passthrough & are healthy state.logger.Debug("still healthy and using h264 passthrough") From c95ea9dd7121570485d0e4bc057b724746276080 Mon Sep 17 00:00:00 2001 From: seanavery Date: Fri, 8 Nov 2024 16:03:46 -0600 Subject: [PATCH 08/24] Lint --- robot/web/stream/state/state.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index 0fd577b7d7a..e4e3257e837 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -148,6 +148,8 @@ func (mt msgType) String() string { return "Increment" case msgTypeDecrement: return "Decrement" + case msgTypeResize: + return "Resize" case msgTypeUnknown: fallthrough default: From eef2ab1fe1d2291e72142a38c776ff5fe2efb44f Mon Sep 17 00:00:00 2001 From: seanavery Date: Mon, 11 Nov 2024 13:00:31 -0500 Subject: [PATCH 09/24] Check resize err --- robot/web/stream/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 8bed177b9cb..8cd141fb430 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -399,7 +399,10 @@ func (server *Server) SetStreamOptions( if !ok { return nil, fmt.Errorf("stream %q not found", req.Name) } - streamState.Resize() + err = streamState.Resize() + if err != nil { + return nil, fmt.Errorf("failed to resize stream: %w", err) + } return &streampb.SetStreamOptionsResponse{}, nil } From 9f77dd9f19dc1fb08d0d09f4709af87d7bf77158 Mon Sep 17 00:00:00 2001 From: seanavery Date: Mon, 11 Nov 2024 14:41:54 -0500 Subject: [PATCH 10/24] Add test for resize downgrade path --- robot/web/stream/state/state_test.go | 115 +++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/robot/web/stream/state/state_test.go b/robot/web/stream/state/state_test.go index 1c482482714..9e2ba53d74c 100644 --- a/robot/web/stream/state/state_test.go +++ b/robot/web/stream/state/state_test.go @@ -762,4 +762,119 @@ func TestStreamState(t *testing.T) { test.That(tb, stopCount.Load(), test.ShouldEqual, 3) }) }) + + t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) { + var startCount atomic.Int64 + var stopCount atomic.Int64 + writeRTPCalledCtx, writeRTPCalledFunc := context.WithCancel(ctx) + streamMock := &mockStream{ + name: camName, + t: t, + startFunc: func() { + startCount.Add(1) + }, + stopFunc: func() { + stopCount.Add(1) + }, + writeRTPFunc: func(pkt *rtp.Packet) error { + // Test that WriteRTP is eventually called when SubscribeRTP is called + writeRTPCalledFunc() + return nil + }, + } + + var subscribeRTPCount atomic.Int64 + var unsubscribeCount atomic.Int64 + type subAndCancel struct { + sub rtppassthrough.Subscription + cancelFn context.CancelFunc + wg *sync.WaitGroup + } + + var subsAndCancelByIDMu sync.Mutex + subsAndCancelByID := map[rtppassthrough.SubscriptionID]subAndCancel{} + + subscribeRTPFunc := func( + ctx context.Context, + bufferSize int, + packetsCB rtppassthrough.PacketCallback, + ) (rtppassthrough.Subscription, error) { + subsAndCancelByIDMu.Lock() + defer subsAndCancelByIDMu.Unlock() + defer subscribeRTPCount.Add(1) + terminatedCtx, terminatedFn := context.WithCancel(context.Background()) + id := uuid.New() + sub := rtppassthrough.Subscription{ID: id, Terminated: terminatedCtx} + subsAndCancelByID[id] = subAndCancel{sub: sub, cancelFn: terminatedFn, wg: &sync.WaitGroup{}} + subsAndCancelByID[id].wg.Add(1) + utils.ManagedGo(func() { + for terminatedCtx.Err() == nil { + packetsCB([]*rtp.Packet{{}}) + time.Sleep(time.Millisecond * 50) + } + }, subsAndCancelByID[id].wg.Done) + return sub, nil + } + + unsubscribeFunc := func(ctx context.Context, id rtppassthrough.SubscriptionID) error { + subsAndCancelByIDMu.Lock() + defer subsAndCancelByIDMu.Unlock() + defer unsubscribeCount.Add(1) + subAndCancel, ok := subsAndCancelByID[id] + if !ok { + test.That(t, fmt.Sprintf("Unsubscribe called with unknown id: %s", id.String()), test.ShouldBeFalse) + } + subAndCancel.cancelFn() + return nil + } + + mockRTPPassthroughSource := &mockRTPPassthroughSource{ + subscribeRTPFunc: subscribeRTPFunc, + unsubscribeFunc: unsubscribeFunc, + } + + robot := mockRobot(mockRTPPassthroughSource) + s := state.New(streamMock, robot, logger) + defer func() { + utils.UncheckedError(s.Close()) + }() + + test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 0) + test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0) + test.That(t, writeRTPCalledCtx.Err(), test.ShouldBeNil) + + logger.Info("the first Increment() eventually call calls SubscribeRTP()") + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0) + }) + // WriteRTP is called + <-writeRTPCalledCtx.Done() + + logger.Info("subsequent Increment() calls don't call any other rtppassthrough.Source methods") + test.That(t, s.Increment(), test.ShouldBeNil) + test.That(t, s.Increment(), test.ShouldBeNil) + time.Sleep(sleepDuration) + test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0) + + // Call Resize to simulate a resize event. This should stop rtp_passthrough and start gostream. + logger.Info("calling Resize() should stop rtp_passthrough and start gostream") + test.That(t, s.Resize(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 1) + test.That(tb, stopCount.Load(), test.ShouldEqual, 0) + }) + + // Make sure that Decrement() calls Stop() as gostream is the data source + logger.Info("calling Decrement() should call Stop() as gostream is the data source") + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 1) + test.That(tb, stopCount.Load(), test.ShouldEqual, 1) + }) + }) } From 95315d4625a6cae922bd4bff6cea85f982060362 Mon Sep 17 00:00:00 2001 From: seanavery Date: Mon, 11 Nov 2024 15:19:01 -0500 Subject: [PATCH 11/24] Improve state handler to avoid passthrough attempts when in resize mode --- robot/web/stream/state/state.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index e4e3257e837..268e592e43e 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -43,7 +43,9 @@ type StreamState struct { streamSource streamSource // streamSourceSub is only non nil if streamSource == streamSourcePassthrough streamSourceSub rtppassthrough.Subscription - resized bool + // resized is set to true when the stream has been resized. + // It is a signal to avoid trying to restart the passthrough stream. + resized bool } // New returns a new *StreamState. @@ -264,6 +266,11 @@ func (state *StreamState) tick() { // stop stream if there are no active clients // noop if there is no stream source state.stopInputStream() + // If streamSource is unknown and resized is true, we do not want to attempt passthrough. + case state.streamSource == streamSourceUnknown && state.resized: + state.logger.Debug("in a resized state, using gostream") + state.Stream.Start() + state.streamSource = streamSourceGoStream case state.streamSource == streamSourceUnknown: // && state.activeClients > 0 // this is the first subscription, attempt passthrough state.logger.Info("attempting to subscribe to rtp_passthrough") @@ -274,6 +281,13 @@ func (state *StreamState) tick() { state.Stream.Start() state.streamSource = streamSourceGoStream } + // If we are currently using passthrough, and the stream state changes to resized + // we need to stop the passthrough stream and restart it through gostream. + case state.streamSource == streamSourcePassthrough && state.resized: + state.logger.Info("stream resized, stopping passthrough stream") + state.stopInputStream() + state.Stream.Start() + state.streamSource = streamSourceGoStream case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil: // restart stream if there we were using passthrough but the sub is terminated state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough") @@ -285,18 +299,10 @@ func (state *StreamState) tick() { state.Stream.Start() state.streamSource = streamSourceGoStream } - // If we are currently using passthrough, and sthe stream state changes to resized - // we need to stop the passthrough stream and restart it through gostream. - case state.streamSource == streamSourcePassthrough && state.resized: - state.logger.Info("stream resized, stopping passthrough stream") - state.stopInputStream() - state.Stream.Start() - state.streamSource = streamSourceGoStream - // state.resized = false case state.streamSource == streamSourcePassthrough: // no op if we are using passthrough & are healthy state.logger.Debug("still healthy and using h264 passthrough") - case state.streamSource == streamSourceGoStream: + case state.streamSource == streamSourceGoStream && !state.resized: // Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as // we expect some components to not implement rtp passthrough. state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough") From cd74a133677dd901fa0426717f6f5143e1edfd38 Mon Sep 17 00:00:00 2001 From: seanavery Date: Mon, 11 Nov 2024 15:21:11 -0500 Subject: [PATCH 12/24] Add test cases that ensure we avoid passthrough attempts when resized --- robot/web/stream/state/state_test.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/robot/web/stream/state/state_test.go b/robot/web/stream/state/state_test.go index 9e2ba53d74c..022abd1db87 100644 --- a/robot/web/stream/state/state_test.go +++ b/robot/web/stream/state/state_test.go @@ -852,13 +852,6 @@ func TestStreamState(t *testing.T) { // WriteRTP is called <-writeRTPCalledCtx.Done() - logger.Info("subsequent Increment() calls don't call any other rtppassthrough.Source methods") - test.That(t, s.Increment(), test.ShouldBeNil) - test.That(t, s.Increment(), test.ShouldBeNil) - time.Sleep(sleepDuration) - test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 1) - test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0) - // Call Resize to simulate a resize event. This should stop rtp_passthrough and start gostream. logger.Info("calling Resize() should stop rtp_passthrough and start gostream") test.That(t, s.Resize(), test.ShouldBeNil) @@ -876,5 +869,26 @@ func TestStreamState(t *testing.T) { test.That(tb, startCount.Load(), test.ShouldEqual, 1) test.That(tb, stopCount.Load(), test.ShouldEqual, 1) }) + + // Call increment again and make sure that gostream is started + // and rtppassthrough is not called + logger.Info("calling Increment() should call Start() as gostream is the data source") + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 2) + test.That(tb, stopCount.Load(), test.ShouldEqual, 1) + }) + + // Call decrement again and make sure that gostream is stopped. + logger.Info("calling Decrement() should call Stop() as gostream is the data source") + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 2) + test.That(tb, stopCount.Load(), test.ShouldEqual, 2) + }) }) } From 2c222ebb9bc3c8609b83f78c77c596ae96f7ef26 Mon Sep 17 00:00:00 2001 From: seanavery Date: Tue, 12 Nov 2024 11:55:38 -0500 Subject: [PATCH 13/24] Simplify locking --- robot/web/stream/server.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 8cd141fb430..2812112ecf2 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -389,13 +389,13 @@ func (server *Server) SetStreamOptions( if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { return nil, errors.New("invalid resolution, width and height must be greater than 0") } + server.mu.Lock() + defer server.mu.Unlock() err := server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) if err != nil { return nil, fmt.Errorf("failed to resize video source: %w", err) } - server.mu.RLock() streamState, ok := server.nameToStreamState[req.Name] - server.mu.RUnlock() if !ok { return nil, fmt.Errorf("stream %q not found", req.Name) } @@ -408,8 +408,6 @@ func (server *Server) SetStreamOptions( // ResizeVideoSource resizes the video source with the given name. func (server *Server) resizeVideoSource(name string, width, height int) error { - server.mu.Lock() - defer server.mu.Unlock() existing, ok := server.videoSources[name] if !ok { return fmt.Errorf("video source %q not found", name) From 50a6fbf9ce7f7c0e02650e74d1aba077fa64bafe Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 13 Nov 2024 10:49:51 -0500 Subject: [PATCH 14/24] Change to isResized --- robot/web/stream/state/state.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index 268e592e43e..b154f65f35e 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -43,9 +43,9 @@ type StreamState struct { streamSource streamSource // streamSourceSub is only non nil if streamSource == streamSourcePassthrough streamSourceSub rtppassthrough.Subscription - // resized is set to true when the stream has been resized. - // It is a signal to avoid trying to restart the passthrough stream. - resized bool + // isResized indicates whether the stream has been resized by the stream server. + // When set to true, it signals that the passthrough stream should not be restarted. + isResized bool } // New returns a new *StreamState. @@ -64,7 +64,7 @@ func New( robot: r, msgChan: make(chan msg), tickChan: make(chan struct{}), - resized: false, + isResized: false, logger: logger, } @@ -202,7 +202,7 @@ func (state *StreamState) sourceEventHandler() { } case msgTypeResize: state.logger.Debug("resize event received") - state.resized = true + state.isResized = true state.tick() case msgTypeUnknown: fallthrough @@ -267,7 +267,7 @@ func (state *StreamState) tick() { // noop if there is no stream source state.stopInputStream() // If streamSource is unknown and resized is true, we do not want to attempt passthrough. - case state.streamSource == streamSourceUnknown && state.resized: + case state.streamSource == streamSourceUnknown && state.isResized: state.logger.Debug("in a resized state, using gostream") state.Stream.Start() state.streamSource = streamSourceGoStream @@ -283,7 +283,7 @@ func (state *StreamState) tick() { } // If we are currently using passthrough, and the stream state changes to resized // we need to stop the passthrough stream and restart it through gostream. - case state.streamSource == streamSourcePassthrough && state.resized: + case state.streamSource == streamSourcePassthrough && state.isResized: state.logger.Info("stream resized, stopping passthrough stream") state.stopInputStream() state.Stream.Start() @@ -302,7 +302,7 @@ func (state *StreamState) tick() { case state.streamSource == streamSourcePassthrough: // no op if we are using passthrough & are healthy state.logger.Debug("still healthy and using h264 passthrough") - case state.streamSource == streamSourceGoStream && !state.resized: + case state.streamSource == streamSourceGoStream && !state.isResized: // Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as // we expect some components to not implement rtp passthrough. state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough") From 56b8f2aa0bdefb7c421d120b46716ca8f19ec976 Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 13 Nov 2024 11:07:22 -0500 Subject: [PATCH 15/24] Add more info in error returns --- robot/web/stream/server.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 2812112ecf2..3373b328093 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -381,27 +381,30 @@ func (server *Server) SetStreamOptions( req *streampb.SetStreamOptionsRequest, ) (*streampb.SetStreamOptionsResponse, error) { if req.Name == "" { - return nil, errors.New("stream name is required") + return nil, errors.New("stream name is required in request") } if req.Resolution == nil { - return nil, errors.New("resolution is required") + return nil, fmt.Errorf("resolution is required to resize stream %q", req.Name) } if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { - return nil, errors.New("invalid resolution, width and height must be greater than 0") + return nil, fmt.Errorf( + "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", + req.Name, req.Resolution.Width, req.Resolution.Height, + ) } server.mu.Lock() defer server.mu.Unlock() err := server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) if err != nil { - return nil, fmt.Errorf("failed to resize video source: %w", err) + return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) } streamState, ok := server.nameToStreamState[req.Name] if !ok { - return nil, fmt.Errorf("stream %q not found", req.Name) + return nil, fmt.Errorf("stream state not found with name %q", req.Name) } err = streamState.Resize() if err != nil { - return nil, fmt.Errorf("failed to resize stream: %w", err) + return nil, fmt.Errorf("failed to resize stream %q: %w", req.Name, err) } return &streampb.SetStreamOptionsResponse{}, nil } From ef3dff0faf93d8813fb8a779ca60f95450def1b8 Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 13 Nov 2024 11:30:02 -0500 Subject: [PATCH 16/24] Use t.Run for stream tests --- robot/web/stream/stream_test.go | 158 +++++++++++++++++--------------- 1 file changed, 84 insertions(+), 74 deletions(-) diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index c882e46395e..4b525260c0a 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -394,96 +394,106 @@ func TestSetStreamOptions(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, len(listResp.Names), test.ShouldEqual, 2) - getStreamOptionsResp, err := livestreamClient.GetStreamOptions(ctx, &streampb.GetStreamOptionsRequest{ - Name: "fake-cam-0-0", + t.Run("GetStreamOptions", func(t *testing.T) { + getStreamOptionsResp, err := livestreamClient.GetStreamOptions(ctx, &streampb.GetStreamOptionsRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, getStreamOptionsResp, test.ShouldNotBeNil) + test.That(t, len(getStreamOptionsResp.Resolutions), test.ShouldEqual, 5) }) - test.That(t, err, test.ShouldBeNil) - test.That(t, getStreamOptionsResp, test.ShouldNotBeNil) - test.That(t, len(getStreamOptionsResp.Resolutions), test.ShouldEqual, 5) - // Test setting stream options with invalid name - setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ - Name: "invalid-name", - Resolution: &streampb.Resolution{Width: 320, Height: 240}, + t.Run("SetStreamOptions with invalid stream name", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "invalid-name", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "not found") }) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, setStreamOptionsResp, test.ShouldBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "not found") - // Test setting stream options without Name - setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{}) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, setStreamOptionsResp, test.ShouldBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "name") - - // Test setting stream options with invalid resolution (0x0) - setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ - Name: "fake-cam-0-0", - Resolution: &streampb.Resolution{Width: 0, Height: 0}, + t.Run("SetStreamOptions without name", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{}) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "name") }) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, setStreamOptionsResp, test.ShouldBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") - // Add stream. We want to make sure that we can hot swap with - // SetStreamOptions with a live stream. - res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ - Name: "fake-cam-0-0", - }) - test.That(t, err, test.ShouldBeNil) - test.That(t, res, test.ShouldNotBeNil) - logger.Info("Checking video track is created") - testutils.WaitForAssertion(t, func(tb testing.TB) { - logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) - videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") - test.That(tb, videoCnt, test.ShouldEqual, 1) + // Test setting stream options with invalid resolution (0x0) + t.Run("SetStreamOptions with invalid resolution", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + Resolution: &streampb.Resolution{Width: 0, Height: 0}, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") }) - // Test setting the stream optoins with a valid resolution - setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ - Name: "fake-cam-0-0", - Resolution: &streampb.Resolution{Width: 320, Height: 240}, + t.Run("AddStream creates video track", func(t *testing.T) { + res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, res, test.ShouldNotBeNil) + logger.Info("Checking video track is created") + testutils.WaitForAssertion(t, func(tb testing.TB) { + logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(tb, videoCnt, test.ShouldEqual, 1) + }) }) - test.That(t, err, test.ShouldBeNil) - test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) - // Make sure that video strack is still alive through the peer connection - videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") - test.That(t, videoCnt, test.ShouldEqual, 1) + t.Run("SetStreamOptions with valid resolution", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) - // Add another stream that has RTPPassthrough enabled - res, err = livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ - Name: "fake-cam-0-1", - }) - test.That(t, err, test.ShouldBeNil) - test.That(t, res, test.ShouldNotBeNil) - logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) - testutils.WaitForAssertion(t, func(tb testing.TB) { - videoCnt = strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") - test.That(tb, videoCnt, test.ShouldEqual, 2) + // Make sure that video strack is still alive through the peer connection + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 1) }) - // Try setting stream options with RTPPassthrough enabled - setStreamOptionsResp, err = livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ - Name: "fake-cam-0-1", - Resolution: &streampb.Resolution{Width: 320, Height: 240}, + t.Run("AddStream with RTPPassthrough enabled", func(t *testing.T) { + res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, res, test.ShouldNotBeNil) + logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) + testutils.WaitForAssertion(t, func(tb testing.TB) { + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(tb, videoCnt, test.ShouldEqual, 2) + }) }) - test.That(t, err, test.ShouldBeNil) - test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) - // Make sure that video strack is still alive through the peer connection - videoCnt = strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") - test.That(t, videoCnt, test.ShouldEqual, 2) + t.Run("SetStreamOptions with RTPPassthrough enabled", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-1", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) - // Make sure there are no problems removing streams - removeRes, err := livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ - Name: "fake-cam-0-0", + // Make sure that video strack is still alive through the peer connection + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 2) }) - test.That(t, err, test.ShouldBeNil) - test.That(t, removeRes, test.ShouldNotBeNil) - removeRes, err = livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ - Name: "fake-cam-0-1", + + t.Run("RemoveStream", func(t *testing.T) { + removeRes, err := livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, removeRes, test.ShouldNotBeNil) + removeRes, err = livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, removeRes, test.ShouldNotBeNil) }) - test.That(t, err, test.ShouldBeNil) - test.That(t, removeRes, test.ShouldNotBeNil) } From 5a52a3610c501a5d7566498cbfbbf3c3fc8fc9ae Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 13 Nov 2024 11:48:18 -0500 Subject: [PATCH 17/24] Use t.Run for stream state tests --- robot/web/stream/state/state_test.go | 92 ++++++++++++++-------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/robot/web/stream/state/state_test.go b/robot/web/stream/state/state_test.go index 022abd1db87..b6d40d6693e 100644 --- a/robot/web/stream/state/state_test.go +++ b/robot/web/stream/state/state_test.go @@ -843,52 +843,52 @@ func TestStreamState(t *testing.T) { test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0) test.That(t, writeRTPCalledCtx.Err(), test.ShouldBeNil) - logger.Info("the first Increment() eventually call calls SubscribeRTP()") - test.That(t, s.Increment(), test.ShouldBeNil) - testutils.WaitForAssertion(t, func(tb testing.TB) { - test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) - test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0) - }) - // WriteRTP is called - <-writeRTPCalledCtx.Done() - - // Call Resize to simulate a resize event. This should stop rtp_passthrough and start gostream. - logger.Info("calling Resize() should stop rtp_passthrough and start gostream") - test.That(t, s.Resize(), test.ShouldBeNil) - testutils.WaitForAssertion(t, func(tb testing.TB) { - test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) - test.That(tb, startCount.Load(), test.ShouldEqual, 1) - test.That(tb, stopCount.Load(), test.ShouldEqual, 0) - }) - - // Make sure that Decrement() calls Stop() as gostream is the data source - logger.Info("calling Decrement() should call Stop() as gostream is the data source") - test.That(t, s.Decrement(), test.ShouldBeNil) - testutils.WaitForAssertion(t, func(tb testing.TB) { - test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) - test.That(tb, startCount.Load(), test.ShouldEqual, 1) - test.That(tb, stopCount.Load(), test.ShouldEqual, 1) - }) - - // Call increment again and make sure that gostream is started - // and rtppassthrough is not called - logger.Info("calling Increment() should call Start() as gostream is the data source") - test.That(t, s.Increment(), test.ShouldBeNil) - testutils.WaitForAssertion(t, func(tb testing.TB) { - test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) - test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) - test.That(tb, startCount.Load(), test.ShouldEqual, 2) - test.That(tb, stopCount.Load(), test.ShouldEqual, 1) - }) - - // Call decrement again and make sure that gostream is stopped. - logger.Info("calling Decrement() should call Stop() as gostream is the data source") - test.That(t, s.Decrement(), test.ShouldBeNil) - testutils.WaitForAssertion(t, func(tb testing.TB) { - test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) - test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) - test.That(tb, startCount.Load(), test.ShouldEqual, 2) - test.That(tb, stopCount.Load(), test.ShouldEqual, 2) + t.Run("Increment should call SubscribeRTP", func(t *testing.T) { + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0) + }) + // WriteRTP is called + <-writeRTPCalledCtx.Done() + }) + + t.Run("Resize should stop rtp_passthrough and start gostream", func(t *testing.T) { + test.That(t, s.Resize(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 1) + test.That(tb, stopCount.Load(), test.ShouldEqual, 0) + }) + }) + + t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) { + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 1) + test.That(tb, stopCount.Load(), test.ShouldEqual, 1) + }) + }) + + t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) { + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 2) + test.That(tb, stopCount.Load(), test.ShouldEqual, 1) + }) + }) + + t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) { + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 2) + test.That(tb, stopCount.Load(), test.ShouldEqual, 2) + }) }) }) } From f5ef994a197a69a19f1d3b3840bc9f89697fffde Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 14 Nov 2024 13:41:39 -0500 Subject: [PATCH 18/24] Remove read lock --- robot/web/stream/server.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 3373b328093..a70ce0b0122 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -336,8 +336,6 @@ func (server *Server) GetStreamOptions( ctx context.Context, req *streampb.GetStreamOptionsRequest, ) (*streampb.GetStreamOptionsResponse, error) { - server.mu.RLock() - defer server.mu.RUnlock() if req.Name == "" { return nil, errors.New("stream name is required") } From fb9eac5018f6646d903755ecaf32bfbc4a5924b6 Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 14 Nov 2024 13:48:11 -0500 Subject: [PATCH 19/24] Create validation helper --- robot/web/stream/server.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index a70ce0b0122..fd002fa186c 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -378,17 +378,8 @@ func (server *Server) SetStreamOptions( ctx context.Context, req *streampb.SetStreamOptionsRequest, ) (*streampb.SetStreamOptionsResponse, error) { - if req.Name == "" { - return nil, errors.New("stream name is required in request") - } - if req.Resolution == nil { - return nil, fmt.Errorf("resolution is required to resize stream %q", req.Name) - } - if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { - return nil, fmt.Errorf( - "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", - req.Name, req.Resolution.Width, req.Resolution.Height, - ) + if err := validateSetStreamOptionsRequest(req); err != nil { + return nil, err } server.mu.Lock() defer server.mu.Unlock() @@ -729,3 +720,19 @@ retryLoop: } return frame.Bounds().Dx(), frame.Bounds().Dy(), nil } + +func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) error { + if req.Name == "" { + return errors.New("stream name is required in request") + } + if req.Resolution == nil { + return fmt.Errorf("resolution is required to resize stream %q", req.Name) + } + if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { + return fmt.Errorf( + "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", + req.Name, req.Resolution.Width, req.Resolution.Height, + ) + } + return nil +} From c41c2dff7df19ec0bf0c407870cf2c0f8e77a654 Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 14 Nov 2024 14:01:18 -0500 Subject: [PATCH 20/24] Move all resize logic into resizeVideoSource --- robot/web/stream/server.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index fd002fa186c..78617056760 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -378,23 +378,16 @@ func (server *Server) SetStreamOptions( ctx context.Context, req *streampb.SetStreamOptionsRequest, ) (*streampb.SetStreamOptionsResponse, error) { - if err := validateSetStreamOptionsRequest(req); err != nil { + err := validateSetStreamOptionsRequest(req) + if err != nil { return nil, err } server.mu.Lock() defer server.mu.Unlock() - err := server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) if err != nil { return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) } - streamState, ok := server.nameToStreamState[req.Name] - if !ok { - return nil, fmt.Errorf("stream state not found with name %q", req.Name) - } - err = streamState.Resize() - if err != nil { - return nil, fmt.Errorf("failed to resize stream %q: %w", req.Name, err) - } return &streampb.SetStreamOptionsResponse{}, nil } @@ -409,9 +402,17 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { server.logger.Errorf("error getting camera %q from robot", name) return err } + streamState, ok := server.nameToStreamState[name] + if !ok { + return fmt.Errorf("stream state not found with name %q", name) + } resizer := gostream.NewResizeVideoSource(cam, width, height) - server.logger.Debug("resizing video source with swap") + server.logger.Debug("resizing video source with a hot swap") existing.Swap(resizer) + err = streamState.Resize() + if err != nil { + return fmt.Errorf("failed to resize stream %q: %w", name, err) + } return nil } From 6a5292c446b202ac6efa8fba28c8fd3f183d851f Mon Sep 17 00:00:00 2001 From: seanavery Date: Fri, 15 Nov 2024 13:41:54 -0500 Subject: [PATCH 21/24] Add reset path to exit resized mode --- robot/web/stream/server.go | 68 ++++++++++++++++++++++------ robot/web/stream/state/state.go | 19 +++++++- robot/web/stream/state/state_test.go | 31 +++++++++++++ robot/web/stream/stream_test.go | 21 ++++++++- 4 files changed, 123 insertions(+), 16 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 78617056760..a09c7254c9d 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -32,6 +32,12 @@ const ( retryDelay = 50 * time.Millisecond ) +const ( + optionsCommandResize = iota + optionsCommandReset + optionsCommandUnknown +) + type peerState struct { streamState *state.StreamState senders []*webrtc.RTPSender @@ -378,20 +384,30 @@ func (server *Server) SetStreamOptions( ctx context.Context, req *streampb.SetStreamOptionsRequest, ) (*streampb.SetStreamOptionsResponse, error) { - err := validateSetStreamOptionsRequest(req) + cmd, err := validateSetStreamOptionsRequest(req) if err != nil { return nil, err } server.mu.Lock() defer server.mu.Unlock() - err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) - if err != nil { - return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) + switch cmd { + case optionsCommandResize: + err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + if err != nil { + return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) + } + case optionsCommandReset: + err = server.resetVideoSource(req.Name) + if err != nil { + return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err) + } + default: + return nil, fmt.Errorf("unknown command type %v", cmd) } return &streampb.SetStreamOptionsResponse{}, nil } -// ResizeVideoSource resizes the video source with the given name. +// resizeVideoSource resizes the video source with the given name. func (server *Server) resizeVideoSource(name string, width, height int) error { existing, ok := server.videoSources[name] if !ok { @@ -416,6 +432,30 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { return nil } +// resetVideoSource resets the video source with the given name to the source resolution. +func (server *Server) resetVideoSource(name string) error { + existing, ok := server.videoSources[name] + if !ok { + return fmt.Errorf("video source %q not found", name) + } + cam, err := camera.FromRobot(server.robot, name) + if err != nil { + server.logger.Errorf("error getting camera %q from robot", name) + } + streamState, ok := server.nameToStreamState[name] + if !ok { + return fmt.Errorf("stream state not found with name %q", name) + } + newSwapper := gostream.NewHotSwappableVideoSource(cam) + server.logger.Debug("resetting video source") + existing.Swap(newSwapper) + err = streamState.Reset() + if err != nil { + return fmt.Errorf("failed to reset stream %q: %w", name, err) + } + return nil +} + // AddNewStreams adds new video and audio streams to the server using the updated set of video and // audio sources. It refreshes the sources, checks for a valid stream configuration, and starts // the streams if applicable. @@ -722,18 +762,20 @@ retryLoop: return frame.Bounds().Dx(), frame.Bounds().Dy(), nil } -func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) error { +// validateSetStreamOptionsRequest validates the request to set the stream options. +func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) { if req.Name == "" { - return errors.New("stream name is required in request") + return optionsCommandUnknown, errors.New("stream name is required in request") } if req.Resolution == nil { - return fmt.Errorf("resolution is required to resize stream %q", req.Name) + return optionsCommandReset, nil } if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { - return fmt.Errorf( - "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", - req.Name, req.Resolution.Width, req.Resolution.Height, - ) + return optionsCommandUnknown, + fmt.Errorf( + "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", + req.Name, req.Resolution.Width, req.Resolution.Height, + ) } - return nil + return optionsCommandResize, nil } diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index b154f65f35e..634836e926c 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -94,7 +94,8 @@ func (state *StreamState) Decrement() error { return state.send(msgTypeDecrement) } -// Resize notifies the StreamState that the stream has been resized. +// Resize notifies that the gostream source has been resized. This will stop and prevent +// the use of the passthrough stream if it is supported. func (state *StreamState) Resize() error { if err := state.closedCtx.Err(); err != nil { return multierr.Combine(ErrClosed, err) @@ -102,6 +103,15 @@ func (state *StreamState) Resize() error { return state.send(msgTypeResize) } +// Reset notifies that the gostream source has been reset to the original resolution. +// This will restart the passthrough stream if it is supported. +func (state *StreamState) Reset() error { + if err := state.closedCtx.Err(); err != nil { + return multierr.Combine(ErrClosed, err) + } + return state.send(msgTypeReset) +} + // Close closes the StreamState. func (state *StreamState) Close() error { state.logger.Info("Closing streamState") @@ -142,6 +152,7 @@ const ( msgTypeIncrement msgTypeDecrement msgTypeResize + msgTypeReset ) func (mt msgType) String() string { @@ -152,6 +163,8 @@ func (mt msgType) String() string { return "Decrement" case msgTypeResize: return "Resize" + case msgTypeReset: + return "Reset" case msgTypeUnknown: fallthrough default: @@ -204,6 +217,10 @@ func (state *StreamState) sourceEventHandler() { state.logger.Debug("resize event received") state.isResized = true state.tick() + case msgTypeReset: + state.logger.Debug("reset event received") + state.isResized = false + state.tick() case msgTypeUnknown: fallthrough default: diff --git a/robot/web/stream/state/state_test.go b/robot/web/stream/state/state_test.go index b6d40d6693e..b8d30300ca7 100644 --- a/robot/web/stream/state/state_test.go +++ b/robot/web/stream/state/state_test.go @@ -890,5 +890,36 @@ func TestStreamState(t *testing.T) { test.That(tb, stopCount.Load(), test.ShouldEqual, 2) }) }) + + t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) { + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 3) + test.That(tb, stopCount.Load(), test.ShouldEqual, 2) + }) + }) + + t.Run("Reset should call Stop as gostream is the current data source and then "+ + "Subscribe as rtp_passthrough is the new data source", func(t *testing.T) { + test.That(t, s.Reset(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 2) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 3) + test.That(tb, stopCount.Load(), test.ShouldEqual, 3) + }) + }) + + t.Run("Decrement should call unsubscribe as rtp_passthrough is the data source", func(t *testing.T) { + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 2) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 2) + test.That(tb, startCount.Load(), test.ShouldEqual, 3) + test.That(tb, stopCount.Load(), test.ShouldEqual, 3) + }) + }) }) } diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index 4b525260c0a..4fe62e46f58 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -452,12 +452,21 @@ func TestSetStreamOptions(t *testing.T) { }) test.That(t, err, test.ShouldBeNil) test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) - // Make sure that video strack is still alive through the peer connection videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") test.That(t, videoCnt, test.ShouldEqual, 1) }) + t.Run("SetStreamOptions without resolution field to reset to the original resolution", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 1) + }) + t.Run("AddStream with RTPPassthrough enabled", func(t *testing.T) { res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ Name: "fake-cam-0-1", @@ -478,8 +487,16 @@ func TestSetStreamOptions(t *testing.T) { }) test.That(t, err, test.ShouldBeNil) test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 2) + }) - // Make sure that video strack is still alive through the peer connection + t.Run("SetStreamOptions reset to original resolution when RTPPassthrough is enabled", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") test.That(t, videoCnt, test.ShouldEqual, 2) }) From 37174cf87a15ffe0a83ab3b3ebb110a7d4f97e00 Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 20 Nov 2024 16:47:39 -0500 Subject: [PATCH 22/24] Directly swap with cam --- robot/web/stream/server.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index a09c7254c9d..c720ca625c5 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -423,7 +423,10 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { return fmt.Errorf("stream state not found with name %q", name) } resizer := gostream.NewResizeVideoSource(cam, width, height) - server.logger.Debug("resizing video source with a hot swap") + server.logger.Debugf( + "resizing video source to width %d and height %d", + width, height, + ) existing.Swap(resizer) err = streamState.Resize() if err != nil { @@ -446,9 +449,8 @@ func (server *Server) resetVideoSource(name string) error { if !ok { return fmt.Errorf("stream state not found with name %q", name) } - newSwapper := gostream.NewHotSwappableVideoSource(cam) server.logger.Debug("resetting video source") - existing.Swap(newSwapper) + existing.Swap(cam) err = streamState.Reset() if err != nil { return fmt.Errorf("failed to reset stream %q: %w", name, err) From 424c3b11f88772a3e5f20a27bd1e483b62ba6159 Mon Sep 17 00:00:00 2001 From: seanavery Date: Wed, 20 Nov 2024 17:07:13 -0500 Subject: [PATCH 23/24] Overwrite sequence number --- gostream/webrtc_track.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/gostream/webrtc_track.go b/gostream/webrtc_track.go index f27ebebb1c7..324def2c202 100644 --- a/gostream/webrtc_track.go +++ b/gostream/webrtc_track.go @@ -33,16 +33,18 @@ type trackLocalStaticRTP struct { mu sync.RWMutex bindings []trackBinding codec webrtc.RTPCodecCapability + sequencer rtp.Sequencer id, rid, streamID string } // newtrackLocalStaticRTP returns a trackLocalStaticRTP. func newtrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) *trackLocalStaticRTP { return &trackLocalStaticRTP{ - codec: c, - bindings: []trackBinding{}, - id: id, - streamID: streamID, + codec: c, + bindings: []trackBinding{}, + id: id, + sequencer: rtp.NewRandomSequencer(), + streamID: streamID, } } @@ -126,6 +128,10 @@ func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error { for _, b := range s.bindings { outboundPacket.Header.SSRC = uint32(b.ssrc) outboundPacket.Header.PayloadType = uint8(b.payloadType) + // We overwrite the sequence number to ensure continuity between packets + // coming from Passthrough sources and those that are packetized by the + // Pion RTP Packetizer in the WriteData method. + outboundPacket.Header.SequenceNumber = s.sequencer.NextSequenceNumber() if _, err := b.writeStream.WriteRTP(&outboundPacket.Header, outboundPacket.Payload); err != nil { writeErrs = append(writeErrs, err) } From 2956cde734ca458ad3197fbbfa0f62d1462936d2 Mon Sep 17 00:00:00 2001 From: seanavery Date: Thu, 21 Nov 2024 13:21:14 -0500 Subject: [PATCH 24/24] Move validate fn --- robot/web/stream/server.go | 36 ++++++++++++++++----------------- robot/web/stream/state/state.go | 2 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index c720ca625c5..a54928dcfa3 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -407,6 +407,24 @@ func (server *Server) SetStreamOptions( return &streampb.SetStreamOptionsResponse{}, nil } +// validateSetStreamOptionsRequest validates the request to set the stream options. +func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) { + if req.Name == "" { + return optionsCommandUnknown, errors.New("stream name is required in request") + } + if req.Resolution == nil { + return optionsCommandReset, nil + } + if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { + return optionsCommandUnknown, + fmt.Errorf( + "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", + req.Name, req.Resolution.Width, req.Resolution.Height, + ) + } + return optionsCommandResize, nil +} + // resizeVideoSource resizes the video source with the given name. func (server *Server) resizeVideoSource(name string, width, height int) error { existing, ok := server.videoSources[name] @@ -763,21 +781,3 @@ retryLoop: } return frame.Bounds().Dx(), frame.Bounds().Dy(), nil } - -// validateSetStreamOptionsRequest validates the request to set the stream options. -func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) { - if req.Name == "" { - return optionsCommandUnknown, errors.New("stream name is required in request") - } - if req.Resolution == nil { - return optionsCommandReset, nil - } - if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { - return optionsCommandUnknown, - fmt.Errorf( - "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", - req.Name, req.Resolution.Width, req.Resolution.Height, - ) - } - return optionsCommandResize, nil -} diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index 634836e926c..de4c0c03cc8 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -285,7 +285,7 @@ func (state *StreamState) tick() { state.stopInputStream() // If streamSource is unknown and resized is true, we do not want to attempt passthrough. case state.streamSource == streamSourceUnknown && state.isResized: - state.logger.Debug("in a resized state, using gostream") + state.logger.Debug("in a resized state and stream source is unknown, defaulting to GoStream") state.Stream.Start() state.streamSource = streamSourceGoStream case state.streamSource == streamSourceUnknown: // && state.activeClients > 0