From 411ab84bb771e367409d6bcda693fb6ab716891b Mon Sep 17 00:00:00 2001 From: "Pascal S. de Kloe" Date: Thu, 15 Aug 2024 20:03:10 +0200 Subject: [PATCH] FIX: sequence number ordering inconsistency from issue #7. --- client.go | 69 +++++++++++++++++++++++++------------------- request.go | 77 +++++++++++++++++++++++++++---------------------- request_test.go | 59 +++++++++++++++++++++++++++++++------ 3 files changed, 133 insertions(+), 72 deletions(-) diff --git a/client.go b/client.go index a04b4bd..f2beb8a 100644 --- a/client.go +++ b/client.go @@ -275,22 +275,29 @@ type Client struct { // Outbound submission may face multiple goroutines. type outbound struct { - seqSem chan seq // sequence semaphore singleton - - // Acknowledgement is traced with a callback channel. - // Insertion requires seqSem. Close requires connSem. - q chan chan<- error + // The sequence semaphore is a singleton instance. + seqSem chan seq + + // Acknowledgement is traced with a callback channel per request. + // Insertion requires a seqSem lock as the queue order must match its + // respective sequence number. Close of the queue requires connSem to + // prevent panic on double close [race]. + queue chan chan<- error } // Sequence tracks outbound submission. type seq struct { - // Sequence number n applies to the next submission, starting with zero. - // Its value is used to calculate the respective MQTT packet identifier. - n uint + // AcceptN has the sequence number for the next submission. Counting + // starts at zero. The value is used to calculate the respective MQTT + // packet identifiers. + + // Packets are accepted once they are persisted. The count is used as a + // sequence number (starting with zero) in packet identifiers. + acceptN uint - // When backlog is less than n, then packets since backlog until n - 1 - // await submission. This may happen during connection absence. - backlog uint + // Any packets between submitN and acceptN are still pending network + // submission. Such backlog may happen due to connectivity failure. + submitN uint } func newClient(p Persistence, config *Config) *Client { @@ -301,7 +308,7 @@ func newClient(p Persistence, config *Config) *Client { config.ExactlyOnceMax = publishIDMask + 1 } - c := &Client{ + c := Client{ Config: *config, // copy persistence: p, onlineSig: make(chan chan struct{}, 1), @@ -310,12 +317,12 @@ func newClient(p Persistence, config *Config) *Client { writeSem: make(chan net.Conn, 1), pingAck: make(chan chan<- error, 1), atLeastOnce: outbound{ - seqSem: make(chan seq, 1), - q: make(chan chan<- error, config.AtLeastOnceMax), + seqSem: make(chan seq, 1), // must singleton + queue: make(chan chan<- error, config.AtLeastOnceMax), }, exactlyOnce: outbound{ - seqSem: make(chan seq, 1), - q: make(chan chan<- error, config.ExactlyOnceMax), + seqSem: make(chan seq, 1), // must singleton + queue: make(chan chan<- error, config.ExactlyOnceMax), }, unorderedTxs: unorderedTxs{ perPacketID: make(map[uint16]unorderedCallback), @@ -332,9 +339,9 @@ func newClient(p Persistence, config *Config) *Client { c.writeSem <- connPending c.ctx, c.cancel = context.WithCancel(context.Background()) - c.atLeastOnce.seqSem <- seq{backlog: ^uint(0)} - c.exactlyOnce.seqSem <- seq{backlog: ^uint(0)} - return c + c.atLeastOnce.seqSem <- seq{} + c.exactlyOnce.seqSem <- seq{} + return &c } // Close terminates the connection establishment. @@ -446,8 +453,9 @@ func (c *Client) termCallbacks() { // flush queue err := fmt.Errorf("%w; PUBLISH not confirmed", ErrClosed) - close(c.atLeastOnce.q) - for ch := range c.atLeastOnce.q { + // seqSem lock required for close: + close(c.atLeastOnce.queue) + for ch := range c.atLeastOnce.queue { ch <- err // won't block } }() @@ -464,8 +472,9 @@ func (c *Client) termCallbacks() { // flush queue err := fmt.Errorf("%w; PUBLISH not confirmed", ErrClosed) - close(c.exactlyOnce.q) - for ch := range c.exactlyOnce.q { + // seqSem lock required for close: + close(c.exactlyOnce.queue) + for ch := range c.exactlyOnce.queue { ch <- err // won't block } }() @@ -939,9 +948,11 @@ func (c *Client) dialAndConnect(config *Config) (net.Conn, *bufio.Reader, error) return conn, bufr, nil } -// Resend submits any and all pending since seqNoOffset. +// Resend submits any and all pending since seqNoOffset. Sequence numbers count +// from zero. Each sequence number is one less than the respective accept count +// was at the time. func (c *Client) resend(conn net.Conn, seqNoOffset uint, seq *seq, space uint) error { - for seqNo := seqNoOffset; seqNo < seq.n; seqNo++ { + for seqNo := seqNoOffset; seqNo < seq.acceptN; seqNo++ { key := seqNo&publishIDMask | space packet, err := c.persistence.Load(uint(key)) if err != nil { @@ -951,7 +962,7 @@ func (c *Client) resend(conn net.Conn, seqNoOffset uint, seq *seq, space uint) e return fmt.Errorf("mqtt: persistence key %#04x gone missing 👻", key) } - if seqNo < seq.backlog && packet[0]>>4 == typePUBLISH { + if seqNo < seq.submitN && packet[0]>>4 == typePUBLISH { packet[0] |= dupeFlag } @@ -959,11 +970,11 @@ func (c *Client) resend(conn net.Conn, seqNoOffset uint, seq *seq, space uint) e if err != nil { return err } - if seqNo >= seq.backlog { - seq.backlog = seqNo + 1 + + if seqNo >= seq.submitN { + seq.submitN = seqNo + 1 } } - return nil } diff --git a/request.go b/request.go index 922f20b..204124e 100644 --- a/request.go +++ b/request.go @@ -455,7 +455,7 @@ func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-ch if err != nil { return nil, err } - return c.submitPersisted(packet, &c.atLeastOnce) + return c.submitPersisted(packet, c.atLeastOnce) } // PublishAtLeastOnceRetained is like PublishAtLeastOnce, but the broker must @@ -470,7 +470,7 @@ func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (excha if err != nil { return nil, err } - return c.submitPersisted(packet, &c.atLeastOnce) + return c.submitPersisted(packet, c.atLeastOnce) } // PublishExactlyOnce delivers the message with an “exactly once” guarantee. @@ -483,7 +483,7 @@ func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-ch if err != nil { return nil, err } - return c.submitPersisted(packet, &c.exactlyOnce) + return c.submitPersisted(packet, c.exactlyOnce) } // PublishExactlyOnceRetained is like PublishExactlyOnce, but the broker must @@ -498,50 +498,47 @@ func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (excha if err != nil { return nil, err } - return c.submitPersisted(packet, &c.exactlyOnce) + return c.submitPersisted(packet, c.exactlyOnce) } -func (c *Client) submitPersisted(packet net.Buffers, out *outbound) (exchange <-chan error, err error) { +func (c *Client) submitPersisted(packet net.Buffers, out outbound) (exchange <-chan error, err error) { // lock sequence seq, ok := <-out.seqSem if !ok { return nil, ErrClosed } defer func() { - out.seqSem <- seq // unlock + out.seqSem <- seq // unlock with updated }() + hasBacklog := seq.submitN < seq.acceptN + // persist - done, err := c.applySeqNoAndEnqueue(packet, seq.n, out) + done, err := c.applySeqNoAndEnqueue(packet, seq.acceptN, out) if err != nil { return nil, err } - seq.n++ + seq.acceptN++ // submit - if seq.backlog < seq.n { + if hasBacklog { // buffered channel won't block done <- fmt.Errorf("%w; PUBLISH enqueued", ErrDown) } else { - err = c.writeBuffersNoWait(packet) if err != nil { - // start backlog - seq.backlog = seq.n - if err == ErrDown { - seq.backlog-- - } - // buffered channel won't block done <- fmt.Errorf("%w; PUBLISH enqueued", err) + } else { + seq.submitN = seq.acceptN } } return done, nil } -func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out *outbound) (done chan error, err error) { - if cap(out.q) == len(out.q) { +func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out outbound) (done chan error, err error) { + if cap(out.queue) == len(out.queue) { return nil, fmt.Errorf("%w; PUBLISH unavailable", ErrMax) } @@ -558,7 +555,7 @@ func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out *outbo } done = make(chan error, 2) // receives at most 1 write error + ErrClosed - out.q <- done // won't block due ErrMax check + out.queue <- done // won't block due ErrMax check return done, nil } @@ -605,7 +602,7 @@ func (c *Client) onPUBACK() error { return errPacketIDSpace case expect != packetID: return fmt.Errorf("%w: PUBACK %#04x while %#04x next in line", errProtoReset, packetID, expect) - case len(c.atLeastOnce.q) == 0: + case len(c.atLeastOnce.queue) == 0: return fmt.Errorf("%w: PUBACK precedes PUBLISH", errProtoReset) } @@ -615,7 +612,7 @@ func (c *Client) onPUBACK() error { return err // causes resubmission of PUBLISH } c.orderedTxs.Acked++ - close(<-c.atLeastOnce.q) + close(<-c.atLeastOnce.queue) return nil } @@ -636,7 +633,7 @@ func (c *Client) onPUBREC() error { return errPacketIDSpace case packetID != expect: return fmt.Errorf("%w: PUBREC %#04x while %#04x next in line", errProtoReset, packetID, expect) - case int(c.Received-c.Completed) >= len(c.exactlyOnce.q): + case int(c.Received-c.Completed) >= len(c.exactlyOnce.queue): return fmt.Errorf("%w: PUBREC precedes PUBLISH", errProtoReset) } @@ -674,7 +671,7 @@ func (c *Client) onPUBCOMP() error { return errPacketIDSpace case packetID != expect: return fmt.Errorf("%w: PUBCOMP %#04x while %#04x next in line", errProtoReset, packetID, expect) - case c.orderedTxs.Completed >= c.orderedTxs.Received || len(c.exactlyOnce.q) == 0: + case c.orderedTxs.Completed >= c.orderedTxs.Received || len(c.exactlyOnce.queue) == 0: return fmt.Errorf("%w: PUBCOMP precedes PUBREL", errProtoReset) } @@ -684,7 +681,7 @@ func (c *Client) onPUBCOMP() error { return err // causes resubmission of PUBREL (from Persistence) } c.orderedTxs.Completed++ - close(<-c.exactlyOnce.q) + close(<-c.exactlyOnce.queue) return nil } @@ -836,13 +833,19 @@ func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal last += publishIDMask + 1 } seq := <-client.atLeastOnce.seqSem - seq.n = last + 1 + seq.acceptN = last + 1 + // BUG(pascaldekloe): + // AdoptSession assumes that all publish-at-least-once packets + // were submitted before already. Persisting the actual state + // after each network submission seems like too much just for + // the DUP flag to be slightly more precise. + seq.submitN = seq.acceptN client.atLeastOnce.seqSem <- seq } // install exactly-once sequence counters if publishKeys, releaseKeys := publishExactlyOnceKeys, publishReleaseKeys; len(publishKeys) != 0 || len(releaseKeys) != 0 { - // txs.Completed ≤ txs.Received ≤ seq.N + // txs.Completed ≤ txs.Received ≤ seq.acceptN txs := &client.orderedTxs if len(releaseKeys) == 0 { txs.Completed = publishKeys[0] & publishIDMask @@ -858,27 +861,33 @@ func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal var last uint if len(publishKeys) != 0 { - last = publishKeys[len(publishKeys)-1] + last = publishKeys[len(publishKeys)-1] & publishIDMask } else { - last = releaseKeys[len(releaseKeys)-1] + last = releaseKeys[len(releaseKeys)-1] & publishIDMask } seq := <-client.exactlyOnce.seqSem - seq.n = last&publishIDMask + 1 - if seq.n < txs.Received { - seq.n += publishIDMask + 1 // address space overflow + seq.acceptN = last + 1 + if seq.acceptN < txs.Received { + seq.acceptN += publishIDMask + 1 // address space overflow } + // BUG(pascaldekloe): + // AdoptSession assumes that all publish-exactly-once packets + // were submitted before already. Persisting the actual state + // after each network submission seems like too much just for + // the DUP flag to be slightly more precise. + seq.submitN = seq.acceptN client.exactlyOnce.seqSem <- seq } // install callback placeholders; won't block due Max check above for range publishAtLeastOnceKeys { - client.atLeastOnce.q <- make(chan<- error, 1) + client.atLeastOnce.queue <- make(chan<- error, 1) } for range publishExactlyOnceKeys { - client.exactlyOnce.q <- make(chan<- error, 1) + client.exactlyOnce.queue <- make(chan<- error, 1) } for range publishReleaseKeys { - client.exactlyOnce.q <- make(chan<- error, 1) + client.exactlyOnce.queue <- make(chan<- error, 1) } return client, warn, nil diff --git a/request_test.go b/request_test.go index af0ba4d..3f5110e 100644 --- a/request_test.go +++ b/request_test.go @@ -249,15 +249,24 @@ func TestPublishAtLeastOnceResend(t *testing.T) { <-brokerMockDone } -// A Client must send pending PUBLISH once reconnected. +// A Client must send pending PUBLISH once reconnected and continue. func TestPublishAtLeastOnceWhileDown(t *testing.T) { t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - testTimeout := ctx.Done() clientConn0, brokerConn0 := net.Pipe() clientConn1, brokerConn1 := net.Pipe() + + // test expiry + testTimeout := make(chan struct{}) + expire := time.AfterFunc(2*time.Second, func() { + t.Error("test timed out") + close(testTimeout) + + clientConn0.Close() + clientConn1.Close() + }) + defer expire.Stop() + client, err := mqtt.VolatileSession("test-client", &mqtt.Config{ PauseTimeout: time.Second / 4, AtLeastOnceMax: 3, @@ -283,15 +292,36 @@ func TestPublishAtLeastOnceWhileDown(t *testing.T) { t.Fatal("PublishAtLeastOnce error:", err) } select { - case err := <-exchange: + case err, ok := <-exchange: + if !ok { + t.Fatal("exchange completed without connection") + } if !errors.Is(err, mqtt.ErrDown) { - t.Fatalf("got exchange error %q, want a mqtt.ErrDown", err) + t.Fatalf("exchange got error %q, want a mqtt.ErrDown", err) } case <-testTimeout: t.Fatal("test timeout while awaiting publish exchange without connection") } - // reconnect and receive + // schedule second publish for after reconnect + secondPublishDone := testRoutine(t, func() { + time.Sleep(time.Second / 2) + + exchange, err := client.PublishAtLeastOnce([]byte("a"), "b") + if err != nil { + t.Fatal("PublishAtLeastOnce error:", err) + } + select { + case err, ok := <-exchange: + if ok { + t.Fatalf("second exchange got error %q", err) + } + case <-testTimeout: + t.Fatal("test timeout while awaiting second publish exchange") + } + }) + + // mock broker reconnect, delayed receive and receive continue testRoutine(t, func() { // CONNECT wantPacketHex(t, brokerConn1, "101700044d51545404000000000b746573742d636c69656e74") @@ -299,6 +329,9 @@ func TestPublishAtLeastOnceWhileDown(t *testing.T) { wantPacketHex(t, brokerConn1, "3206000179800078") // PUBLISH (enqueued) sendPacketHex(t, brokerConn1, "40028000") // PUBACK + wantPacketHex(t, brokerConn1, "3206000162800161") // PUBLISH (second) + sendPacketHex(t, brokerConn1, "40028001") // PUBACK + brokerConn1.Close() // causes EOF next }) message, topic, err = client.ReadSlices() @@ -307,13 +340,21 @@ func TestPublishAtLeastOnceWhileDown(t *testing.T) { message, topic, err) } + // first publish should recover and complete select { case err, ok := <-exchange: if ok { - t.Errorf("got exchange error %q, want channel close", err) + t.Errorf("first exchange got error %q, want channel close", err) } case <-testTimeout: - t.Fatal("test timeout while awaiting publish exchange completion") + t.Fatal("test timeout while awaiting first publish exchange completion") + } + + select { + case <-secondPublishDone: + break // OK + case <-testTimeout: + t.Fatal("test timeout while awaiting second publish routine") } }