forked from quic-go/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datagram_queue.go
137 lines (121 loc) · 2.9 KB
/
datagram_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package quic
import (
"context"
"sync"
"github.com/xtls/quic-go/internal/utils"
"github.com/xtls/quic-go/internal/utils/ringbuffer"
"github.com/xtls/quic-go/internal/wire"
)
const (
maxDatagramSendQueueLen = 32
maxDatagramRcvQueueLen = 128
)
type datagramQueue struct {
sendMx sync.Mutex
sendQueue ringbuffer.RingBuffer[*wire.DatagramFrame]
sent chan struct{} // used to notify Add that a datagram was dequeued
rcvMx sync.Mutex
rcvQueue [][]byte
rcvd chan struct{} // used to notify Receive that a new datagram was received
closeErr error
closed chan struct{}
hasData func()
logger utils.Logger
}
func newDatagramQueue(hasData func(), logger utils.Logger) *datagramQueue {
return &datagramQueue{
hasData: hasData,
rcvd: make(chan struct{}, 1),
sent: make(chan struct{}, 1),
closed: make(chan struct{}),
logger: logger,
}
}
// Add queues a new DATAGRAM frame for sending.
// Up to 32 DATAGRAM frames will be queued.
// Once that limit is reached, Add blocks until the queue size has reduced.
func (h *datagramQueue) Add(f *wire.DatagramFrame) error {
h.sendMx.Lock()
for {
if h.sendQueue.Len() < maxDatagramSendQueueLen {
h.sendQueue.PushBack(f)
h.sendMx.Unlock()
h.hasData()
return nil
}
select {
case <-h.sent: // drain the queue so we don't loop immediately
default:
}
h.sendMx.Unlock()
select {
case <-h.closed:
return h.closeErr
case <-h.sent:
}
h.sendMx.Lock()
}
}
// Peek gets the next DATAGRAM frame for sending.
// If actually sent out, Pop needs to be called before the next call to Peek.
func (h *datagramQueue) Peek() *wire.DatagramFrame {
h.sendMx.Lock()
defer h.sendMx.Unlock()
if h.sendQueue.Empty() {
return nil
}
return h.sendQueue.PeekFront()
}
func (h *datagramQueue) Pop() {
h.sendMx.Lock()
defer h.sendMx.Unlock()
_ = h.sendQueue.PopFront()
select {
case h.sent <- struct{}{}:
default:
}
}
// HandleDatagramFrame handles a received DATAGRAM frame.
func (h *datagramQueue) HandleDatagramFrame(f *wire.DatagramFrame) {
data := make([]byte, len(f.Data))
copy(data, f.Data)
var queued bool
h.rcvMx.Lock()
if len(h.rcvQueue) < maxDatagramRcvQueueLen {
h.rcvQueue = append(h.rcvQueue, data)
queued = true
select {
case h.rcvd <- struct{}{}:
default:
}
}
h.rcvMx.Unlock()
if !queued && h.logger.Debug() {
h.logger.Debugf("Discarding received DATAGRAM frame (%d bytes payload)", len(f.Data))
}
}
// Receive gets a received DATAGRAM frame.
func (h *datagramQueue) Receive(ctx context.Context) ([]byte, error) {
for {
h.rcvMx.Lock()
if len(h.rcvQueue) > 0 {
data := h.rcvQueue[0]
h.rcvQueue = h.rcvQueue[1:]
h.rcvMx.Unlock()
return data, nil
}
h.rcvMx.Unlock()
select {
case <-h.rcvd:
continue
case <-h.closed:
return nil, h.closeErr
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (h *datagramQueue) CloseWithError(e error) {
h.closeErr = e
close(h.closed)
}