Skip to content

Commit

Permalink
test publish at least once while down (issue #7)
Browse files Browse the repository at this point in the history
  • Loading branch information
pascaldekloe committed Aug 15, 2024
1 parent 3f0a464 commit 0140f1e
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,74 @@ func TestPublishAtLeastOnceResend(t *testing.T) {
<-brokerMockDone
}

// A Client must send pending PUBLISH once reconnected.
func TestPublishAtLeastOnceWhileDown(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
testTimeout := ctx.Done()

clientConn0, brokerConn0 := net.Pipe()
clientConn1, brokerConn1 := net.Pipe()
client, err := mqtt.VolatileSession("test-client", &mqtt.Config{
PauseTimeout: time.Second / 4,
AtLeastOnceMax: 3,
Dialer: newDialerMock(t, 50*time.Millisecond, clientConn0, clientConn1),
})
if err != nil {
t.Fatal("VolatileSession error:", err)
}

// fail first connection with timeout
brokerConn0.SetDeadline(time.Now().Add(20 * time.Millisecond))
message, topic, err := client.ReadSlices()
var ne net.Error
if err == nil || !errors.As(err, &ne) || !ne.Timeout() {
t.Fatalf("got message %q, topic %q, error %q, want first connection to fail on a timeout error",
message, topic, err)
}
// client is down

// publish should enqueue
exchange, err := client.PublishAtLeastOnce([]byte("x"), "y")
if err != nil {
t.Fatal("PublishAtLeastOnce error:", err)
}
select {
case err := <-exchange:
if !errors.Is(err, mqtt.ErrDown) {
t.Fatalf("got exchange error %q, want a mqtt.ErrDown", err)
}
case <-testTimeout:
t.Fatal("test timeout while awaiting publish exchange without connection")
}

// reconnect and receive
testRoutine(t, func() {
// CONNECT
wantPacketHex(t, brokerConn1, "101700044d51545404000000000b746573742d636c69656e74")
sendPacketHex(t, brokerConn1, "20020000") // CONNACK
wantPacketHex(t, brokerConn1, "3206000179800078") // PUBLISH (enqueued)
sendPacketHex(t, brokerConn1, "40028000") // PUBACK

brokerConn1.Close() // causes EOF next
})
message, topic, err = client.ReadSlices()
if err == nil || !errors.Is(err, io.EOF) {
t.Fatalf("got message %q, topic %q, error %q, want second connection to EOF",
message, topic, err)
}

select {
case err, ok := <-exchange:
if ok {
t.Errorf("got exchange error %q, want channel close", err)
}
case <-testTimeout:
t.Fatal("test timeout while awaiting publish exchange completion")
}
}

// TestPublishAtLeastOnceRestart sends three messages as QOS 1 PUBLISH. The
// broker simulation will do all of the following:
//
Expand Down

0 comments on commit 0140f1e

Please sign in to comment.