-
Notifications
You must be signed in to change notification settings - Fork 4
/
request.go
917 lines (817 loc) · 29.9 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
package mqtt
import (
"encoding/binary"
"errors"
"fmt"
"net"
"sort"
"sync"
)
// ErrMax denies a request on transit capacity, which prevents the Client from
// blocking. Ping has a limit of 1 slot. Subscribe and Unsubscribe share a large
// number of slots. PublishAtLeastOnce and PublishExactlyOnce each have a limit
// defined by Config. A plain Publish (at most once) has no limit.
var ErrMax = errors.New("mqtt: maximum number of pending requests reached")
// ErrCanceled means that a quit signal got applied before the request was send.
// The transacion never happened, as opposed to ErrAbandoned.
var ErrCanceled = errors.New("mqtt: request canceled before submission")
// ErrAbandoned means that a quit signal got applied after the request was send.
// The broker received the request, yet the result/response remains unknown.
var ErrAbandoned = errors.New("mqtt: request abandoned after submission")
// ErrBreak means that the connection broke up after the request was send.
// The broker received the request, yet the result/response remains unknown.
var ErrBreak = errors.New("mqtt: connection lost while awaiting response")
// BufSize should fit topic names with a bit of overhead.
const bufSize = 128
// BufPool is used to construct packets for submission.
// Append will allocate the appropriate amount on overflows.
// The PUBLISH messages are not copied into these buffers.
var bufPool = sync.Pool{New: func() interface{} { return new([bufSize]byte) }}
// Ping makes a roundtrip to validate the connection.
// Only one request is permitted [ErrMax] at a time.
//
// Quit is optional, as nil just blocks. Appliance of quit will strictly result
// in either ErrCanceled or ErrAbandoned.
func (c *Client) Ping(quit <-chan struct{}) error {
// install callback
done := make(chan error, 1)
select {
case c.pingAck <- done:
break // OK
default:
return fmt.Errorf("%w; PING unavailable", ErrMax)
}
// submit transaction
if err := c.write(quit, packetPINGREQ); err != nil {
select {
case <-c.pingAck: // unlock
default: // picked up by unrelated pong
}
return fmt.Errorf("%w; PING request interrupted", err)
}
select {
case err := <-done:
return err
case <-quit:
select {
case <-c.pingAck: // unlock
return fmt.Errorf("%w; PING not confirmed", ErrAbandoned)
default: // picked up in mean time
return <-done
}
}
}
func (c *Client) onPINGRESP() error {
if len(c.peek) != 0 {
return fmt.Errorf("%w: PINGRESP with %d byte remaining length", errProtoReset, len(c.peek))
}
select {
case ack := <-c.pingAck:
close(ack)
default:
break // tolerates wandering pong
}
return nil
}
// SubscribeError holds one or more topic filters which were failed by the broker.
// The element order matches the originating request's.
type SubscribeError []string
// Error implements the standard error interface.
func (e SubscribeError) Error() string {
return fmt.Sprintf("mqtt: broker failed %d topic filters", len(e))
}
// Reject no-ops to prevent programming mistakes.
var (
// “The payload of a SUBSCRIBE packet MUST contain at least one
// Topic Filter / QoS pair. A SUBSCRIBE packet with no payload
// is a protocol violation.”
// — MQTT Version 3.1.1, conformance statement MQTT-3.8.3-3
errSubscribeNone = errors.New("mqtt: SUBSCRIBE without topic filters denied")
// “The Payload of an UNSUBSCRIBE packet MUST contain at least
// one Topic Filter. An UNSUBSCRIBE packet with no payload is a
// protocol violation.”
// — MQTT Version 3.1.1, conformance statement MQTT-3.10.3-2
errUnsubscribeNone = errors.New("mqtt: UNSUBSCRIBE without topic filters denied")
)
// A total for four types of client requests require a 16-bit packet identifier,
// namely SUBSCRIBE, UNSUBSCRIBE and PUBLISH at-least-once or exactly-once.
// The outbound identifiers are assigned in segments per type. The non-zero
// prefixes/spaces also prevent use of the reserved packet identifier zero.
const (
// A 14-bit address space allows for up to 16,384 pending transactions.
publishIDMask = 0x3fff
// The most-significant bit flags an ordered transaction for publish.
// The second most-significant bit distinguises the QOS level.
atLeastOnceIDSpace = 0x8000
exactlyOnceIDSpace = 0xc000
// A 13-bit address space allows for up to 8,192 pending transactions.
unorderedIDMask = 0x1fff
subscribeIDSpace = 0x6000
unsubscribeIDSpace = 0x4000
)
// ErrPacketIDSpace signals a response packet with an identifier outside of the
// respective address spaces, defined by subscribeIDSpace, unsubscribeIDSpace,
// atLeastOnceIDSpace and exactlyOnceIDSpace. This extra check has a potential
// to detect corruptions which would otherwise go unnoticed.
var errPacketIDSpace = fmt.Errorf("%w: packet ID space mismatch", errProtoReset)
// UnorderedTxs tracks outbound transactions without sequence contraints.
type unorderedTxs struct {
sync.Mutex
n uint // counter is permitted to overflow
perPacketID map[uint16]unorderedCallback // transit state
}
type unorderedCallback struct {
done chan<- error
topicFilters []string
}
// StartTx assigns a slot for either a subscribe or an unsubscribe.
// The filter slice is nil for unsubscribes only.
func (txs *unorderedTxs) startTx(topicFilters []string) (packetID uint16, done <-chan error, err error) {
var space uint
if topicFilters == nil {
space = unsubscribeIDSpace
} else {
space = subscribeIDSpace
}
// Only one response error can be applied on done.
ch := make(chan error, 1)
txs.Lock()
defer txs.Unlock()
// By using only a small window of the actual space we
// minimise any overlap risks with ErrAbandoned cases.
if len(txs.perPacketID) > unorderedIDMask>>4 {
return 0, nil, ErrMax
}
// Find a free identifier with the sequence counter.
for {
packetID = uint16(txs.n&unorderedIDMask | space)
txs.n++
if _, ok := txs.perPacketID[packetID]; ok {
// Such collision indicates a very late response.
continue // just skips the identifier
}
txs.perPacketID[packetID] = unorderedCallback{
topicFilters: topicFilters,
done: ch,
}
return packetID, ch, nil
}
}
// EndTx releases a slot. The filter slice is nil for unsubscribe requests.
func (txs *unorderedTxs) endTx(packetID uint16) (done chan<- error, topicFilters []string) {
txs.Lock()
defer txs.Unlock()
callback := txs.perPacketID[packetID]
delete(txs.perPacketID, packetID)
return callback.done, callback.topicFilters
}
func (txs *unorderedTxs) breakAll() {
txs.Lock()
defer txs.Unlock()
for packetID, callback := range txs.perPacketID {
delete(txs.perPacketID, packetID)
callback.done <- fmt.Errorf("%w; subscription change not confirmed", ErrBreak)
}
}
// Subscribe requests subscription for all topics that match any of the filter
// arguments.
//
// Quit is optional, as nil just blocks. Appliance of quit will strictly result
// in either ErrCanceled or ErrAbandoned.
func (c *Client) Subscribe(quit <-chan struct{}, topicFilters ...string) error {
return c.subscribeLevel(quit, topicFilters, exactlyOnceLevel)
}
// SubscribeLimitAtMostOnce is like Subscribe, but it limits message reception
// to quality-of-service level 0—fire and forget.
func (c *Client) SubscribeLimitAtMostOnce(quit <-chan struct{}, topicFilters ...string) error {
return c.subscribeLevel(quit, topicFilters, atMostOnceLevel)
}
// SubscribeLimitAtLeastOnce is like Subscribe, but it limits message reception
// to quality-of-service level 1—acknowledged transfer.
func (c *Client) SubscribeLimitAtLeastOnce(quit <-chan struct{}, topicFilters ...string) error {
return c.subscribeLevel(quit, topicFilters, atLeastOnceLevel)
}
func (c *Client) subscribeLevel(quit <-chan struct{}, topicFilters []string, levelMax byte) error {
if len(topicFilters) == 0 {
return errSubscribeNone
}
size := 2 + len(topicFilters)*3
for _, s := range topicFilters {
if err := topicCheck(s); err != nil {
return fmt.Errorf("%w; SUBSCRIBE request denied on topic filter", err)
}
size += len(s)
}
if size > packetMax {
return fmt.Errorf("%w; SUBSCRIBE request denied", errPacketMax)
}
// slot assignment
packetID, done, err := c.unorderedTxs.startTx(topicFilters)
if err != nil {
return fmt.Errorf("%w; SUBSCRIBE unavailable", err)
}
// request packet composition
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet := append(buf[:0], typeSUBSCRIBE<<4|atLeastOnceLevel<<1)
l := uint(size)
for ; l > 0x7f; l >>= 7 {
packet = append(packet, byte(l|0x80))
}
packet = append(packet, byte(l))
packet = append(packet, byte(packetID>>8), byte(packetID))
for _, s := range topicFilters {
packet = append(packet, byte(len(s)>>8), byte(len(s)))
packet = append(packet, s...)
packet = append(packet, levelMax)
}
// network submission
if err = c.write(quit, packet); err != nil {
c.unorderedTxs.endTx(packetID) // releases slot
return fmt.Errorf("%w; SUBSCRIBE request interrupted", err)
}
select {
case err := <-done:
return err
case <-quit:
c.unorderedTxs.endTx(packetID) // releases slot
return fmt.Errorf("%w; SUBSCRIBE not confirmed", ErrAbandoned)
}
}
func (c *Client) onSUBACK() error {
if len(c.peek) < 3 {
return fmt.Errorf("%w: SUBACK with %d byte remaining length", errProtoReset, len(c.peek))
}
packetID := binary.BigEndian.Uint16(c.peek)
switch {
case packetID == 0:
return errPacketIDZero
case packetID&^unorderedIDMask != subscribeIDSpace:
return errPacketIDSpace
}
returnCodes := c.peek[2:]
var failN int
for _, code := range returnCodes {
switch code {
case atMostOnceLevel, atLeastOnceLevel, exactlyOnceLevel:
break
case 0x80:
failN++
default:
return fmt.Errorf("%w: SUBACK with illegal return code %#02x", errProtoReset, code)
}
}
// commit
done, topicFilters := c.unorderedTxs.endTx(packetID)
if done == nil { // hopefully due ErrAbandoned
return nil
}
// “The SUBACK Packet sent by the Server to the Client MUST contain a
// return code for each Topic Filter/QoS pair. …”
// — MQTT Version 3.1.1, conformance statement MQTT-3.8.4-5
if len(topicFilters) != len(returnCodes) {
done <- fmt.Errorf("mqtt: %d return codes for SUBSCRIBE with %d topic filters", len(returnCodes), len(topicFilters))
return errProtoReset
}
if failN != 0 {
var err SubscribeError
for i, code := range returnCodes {
if code == 0x80 {
err = append(err, topicFilters[i])
}
}
done <- err
}
close(done)
return nil
}
// Unsubscribe requests subscription cancelation for each of the filter
// arguments.
//
// Quit is optional, as nil just blocks. Appliance of quit will strictly result
// in either ErrCanceled or ErrAbandoned.
func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error {
if len(topicFilters) == 0 {
return errUnsubscribeNone
}
size := 2 + len(topicFilters)*2
for _, s := range topicFilters {
size += len(s)
if err := topicCheck(s); err != nil {
return fmt.Errorf("%w; UNSUBSCRIBE request denied on topic filter", err)
}
}
if size > packetMax {
return fmt.Errorf("%w; UNSUBSCRIBE request denied", errPacketMax)
}
// slot assignment
packetID, done, err := c.unorderedTxs.startTx(nil)
if err != nil {
return fmt.Errorf("%w; UNSUBSCRIBE unavailable", err)
}
// request packet composition
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
// header
packet := append(buf[:0], typeUNSUBSCRIBE<<4|atLeastOnceLevel<<1)
l := uint(size)
for ; l > 0x7f; l >>= 7 {
packet = append(packet, byte(l|0x80))
}
packet = append(packet, byte(l))
packet = append(packet, byte(packetID>>8), byte(packetID))
// payload
for _, s := range topicFilters {
packet = append(packet, byte(len(s)>>8), byte(len(s)))
packet = append(packet, s...)
}
// network submission
if err = c.write(quit, packet); err != nil {
c.unorderedTxs.endTx(packetID) // releases slot
return fmt.Errorf("%w; UNSUBSCRIBE request interrupted", err)
}
select {
case err := <-done:
return err
case <-quit:
c.unorderedTxs.endTx(packetID) // releases slot
return fmt.Errorf("%w; UNSUBSCRIBE not confirmed", ErrAbandoned)
}
}
func (c *Client) onUNSUBACK() error {
if len(c.peek) != 2 {
return fmt.Errorf("%w: UNSUBACK with %d byte remaining length", errProtoReset, len(c.peek))
}
packetID := binary.BigEndian.Uint16(c.peek)
switch {
case packetID == 0:
return errPacketIDZero
case packetID&^unorderedIDMask != unsubscribeIDSpace:
return errPacketIDSpace
}
done, _ := c.unorderedTxs.endTx(packetID)
if done != nil {
close(done)
}
return nil
}
// OrderedTxs tracks outbound transactions with sequence constraints.
// The counters are allowed to overflow.
type orderedTxs struct {
Acked uint // confirm count for PublishAtLeastOnce
Received uint // confirm count 1/2 for PublishExactlyOnce
Completed uint // confirm count 2/2 for PublishExactlyOnce
}
// Publish delivers the message with an “at most once” guarantee.
// Subscribers may or may not receive the message when subject to error.
// This delivery method is the most efficient option.
//
// Quit is optional, as nil just blocks. Appliance of quit will strictly result
// in ErrCanceled.
func (c *Client) Publish(quit <-chan struct{}, message []byte, topic string) error {
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet, err := publishPacket(buf, message, topic, 0, typePUBLISH<<4)
if err != nil {
return err
}
return c.writeBuffers(quit, packet)
}
// PublishRetained is like Publish, but the broker must store the message, so
// that it can be delivered to future subscribers whose subscriptions match the
// topic name. The broker may choose to discard the message at any time though.
// Uppon reception, the broker must discard any message previously retained for
// the topic name.
func (c *Client) PublishRetained(quit <-chan struct{}, message []byte, topic string) error {
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet, err := publishPacket(buf, message, topic, 0, typePUBLISH<<4|retainFlag)
if err != nil {
return err
}
return c.writeBuffers(quit, packet)
}
// PublishAtLeastOnce delivers the message with an “at least once” guarantee.
// Subscribers may receive the message more than once when subject to error.
// This delivery method requires a response transmission plus persistence on
// both client-side and broker-side.
//
// The exchange channel is closed uppon receival confirmation by the broker.
// ErrClosed leaves the channel blocked (with no further input).
func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-chan error, err error) {
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet, err := publishPacket(buf, message, topic, atLeastOnceIDSpace, typePUBLISH<<4|atLeastOnceLevel<<1)
if err != nil {
return nil, err
}
return c.submitPersisted(packet, c.atLeastOnce)
}
// PublishAtLeastOnceRetained is like PublishAtLeastOnce, but the broker must
// store the message, so that it can be delivered to future subscribers whose
// subscriptions match the topic name. When a new subscription is established,
// the last retained message, if any, on each matching topic name must be sent
// to the subscriber.
func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (exchange <-chan error, err error) {
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet, err := publishPacket(buf, message, topic, atLeastOnceIDSpace, typePUBLISH<<4|atLeastOnceLevel<<1|retainFlag)
if err != nil {
return nil, err
}
return c.submitPersisted(packet, c.atLeastOnce)
}
// PublishExactlyOnce delivers the message with an “exactly once” guarantee.
// This delivery method eliminates the duplicate-delivery risk from
// PublishAtLeastOnce at the expense of an additional network roundtrip.
func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-chan error, err error) {
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet, err := publishPacket(buf, message, topic, exactlyOnceIDSpace, typePUBLISH<<4|exactlyOnceLevel<<1)
if err != nil {
return nil, err
}
return c.submitPersisted(packet, c.exactlyOnce)
}
// PublishExactlyOnceRetained is like PublishExactlyOnce, but the broker must
// store the message, so that it can be delivered to future subscribers whose
// subscriptions match the topic name. When a new subscription is established,
// the last retained message, if any, on each matching topic name must be sent
// to the subscriber.
func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (exchange <-chan error, err error) {
buf := bufPool.Get().(*[bufSize]byte)
defer bufPool.Put(buf)
packet, err := publishPacket(buf, message, topic, exactlyOnceIDSpace, typePUBLISH<<4|exactlyOnceLevel<<1|retainFlag)
if err != nil {
return nil, err
}
return c.submitPersisted(packet, c.exactlyOnce)
}
func (c *Client) submitPersisted(packet net.Buffers, out outbound) (exchange <-chan error, err error) {
// lock sequence
seq, ok := <-out.seqSem
if !ok {
return nil, ErrClosed
}
defer func() {
out.seqSem <- seq // unlock with updated
}()
hasBacklog := seq.submitN < seq.acceptN
// persist
done, err := c.applySeqNoAndEnqueue(packet, seq.acceptN, out)
if err != nil {
return nil, err
}
seq.acceptN++
// submit
if hasBacklog {
// buffered channel won't block
done <- fmt.Errorf("%w; PUBLISH enqueued", ErrDown)
} else {
err = c.writeBuffersNoWait(packet)
if err != nil {
// buffered channel won't block
done <- fmt.Errorf("%w; PUBLISH enqueued", err)
} else {
seq.submitN = seq.acceptN
}
}
return done, nil
}
func (c *Client) applySeqNoAndEnqueue(packet net.Buffers, seqNo uint, out outbound) (done chan error, err error) {
if cap(out.queue) == len(out.queue) {
return nil, fmt.Errorf("%w; PUBLISH unavailable", ErrMax)
}
// apply sequence number to packet
buf := packet[0]
i := len(buf) - 2
packetID := uint(binary.BigEndian.Uint16(buf[i:]))
packetID |= seqNo & publishIDMask
binary.BigEndian.PutUint16(buf[i:], uint16(packetID))
err = c.persistence.Save(packetID, packet)
if err != nil {
return nil, fmt.Errorf("%w; PUBLISH dropped", err)
}
done = make(chan error, 2) // receives at most 1 write error + ErrClosed
out.queue <- done // won't block due ErrMax check
return done, nil
}
func publishPacket(buf *[bufSize]byte, message []byte, topic string, packetID uint, head byte) (net.Buffers, error) {
if err := topicCheck(topic); err != nil {
return nil, fmt.Errorf("%w; PUBLISH request denied on topic", err)
}
size := 2 + len(topic) + len(message)
if packetID != 0 {
size += 2
}
if size < 0 || size > packetMax {
return nil, fmt.Errorf("%w; PUBLISH request denied", errPacketMax)
}
packet := append(buf[:0], head)
l := uint(size)
for ; l > 0x7f; l >>= 7 {
packet = append(packet, byte(l|0x80))
}
packet = append(packet, byte(l))
packet = append(packet, byte(len(topic)>>8), byte(len(topic)))
packet = append(packet, topic...)
if packetID != 0 {
packet = append(packet, byte(packetID>>8), byte(packetID))
}
return net.Buffers{packet, message}, nil
}
// OnPUBACK applies the confirm of a PublishAtLeastOnce.
func (c *Client) onPUBACK() error {
// parse packet
if len(c.peek) != 2 {
return fmt.Errorf("%w: PUBACK with %d byte remaining length", errProtoReset, len(c.peek))
}
packetID := uint(binary.BigEndian.Uint16(c.peek))
// match identifier
expect := c.orderedTxs.Acked&publishIDMask | atLeastOnceIDSpace
switch {
case packetID == 0:
return errPacketIDZero
case packetID&^publishIDMask != atLeastOnceIDSpace:
return errPacketIDSpace
case expect != packetID:
return fmt.Errorf("%w: PUBACK %#04x while %#04x next in line", errProtoReset, packetID, expect)
case len(c.atLeastOnce.queue) == 0:
return fmt.Errorf("%w: PUBACK precedes PUBLISH", errProtoReset)
}
// ceil transaction
err := c.persistence.Delete(packetID)
if err != nil {
return err // causes resubmission of PUBLISH
}
c.orderedTxs.Acked++
close(<-c.atLeastOnce.queue)
return nil
}
// OnPUBREC applies the first confirm of a PublishExactlyOnce.
func (c *Client) onPUBREC() error {
// parse packet
if len(c.peek) != 2 {
return fmt.Errorf("%w: PUBREC with %d byte remaining length", errProtoReset, len(c.peek))
}
packetID := uint(binary.BigEndian.Uint16(c.peek))
// match identifier
expect := c.orderedTxs.Received&publishIDMask | exactlyOnceIDSpace
switch {
case packetID == 0:
return errPacketIDZero
case packetID&^publishIDMask != exactlyOnceIDSpace:
return errPacketIDSpace
case packetID != expect:
return fmt.Errorf("%w: PUBREC %#04x while %#04x next in line", errProtoReset, packetID, expect)
case int(c.Received-c.Completed) >= len(c.exactlyOnce.queue):
return fmt.Errorf("%w: PUBREC precedes PUBLISH", errProtoReset)
}
// Use pendingAck as a buffer here.
c.pendingAck = append(c.pendingAck[:0], typePUBREL<<4|atLeastOnceLevel<<1, 2, byte(packetID>>8), byte(packetID))
err := c.persistence.Save(packetID, net.Buffers{c.pendingAck})
if err != nil {
c.pendingAck = c.pendingAck[:0]
return err // causes resubmission of PUBLISH (from persistence)
}
c.orderedTxs.Received++
err = c.write(nil, c.pendingAck)
if err != nil {
return err // keeps pendingAck to retry
}
c.pendingAck = c.pendingAck[:0]
return nil
}
// OnPUBCOMP applies the second (and final) confirm of a PublishExactlyOnce.
func (c *Client) onPUBCOMP() error {
// parse packet
if len(c.peek) != 2 {
return fmt.Errorf("%w: PUBCOMP with %d byte remaining length", errProtoReset, len(c.peek))
}
packetID := uint(binary.BigEndian.Uint16(c.peek))
// match identifier
expect := c.orderedTxs.Completed&publishIDMask | exactlyOnceIDSpace
switch {
case packetID == 0:
return errPacketIDZero
case packetID&^publishIDMask != exactlyOnceIDSpace:
return errPacketIDSpace
case packetID != expect:
return fmt.Errorf("%w: PUBCOMP %#04x while %#04x next in line", errProtoReset, packetID, expect)
case c.orderedTxs.Completed >= c.orderedTxs.Received || len(c.exactlyOnce.queue) == 0:
return fmt.Errorf("%w: PUBCOMP precedes PUBREL", errProtoReset)
}
// ceil transaction
err := c.persistence.Delete(packetID)
if err != nil {
return err // causes resubmission of PUBREL (from Persistence)
}
c.orderedTxs.Completed++
close(<-c.exactlyOnce.queue)
return nil
}
// InitSession configures the Persistence for first use. Brokers use clientID to
// uniquely identify the session. The session may be continued with AdoptSession
// on another Client.
func InitSession(clientID string, p Persistence, c *Config) (*Client, error) {
return initSession(clientID, &ruggedPersistence{Persistence: p}, c)
}
// VolatileSession operates solely in-memory. This setup is recommended for
// delivery with the “at most once” guarantee [Publish], and for reception
// without the “exactly once” guarantee [SubscribeLimitAtLeastOnce], and for
// testing.
//
// Brokers use clientID to uniquely identify the session. Volatile sessions may
// be continued by using the same clientID again. Use CleanSession to prevent
// reuse of an existing state.
//
// An error implies either a broken setup or persistence failure. Connection
// issues, if any, are reported by ReadSlices.
func VolatileSession(clientID string, c *Config) (*Client, error) {
return initSession(clientID, newVolatile(), c)
}
func initSession(clientID string, p Persistence, c *Config) (*Client, error) {
if err := stringCheck(clientID); err != nil {
return nil, fmt.Errorf("%w; illegal client identifier", err)
}
if err := c.valid(); err != nil {
return nil, err
}
// empty check
keys, err := p.List()
if err != nil {
return nil, err
}
if len(keys) != 0 {
return nil, errors.New("mqtt: init on non-empty persistence")
}
// install
err = p.Save(clientIDKey, net.Buffers{[]byte(clientID)})
if err != nil {
return nil, err
}
return newClient(p, c), nil
}
// AdoptSession continues with a Persistence which had an InitSession already.
//
// A fatal implies either a broken setup or persistence failure. Connection
// issues, if any, are reported by ReadSlices. The Client recovers from corrupt
// states (in Persistence) automatically with warn entries.
func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal error) {
if err := c.valid(); err != nil {
return nil, warn, err
}
keys, err := p.List()
if err != nil {
return nil, warn, err
}
// storage includes a sequence number
storeOrderPerKey := make(map[uint]uint64, len(keys))
// “When a Client reconnects with CleanSession set to 0, both the Client
// and Server MUST re-send any unacknowledged PUBLISH Packets (where QoS
// > 0) and PUBREL Packets using their original Packet Identifiers.”
// — MQTT Version 3.1.1, conformance statement MQTT-4.4.0-1
var publishAtLeastOnceKeys, publishExactlyOnceKeys, publishReleaseKeys []uint
for _, key := range keys {
if key == clientIDKey || key&remoteIDKeyFlag != 0 {
continue
}
value, err := p.Load(key)
if err != nil {
return nil, warn, err
}
packet, storageSeqNo, err := decodeValue(value)
if err != nil {
delErr := p.Delete(key)
if delErr != nil {
warn = append(warn, fmt.Errorf("%w; record %#x not deleted: %w", err, key, delErr))
} else {
warn = append(warn, fmt.Errorf("%w; record %#x deleted", err, key))
}
continue
}
storeOrderPerKey[key] = storageSeqNo
switch packet[0] >> 4 {
case typePUBLISH:
switch key &^ publishIDMask {
case atLeastOnceIDSpace:
publishAtLeastOnceKeys = append(publishAtLeastOnceKeys, key)
case exactlyOnceIDSpace:
publishExactlyOnceKeys = append(publishExactlyOnceKeys, key)
}
case typePUBREL:
publishReleaseKeys = append(publishReleaseKeys, key)
}
}
// sort by persistence sequence number
sort.Slice(publishAtLeastOnceKeys, func(i, j int) (less bool) {
return storeOrderPerKey[publishAtLeastOnceKeys[i]] < storeOrderPerKey[publishAtLeastOnceKeys[j]]
})
sort.Slice(publishExactlyOnceKeys, func(i, j int) (less bool) {
return storeOrderPerKey[publishExactlyOnceKeys[i]] < storeOrderPerKey[publishExactlyOnceKeys[j]]
})
sort.Slice(publishReleaseKeys, func(i, j int) (less bool) {
return storeOrderPerKey[publishReleaseKeys[i]] < storeOrderPerKey[publishReleaseKeys[j]]
})
// ensure continuous sequence
publishAtLeastOnceKeys = cleanSequence(publishAtLeastOnceKeys, "PUBLISH at-least-once", &warn)
publishExactlyOnceKeys = cleanSequence(publishExactlyOnceKeys, "PUBLISH exactly-once", &warn)
publishReleaseKeys = cleanSequence(publishReleaseKeys, "PUBREL", &warn)
if len(publishExactlyOnceKeys) != 0 && len(publishReleaseKeys) != 0 {
n := publishExactlyOnceKeys[0] & publishIDMask
p := publishReleaseKeys[len(publishReleaseKeys)-1] & publishIDMask
if n-p != 1 && !(n == 0 && p == publishIDMask) {
warn = append(warn, fmt.Errorf("mqtt: PUBREL %#x–%#x dropped ☠️ due gap until PUBLISH %#x",
publishReleaseKeys[0], publishReleaseKeys[len(publishReleaseKeys)-1], publishExactlyOnceKeys[0]))
}
}
// instantiate client
if n := len(publishAtLeastOnceKeys); n > c.AtLeastOnceMax {
return nil, warn, fmt.Errorf("mqtt: %d AtLeastOnceMax is less than the %d pending in session", c.AtLeastOnceMax, n)
}
if n := len(publishExactlyOnceKeys) + len(publishReleaseKeys); n > c.ExactlyOnceMax {
return nil, warn, fmt.Errorf("mqtt: %d ExactlyOnceMax is less than the %d pending in session", c.ExactlyOnceMax, n)
}
client = newClient(&ruggedPersistence{Persistence: p}, c)
// check for outbound publish pending confirmation
if keys = publishAtLeastOnceKeys; len(keys) != 0 {
// install sequence counts; txs.Acked < seq.acceptN
// and: seq.acceptN − txs.Acked ≤ publishIDMask
client.orderedTxs.Acked = keys[0] & publishIDMask
last := keys[len(keys)-1] & publishIDMask
if last < client.orderedTxs.Acked {
// range overflows address space
last += publishIDMask + 1
}
seq := <-client.atLeastOnce.seqSem
seq.acceptN = last + 1
// BUG(pascaldekloe):
// AdoptSession assumes that all publish-at-least-once packets
// were submitted before already. Persisting the actual state
// after each network submission seems like too much just for
// the DUP flag to be slightly more precise.
seq.submitN = seq.acceptN
client.atLeastOnce.seqSem <- seq
}
// check for outbound publish pending confirmation
if publishKeys, releaseKeys := publishExactlyOnceKeys, publishReleaseKeys; len(publishKeys) != 0 || len(releaseKeys) != 0 {
// install sequence counts; txs.Completed < seq.acceptN
// and: txs.Completed ≤ txs.Received ≤ seq.acceptN
// and: seq.acceptN − txs.Completed ≤ publishIDMask
txs := &client.orderedTxs
if len(releaseKeys) == 0 { // implies len(publishKeys) != 0
txs.Completed = publishKeys[0] & publishIDMask
txs.Received = txs.Completed
} else {
txs.Completed = releaseKeys[0] & publishIDMask
txs.Received = releaseKeys[len(releaseKeys)-1]&publishIDMask + 1
if txs.Received < txs.Completed {
// range overflows address space
txs.Received += publishIDMask + 1
}
}
var last uint
if len(publishKeys) != 0 {
last = publishKeys[len(publishKeys)-1] & publishIDMask
} else {
last = releaseKeys[len(releaseKeys)-1] & publishIDMask
}
if last < txs.Received {
// range overflows address space
last += publishIDMask + 1
}
seq := <-client.exactlyOnce.seqSem
seq.acceptN = last + 1
// BUG(pascaldekloe):
// AdoptSession assumes that all publish-exactly-once packets
// were submitted before already. Persisting the actual state
// after each network submission seems like too much just for
// the DUP flag to be slightly more precise.
seq.submitN = seq.acceptN
client.exactlyOnce.seqSem <- seq
}
// install callback placeholders; won't block due Max check above
for range publishAtLeastOnceKeys {
client.atLeastOnce.queue <- make(chan<- error, 1)
}
for range publishExactlyOnceKeys {
client.exactlyOnce.queue <- make(chan<- error, 1)
}
for range publishReleaseKeys {
client.exactlyOnce.queue <- make(chan<- error, 1)
}
return client, warn, nil
}
func cleanSequence(keys []uint, name string, warn *[]error) []uint {
for i := 1; i < len(keys); i++ {
n := keys[i] & publishIDMask
p := keys[i-1] & publishIDMask
if n-p == 1 || n == 0 && p == publishIDMask {
continue
}
*warn = append(*warn, fmt.Errorf("mqtt: %s %#x–%#x dropped ☠️ due gap until %#x", name, keys[0], keys[i-1], keys[i]))
keys = keys[i:]
i = 0
}
return keys
}