Skip to content

Commit

Permalink
RSDK-8978 - Move video & audio sources into the stream server (#4490)
Browse files Browse the repository at this point in the history
Co-authored-by: randhid <[email protected]>
  • Loading branch information
seanavery and randhid authored Oct 30, 2024
1 parent 0ba8458 commit cc2c8fb
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 186 deletions.
150 changes: 150 additions & 0 deletions robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package webstream
import (
"context"
"fmt"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -45,12 +46,17 @@ type Server struct {
activePeerStreams map[*webrtc.PeerConnection]map[string]*peerState
activeBackgroundWorkers sync.WaitGroup
isAlive bool

streamConfig gostream.StreamConfig
videoSources map[string]gostream.HotSwappableVideoSource
audioSources map[string]gostream.HotSwappableAudioSource
}

// NewServer returns a server that will run on the given port and initially starts with the given
// stream.
func NewServer(
robot robot.Robot,
streamConfig gostream.StreamConfig,
logger logging.Logger,
) *Server {
closedCtx, closedFn := context.WithCancel(context.Background())
Expand All @@ -62,6 +68,9 @@ func NewServer(
nameToStreamState: map[string]*state.StreamState{},
activePeerStreams: map[*webrtc.PeerConnection]map[string]*peerState{},
isAlive: true,
streamConfig: streamConfig,
videoSources: map[string]gostream.HotSwappableVideoSource{},
audioSources: map[string]gostream.HotSwappableAudioSource{},
}
server.startMonitorCameraAvailable()
return server
Expand Down Expand Up @@ -310,6 +319,70 @@ func (server *Server) RemoveStream(ctx context.Context, req *streampb.RemoveStre
return &streampb.RemoveStreamResponse{}, nil
}

// AddNewStreams adds new video and audio streams to the server using the updated set of video and
// audio sources. It refreshes the sources, checks for a valid stream configuration, and starts
// the streams if applicable.
func (server *Server) AddNewStreams(ctx context.Context) error {
// Refreshing sources will walk the robot resources for anything implementing the camera and
// audioinput APIs and mutate the `svc.videoSources` and `svc.audioSources` maps.
server.refreshVideoSources()
server.refreshAudioSources()

if server.streamConfig == (gostream.StreamConfig{}) {
// The `streamConfig` dictates the video and audio encoder libraries to use. We can't do
// much if none are present.
if len(server.videoSources) != 0 || len(server.audioSources) != 0 {
server.logger.Warn("not starting streams due to no stream config being set")
}
return nil
}

server.logger.Info("starting video and audio streams")
for name := range server.videoSources {
if runtime.GOOS == "windows" {
// TODO(RSDK-1771): support video on windows
server.logger.Warn("video streaming not supported on Windows yet")
break
}
// We walk the updated set of `videoSources` and ensure all of the sources are "created" and
// "started".
config := gostream.StreamConfig{
Name: name,
VideoEncoderFactory: server.streamConfig.VideoEncoderFactory,
}
// Call `createStream`. `createStream` is responsible for first checking if the stream
// already exists. If it does, it skips creating a new stream and we continue to the next source.
//
// TODO(RSDK-9079) Add reliable framerate fetcher for stream videosources
stream, alreadyRegistered, err := server.createStream(config, name)
if err != nil {
return err
} else if alreadyRegistered {
continue
}
server.startVideoStream(ctx, server.videoSources[name], stream)
}

for name := range server.audioSources {
// Similarly, we walk the updated set of `audioSources` and ensure all of the audio sources
// are "created" and "started". `createStream` and `startAudioStream` have the same
// behaviors as described above for video streams.
config := gostream.StreamConfig{
Name: name,
AudioEncoderFactory: server.streamConfig.AudioEncoderFactory,
}
stream, alreadyRegistered, err := server.createStream(config, name)
if err != nil {
return err
} else if alreadyRegistered {
continue
}
server.startAudioStream(ctx, server.audioSources[name], stream)
}

return nil
}

// Close closes the Server and waits for spun off goroutines to complete.
func (server *Server) Close() error {
server.closedFn()
Expand Down Expand Up @@ -412,3 +485,80 @@ func (server *Server) removeMissingStreams() {
utils.UncheckedError(streamState.Close())
}
}

// refreshVideoSources checks and initializes every possible video source that could be viewed from the robot.
func (server *Server) refreshVideoSources() {
for _, name := range camera.NamesFromRobot(server.robot) {
cam, err := camera.FromRobot(server.robot, name)
if err != nil {
continue
}
existing, ok := server.videoSources[cam.Name().SDPTrackName()]
if ok {
existing.Swap(cam)
continue
}
newSwapper := gostream.NewHotSwappableVideoSource(cam)
server.videoSources[cam.Name().SDPTrackName()] = newSwapper
}
}

// refreshAudioSources checks and initializes every possible audio source that could be viewed from the robot.
func (server *Server) refreshAudioSources() {
for _, name := range audioinput.NamesFromRobot(server.robot) {
input, err := audioinput.FromRobot(server.robot, name)
if err != nil {
continue
}
existing, ok := server.audioSources[input.Name().SDPTrackName()]
if ok {
existing.Swap(input)
continue
}
newSwapper := gostream.NewHotSwappableAudioSource(input)
server.audioSources[input.Name().SDPTrackName()] = newSwapper
}
}

func (server *Server) createStream(config gostream.StreamConfig, name string) (gostream.Stream, bool, error) {
stream, err := server.NewStream(config)
// Skip if stream is already registered, otherwise raise any other errors
registeredError := &StreamAlreadyRegisteredError{}
if errors.As(err, &registeredError) {
server.logger.Debugf("%s stream already registered", name)
return nil, true, nil
} else if err != nil {
return nil, false, err
}
return stream, false, err
}

func (server *Server) startStream(streamFunc func(opts *BackoffTuningOptions) error) {
waitCh := make(chan struct{})
server.activeBackgroundWorkers.Add(1)
utils.PanicCapturingGo(func() {
defer server.activeBackgroundWorkers.Done()
close(waitCh)
if err := streamFunc(&BackoffTuningOptions{}); err != nil {
if utils.FilterOutError(err, context.Canceled) != nil {
server.logger.Errorw("error streaming", "error", err)
}
}
})
<-waitCh
}

func (server *Server) startVideoStream(ctx context.Context, source gostream.VideoSource, stream gostream.Stream) {
server.startStream(func(opts *BackoffTuningOptions) error {
streamVideoCtx, _ := utils.MergeContext(server.closedCtx, ctx)
return streamVideoSource(streamVideoCtx, source, stream, opts, server.logger)
})
}

func (server *Server) startAudioStream(ctx context.Context, source gostream.AudioSource, stream gostream.Stream) {
server.startStream(func(opts *BackoffTuningOptions) error {
// Merge ctx that may be coming from a Reconfigure.
streamAudioCtx, _ := utils.MergeContext(server.closedCtx, ctx)
return streamAudioSource(streamAudioCtx, source, stream, opts, server.logger)
})
}
8 changes: 4 additions & 4 deletions robot/web/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.viam.com/rdk/logging"
)

// StreamVideoSource starts a stream from a video source with a throttled error handler.
func StreamVideoSource(
// streamVideoSource starts a stream from a video source with a throttled error handler.
func streamVideoSource(
ctx context.Context,
source gostream.VideoSource,
stream gostream.Stream,
Expand All @@ -25,8 +25,8 @@ func StreamVideoSource(
return gostream.StreamVideoSourceWithErrorHandler(ctx, source, stream, backoffOpts.getErrorThrottledHandler(logger, stream.Name()), logger)
}

// StreamAudioSource starts a stream from an audio source with a throttled error handler.
func StreamAudioSource(
// streamAudioSource starts a stream from an audio source with a throttled error handler.
func streamAudioSource(
ctx context.Context,
source gostream.AudioSource,
stream gostream.Stream,
Expand Down
Loading

0 comments on commit cc2c8fb

Please sign in to comment.