Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8982, RSDK-9167, RSDK-8979] - Add SetStreamOptions to stream server #4530

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions gostream/webrtc_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ type trackLocalStaticRTP struct {
mu sync.RWMutex
bindings []trackBinding
codec webrtc.RTPCodecCapability
sequencer rtp.Sequencer
id, rid, streamID string
}

// newtrackLocalStaticRTP returns a trackLocalStaticRTP.
func newtrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) *trackLocalStaticRTP {
return &trackLocalStaticRTP{
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
codec: c,
bindings: []trackBinding{},
id: id,
sequencer: rtp.NewRandomSequencer(),
streamID: streamID,
}
}

Expand Down Expand Up @@ -126,6 +128,10 @@ func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
for _, b := range s.bindings {
outboundPacket.Header.SSRC = uint32(b.ssrc)
outboundPacket.Header.PayloadType = uint8(b.payloadType)
// We overwrite the sequence number to ensure continuity between packets
// coming from Passthrough sources and those that are packetized by the
// Pion RTP Packetizer in the WriteData method.
outboundPacket.Header.SequenceNumber = s.sequencer.NextSequenceNumber()
if _, err := b.writeStream.WriteRTP(&outboundPacket.Header, outboundPacket.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
Expand Down
104 changes: 104 additions & 0 deletions robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
retryDelay = 50 * time.Millisecond
)

const (
optionsCommandResize = iota
optionsCommandReset
optionsCommandUnknown
)

type peerState struct {
streamState *state.StreamState
senders []*webrtc.RTPSender
Expand Down Expand Up @@ -372,6 +378,104 @@ func (server *Server) GetStreamOptions(
}, nil
}

// SetStreamOptions implements part of the StreamServiceServer. It sets the resolution of the stream
// to the given width and height.
func (server *Server) SetStreamOptions(
ctx context.Context,
req *streampb.SetStreamOptionsRequest,
) (*streampb.SetStreamOptionsResponse, error) {
cmd, err := validateSetStreamOptionsRequest(req)
if err != nil {
return nil, err
}
server.mu.Lock()
defer server.mu.Unlock()
switch cmd {
case optionsCommandResize:
err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height))
if err != nil {
return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err)
}
case optionsCommandReset:
err = server.resetVideoSource(req.Name)
if err != nil {
return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err)
}
default:
return nil, fmt.Errorf("unknown command type %v", cmd)
}
return &streampb.SetStreamOptionsResponse{}, nil
}

// validateSetStreamOptionsRequest validates the request to set the stream options.
func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int, error) {
if req.Name == "" {
return optionsCommandUnknown, errors.New("stream name is required in request")
}
if req.Resolution == nil {
return optionsCommandReset, nil
}
if req.Resolution.Width <= 0 || req.Resolution.Height <= 0 {
return optionsCommandUnknown,
fmt.Errorf(
"invalid resolution to resize stream %q: width (%d) and height (%d) must be greater than 0",
req.Name, req.Resolution.Width, req.Resolution.Height,
)
}
return optionsCommandResize, nil
}

// resizeVideoSource resizes the video source with the given name.
func (server *Server) resizeVideoSource(name string, width, height int) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
return err
}
streamState, ok := server.nameToStreamState[name]
if !ok {
return fmt.Errorf("stream state not found with name %q", name)
}
resizer := gostream.NewResizeVideoSource(cam, width, height)
server.logger.Debugf(
"resizing video source to width %d and height %d",
width, height,
)
existing.Swap(resizer)
err = streamState.Resize()
if err != nil {
return fmt.Errorf("failed to resize stream %q: %w", name, err)
}
return nil
}

// resetVideoSource resets the video source with the given name to the source resolution.
func (server *Server) resetVideoSource(name string) error {
existing, ok := server.videoSources[name]
if !ok {
return fmt.Errorf("video source %q not found", name)
}
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
server.logger.Errorf("error getting camera %q from robot", name)
}
streamState, ok := server.nameToStreamState[name]
if !ok {
return fmt.Errorf("stream state not found with name %q", name)
}
server.logger.Debug("resetting video source")
existing.Swap(cam)
err = streamState.Reset()
if err != nil {
return fmt.Errorf("failed to reset stream %q: %w", name, err)
}
return nil
}

// AddNewStreams adds new video and audio streams to the server using the updated set of video and
// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts
// the streams if applicable.
Expand Down
50 changes: 49 additions & 1 deletion robot/web/stream/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type StreamState struct {
streamSource streamSource
// streamSourceSub is only non nil if streamSource == streamSourcePassthrough
streamSourceSub rtppassthrough.Subscription
// isResized indicates whether the stream has been resized by the stream server.
// When set to true, it signals that the passthrough stream should not be restarted.
isResized bool
}

// New returns a new *StreamState.
Expand All @@ -61,6 +64,7 @@ func New(
robot: r,
msgChan: make(chan msg),
tickChan: make(chan struct{}),
isResized: false,
logger: logger,
}

Expand Down Expand Up @@ -90,6 +94,24 @@ func (state *StreamState) Decrement() error {
return state.send(msgTypeDecrement)
}

// Resize notifies that the gostream source has been resized. This will stop and prevent
// the use of the passthrough stream if it is supported.
func (state *StreamState) Resize() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeResize)
}

// Reset notifies that the gostream source has been reset to the original resolution.
// This will restart the passthrough stream if it is supported.
func (state *StreamState) Reset() error {
if err := state.closedCtx.Err(); err != nil {
return multierr.Combine(ErrClosed, err)
}
return state.send(msgTypeReset)
}

// Close closes the StreamState.
func (state *StreamState) Close() error {
state.logger.Info("Closing streamState")
Expand Down Expand Up @@ -129,6 +151,8 @@ const (
msgTypeUnknown msgType = iota
msgTypeIncrement
msgTypeDecrement
msgTypeResize
msgTypeReset
)

func (mt msgType) String() string {
Expand All @@ -137,6 +161,10 @@ func (mt msgType) String() string {
return "Increment"
case msgTypeDecrement:
return "Decrement"
case msgTypeResize:
return "Resize"
case msgTypeReset:
return "Reset"
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -185,6 +213,14 @@ func (state *StreamState) sourceEventHandler() {
if state.activeClients == 0 {
state.tick()
}
case msgTypeResize:
state.logger.Debug("resize event received")
state.isResized = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ever go back to not resized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up -- the Reset path has been implemented to handle getting out of resized mode.

state.tick()
case msgTypeReset:
state.logger.Debug("reset event received")
state.isResized = false
state.tick()
case msgTypeUnknown:
fallthrough
default:
Expand Down Expand Up @@ -247,6 +283,11 @@ func (state *StreamState) tick() {
// stop stream if there are no active clients
// noop if there is no stream source
state.stopInputStream()
// If streamSource is unknown and resized is true, we do not want to attempt passthrough.
case state.streamSource == streamSourceUnknown && state.isResized:
state.logger.Debug("in a resized state and stream source is unknown, defaulting to GoStream")
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourceUnknown: // && state.activeClients > 0
// this is the first subscription, attempt passthrough
state.logger.Info("attempting to subscribe to rtp_passthrough")
Expand All @@ -257,6 +298,13 @@ func (state *StreamState) tick() {
state.Stream.Start()
state.streamSource = streamSourceGoStream
}
// If we are currently using passthrough, and the stream state changes to resized
// we need to stop the passthrough stream and restart it through gostream.
case state.streamSource == streamSourcePassthrough && state.isResized:
state.logger.Info("stream resized, stopping passthrough stream")
state.stopInputStream()
state.Stream.Start()
state.streamSource = streamSourceGoStream
case state.streamSource == streamSourcePassthrough && state.streamSourceSub.Terminated.Err() != nil:
// restart stream if there we were using passthrough but the sub is terminated
state.logger.Info("previous subscription terminated attempting to subscribe to rtp_passthrough")
Expand All @@ -271,7 +319,7 @@ func (state *StreamState) tick() {
case state.streamSource == streamSourcePassthrough:
// no op if we are using passthrough & are healthy
state.logger.Debug("still healthy and using h264 passthrough")
case state.streamSource == streamSourceGoStream:
case state.streamSource == streamSourceGoStream && !state.isResized:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a user do to hit this state after they've selected a resolution in the UI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented the Reset path for this. Not exactly sure how to handle this from a UI perspective -- may need to update the wire-diagram in the scope doc to include a Reset button.

// Try to upgrade to passthrough if we are using gostream. We leave logs these as debugs as
// we expect some components to not implement rtp passthrough.
state.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough")
Expand Down
Loading
Loading