Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream server #4530

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ func (server *Server) GetStreamOptions(
ctx context.Context,
randhid marked this conversation as resolved.
Show resolved Hide resolved
req *streampb.GetStreamOptionsRequest,
) (*streampb.GetStreamOptionsResponse, error) {
server.mu.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add this read lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutex is protecting these struct members:

	nameToStreamState       map[string]*state.StreamState
	activePeerStreams       map[*webrtc.PeerConnection]map[string]*peerState
	activeBackgroundWorkers sync.WaitGroup
	isAlive                 bool

I don't see them being accessed so I don't understand why this mutex is being taken

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - I am just pulling the camera from robot not accessing any of these. Removed.

defer server.mu.RUnlock()
if req.Name == "" {
return nil, errors.New("stream name is required")
}
Expand Down Expand Up @@ -372,6 +374,58 @@ func (server *Server) GetStreamOptions(
}, nil
}

// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream
// to the given width and height.
func (server *Server) SetStreamOptions(
ctx context.Context,
req *streampb.SetStreamOptionsRequest,
) (*streampb.SetStreamOptionsResponse, error) {
if req.Name == "" {
return nil, errors.New("stream name is required in request")
}
if req.Resolution == nil {
return nil, fmt.Errorf("resolution is required to resize stream %q", req.Name)
}
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 {
return nil, fmt.Errorf(
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0",
req.Name, req.Resolution.Width, req.Resolution.Height,
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move this into a validation function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - moved to helper.

server.mu.Lock()
defer server.mu.Unlock()
err := server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height))
if err != nil {
return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err)
}
streamState, ok := server.nameToStreamState[req.Name]
if !ok {
return nil, fmt.Errorf("stream state not found with name %q", req.Name)
}
err = streamState.Resize()
if err != nil {
return nil, fmt.Errorf("failed to resize stream %q: %w", req.Name, err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this logic into resizeVideoSource?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep -- moved all resize logic into resizeVideoSource.

return &streampb.SetStreamOptionsResponse{}, nil
}

// ResizeVideoSource resizes the video source with the given name.
func (server *Server) resizeVideoSource(name string, width, height int) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
return err
}
resizer := gostream.NewResizeVideoSource(cam, width, height)
server.logger.Debug("resizing video source with swap")
existing.Swap(resizer)
return nil
}

// AddNewStreams adds new video and audio streams to the server using the updated set of video and
// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts
// the streams if applicable.
Expand Down
33 changes: 32 additions & 1 deletion robot/web/stream/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type StreamState struct {
streamSource streamSource
// streamSourceSub is only non nil if streamSource == streamSourcePassthrough
streamSourceSub rtppassthrough.Subscription
// isResized indicates whether the stream has been resized by the stream server.
// When set to true, it signals that the passthrough stream should not be restarted.
isResized bool
}

// New returns a new *StreamState.
Expand All @@ -61,6 +64,7 @@ func New(
robot: r,
msgChan: make(chan msg),
tickChan: make(chan struct{}),
isResized: false,
logger: logger,
}

Expand Down Expand Up @@ -90,6 +94,14 @@ func (state *StreamState) Decrement() error {
return state.send(msgTypeDecrement)
}

// Resize notifies the StreamState that the stream has been resized.
func (state *StreamState) Resize() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeResize)
}

// Close closes the StreamState.
func (state *StreamState) Close() error {
state.logger.Info("Closing streamState")
Expand Down Expand Up @@ -129,6 +141,7 @@ const (
msgTypeUnknown msgType = iota
msgTypeIncrement
msgTypeDecrement
msgTypeResize
)

func (mt msgType) String() string {
Expand All @@ -137,6 +150,8 @@ func (mt msgType) String() string {
return "Increment"
case msgTypeDecrement:
return "Decrement"
case msgTypeResize:
return "Resize"
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -185,6 +200,10 @@ func (state *StreamState) sourceEventHandler() {
if state.activeClients == 0 {
state.tick()
}
case msgTypeResize:
state.logger.Debug("resize event received")
state.isResized = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ever go back to not resized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up -- the Reset path has been implemented to handle getting out of resized mode.

state.tick()
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -247,6 +266,11 @@ func (state *StreamState) tick() {
// stop stream if there are no active clients
// noop if there is no stream source
state.stopInputStream()
// If streamSource is unknown and resized is true, we do not want to attempt passthrough.
case state.streamSource == streamSourceUnknown && state.isResized:
state.logger.Debug("in a resized state, using gostream")
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourceUnknown: // && state.activeClients > 0
// this is the first subscription, attempt passthrough
state.logger.Info("attempting to subscribe to rtp_passthrough")
Expand All @@ -257,6 +281,13 @@ func (state *StreamState) tick() {
state.Stream.Start()
state.streamSource = streamSourceGoStream
}
// If we are currently using passthrough, and the stream state changes to resized
// we need to stop the passthrough stream and restart it through gostream.
case state.streamSource == streamSourcePassthrough && state.isResized:
state.logger.Info("stream resized, stopping passthrough stream")
state.stopInputStream()
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil:
// restart stream if there we were using passthrough but the sub is terminated
state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough")
Expand All @@ -271,7 +302,7 @@ func (state *StreamState) tick() {
case state.streamSource == streamSourcePassthrough:
// no op if we are using passthrough & are healthy
state.logger.Debug("still healthy and using h264 passthrough")
case state.streamSource == streamSourceGoStream:
case state.streamSource == streamSourceGoStream && !state.isResized:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a user do to hit this state after they've selected a resolution in the UI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented the Reset path for this. Not exactly sure how to handle this from a UI perspective -- may need to update the wire-diagram in the scope doc to include a Reset button.

// Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as
// we expect some components to not implement rtp passthrough.
state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough")
Expand Down
129 changes: 129 additions & 0 deletions robot/web/stream/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,4 +762,133 @@ func TestStreamState(t *testing.T) {
test.That(tb, stopCount.Load(), test.ShouldEqual, 3)
})
})

t.Run("when in rtppassthrough mode and a resize occurs test downgrade path to gostream", func(t *testing.T) {
var startCount atomic.Int64
Comment on lines +766 to +767
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of your tests are time based and waiting for assertions, can you run with -race and -failfast in canon to verify that they're not flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go test -timeout 30s -race -failfast -run ^TestSetStreamOptions$ go.viam.com/rdk/robot/web/stream
ok go.viam.com/rdk/robot/web/stream 3.255s


go test -timeout 30s -race -failfast -run ^TestStreamState$/^when_in_rtppassthrough_mode_and_a_resize_occurs_test_downgrade_path_to_gostream$ go.viam.com/rdk/robot/web/stream/state
ok go.viam.com/rdk/robot/web/stream/state 3.056s

var stopCount atomic.Int64
writeRTPCalledCtx, writeRTPCalledFunc := context.WithCancel(ctx)
streamMock := &mockStream{
name: camName,
t: t,
startFunc: func() {
startCount.Add(1)
},
stopFunc: func() {
stopCount.Add(1)
},
writeRTPFunc: func(pkt *rtp.Packet) error {
// Test that WriteRTP is eventually called when SubscribeRTP is called
writeRTPCalledFunc()
return nil
},
}

var subscribeRTPCount atomic.Int64
var unsubscribeCount atomic.Int64
type subAndCancel struct {
sub rtppassthrough.Subscription
cancelFn context.CancelFunc
wg *sync.WaitGroup
}

var subsAndCancelByIDMu sync.Mutex
subsAndCancelByID := map[rtppassthrough.SubscriptionID]subAndCancel{}

subscribeRTPFunc := func(
ctx context.Context,
bufferSize int,
packetsCB rtppassthrough.PacketCallback,
) (rtppassthrough.Subscription, error) {
subsAndCancelByIDMu.Lock()
defer subsAndCancelByIDMu.Unlock()
defer subscribeRTPCount.Add(1)
terminatedCtx, terminatedFn := context.WithCancel(context.Background())
id := uuid.New()
sub := rtppassthrough.Subscription{ID: id, Terminated: terminatedCtx}
subsAndCancelByID[id] = subAndCancel{sub: sub, cancelFn: terminatedFn, wg: &sync.WaitGroup{}}
subsAndCancelByID[id].wg.Add(1)
utils.ManagedGo(func() {
for terminatedCtx.Err() == nil {
packetsCB([]*rtp.Packet{{}})
time.Sleep(time.Millisecond * 50)
}
}, subsAndCancelByID[id].wg.Done)
return sub, nil
}

unsubscribeFunc := func(ctx context.Context, id rtppassthrough.SubscriptionID) error {
subsAndCancelByIDMu.Lock()
defer subsAndCancelByIDMu.Unlock()
defer unsubscribeCount.Add(1)
subAndCancel, ok := subsAndCancelByID[id]
if !ok {
test.That(t, fmt.Sprintf("Unsubscribe called with unknown id: %s", id.String()), test.ShouldBeFalse)
}
subAndCancel.cancelFn()
return nil
}

mockRTPPassthroughSource := &mockRTPPassthroughSource{
subscribeRTPFunc: subscribeRTPFunc,
unsubscribeFunc: unsubscribeFunc,
}

robot := mockRobot(mockRTPPassthroughSource)
s := state.New(streamMock, robot, logger)
defer func() {
utils.UncheckedError(s.Close())
}()

test.That(t, subscribeRTPCount.Load(), test.ShouldEqual, 0)
test.That(t, unsubscribeCount.Load(), test.ShouldEqual, 0)
test.That(t, writeRTPCalledCtx.Err(), test.ShouldBeNil)

t.Run("Increment should call SubscribeRTP", func(t *testing.T) {
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 0)
})
// WriteRTP is called
<-writeRTPCalledCtx.Done()
})

t.Run("Resize should stop rtp_passthrough and start gostream", func(t *testing.T) {
test.That(t, s.Resize(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 1)
test.That(tb, stopCount.Load(), test.ShouldEqual, 0)
})
})

t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) {
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 1)
test.That(tb, stopCount.Load(), test.ShouldEqual, 1)
})
})

t.Run("Increment should call Start as gostream is the data source", func(t *testing.T) {
test.That(t, s.Increment(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 2)
test.That(tb, stopCount.Load(), test.ShouldEqual, 1)
})
})

t.Run("Decrement should call Stop as gostream is the data source", func(t *testing.T) {
test.That(t, s.Decrement(), test.ShouldBeNil)
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, subscribeRTPCount.Load(), test.ShouldEqual, 1)
test.That(tb, unsubscribeCount.Load(), test.ShouldEqual, 1)
test.That(tb, startCount.Load(), test.ShouldEqual, 2)
test.That(tb, stopCount.Load(), test.ShouldEqual, 2)
})
})
})
}
Loading
Loading