Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream server #4530

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ func (server *Server) GetStreamOptions(
ctx context.Context,
randhid marked this conversation as resolved.
Show resolved Hide resolved
req *streampb.GetStreamOptionsRequest,
) (*streampb.GetStreamOptionsResponse, error) {
server.mu.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this read lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutex is protecting these struct members:

	nameToStreamState       map[string]*state.StreamState
	activePeerStreams       map[*webrtc.PeerConnection]map[string]*peerState
	activeBackgroundWorkers sync.WaitGroup
	isAlive                 bool

I don't see them being accessed so I don't understand why this mutex is being taken

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - I am just pulling the camera from robot not accessing any of these. Removed.

defer server.mu.RUnlock()
if req.Name == "" {
return nil, errors.New("stream name is required")
}
Expand Down Expand Up @@ -372,6 +374,55 @@ func (server *Server) GetStreamOptions(
}, nil
}

// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream
// to the given width and height.
func (server *Server) SetStreamOptions(
ctx context.Context,
req *streampb.SetStreamOptionsRequest,
) (*streampb.SetStreamOptionsResponse, error) {
if req.Name == "" {
return nil, errors.New("stream name is required")
}
if req.Resolution == nil {
return nil, errors.New("resolution is required")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we prefix these errors with what stream it's about i.e. include the stream name in the error message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Added name to error returns and more details where appropriate.

}
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 {
return nil, errors.New("invalid resolution, width and height must be greater than 0")
}
server.mu.Lock()
defer server.mu.Unlock()
err := server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height))
if err != nil {
return nil, fmt.Errorf("failed to resize video source: %w", err)
}
streamState, ok := server.nameToStreamState[req.Name]
if !ok {
return nil, fmt.Errorf("stream %q not found", req.Name)
}
err = streamState.Resize()
if err != nil {
return nil, fmt.Errorf("failed to resize stream: %w", err)
}
return &streampb.SetStreamOptionsResponse{}, nil
}

// ResizeVideoSource resizes the video source with the given name.
func (server *Server) resizeVideoSource(name string, width, height int) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
return err
}
resizer := gostream.NewResizeVideoSource(cam, width, height)
server.logger.Debug("resizing video source with swap")
existing.Swap(resizer)
return nil
}

// AddNewStreams adds new video and audio streams to the server using the updated set of video and
// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts
// the streams if applicable.
Expand Down
33 changes: 32 additions & 1 deletion robot/web/stream/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type StreamState struct {
streamSource streamSource
// streamSourceSub is only non nil if streamSource == streamSourcePassthrough
streamSourceSub rtppassthrough.Subscription
// resized is set to true when the stream has been resized.
// It is a signal to avoid trying to restart the passthrough stream.
resized bool
}

// New returns a new *StreamState.
Expand All @@ -61,6 +64,7 @@ func New(
robot: r,
msgChan: make(chan msg),
tickChan: make(chan struct{}),
resized: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit suggestion] could we call this isResized or wasResized

the past-tense-as-a-bool is kinda overlapping with certain interface naming conventions e.g. resource.Named

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Changed to isResized.

logger: logger,
}

Expand Down Expand Up @@ -90,6 +94,14 @@ func (state *StreamState) Decrement() error {
return state.send(msgTypeDecrement)
}

// Resize notifies the StreamState that the stream has been resized.
func (state *StreamState) Resize() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeResize)
}

// Close closes the StreamState.
func (state *StreamState) Close() error {
state.logger.Info("Closing streamState")
Expand Down Expand Up @@ -129,6 +141,7 @@ const (
msgTypeUnknown msgType = iota
msgTypeIncrement
msgTypeDecrement
msgTypeResize
)

func (mt msgType) String() string {
Expand All @@ -137,6 +150,8 @@ func (mt msgType) String() string {
return "Increment"
case msgTypeDecrement:
return "Decrement"
case msgTypeResize:
return "Resize"
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -185,6 +200,10 @@ func (state *StreamState) sourceEventHandler() {
if state.activeClients == 0 {
state.tick()
}
case msgTypeResize:
state.logger.Debug("resize event received")
state.resized = true
state.tick()
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -247,6 +266,11 @@ func (state *StreamState) tick() {
// stop stream if there are no active clients
// noop if there is no stream source
state.stopInputStream()
// If streamSource is unknown and resized is true, we do not want to attempt passthrough.
case state.streamSource == streamSourceUnknown && state.resized:
state.logger.Debug("in a resized state, using gostream")
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourceUnknown: // && state.activeClients > 0
// this is the first subscription, attempt passthrough
state.logger.Info("attempting to subscribe to rtp_passthrough")
Expand All @@ -257,6 +281,13 @@ func (state *StreamState) tick() {
state.Stream.Start()
state.streamSource = streamSourceGoStream
}
// If we are currently using passthrough, and the stream state changes to resized
// we need to stop the passthrough stream and restart it through gostream.
case state.streamSource == streamSourcePassthrough && state.resized:
state.logger.Info("stream resized, stopping passthrough stream")
state.stopInputStream()
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil:
// restart stream if there we were using passthrough but the sub is terminated
state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough")
Expand All @@ -271,7 +302,7 @@ func (state *StreamState) tick() {
case state.streamSource == streamSourcePassthrough:
// no op if we are using passthrough & are healthy
state.logger.Debug("still healthy and using h264 passthrough")
case state.streamSource == streamSourceGoStream:
case state.streamSource == streamSourceGoStream && !state.resized:
// Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as
// we expect some components to not implement rtp passthrough.
state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough")
Expand Down
129 changes: 129 additions & 0 deletions robot/web/stream/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,4 +762,133 @@ func TestStreamState(t *testing.T) {
test.That(tb, stopCount.Load(), test.ShouldEqual, 3)
})
})

t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) {
var startCount atomic.Int64
Comment on lines +766 to +767
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of your tests are time based and waiting for assertions, can you run with -race and -failfast in canon to verify that they're not flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go test -timeout 30s -race -failfast -run ^TestSetStreamOptions$ go.viam.com/rdk/robot/web/stream
ok go.viam.com/rdk/robot/web/stream 3.255s


go test -timeout 30s -race -failfast -run ^TestStreamState$/^when_in_rtppassthrough_mode_and_a_resize_occurs_test_downgrade_path_to_gostream$ go.viam.com/rdk/robot/web/stream/state
ok go.viam.com/rdk/robot/web/stream/state 3.056s

var stopCount atomic.Int64
writeRTPCalledCtx, writeRTPCalledFunc := context.WithCancel(ctx)
streamMock := &mockStream{
name: camName,
t: t,
startFunc: func() {
startCount.Add(1)
},
stopFunc: func() {
stopCount.Add(1)
},
writeRTPFunc: func(pkt *rtp.Packet) error {
// Test that WriteRTP is eventually called when SubscribeRTP is called
writeRTPCalledFunc()
return nil
},
}

var subscribeRTPCount atomic.Int64
var unsubscribeCount atomic.Int64
type subAndCancel struct {
sub rtppassthrough.Subscription
cancelFn context.CancelFunc
wg *sync.WaitGroup
}

var subsAndCancelByIDMu sync.Mutex
subsAndCancelByID := map[rtppassthrough.SubscriptionID]subAndCancel{}

subscribeRTPFunc := func(
ctx context.Context,
bufferSize int,
packetsCB rtppassthrough.PacketCallback,
) (rtppassthrough.Subscription, error) {
subsAndCancelByIDMu.Lock()
defer subsAndCancelByIDMu.Unlock()
defer subscribeRTPCount.Add(1)
terminatedCtx, terminatedFn := context.WithCancel(context.Background())
id := uuid.New()
sub := rtppassthrough.Subscription{ID: id, Terminated: terminatedCtx}
subsAndCancelByID[id] = subAndCancel{sub: sub, cancelFn: terminatedFn, wg: &sync.WaitGroup{}}
subsAndCancelByID[id].wg.Add(1)
utils.ManagedGo(func() {
for terminatedCtx.Err() == nil {
packetsCB([]*rtp.Packet{{}})
time.Sleep(time.Millisecond * 50)
}
}, subsAndCancelByID[id].wg.Done)
return sub, nil
}

unsubscribeFunc := func(ctx context.Context, id rtppassthrough.SubscriptionID) error {
subsAndCancelByIDMu.Lock()
defer subsAndCancelByIDMu.Unlock()
defer unsubscribeCount.Add(1)
subAndCancel, ok := subsAndCancelByID[id]
if !ok {
test.That(t, fmt.Sprintf("Unsubscribe called with unknown id: %s", id.String()), test.ShouldBeFalse)
}
subAndCancel.cancelFn()
return nil
}

mockRTPPassthroughSource := &mockRTPPassthroughSource{
subscribeRTPFunc: subscribeRTPFunc,
unsubscribeFunc: unsubscribeFunc,
}

robot := mockRobot(mockRTPPassthroughSource)
s := state.New(streamMock, robot, logger)
defer func() {
utils.UncheckedError(s.Close())
}()

test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 0)
test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0)
test.That(t, writeRTPCalledCtx.Err(), test.ShouldBeNil)

logger.Info("the first Increment() eventually call calls SubscribeRTP()")
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0)
})
// WriteRTP is called
<-writeRTPCalledCtx.Done()

// Call Resize to simulate a resize event. This should stop rtp_passthrough and start gostream.
logger.Info("calling Resize() should stop rtp_passthrough and start gostream")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here might be nice to use t.Run

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep should be ok to nest t.Run tests here.

test.That(t, s.Resize(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 1)
test.That(tb, stopCount.Load(), test.ShouldEqual, 0)
})

// Make sure that Decrement() calls Stop() as gostream is the data source
logger.Info("calling Decrement() should call Stop() as gostream is the data source")
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 1)
test.That(tb, stopCount.Load(), test.ShouldEqual, 1)
})

// Call increment again and make sure that gostream is started
// and rtppassthrough is not called
logger.Info("calling Increment() should call Start() as gostream is the data source")
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 2)
test.That(tb, stopCount.Load(), test.ShouldEqual, 1)
})

// Call decrement again and make sure that gostream is stopped.
logger.Info("calling Decrement() should call Stop() as gostream is the data source")
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 2)
test.That(tb, stopCount.Load(), test.ShouldEqual, 2)
})
})
}
Loading
Loading