Skip to content

Commit

Permalink
Improved Disconnect Error Handling (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jun 17, 2024
1 parent 22d23eb commit 0c2008c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 114 deletions.
54 changes: 16 additions & 38 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,8 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
}

/// <inheritdoc />
public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, CancellationToken cancellationToken = default)
{
if (!this.IsConnected())
{
throw new HiveMQttClientException("PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync.");
}

message.Validate();

var packetIdentifier = this.GeneratePacketIdentifier();
Expand All @@ -227,17 +222,12 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
{
// Wait on the QoS 1 handshake
pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (TimeoutException)
catch (OperationCanceledException)
{
Logger.Error("PublishAsync: QoS 1 timeout. No PUBACK response received in time.");
var disconnectOptions = new DisconnectOptions
{
ReasonCode = DisconnectReasonCode.UnspecifiedError,
};
await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
Logger.Debug("PublishAsync: Operation cancelled by user.");
throw;
}

Expand All @@ -255,18 +245,12 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
{
// Wait on the QoS 2 handshake
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (TimeoutException)
catch (OperationCanceledException)
{
Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");

var disconnectOptions = new DisconnectOptions
{
ReasonCode = DisconnectReasonCode.UnspecifiedError,
};
await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
Logger.Debug("PublishAsync: Operation cancelled by user.");
throw;
}

Expand All @@ -278,11 +262,6 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
}
}

if (publishResult is null)
{
throw new HiveMQttClientException("PublishAsync: QoS 2 complete but no PubRec packet received.");
}

return publishResult;

Check warning on line 265 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Possible null reference return.

Check warning on line 265 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Possible null reference return.

Check warning on line 265 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Possible null reference return.

Check warning on line 265 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Possible null reference return.

Check warning on line 265 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Possible null reference return.

Check warning on line 265 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Possible null reference return.
}

Expand Down Expand Up @@ -330,11 +309,6 @@ public async Task<SubscribeResult> SubscribeAsync(string topic, QualityOfService
/// <inheritdoc />
public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
{
if (!this.IsConnected())
{
throw new HiveMQttClientException("SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync.");
}

// Fire the corresponding event
this.BeforeSubscribeEventLauncher(options);

Expand Down Expand Up @@ -445,11 +419,6 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(List<Subscription> subscri

public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOptions)
{
if (!this.IsConnected())
{
throw new HiveMQttClientException("UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync.");
}

// Fire the corresponding event
this.BeforeUnsubscribeEventLauncher(unsubOptions.Subscriptions);

Expand Down Expand Up @@ -513,6 +482,12 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
/// <param name="clean">Indicates whether the disconnect was intended or not.</param>
private async Task<bool> HandleDisconnectionAsync(bool clean = true)
{
if (this.ConnectState == ConnectState.Disconnected)
{
Logger.Trace("HandleDisconnection: Already disconnected.");
return false;
}

Logger.Debug($"HandleDisconnection: Handling disconnection. clean={clean}.");

// Cancel all background tasks and close the socket
Expand All @@ -537,6 +512,9 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)
this.OutgoingPublishQueue.Clear();
}

// Delay for 1 seconds before launching the AfterDisconnect event
await Task.Delay(1000).ConfigureAwait(false);

// Fire the corresponding after event
this.AfterDisconnectEventLauncher(clean);
return true;
Expand Down
2 changes: 1 addition & 1 deletion Source/HiveMQtt/Client/HiveMQClientSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private async Task<bool> CreateTLSConnectionAsync(Stream stream)
}
else
{
tlsOptions.RemoteCertificateValidationCallback = HiveMQClient.ValidateServerCertificate;
tlsOptions.RemoteCertificateValidationCallback = ValidateServerCertificate;
}

try
Expand Down
Loading

0 comments on commit 0c2008c

Please sign in to comment.