Skip to content

Commit

Permalink
Refactor strategy for fallback messages
Browse files Browse the repository at this point in the history
Handling exceptions for deadlock threads.
Implement `TryPingAsync` for check is broker
available for sending messages.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Mar 18, 2024
1 parent 5bdfac6 commit 99fa69d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 16 deletions.
14 changes: 13 additions & 1 deletion AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,19 @@ private bool EventualyReconnect()
{
interval = MAX_INCREMENT_INTERVAL;
}
Task.Run(async () => await Connect()).Wait(interval);

try
{
Task.Run(async () => await Connect()).Wait(interval);
}
catch (AggregateException ex)
{
foreach (var innerException in ex.InnerExceptions)
{
// Handle each individual exception
Console.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}");
}
}
}
}

Expand Down
28 changes: 20 additions & 8 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,30 @@ public override async Task Connect()
_client = await InitClientAsync();
}

MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(),
CancellationToken.None).ConfigureAwait(false);

if (result.ResultCode == MqttClientConnectResultCode.Success)
try
{
await CompleteAstarteConnection(result.IsSessionPresent);
using (var timeoutToken = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)))
{
MqttClientConnectResult result = await _client.ConnectAsync(
_connectionInfo.GetMqttConnectOptions(),
timeoutToken.Token);

if (result.ResultCode == MqttClientConnectResultCode.Success)
{
await CompleteAstarteConnection(result.IsSessionPresent);
}
else
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
}
}
}
else
catch (OperationCanceledException)
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
Console.WriteLine("Timeout while connecting.");
}

}

public override void Disconnect()
Expand Down
41 changes: 34 additions & 7 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,20 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface

try
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
if (_client.TryPingAsync().Result)
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
else
{
HandleDatastreamFailedPublish(
new MqttCommunicationException("Broker is not avelabe"),
mapping, topic, payload, qos);
}

}
catch (MqttCommunicationException ex)
catch (MqttCommunicationException)
{
HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos);
_astarteTransportEventListener?.OnTransportDisconnected();
}
catch (AstarteTransportException ex)
Expand Down Expand Up @@ -143,7 +152,16 @@ public override async Task SendIntrospection()
.Remove(introspectionStringBuilder.Length - 1, 1);
string introspection = introspectionStringBuilder.ToString();

await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2);

try
{
await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), MqttQualityOfServiceLevel.ExactlyOnce);
}
catch (Exception ex)
{

Console.WriteLine(ex.Message);
}
}

public override async Task SendIndividualValue(AstarteInterface astarteInterface,
Expand All @@ -170,11 +188,20 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast

try
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
if (_client.TryPingAsync().Result)
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
else
{
HandleDatastreamFailedPublish(
new MqttCommunicationException("Broker is not avelabe"),
mapping, topic, payload, qos);
}

}
catch (MqttCommunicationException e)
catch (MqttCommunicationException)
{
HandleDatastreamFailedPublish(e, mapping, topic, payload, qos);
_astarteTransportEventListener?.OnTransportDisconnected();
}
catch (AstarteTransportException ex)
Expand Down

0 comments on commit 99fa69d

Please sign in to comment.