diff --git a/request_test.go b/request_test.go index 39ede83..af0ba4d 100644 --- a/request_test.go +++ b/request_test.go @@ -249,6 +249,74 @@ func TestPublishAtLeastOnceResend(t *testing.T) { <-brokerMockDone } +// A Client must send pending PUBLISH once reconnected. +func TestPublishAtLeastOnceWhileDown(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + testTimeout := ctx.Done() + + clientConn0, brokerConn0 := net.Pipe() + clientConn1, brokerConn1 := net.Pipe() + client, err := mqtt.VolatileSession("test-client", &mqtt.Config{ + PauseTimeout: time.Second / 4, + AtLeastOnceMax: 3, + Dialer: newDialerMock(t, 50*time.Millisecond, clientConn0, clientConn1), + }) + if err != nil { + t.Fatal("VolatileSession error:", err) + } + + // fail first connection with timeout + brokerConn0.SetDeadline(time.Now().Add(20 * time.Millisecond)) + message, topic, err := client.ReadSlices() + var ne net.Error + if err == nil || !errors.As(err, &ne) || !ne.Timeout() { + t.Fatalf("got message %q, topic %q, error %q, want first connection to fail on a timeout error", + message, topic, err) + } + // client is down + + // publish should enqueue + exchange, err := client.PublishAtLeastOnce([]byte("x"), "y") + if err != nil { + t.Fatal("PublishAtLeastOnce error:", err) + } + select { + case err := <-exchange: + if !errors.Is(err, mqtt.ErrDown) { + t.Fatalf("got exchange error %q, want a mqtt.ErrDown", err) + } + case <-testTimeout: + t.Fatal("test timeout while awaiting publish exchange without connection") + } + + // reconnect and receive + testRoutine(t, func() { + // CONNECT + wantPacketHex(t, brokerConn1, "101700044d51545404000000000b746573742d636c69656e74") + sendPacketHex(t, brokerConn1, "20020000") // CONNACK + wantPacketHex(t, brokerConn1, "3206000179800078") // PUBLISH (enqueued) + sendPacketHex(t, brokerConn1, "40028000") // PUBACK + + brokerConn1.Close() // causes EOF next + }) + message, topic, err = client.ReadSlices() + if err == nil || !errors.Is(err, io.EOF) { + t.Fatalf("got message %q, topic %q, error %q, want second connection to EOF", + message, topic, err) + } + + select { + case err, ok := <-exchange: + if ok { + t.Errorf("got exchange error %q, want channel close", err) + } + case <-testTimeout: + t.Fatal("test timeout while awaiting publish exchange completion") + } +} + // TestPublishAtLeastOnceRestart sends three messages as QOS 1 PUBLISH. The // broker simulation will do all of the following: //