Skip to content

Commit

Permalink
Tweaks to the interruptStream (viamrobotics#4259)
Browse files Browse the repository at this point in the history
My original plan was to move this to using a `StoppableWorkers`, so that we definitely shut down the background goroutine when we call `closeStream()` (right now, it cancels the context and returns without waiting for the goroutine to actually stop). However, this is trickier than I realized: the context that is canceled is _not_ descended from the background context: it's passed in as an argument and might be canceled by something else! So, this PR is some preliminary cleanup without using a `StoppableWorkers` yet.

Thanks to Olivia for pointing out that we don't actually need to wait for the background goroutine to start up, and can return immediately without those syncing up.

Tried on an rpi5 with a modular board component and a modular ultrasonic sensor (so, streaming interrupts from one module to the RDK, and then streaming those same interrupts from the RDK to another module): everything works great!
  • Loading branch information
penguinland authored Aug 20, 2024
1 parent ac02ee6 commit 0ac052c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 30 deletions.
8 changes: 4 additions & 4 deletions components/board/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,14 @@ func (c *client) StreamTicks(ctx context.Context, interrupts []DigitalInterrupt,
client: c,
}

c.mu.Lock()
c.interruptStreams = append(c.interruptStreams, stream)
c.mu.Unlock()

err = stream.startStream(ctx, interrupts, ch)
if err != nil {
return err
}

c.mu.Lock()
defer c.mu.Unlock()
c.interruptStreams = append(c.interruptStreams, stream)
return nil
}

Expand Down
32 changes: 6 additions & 26 deletions components/board/interrupt_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import (

type interruptStream struct {
*client
streamCancel context.CancelFunc
streamRunning bool
streamReady chan bool
streamMu sync.Mutex
streamCancel context.CancelFunc
streamMu sync.Mutex

activeBackgroundWorkers sync.WaitGroup
extra *structpb.Struct
Expand All @@ -28,9 +26,6 @@ func (s *interruptStream) startStream(ctx context.Context, interrupts []DigitalI
return ctx.Err()
}

s.streamRunning = true
s.streamReady = make(chan bool)
s.activeBackgroundWorkers.Add(1)
ctx, cancel := context.WithCancel(ctx)
s.streamCancel = cancel

Expand Down Expand Up @@ -62,30 +57,15 @@ func (s *interruptStream) startStream(ctx context.Context, interrupts []DigitalI
// Create a background go routine to receive from the server stream.
// We rely on calling the Done function here rather than in close stream
// since managed go calls that function when the routine exits.
s.activeBackgroundWorkers.Add(1)
utils.ManagedGo(func() {
s.recieveFromStream(ctx, stream, ch)
s.receiveFromStream(ctx, stream, ch)
},
s.activeBackgroundWorkers.Done)

select {
case <-ctx.Done():
return ctx.Err()
case <-s.streamReady:
return nil
}
return nil
}

func (s *interruptStream) recieveFromStream(ctx context.Context, stream pb.BoardService_StreamTicksClient, ch chan Tick) {
defer func() {
s.streamMu.Lock()
defer s.streamMu.Unlock()
s.streamRunning = false
}()
// Close the stream ready channel so the above function returns.
if s.streamReady != nil {
close(s.streamReady)
}
s.streamReady = nil
func (s *interruptStream) receiveFromStream(ctx context.Context, stream pb.BoardService_StreamTicksClient, ch chan Tick) {
defer s.closeStream()

// repeatly receive from the stream
Expand Down

0 comments on commit 0ac052c

Please sign in to comment.