diff --git a/gostream/webrtc_track.go b/gostream/webrtc_track.go index f27ebebb1c7..324def2c202 100644 --- a/gostream/webrtc_track.go +++ b/gostream/webrtc_track.go @@ -33,16 +33,18 @@ type trackLocalStaticRTP struct { mu sync.RWMutex bindings []trackBinding codec webrtc.RTPCodecCapability + sequencer rtp.Sequencer id, rid, streamID string } // newtrackLocalStaticRTP returns a trackLocalStaticRTP. func newtrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) *trackLocalStaticRTP { return &trackLocalStaticRTP{ - codec: c, - bindings: []trackBinding{}, - id: id, - streamID: streamID, + codec: c, + bindings: []trackBinding{}, + id: id, + sequencer: rtp.NewRandomSequencer(), + streamID: streamID, } } @@ -126,6 +128,10 @@ func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error { for _, b := range s.bindings { outboundPacket.Header.SSRC = uint32(b.ssrc) outboundPacket.Header.PayloadType = uint8(b.payloadType) + // We overwrite the sequence number to ensure continuity between packets + // coming from Passthrough sources and those that are packetized by the + // Pion RTP Packetizer in the WriteData method. + outboundPacket.Header.SequenceNumber = s.sequencer.NextSequenceNumber() if _, err := b.writeStream.WriteRTP(&outboundPacket.Header, outboundPacket.Payload); err != nil { writeErrs = append(writeErrs, err) } diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index 086fa7cef51..a54928dcfa3 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -32,6 +32,12 @@ const ( retryDelay = 50 * time.Millisecond ) +const ( + optionsCommandResize = iota + optionsCommandReset + optionsCommandUnknown +) + type peerState struct { streamState *state.StreamState senders []*webrtc.RTPSender @@ -372,6 +378,104 @@ func (server *Server) GetStreamOptions( }, nil } +// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream +// to the given width and height. +func (server *Server) SetStreamOptions( + ctx context.Context, + req *streampb.SetStreamOptionsRequest, +) (*streampb.SetStreamOptionsResponse, error) { + cmd, err := validateSetStreamOptionsRequest(req) + if err != nil { + return nil, err + } + server.mu.Lock() + defer server.mu.Unlock() + switch cmd { + case optionsCommandResize: + err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + if err != nil { + return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) + } + case optionsCommandReset: + err = server.resetVideoSource(req.Name) + if err != nil { + return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err) + } + default: + return nil, fmt.Errorf("unknown command type %v", cmd) + } + return &streampb.SetStreamOptionsResponse{}, nil +} + +// validateSetStreamOptionsRequest validates the request to set the stream options. +func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) { + if req.Name == "" { + return optionsCommandUnknown, errors.New("stream name is required in request") + } + if req.Resolution == nil { + return optionsCommandReset, nil + } + if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 { + return optionsCommandUnknown, + fmt.Errorf( + "invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0", + req.Name, req.Resolution.Width, req.Resolution.Height, + ) + } + return optionsCommandResize, nil +} + +// resizeVideoSource resizes the video source with the given name. +func (server *Server) resizeVideoSource(name string, width, height int) error { + existing, ok := server.videoSources[name] + if !ok { + return fmt.Errorf("video source %q not found", name) + } + cam, err := camera.FromRobot(server.robot, name) + if err != nil { + server.logger.Errorf("error getting camera %q from robot", name) + return err + } + streamState, ok := server.nameToStreamState[name] + if !ok { + return fmt.Errorf("stream state not found with name %q", name) + } + resizer := gostream.NewResizeVideoSource(cam, width, height) + server.logger.Debugf( + "resizing video source to width %d and height %d", + width, height, + ) + existing.Swap(resizer) + err = streamState.Resize() + if err != nil { + return fmt.Errorf("failed to resize stream %q: %w", name, err) + } + return nil +} + +// resetVideoSource resets the video source with the given name to the source resolution. +func (server *Server) resetVideoSource(name string) error { + existing, ok := server.videoSources[name] + if !ok { + return fmt.Errorf("video source %q not found", name) + } + cam, err := camera.FromRobot(server.robot, name) + if err != nil { + server.logger.Errorf("error getting camera %q from robot", name) + } + streamState, ok := server.nameToStreamState[name] + if !ok { + return fmt.Errorf("stream state not found with name %q", name) + } + server.logger.Debug("resetting video source") + existing.Swap(cam) + err = streamState.Reset() + if err != nil { + return fmt.Errorf("failed to reset stream %q: %w", name, err) + } + return nil +} + // AddNewStreams adds new video and audio streams to the server using the updated set of video and // audio sources. It refreshes the sources, checks for a valid stream configuration, and starts // the streams if applicable. diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index 7a51600e72f..de4c0c03cc8 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -43,6 +43,9 @@ type StreamState struct { streamSource streamSource // streamSourceSub is only non nil if streamSource == streamSourcePassthrough streamSourceSub rtppassthrough.Subscription + // isResized indicates whether the stream has been resized by the stream server. + // When set to true, it signals that the passthrough stream should not be restarted. + isResized bool } // New returns a new *StreamState. @@ -61,6 +64,7 @@ func New( robot: r, msgChan: make(chan msg), tickChan: make(chan struct{}), + isResized: false, logger: logger, } @@ -90,6 +94,24 @@ func (state *StreamState) Decrement() error { return state.send(msgTypeDecrement) } +// Resize notifies that the gostream source has been resized. This will stop and prevent +// the use of the passthrough stream if it is supported. +func (state *StreamState) Resize() error { + if err := state.closedCtx.Err(); err != nil { + return multierr.Combine(ErrClosed, err) + } + return state.send(msgTypeResize) +} + +// Reset notifies that the gostream source has been reset to the original resolution. +// This will restart the passthrough stream if it is supported. +func (state *StreamState) Reset() error { + if err := state.closedCtx.Err(); err != nil { + return multierr.Combine(ErrClosed, err) + } + return state.send(msgTypeReset) +} + // Close closes the StreamState. func (state *StreamState) Close() error { state.logger.Info("Closing streamState") @@ -129,6 +151,8 @@ const ( msgTypeUnknown msgType = iota msgTypeIncrement msgTypeDecrement + msgTypeResize + msgTypeReset ) func (mt msgType) String() string { @@ -137,6 +161,10 @@ func (mt msgType) String() string { return "Increment" case msgTypeDecrement: return "Decrement" + case msgTypeResize: + return "Resize" + case msgTypeReset: + return "Reset" case msgTypeUnknown: fallthrough default: @@ -185,6 +213,14 @@ func (state *StreamState) sourceEventHandler() { if state.activeClients == 0 { state.tick() } + case msgTypeResize: + state.logger.Debug("resize event received") + state.isResized = true + state.tick() + case msgTypeReset: + state.logger.Debug("reset event received") + state.isResized = false + state.tick() case msgTypeUnknown: fallthrough default: @@ -247,6 +283,11 @@ func (state *StreamState) tick() { // stop stream if there are no active clients // noop if there is no stream source state.stopInputStream() + // If streamSource is unknown and resized is true, we do not want to attempt passthrough. + case state.streamSource == streamSourceUnknown && state.isResized: + state.logger.Debug("in a resized state and stream source is unknown, defaulting to GoStream") + state.Stream.Start() + state.streamSource = streamSourceGoStream case state.streamSource == streamSourceUnknown: // && state.activeClients > 0 // this is the first subscription, attempt passthrough state.logger.Info("attempting to subscribe to rtp_passthrough") @@ -257,6 +298,13 @@ func (state *StreamState) tick() { state.Stream.Start() state.streamSource = streamSourceGoStream } + // If we are currently using passthrough, and the stream state changes to resized + // we need to stop the passthrough stream and restart it through gostream. + case state.streamSource == streamSourcePassthrough && state.isResized: + state.logger.Info("stream resized, stopping passthrough stream") + state.stopInputStream() + state.Stream.Start() + state.streamSource = streamSourceGoStream case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil: // restart stream if there we were using passthrough but the sub is terminated state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough") @@ -271,7 +319,7 @@ func (state *StreamState) tick() { case state.streamSource == streamSourcePassthrough: // no op if we are using passthrough & are healthy state.logger.Debug("still healthy and using h264 passthrough") - case state.streamSource == streamSourceGoStream: + case state.streamSource == streamSourceGoStream && !state.isResized: // Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as // we expect some components to not implement rtp passthrough. state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough") diff --git a/robot/web/stream/state/state_test.go b/robot/web/stream/state/state_test.go index 1c482482714..b8d30300ca7 100644 --- a/robot/web/stream/state/state_test.go +++ b/robot/web/stream/state/state_test.go @@ -762,4 +762,164 @@ func TestStreamState(t *testing.T) { test.That(tb, stopCount.Load(), test.ShouldEqual, 3) }) }) + + t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) { + var startCount atomic.Int64 + var stopCount atomic.Int64 + writeRTPCalledCtx, writeRTPCalledFunc := context.WithCancel(ctx) + streamMock := &mockStream{ + name: camName, + t: t, + startFunc: func() { + startCount.Add(1) + }, + stopFunc: func() { + stopCount.Add(1) + }, + writeRTPFunc: func(pkt *rtp.Packet) error { + // Test that WriteRTP is eventually called when SubscribeRTP is called + writeRTPCalledFunc() + return nil + }, + } + + var subscribeRTPCount atomic.Int64 + var unsubscribeCount atomic.Int64 + type subAndCancel struct { + sub rtppassthrough.Subscription + cancelFn context.CancelFunc + wg *sync.WaitGroup + } + + var subsAndCancelByIDMu sync.Mutex + subsAndCancelByID := map[rtppassthrough.SubscriptionID]subAndCancel{} + + subscribeRTPFunc := func( + ctx context.Context, + bufferSize int, + packetsCB rtppassthrough.PacketCallback, + ) (rtppassthrough.Subscription, error) { + subsAndCancelByIDMu.Lock() + defer subsAndCancelByIDMu.Unlock() + defer subscribeRTPCount.Add(1) + terminatedCtx, terminatedFn := context.WithCancel(context.Background()) + id := uuid.New() + sub := rtppassthrough.Subscription{ID: id, Terminated: terminatedCtx} + subsAndCancelByID[id] = subAndCancel{sub: sub, cancelFn: terminatedFn, wg: &sync.WaitGroup{}} + subsAndCancelByID[id].wg.Add(1) + utils.ManagedGo(func() { + for terminatedCtx.Err() == nil { + packetsCB([]*rtp.Packet{{}}) + time.Sleep(time.Millisecond * 50) + } + }, subsAndCancelByID[id].wg.Done) + return sub, nil + } + + unsubscribeFunc := func(ctx context.Context, id rtppassthrough.SubscriptionID) error { + subsAndCancelByIDMu.Lock() + defer subsAndCancelByIDMu.Unlock() + defer unsubscribeCount.Add(1) + subAndCancel, ok := subsAndCancelByID[id] + if !ok { + test.That(t, fmt.Sprintf("Unsubscribe called with unknown id: %s", id.String()), test.ShouldBeFalse) + } + subAndCancel.cancelFn() + return nil + } + + mockRTPPassthroughSource := &mockRTPPassthroughSource{ + subscribeRTPFunc: subscribeRTPFunc, + unsubscribeFunc: unsubscribeFunc, + } + + robot := mockRobot(mockRTPPassthroughSource) + s := state.New(streamMock, robot, logger) + defer func() { + utils.UncheckedError(s.Close()) + }() + + test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 0) + test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0) + test.That(t, writeRTPCalledCtx.Err(), test.ShouldBeNil) + + t.Run("Increment should call SubscribeRTP", func(t *testing.T) { + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0) + }) + // WriteRTP is called + <-writeRTPCalledCtx.Done() + }) + + t.Run("Resize should stop rtp_passthrough and start gostream", func(t *testing.T) { + test.That(t, s.Resize(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 1) + test.That(tb, stopCount.Load(), test.ShouldEqual, 0) + }) + }) + + t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) { + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 1) + test.That(tb, stopCount.Load(), test.ShouldEqual, 1) + }) + }) + + t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) { + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 2) + test.That(tb, stopCount.Load(), test.ShouldEqual, 1) + }) + }) + + t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) { + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 2) + test.That(tb, stopCount.Load(), test.ShouldEqual, 2) + }) + }) + + t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) { + test.That(t, s.Increment(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 3) + test.That(tb, stopCount.Load(), test.ShouldEqual, 2) + }) + }) + + t.Run("Reset should call Stop as gostream is the current data source and then "+ + "Subscribe as rtp_passthrough is the new data source", func(t *testing.T) { + test.That(t, s.Reset(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 2) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1) + test.That(tb, startCount.Load(), test.ShouldEqual, 3) + test.That(tb, stopCount.Load(), test.ShouldEqual, 3) + }) + }) + + t.Run("Decrement should call unsubscribe as rtp_passthrough is the data source", func(t *testing.T) { + test.That(t, s.Decrement(), test.ShouldBeNil) + testutils.WaitForAssertion(t, func(tb testing.TB) { + test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 2) + test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 2) + test.That(tb, startCount.Load(), test.ShouldEqual, 3) + test.That(tb, stopCount.Load(), test.ShouldEqual, 3) + }) + }) + }) } diff --git a/robot/web/stream/stream_test.go b/robot/web/stream/stream_test.go index f2cc9472dd4..4fe62e46f58 100644 --- a/robot/web/stream/stream_test.go +++ b/robot/web/stream/stream_test.go @@ -198,7 +198,7 @@ func TestAudioTrackIsNotCreatedForVideoStream(t *testing.T) { } func TestGetStreamOptions(t *testing.T) { - logger := logging.NewTestLogger(t).Sublogger("TestWebReconfigure") + logger := logging.NewTestLogger(t).Sublogger("GetStreamOptions") // Create a robot with several fake cameras of common resolutions. // Fake cameras with a Model attribute will use Properties to // determine source resolution. Fake cameras without a Model @@ -356,3 +356,161 @@ func TestGetStreamOptions(t *testing.T) { testGetStreamOptions(name, expectedResolutions) } } + +func TestSetStreamOptions(t *testing.T) { + logger := logging.NewTestLogger(t).Sublogger("TestSetStreamOptions") + origCfg := &config.Config{Components: []resource.Config{ + // 480p + { + Name: "fake-cam-0-0", + API: resource.NewAPI("rdk", "component", "camera"), + Model: resource.DefaultModelFamily.WithModel("fake"), + ConvertedAttributes: &fake.Config{ + Width: 640, + Height: 480, + }, + }, + { + Name: "fake-cam-0-1", + API: resource.NewAPI("rdk", "component", "camera"), + Model: resource.DefaultModelFamily.WithModel("fake"), + ConvertedAttributes: &fake.Config{ + Width: 640, + Height: 480, + RTPPassthrough: true, + }, + }, + }} + + ctx, robot, addr, webSvc := setupRealRobot(t, origCfg, logger) + defer robot.Close(ctx) + defer webSvc.Close(ctx) + conn, err := rgrpc.Dial(context.Background(), addr, logger.Sublogger("TestDial"), rpc.WithDisableDirectGRPC()) + test.That(t, err, test.ShouldBeNil) + defer conn.Close() + + livestreamClient := streampb.NewStreamServiceClient(conn) + listResp, err := livestreamClient.ListStreams(ctx, &streampb.ListStreamsRequest{}) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(listResp.Names), test.ShouldEqual, 2) + + t.Run("GetStreamOptions", func(t *testing.T) { + getStreamOptionsResp, err := livestreamClient.GetStreamOptions(ctx, &streampb.GetStreamOptionsRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, getStreamOptionsResp, test.ShouldNotBeNil) + test.That(t, len(getStreamOptionsResp.Resolutions), test.ShouldEqual, 5) + }) + + t.Run("SetStreamOptions with invalid stream name", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "invalid-name", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "not found") + }) + + t.Run("SetStreamOptions without name", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{}) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "name") + }) + + // Test setting stream options with invalid resolution (0x0) + t.Run("SetStreamOptions with invalid resolution", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + Resolution: &streampb.Resolution{Width: 0, Height: 0}, + }) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, setStreamOptionsResp, test.ShouldBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, "invalid resolution") + }) + + t.Run("AddStream creates video track", func(t *testing.T) { + res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, res, test.ShouldNotBeNil) + logger.Info("Checking video track is created") + testutils.WaitForAssertion(t, func(tb testing.TB) { + logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(tb, videoCnt, test.ShouldEqual, 1) + }) + }) + + t.Run("SetStreamOptions with valid resolution", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + // Make sure that video strack is still alive through the peer connection + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 1) + }) + + t.Run("SetStreamOptions without resolution field to reset to the original resolution", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 1) + }) + + t.Run("AddStream with RTPPassthrough enabled", func(t *testing.T) { + res, err := livestreamClient.AddStream(ctx, &streampb.AddStreamRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, res, test.ShouldNotBeNil) + logger.Info(conn.PeerConn().CurrentLocalDescription().SDP) + testutils.WaitForAssertion(t, func(tb testing.TB) { + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(tb, videoCnt, test.ShouldEqual, 2) + }) + }) + + t.Run("SetStreamOptions with RTPPassthrough enabled", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-1", + Resolution: &streampb.Resolution{Width: 320, Height: 240}, + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 2) + }) + + t.Run("SetStreamOptions reset to original resolution when RTPPassthrough is enabled", func(t *testing.T) { + setStreamOptionsResp, err := livestreamClient.SetStreamOptions(ctx, &streampb.SetStreamOptionsRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, setStreamOptionsResp, test.ShouldNotBeNil) + videoCnt := strings.Count(conn.PeerConn().CurrentLocalDescription().SDP, "m=video") + test.That(t, videoCnt, test.ShouldEqual, 2) + }) + + t.Run("RemoveStream", func(t *testing.T) { + removeRes, err := livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ + Name: "fake-cam-0-0", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, removeRes, test.ShouldNotBeNil) + removeRes, err = livestreamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{ + Name: "fake-cam-0-1", + }) + test.That(t, err, test.ShouldBeNil) + test.That(t, removeRes, test.ShouldNotBeNil) + }) +}