Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream server #4530

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
retryDelay = 50 * time.Millisecond
)

const (
optionsCommandResize = iota
optionsCommandReset
optionsCommandUnknown
)

type peerState struct {
streamState *state.StreamState
senders []*webrtc.RTPSender
Expand Down Expand Up @@ -372,6 +378,84 @@ func (server *Server) GetStreamOptions(
}, nil
}

// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream
// to the given width and height.
func (server *Server) SetStreamOptions(
ctx context.Context,
req *streampb.SetStreamOptionsRequest,
) (*streampb.SetStreamOptionsResponse, error) {
cmd, err := validateSetStreamOptionsRequest(req)
if err != nil {
return nil, err
}
server.mu.Lock()
defer server.mu.Unlock()
switch cmd {
case optionsCommandResize:
err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height))
if err != nil {
return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err)
}
case optionsCommandReset:
err = server.resetVideoSource(req.Name)
if err != nil {
return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err)
}
default:
return nil, fmt.Errorf("unknown command type %v", cmd)
}
return &streampb.SetStreamOptionsResponse{}, nil
}

// resizeVideoSource resizes the video source with the given name.
func (server *Server) resizeVideoSource(name string, width, height int) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
return err
}
streamState, ok := server.nameToStreamState[name]
if !ok {
return fmt.Errorf("stream state not found with name %q", name)
}
resizer := gostream.NewResizeVideoSource(cam, width, height)
server.logger.Debug("resizing video source with a hot swap")
existing.Swap(resizer)
err = streamState.Resize()
if err != nil {
return fmt.Errorf("failed to resize stream %q: %w", name, err)
}
return nil
}

// resetVideoSource resets the video source with the given name to the source resolution.
func (server *Server) resetVideoSource(name string) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
}
streamState, ok := server.nameToStreamState[name]
if !ok {
return fmt.Errorf("stream state not found with name %q", name)
}
newSwapper := gostream.NewHotSwappableVideoSource(cam)
server.logger.Debug("resetting video source")
existing.Swap(newSwapper)
err = streamState.Reset()
if err != nil {
return fmt.Errorf("failed to reset stream %q: %w", name, err)
}
return nil
}

// AddNewStreams adds new video and audio streams to the server using the updated set of video and
// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts
// the streams if applicable.
Expand Down Expand Up @@ -677,3 +761,21 @@ retryLoop:
}
return frame.Bounds().Dx(), frame.Bounds().Dy(), nil
}

// validateSetStreamOptionsRequest validates the request to set the stream options.
func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) {
if req.Name == "" {
return optionsCommandUnknown, errors.New("stream name is required in request")
}
if req.Resolution == nil {
return optionsCommandReset, nil
}
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 {
return optionsCommandUnknown,
fmt.Errorf(
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0",
req.Name, req.Resolution.Width, req.Resolution.Height,
)
}
return optionsCommandResize, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put this above where it's used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, moved.

50 changes: 49 additions & 1 deletion robot/web/stream/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type StreamState struct {
streamSource streamSource
// streamSourceSub is only non nil if streamSource == streamSourcePassthrough
streamSourceSub rtppassthrough.Subscription
// isResized indicates whether the stream has been resized by the stream server.
// When set to true, it signals that the passthrough stream should not be restarted.
isResized bool
}

// New returns a new *StreamState.
Expand All @@ -61,6 +64,7 @@ func New(
robot: r,
msgChan: make(chan msg),
tickChan: make(chan struct{}),
isResized: false,
logger: logger,
}

Expand Down Expand Up @@ -90,6 +94,24 @@ func (state *StreamState) Decrement() error {
return state.send(msgTypeDecrement)
}

// Resize notifies that the gostream source has been resized. This will stop and prevent
// the use of the passthrough stream if it is supported.
func (state *StreamState) Resize() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeResize)
}

// Reset notifies that the gostream source has been reset to the original resolution.
// This will restart the passthrough stream if it is supported.
func (state *StreamState) Reset() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeReset)
}

// Close closes the StreamState.
func (state *StreamState) Close() error {
state.logger.Info("Closing streamState")
Expand Down Expand Up @@ -129,6 +151,8 @@ const (
msgTypeUnknown msgType = iota
msgTypeIncrement
msgTypeDecrement
msgTypeResize
msgTypeReset
)

func (mt msgType) String() string {
Expand All @@ -137,6 +161,10 @@ func (mt msgType) String() string {
return "Increment"
case msgTypeDecrement:
return "Decrement"
case msgTypeResize:
return "Resize"
case msgTypeReset:
return "Reset"
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -185,6 +213,14 @@ func (state *StreamState) sourceEventHandler() {
if state.activeClients == 0 {
state.tick()
}
case msgTypeResize:
state.logger.Debug("resize event received")
state.isResized = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ever go back to not resized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up -- the Reset path has been implemented to handle getting out of resized mode.

state.tick()
case msgTypeReset:
state.logger.Debug("reset event received")
state.isResized = false
state.tick()
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -247,6 +283,11 @@ func (state *StreamState) tick() {
// stop stream if there are no active clients
// noop if there is no stream source
state.stopInputStream()
// If streamSource is unknown and resized is true, we do not want to attempt passthrough.
case state.streamSource == streamSourceUnknown && state.isResized:
state.logger.Debug("in a resized state, using gostream")
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourceUnknown: // && state.activeClients > 0
// this is the first subscription, attempt passthrough
state.logger.Info("attempting to subscribe to rtp_passthrough")
Expand All @@ -257,6 +298,13 @@ func (state *StreamState) tick() {
state.Stream.Start()
state.streamSource = streamSourceGoStream
}
// If we are currently using passthrough, and the stream state changes to resized
// we need to stop the passthrough stream and restart it through gostream.
case state.streamSource == streamSourcePassthrough && state.isResized:
state.logger.Info("stream resized, stopping passthrough stream")
state.stopInputStream()
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil:
// restart stream if there we were using passthrough but the sub is terminated
state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough")
Expand All @@ -271,7 +319,7 @@ func (state *StreamState) tick() {
case state.streamSource == streamSourcePassthrough:
// no op if we are using passthrough & are healthy
state.logger.Debug("still healthy and using h264 passthrough")
case state.streamSource == streamSourceGoStream:
case state.streamSource == streamSourceGoStream && !state.isResized:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a user do to hit this state after they've selected a resolution in the UI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented the Reset path for this. Not exactly sure how to handle this from a UI perspective -- may need to update the wire-diagram in the scope doc to include a Reset button.

// Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as
// we expect some components to not implement rtp passthrough.
state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough")
Expand Down
160 changes: 160 additions & 0 deletions robot/web/stream/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,4 +762,164 @@ func TestStreamState(t *testing.T) {
test.That(tb, stopCount.Load(), test.ShouldEqual, 3)
})
})

t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) {
var startCount atomic.Int64
Comment on lines +766 to +767
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of your tests are time based and waiting for assertions, can you run with -race and -failfast in canon to verify that they're not flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go test -timeout 30s -race -failfast -run ^TestSetStreamOptions$ go.viam.com/rdk/robot/web/stream
ok go.viam.com/rdk/robot/web/stream 3.255s


go test -timeout 30s -race -failfast -run ^TestStreamState$/^when_in_rtppassthrough_mode_and_a_resize_occurs_test_downgrade_path_to_gostream$ go.viam.com/rdk/robot/web/stream/state
ok go.viam.com/rdk/robot/web/stream/state 3.056s

var stopCount atomic.Int64
writeRTPCalledCtx, writeRTPCalledFunc := context.WithCancel(ctx)
streamMock := &mockStream{
name: camName,
t: t,
startFunc: func() {
startCount.Add(1)
},
stopFunc: func() {
stopCount.Add(1)
},
writeRTPFunc: func(pkt *rtp.Packet) error {
// Test that WriteRTP is eventually called when SubscribeRTP is called
writeRTPCalledFunc()
return nil
},
}

var subscribeRTPCount atomic.Int64
var unsubscribeCount atomic.Int64
type subAndCancel struct {
sub rtppassthrough.Subscription
cancelFn context.CancelFunc
wg *sync.WaitGroup
}

var subsAndCancelByIDMu sync.Mutex
subsAndCancelByID := map[rtppassthrough.SubscriptionID]subAndCancel{}

subscribeRTPFunc := func(
ctx context.Context,
bufferSize int,
packetsCB rtppassthrough.PacketCallback,
) (rtppassthrough.Subscription, error) {
subsAndCancelByIDMu.Lock()
defer subsAndCancelByIDMu.Unlock()
defer subscribeRTPCount.Add(1)
terminatedCtx, terminatedFn := context.WithCancel(context.Background())
id := uuid.New()
sub := rtppassthrough.Subscription{ID: id, Terminated: terminatedCtx}
subsAndCancelByID[id] = subAndCancel{sub: sub, cancelFn: terminatedFn, wg: &sync.WaitGroup{}}
subsAndCancelByID[id].wg.Add(1)
utils.ManagedGo(func() {
for terminatedCtx.Err() == nil {
packetsCB([]*rtp.Packet{{}})
time.Sleep(time.Millisecond * 50)
}
}, subsAndCancelByID[id].wg.Done)
return sub, nil
}

unsubscribeFunc := func(ctx context.Context, id rtppassthrough.SubscriptionID) error {
subsAndCancelByIDMu.Lock()
defer subsAndCancelByIDMu.Unlock()
defer unsubscribeCount.Add(1)
subAndCancel, ok := subsAndCancelByID[id]
if !ok {
test.That(t, fmt.Sprintf("Unsubscribe called with unknown id: %s", id.String()), test.ShouldBeFalse)
}
subAndCancel.cancelFn()
return nil
}

mockRTPPassthroughSource := &mockRTPPassthroughSource{
subscribeRTPFunc: subscribeRTPFunc,
unsubscribeFunc: unsubscribeFunc,
}

robot := mockRobot(mockRTPPassthroughSource)
s := state.New(streamMock, robot, logger)
defer func() {
utils.UncheckedError(s.Close())
}()

test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 0)
test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0)
test.That(t, writeRTPCalledCtx.Err(), test.ShouldBeNil)

t.Run("Increment should call SubscribeRTP", func(t *testing.T) {
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0)
})
// WriteRTP is called
<-writeRTPCalledCtx.Done()
})

t.Run("Resize should stop rtp_passthrough and start gostream", func(t *testing.T) {
test.That(t, s.Resize(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 1)
test.That(tb, stopCount.Load(), test.ShouldEqual, 0)
})
})

t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) {
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 1)
test.That(tb, stopCount.Load(), test.ShouldEqual, 1)
})
})

t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) {
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 2)
test.That(tb, stopCount.Load(), test.ShouldEqual, 1)
})
})

t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) {
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 2)
test.That(tb, stopCount.Load(), test.ShouldEqual, 2)
})
})

t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) {
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 3)
test.That(tb, stopCount.Load(), test.ShouldEqual, 2)
})
})

t.Run("Reset should call Stop as gostream is the current data source and then "+
"Subscribe as rtp_passthrough is the new data source", func(t *testing.T) {
test.That(t, s.Reset(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 2)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 3)
test.That(tb, stopCount.Load(), test.ShouldEqual, 3)
})
})

t.Run("Decrement should call unsubscribe as rtp_passthrough is the data source", func(t *testing.T) {
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 2)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 2)
test.That(tb, startCount.Load(), test.ShouldEqual, 3)
test.That(tb, stopCount.Load(), test.ShouldEqual, 3)
})
})
})
}
Loading
Loading