Skip to content

Commit

Permalink
Refactor strategy for fallback messages
Browse files Browse the repository at this point in the history
Handling exceptions for deadlock threads.
Implement `TryPingAsync` for check is broker
available for sending messages.
Add timeOut time to device constructor, timeOut
duration for the connection. Implement new field
in constructors  E2E and Example.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Mar 20, 2024
1 parent 5bdfac6 commit b921486
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 23 deletions.
6 changes: 4 additions & 2 deletions AstarteDeviceSDKCSharp/AstartePairingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ public class AstartePairingHandler
readonly AstarteCryptoStore _cryptoStore;
private List<AstarteTransport>? _transports;
private X509Certificate2? _certificate;
private TimeSpan _timeOut;

public AstartePairingHandler(string pairingUrl, string astarteRealm, string deviceId,
string credentialSecret, AstarteCryptoStore astarteCryptoStore)
string credentialSecret, AstarteCryptoStore astarteCryptoStore, TimeSpan timeout)
{
_astarteRealm = astarteRealm;
_deviceId = deviceId;
_credentialSecret = credentialSecret;
_cryptoStore = astarteCryptoStore;
_timeOut = timeout;
_AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm);

_certificate = _cryptoStore.GetCertificate();
Expand All @@ -60,7 +62,7 @@ public async Task Init()
private async Task ReloadTransports()
{
_transports = await _AstartePairingService.ReloadTransports(_credentialSecret,
_cryptoStore, _deviceId);
_cryptoStore, _deviceId, _timeOut);
}

public List<AstarteTransport> GetTransports()
Expand Down
5 changes: 3 additions & 2 deletions AstarteDeviceSDKCSharp/AstartePairingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public AstartePairingService(string pairingUrl, string astarteRealm)
}

internal async Task<List<AstarteTransport>> ReloadTransports(string credentialSecret,
AstarteCryptoStore astarteCryptoStore, string deviceId)
AstarteCryptoStore astarteCryptoStore, string deviceId, TimeSpan timeOut)
{
List<AstarteTransport> transports = new();
// Prepare the Pairing API request
Expand Down Expand Up @@ -101,7 +101,8 @@ internal async Task<List<AstarteTransport>> ReloadTransports(string credentialSe
_astarteRealm,
deviceId,
item,
astarteCryptoStore);
astarteCryptoStore,
timeOut);

transports.Add(supportedTransport);
}
Expand Down
20 changes: 18 additions & 2 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AstarteDevice : IAstarteTransportEventListener
private bool _explicitDisconnectionRequest;
private static int MIN_INCREMENT_INTERVAL = 5000;
private static int MAX_INCREMENT_INTERVAL = 60000;
private TimeSpan _timeOut;

Check warning on line 46 in AstarteDeviceSDKCSharp/Device/AstarteDevice.cs

View workflow job for this annotation

GitHub Actions / build

The field 'AstarteDevice._timeOut' is never used

Check warning on line 46 in AstarteDeviceSDKCSharp/Device/AstarteDevice.cs

View workflow job for this annotation

GitHub Actions / build

The field 'AstarteDevice._timeOut' is never used

Check warning on line 46 in AstarteDeviceSDKCSharp/Device/AstarteDevice.cs

View workflow job for this annotation

GitHub Actions / e2e-test

The field 'AstarteDevice._timeOut' is never used

Check warning on line 46 in AstarteDeviceSDKCSharp/Device/AstarteDevice.cs

View workflow job for this annotation

GitHub Actions / e2e-test

The field 'AstarteDevice._timeOut' is never used

/// <summary>
/// Basic class defining an Astarte device.
Expand All @@ -68,13 +69,15 @@ public class AstarteDevice : IAstarteTransportEventListener
/// It can be a shared directory for multiple devices, a subdirectory for the given device
/// ID will be created.
/// </param>
/// <param name="timeOut">The timeout duration for the connection.</param>
public AstarteDevice(
string deviceId,
string astarteRealm,
string credentialSecret,
IAstarteInterfaceProvider astarteInterfaceProvider,
string pairingBaseUrl,
string cryptoStoreDirectory,
TimeSpan timeOut,
bool ignoreSSLErrors = false)
{
if (!Directory.Exists(cryptoStoreDirectory))
Expand All @@ -91,12 +94,14 @@ public AstarteDevice(
AstarteCryptoStore astarteCryptoStore = new AstarteCryptoStore(fullCryptoDirPath);
astarteCryptoStore.IgnoreSSLErrors = ignoreSSLErrors;


_pairingHandler = new AstartePairingHandler(
pairingBaseUrl,
astarteRealm,
deviceId,
credentialSecret,
astarteCryptoStore);
astarteCryptoStore,
timeOut);

astartePropertyStorage = new AstartePropertyStorage(fullCryptoDirPath);

Expand Down Expand Up @@ -180,7 +185,18 @@ private bool EventualyReconnect()
{
interval = MAX_INCREMENT_INTERVAL;
}
Task.Run(async () => await Connect()).Wait(interval);

try
{
Task.Run(async () => await Connect()).Wait(interval);
}
catch (AggregateException ex)
{
foreach (var innerException in ex.InnerExceptions)
{
Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}");
}
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class AstarteTransportFactory

public static AstarteTransport? CreateAstarteTransportFromPairing
(AstarteProtocolType protocolType, string astarteRealm,
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore)
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore, TimeSpan timeOut)
{

switch (protocolType)
Expand All @@ -40,7 +40,8 @@ public static AstarteTransport? CreateAstarteTransportFromPairing
new MutualSSLAuthenticationMqttConnectionInfo(brokerUrl,
astarteRealm,
deviceId,
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters())
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters(),
timeOut)
);
default:
return null;
Expand Down
28 changes: 20 additions & 8 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,30 @@ public override async Task Connect()
_client = await InitClientAsync();
}

MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(),
CancellationToken.None).ConfigureAwait(false);

if (result.ResultCode == MqttClientConnectResultCode.Success)
try
{
await CompleteAstarteConnection(result.IsSessionPresent);
using (var timeoutToken = new CancellationTokenSource(_connectionInfo.GetTimeOut()))
{
MqttClientConnectResult result = await _client.ConnectAsync(
_connectionInfo.GetMqttConnectOptions(),
timeoutToken.Token);

if (result.ResultCode == MqttClientConnectResultCode.Success)
{
await CompleteAstarteConnection(result.IsSessionPresent);
}
else
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
}
}
}
else
catch (OperationCanceledException)
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
Trace.WriteLine("Timeout while connecting.");
}

}

public override void Disconnect()
Expand Down
30 changes: 25 additions & 5 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,17 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface

try
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
if (_client.TryPingAsync().Result)
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
else
{
HandleDatastreamFailedPublish(
new MqttCommunicationException("Broker is not available."),
mapping, topic, payload, qos);
}

}
catch (MqttCommunicationException ex)
{
Expand Down Expand Up @@ -143,7 +153,7 @@ public override async Task SendIntrospection()
.Remove(introspectionStringBuilder.Length - 1, 1);
string introspection = introspectionStringBuilder.ToString();

await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2);
await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), MqttQualityOfServiceLevel.ExactlyOnce);
}

public override async Task SendIndividualValue(AstarteInterface astarteInterface,
Expand All @@ -170,11 +180,21 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast

try
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
if (_client.TryPingAsync().Result)
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
else
{
HandleDatastreamFailedPublish(
new MqttCommunicationException("Broker is not avelabe"),
mapping, topic, payload, qos);
}

}
catch (MqttCommunicationException e)
catch (MqttCommunicationException ex)
{
HandleDatastreamFailedPublish(e, mapping, topic, payload, qos);
HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos);
_astarteTransportEventListener?.OnTransportDisconnected();
}
catch (AstarteTransportException ex)
Expand Down
2 changes: 2 additions & 0 deletions AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public interface IMqttConnectionInfo
string GetClientId();

MqttClientOptions GetMqttConnectOptions();

TimeSpan GetTimeOut();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public class MutualSSLAuthenticationMqttConnectionInfo : IMqttConnectionInfo
private readonly Uri _brokerUrl;
private readonly MqttClientOptions _mqttConnectOptions;
private readonly string _clientId = string.Empty;
private readonly TimeSpan _timeOut;

public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRealm,
string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions)
string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions, TimeSpan timeOut)
{
_brokerUrl = brokerUrl;
_mqttConnectOptions = new MqttClientOptionsBuilder()
Expand All @@ -42,13 +43,16 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe
.WithSessionExpiryInterval(0)
.Build();

_timeOut = timeOut;
_clientId = $"{astarteRealm}/{deviceId}";
}

public Uri GetBrokerUrl() => _brokerUrl;

public string GetClientId() => _clientId;

public TimeSpan GetTimeOut() => _timeOut;

public MqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static AstarteDevice Instance
new MockInterfaceProvider(),
astarteMockData.PairingUrl,
cryptoStoreDir,
TimeSpan.FromMilliseconds(500),
true
);
astarteDevice.SetAlwaysReconnect(true);
Expand Down
3 changes: 2 additions & 1 deletion AstarteDeviceSDKExample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ static async Task<int> Main(string[] args)
credentialsSecret,
interfaceProvider,
pairingUrl,
cryptoStoreDir);
cryptoStoreDir,
TimeSpan.FromMilliseconds(500));

myDevice.SetAlwaysReconnect(true);
myDevice.SetAstarteMessageListener(new ExampleMessageListener());
Expand Down

0 comments on commit b921486

Please sign in to comment.