From 6ac4b71eee7fdafc53482685ba76cc203d1d35f3 Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Mon, 26 Aug 2024 07:38:32 -0400 Subject: [PATCH] Fix concurrent pc.GracefulClose --- peerconnection.go | 106 ++++++++++++++++++++++++----------- peerconnection_close_test.go | 84 ++++++++++++++++++++++++++- 2 files changed, 155 insertions(+), 35 deletions(-) diff --git a/peerconnection.go b/peerconnection.go index 3b40dafecb6..5617ee63e81 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -56,8 +56,9 @@ type PeerConnection struct { idpLoginURL *string isClosed *atomicBool - isGracefulClosed *atomicBool - isGracefulClosedDone chan struct{} + isGracefullyClosingOrClosed bool + isCloseDone chan struct{} + isGracefulCloseDone chan struct{} isNegotiationNeeded *atomicBool updateNegotiationNeededFlagOnEmptyChain *atomicBool @@ -130,8 +131,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, ICECandidatePoolSize: 0, }, isClosed: &atomicBool{}, - isGracefulClosed: &atomicBool{}, - isGracefulClosedDone: make(chan struct{}), + isCloseDone: make(chan struct{}), + isGracefulCloseDone: make(chan struct{}), isNegotiationNeeded: &atomicBool{}, updateNegotiationNeededFlagOnEmptyChain: &atomicBool{}, lastOffer: "", @@ -2101,22 +2102,44 @@ func (pc *PeerConnection) GracefulClose() error { func (pc *PeerConnection) close(shouldGracefullyClose bool) error { // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2) - alreadyGracefullyClosed := shouldGracefullyClose && pc.isGracefulClosed.swap(true) - if pc.isClosed.swap(true) { - if alreadyGracefullyClosed { - // similar but distinct condition where we may be waiting for some - // other graceful close to finish. Incorrectly using isClosed may - // leak a goroutine. - <-pc.isGracefulClosedDone - } - return nil + + pc.mu.Lock() + // A lock in this critical section is needed because pc.isClosed and + // pc.isGracefullyClosingOrClosed are related to each other in that we + // want to make graceful and normal closure one time operations in order + // to avoid any double closure errors from cropping up. However, there are + // some overlapping close cases when both normal and graceful close are used + // that should be idempotent, but be cautioned when writing new close behavior + // to preserve this property. + isAlreadyClosingOrClosed := pc.isClosed.swap(true) + isAlreadyGracefullyClosingOrClosed := pc.isGracefullyClosingOrClosed + if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed { + pc.isGracefullyClosingOrClosed = true } - if shouldGracefullyClose && !alreadyGracefullyClosed { - defer close(pc.isGracefulClosedDone) + pc.mu.Unlock() + + if isAlreadyClosingOrClosed { + if !shouldGracefullyClose { + return nil + } + // Even if we're already closing, it may not be graceful: + // If we are not the ones doing the closing, we just wait for the graceful close + // to happen and then return. + if isAlreadyGracefullyClosingOrClosed { + <-pc.isGracefulCloseDone + return nil + } + // Otherwise we need to go through the graceful closure flow once the + // normal closure is done since there are extra steps to take with a + // graceful close. + <-pc.isCloseDone + } else { + defer close(pc.isCloseDone) } - // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3) - pc.signalingState.Set(SignalingStateClosed) + if shouldGracefullyClose { + defer close(pc.isGracefulCloseDone) + } // Try closing everything and collect the errors // Shutdown strategy: @@ -2126,6 +2149,34 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error { // continue the chain the Mux has to be closed. closeErrs := make([]error, 4) + doGracefulCloseOps := func() []error { + if !shouldGracefullyClose { + return nil + } + + // these are all non-canon steps + var gracefulCloseErrors []error + if pc.iceTransport != nil { + gracefulCloseErrors = append(gracefulCloseErrors, pc.iceTransport.GracefulStop()) + } + + pc.ops.GracefulClose() + + pc.sctpTransport.lock.Lock() + for _, d := range pc.sctpTransport.dataChannels { + gracefulCloseErrors = append(gracefulCloseErrors, d.GracefulClose()) + } + pc.sctpTransport.lock.Unlock() + return gracefulCloseErrors + } + + if isAlreadyClosingOrClosed { + return util.FlattenErrs(doGracefulCloseOps()) + } + + // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3) + pc.signalingState.Set(SignalingStateClosed) + closeErrs = append(closeErrs, pc.api.interceptor.Close()) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4) @@ -2156,28 +2207,15 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error { closeErrs = append(closeErrs, pc.dtlsTransport.Stop()) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10) - if pc.iceTransport != nil { - if shouldGracefullyClose { - // note that it isn't canon to stop gracefully - closeErrs = append(closeErrs, pc.iceTransport.GracefulStop()) - } else { - closeErrs = append(closeErrs, pc.iceTransport.Stop()) - } + if pc.iceTransport != nil && !shouldGracefullyClose { + // we will stop gracefully in doGracefulCloseOps + closeErrs = append(closeErrs, pc.iceTransport.Stop()) } // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11) pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State()) - if shouldGracefullyClose { - pc.ops.GracefulClose() - - // note that it isn't canon to stop gracefully - pc.sctpTransport.lock.Lock() - for _, d := range pc.sctpTransport.dataChannels { - closeErrs = append(closeErrs, d.GracefulClose()) - } - pc.sctpTransport.lock.Unlock() - } + closeErrs = append(closeErrs, doGracefulCloseOps()...) return util.FlattenErrs(closeErrs) } diff --git a/peerconnection_close_test.go b/peerconnection_close_test.go index 9f90034a289..42b3ec860ba 100644 --- a/peerconnection_close_test.go +++ b/peerconnection_close_test.go @@ -7,6 +7,8 @@ package webrtc import ( + "fmt" + "sync" "testing" "time" @@ -180,7 +182,7 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) { } } -func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) { +func TestPeerConnection_GracefulCloseWithIncomingMessages(t *testing.T) { // Limit runtime in case of deadlocks lim := test.TimeOut(time.Second * 20) defer lim.Stop() @@ -245,3 +247,83 @@ func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) { t.Fatal(err) } } + +func TestPeerConnection_GracefulCloseWhileOpening(t *testing.T) { + // Limit runtime in case of deadlocks + lim := test.TimeOut(time.Second * 5) + defer lim.Stop() + + report := test.CheckRoutinesStrict(t) + defer report() + + pcOffer, pcAnswer, err := newPair() + if err != nil { + t.Fatal(err) + } + + if _, err = pcOffer.CreateDataChannel("initial_data_channel", nil); err != nil { + t.Fatal(err) + } + + offer, err := pcOffer.CreateOffer(nil) + if err != nil { + t.Fatal(err) + } + offerGatheringComplete := GatheringCompletePromise(pcOffer) + if err = pcOffer.SetLocalDescription(offer); err != nil { + t.Fatal(err) + } + <-offerGatheringComplete + + err = pcOffer.GracefulClose() + if err != nil { + t.Fatal(err) + } + + if err = pcAnswer.SetRemoteDescription(offer); err != nil { + t.Fatal(err) + } + + err = pcAnswer.GracefulClose() + if err != nil { + t.Fatal(err) + } +} + +func TestPeerConnection_GracefulCloseConcurrent(t *testing.T) { + // Limit runtime in case of deadlocks + lim := test.TimeOut(time.Second * 10) + defer lim.Stop() + + for _, mixed := range []bool{false, true} { + t.Run(fmt.Sprintf("mixed_graceful=%t", mixed), func(t *testing.T) { + report := test.CheckRoutinesStrict(t) + defer report() + + pc, err := NewPeerConnection(Configuration{}) + if err != nil { + t.Fatal(err) + } + + const gracefulCloseConcurrency = 50 + var wg sync.WaitGroup + wg.Add(gracefulCloseConcurrency) + for i := 0; i < gracefulCloseConcurrency; i++ { + go func() { + defer wg.Done() + assert.NoError(t, pc.GracefulClose()) + }() + } + if !mixed { + if err := pc.Close(); err != nil { + t.Fatal(err) + } + } else { + if err := pc.GracefulClose(); err != nil { + t.Fatal(err) + } + } + wg.Wait() + }) + } +}