Skip to content

Commit

Permalink
FIX: sequence number ordering inconsistency from issue #7.
Browse files Browse the repository at this point in the history
  • Loading branch information
pascaldekloe committed Aug 15, 2024
1 parent 0140f1e commit 411ab84
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 72 deletions.
69 changes: 40 additions & 29 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,22 +275,29 @@ type Client struct {

// Outbound submission may face multiple goroutines.
type outbound struct {
seqSem chan seq // sequence semaphore singleton

// Acknowledgement is traced with a callback channel.
// Insertion requires seqSem. Close requires connSem.
q chan chan<- error
// The sequence semaphore is a singleton instance.
seqSem chan seq

// Acknowledgement is traced with a callback channel per request.
// Insertion requires a seqSem lock as the queue order must match its
// respective sequence number. Close of the queue requires connSem to
// prevent panic on double close [race].
queue chan chan<- error
}

// Sequence tracks outbound submission.
type seq struct {
// Sequence number n applies to the next submission, starting with zero.
// Its value is used to calculate the respective MQTT packet identifier.
n uint
// AcceptN has the sequence number for the next submission. Counting
// starts at zero. The value is used to calculate the respective MQTT
// packet identifiers.

// Packets are accepted once they are persisted. The count is used as a
// sequence number (starting with zero) in packet identifiers.
acceptN uint

// When backlog is less than n, then packets since backlog until n - 1
// await submission. This may happen during connection absence.
backlog uint
// Any packets between submitN and acceptN are still pending network
// submission. Such backlog may happen due to connectivity failure.
submitN uint
}

func newClient(p Persistence, config *Config) *Client {
Expand All @@ -301,7 +308,7 @@ func newClient(p Persistence, config *Config) *Client {
config.ExactlyOnceMax = publishIDMask + 1
}

c := &Client{
c := Client{
Config: *config, // copy
persistence: p,
onlineSig: make(chan chan struct{}, 1),
Expand All @@ -310,12 +317,12 @@ func newClient(p Persistence, config *Config) *Client {
writeSem: make(chan net.Conn, 1),
pingAck: make(chan chan<- error, 1),
atLeastOnce: outbound{
seqSem: make(chan seq, 1),
q: make(chan chan<- error, config.AtLeastOnceMax),
seqSem: make(chan seq, 1), // must singleton
queue: make(chan chan<- error, config.AtLeastOnceMax),
},
exactlyOnce: outbound{
seqSem: make(chan seq, 1),
q: make(chan chan<- error, config.ExactlyOnceMax),
seqSem: make(chan seq, 1), // must singleton
queue: make(chan chan<- error, config.ExactlyOnceMax),
},
unorderedTxs: unorderedTxs{
perPacketID: make(map[uint16]unorderedCallback),
Expand All @@ -332,9 +339,9 @@ func newClient(p Persistence, config *Config) *Client {
c.writeSem <- connPending

c.ctx, c.cancel = context.WithCancel(context.Background())
c.atLeastOnce.seqSem <- seq{backlog: ^uint(0)}
c.exactlyOnce.seqSem <- seq{backlog: ^uint(0)}
return c
c.atLeastOnce.seqSem <- seq{}
c.exactlyOnce.seqSem <- seq{}
return &c
}

// Close terminates the connection establishment.
Expand Down Expand Up @@ -446,8 +453,9 @@ func (c *Client) termCallbacks() {

// flush queue
err := fmt.Errorf("%w; PUBLISH not confirmed", ErrClosed)
close(c.atLeastOnce.q)
for ch := range c.atLeastOnce.q {
// seqSem lock required for close:
close(c.atLeastOnce.queue)
for ch := range c.atLeastOnce.queue {
ch <- err // won't block
}
}()
Expand All @@ -464,8 +472,9 @@ func (c *Client) termCallbacks() {

// flush queue
err := fmt.Errorf("%w; PUBLISH not confirmed", ErrClosed)
close(c.exactlyOnce.q)
for ch := range c.exactlyOnce.q {
// seqSem lock required for close:
close(c.exactlyOnce.queue)
for ch := range c.exactlyOnce.queue {
ch <- err // won't block
}
}()
Expand Down Expand Up @@ -939,9 +948,11 @@ func (c *Client) dialAndConnect(config *Config) (net.Conn, *bufio.Reader, error)
return conn, bufr, nil
}

// Resend submits any and all pending since seqNoOffset.
// Resend submits any and all pending since seqNoOffset. Sequence numbers count
// from zero. Each sequence number is one less than the respective accept count
// was at the time.
func (c *Client) resend(conn net.Conn, seqNoOffset uint, seq *seq, space uint) error {
for seqNo := seqNoOffset; seqNo < seq.n; seqNo++ {
for seqNo := seqNoOffset; seqNo < seq.acceptN; seqNo++ {
key := seqNo&publishIDMask | space
packet, err := c.persistence.Load(uint(key))
if err != nil {
Expand All @@ -951,19 +962,19 @@ func (c *Client) resend(conn net.Conn, seqNoOffset uint, seq *seq, space uint) e
return fmt.Errorf("mqtt: persistence key %#04x gone missing 👻", key)
}

if seqNo < seq.backlog && packet[0]>>4 == typePUBLISH {
if seqNo < seq.submitN && packet[0]>>4 == typePUBLISH {
packet[0] |= dupeFlag
}

err = write(conn, packet, c.PauseTimeout)
if err != nil {
return err
}
if seqNo >= seq.backlog {
seq.backlog = seqNo + 1

if seqNo >= seq.submitN {
seq.submitN = seqNo + 1
}
}

return nil
}

Expand Down
77 changes: 43 additions & 34 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-ch
if err != nil {
return nil, err
}
return c.submitPersisted(packet, &c.atLeastOnce)
return c.submitPersisted(packet, c.atLeastOnce)
}

// PublishAtLeastOnceRetained is like PublishAtLeastOnce, but the broker must
Expand All @@ -470,7 +470,7 @@ func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (excha
if err != nil {
return nil, err
}
return c.submitPersisted(packet, &c.atLeastOnce)
return c.submitPersisted(packet, c.atLeastOnce)
}

// PublishExactlyOnce delivers the message with an “exactly once” guarantee.
Expand All @@ -483,7 +483,7 @@ func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-ch
if err != nil {
return nil, err
}
return c.submitPersisted(packet, &c.exactlyOnce)
return c.submitPersisted(packet, c.exactlyOnce)
}

// PublishExactlyOnceRetained is like PublishExactlyOnce, but the broker must
Expand All @@ -498,50 +498,47 @@ func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (excha
if err != nil {
return nil, err
}
return c.submitPersisted(packet, &c.exactlyOnce)
return c.submitPersisted(packet, c.exactlyOnce)
}

func (c *Client) submitPersisted(packet net.Buffers, out *outbound) (exchange <-chan error, err error) {
func (c *Client) submitPersisted(packet net.Buffers, out outbound) (exchange <-chan error, err error) {
// lock sequence
seq, ok := <-out.seqSem
if !ok {
return nil, ErrClosed
}
defer func() {
out.seqSem <- seq // unlock
out.seqSem <- seq // unlock with updated
}()

hasBacklog := seq.submitN < seq.acceptN

// persist
done, err := c.applySeqNoAndEnqueue(packet, seq.n, out)
done, err := c.applySeqNoAndEnqueue(packet, seq.acceptN, out)
if err != nil {
return nil, err
}
seq.n++
seq.acceptN++

// submit
if seq.backlog < seq.n {
if hasBacklog {
// buffered channel won't block
done <- fmt.Errorf("%w; PUBLISH enqueued", ErrDown)
} else {

err = c.writeBuffersNoWait(packet)
if err != nil {
// start backlog
seq.backlog = seq.n
if err == ErrDown {
seq.backlog--
}

// buffered channel won't block
done <- fmt.Errorf("%w; PUBLISH enqueued", err)
} else {
seq.submitN = seq.acceptN
}
}

return done, nil
}

func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out *outbound) (done chan error, err error) {
if cap(out.q) == len(out.q) {
func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out outbound) (done chan error, err error) {
if cap(out.queue) == len(out.queue) {
return nil, fmt.Errorf("%w; PUBLISH unavailable", ErrMax)
}

Expand All @@ -558,7 +555,7 @@ func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out *outbo
}

done = make(chan error, 2) // receives at most 1 write error + ErrClosed
out.q <- done // won't block due ErrMax check
out.queue <- done // won't block due ErrMax check
return done, nil
}

Expand Down Expand Up @@ -605,7 +602,7 @@ func (c *Client) onPUBACK() error {
return errPacketIDSpace
case expect != packetID:
return fmt.Errorf("%w: PUBACK %#04x while %#04x next in line", errProtoReset, packetID, expect)
case len(c.atLeastOnce.q) == 0:
case len(c.atLeastOnce.queue) == 0:
return fmt.Errorf("%w: PUBACK precedes PUBLISH", errProtoReset)
}

Expand All @@ -615,7 +612,7 @@ func (c *Client) onPUBACK() error {
return err // causes resubmission of PUBLISH
}
c.orderedTxs.Acked++
close(<-c.atLeastOnce.q)
close(<-c.atLeastOnce.queue)
return nil
}

Expand All @@ -636,7 +633,7 @@ func (c *Client) onPUBREC() error {
return errPacketIDSpace
case packetID != expect:
return fmt.Errorf("%w: PUBREC %#04x while %#04x next in line", errProtoReset, packetID, expect)
case int(c.Received-c.Completed) >= len(c.exactlyOnce.q):
case int(c.Received-c.Completed) >= len(c.exactlyOnce.queue):
return fmt.Errorf("%w: PUBREC precedes PUBLISH", errProtoReset)
}

Expand Down Expand Up @@ -674,7 +671,7 @@ func (c *Client) onPUBCOMP() error {
return errPacketIDSpace
case packetID != expect:
return fmt.Errorf("%w: PUBCOMP %#04x while %#04x next in line", errProtoReset, packetID, expect)
case c.orderedTxs.Completed >= c.orderedTxs.Received || len(c.exactlyOnce.q) == 0:
case c.orderedTxs.Completed >= c.orderedTxs.Received || len(c.exactlyOnce.queue) == 0:
return fmt.Errorf("%w: PUBCOMP precedes PUBREL", errProtoReset)
}

Expand All @@ -684,7 +681,7 @@ func (c *Client) onPUBCOMP() error {
return err // causes resubmission of PUBREL (from Persistence)
}
c.orderedTxs.Completed++
close(<-c.exactlyOnce.q)
close(<-c.exactlyOnce.queue)
return nil
}

Expand Down Expand Up @@ -836,13 +833,19 @@ func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal
last += publishIDMask + 1
}
seq := <-client.atLeastOnce.seqSem
seq.n = last + 1
seq.acceptN = last + 1
// BUG(pascaldekloe):
// AdoptSession assumes that all publish-at-least-once packets
// were submitted before already. Persisting the actual state
// after each network submission seems like too much just for
// the DUP flag to be slightly more precise.
seq.submitN = seq.acceptN
client.atLeastOnce.seqSem <- seq
}

// install exactly-once sequence counters
if publishKeys, releaseKeys := publishExactlyOnceKeys, publishReleaseKeys; len(publishKeys) != 0 || len(releaseKeys) != 0 {
// txs.Completed ≤ txs.Received ≤ seq.N
// txs.Completed ≤ txs.Received ≤ seq.acceptN
txs := &client.orderedTxs
if len(releaseKeys) == 0 {
txs.Completed = publishKeys[0] & publishIDMask
Expand All @@ -858,27 +861,33 @@ func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal

var last uint
if len(publishKeys) != 0 {
last = publishKeys[len(publishKeys)-1]
last = publishKeys[len(publishKeys)-1] & publishIDMask
} else {
last = releaseKeys[len(releaseKeys)-1]
last = releaseKeys[len(releaseKeys)-1] & publishIDMask
}
seq := <-client.exactlyOnce.seqSem
seq.n = last&publishIDMask + 1
if seq.n < txs.Received {
seq.n += publishIDMask + 1 // address space overflow
seq.acceptN = last + 1
if seq.acceptN < txs.Received {
seq.acceptN += publishIDMask + 1 // address space overflow
}
// BUG(pascaldekloe):
// AdoptSession assumes that all publish-exactly-once packets
// were submitted before already. Persisting the actual state
// after each network submission seems like too much just for
// the DUP flag to be slightly more precise.
seq.submitN = seq.acceptN
client.exactlyOnce.seqSem <- seq
}

// install callback placeholders; won't block due Max check above
for range publishAtLeastOnceKeys {
client.atLeastOnce.q <- make(chan<- error, 1)
client.atLeastOnce.queue <- make(chan<- error, 1)
}
for range publishExactlyOnceKeys {
client.exactlyOnce.q <- make(chan<- error, 1)
client.exactlyOnce.queue <- make(chan<- error, 1)
}
for range publishReleaseKeys {
client.exactlyOnce.q <- make(chan<- error, 1)
client.exactlyOnce.queue <- make(chan<- error, 1)
}

return client, warn, nil
Expand Down
Loading

0 comments on commit 411ab84

Please sign in to comment.