From b921486ef4a834deadebfe1325f1ad1e29542e57 Mon Sep 17 00:00:00 2001 From: Osman Hadzic Date: Mon, 18 Mar 2024 08:49:01 +0100 Subject: [PATCH] Refactor strategy for fallback messages Handling exceptions for deadlock threads. Implement `TryPingAsync` for check is broker available for sending messages. Add timeOut time to device constructor, timeOut duration for the connection. Implement new field in constructors E2E and Example. Signed-off-by: Osman Hadzic --- .../AstartePairingHandler.cs | 6 ++-- .../AstartePairingService.cs | 5 ++-- .../Device/AstarteDevice.cs | 20 +++++++++++-- .../Transport/AstarteTransportFactory.cs | 5 ++-- .../Transport/MQTT/AstarteMqttTransport.cs | 28 ++++++++++++----- .../Transport/MQTT/AstarteMqttV1Transport.cs | 30 +++++++++++++++---- .../Transport/MQTT/IMqttConnectionInfo.cs | 2 ++ ...tualSSLAuthenticationMqttConnectionInfo.cs | 6 +++- .../Utilities/AstarteDeviceSingleton.cs | 1 + AstarteDeviceSDKExample/Program.cs | 3 +- 10 files changed, 83 insertions(+), 23 deletions(-) diff --git a/AstarteDeviceSDKCSharp/AstartePairingHandler.cs b/AstarteDeviceSDKCSharp/AstartePairingHandler.cs index 3e973c1..a9845f7 100644 --- a/AstarteDeviceSDKCSharp/AstartePairingHandler.cs +++ b/AstarteDeviceSDKCSharp/AstartePairingHandler.cs @@ -33,14 +33,16 @@ public class AstartePairingHandler readonly AstarteCryptoStore _cryptoStore; private List? _transports; private X509Certificate2? _certificate; + private TimeSpan _timeOut; public AstartePairingHandler(string pairingUrl, string astarteRealm, string deviceId, - string credentialSecret, AstarteCryptoStore astarteCryptoStore) + string credentialSecret, AstarteCryptoStore astarteCryptoStore, TimeSpan timeout) { _astarteRealm = astarteRealm; _deviceId = deviceId; _credentialSecret = credentialSecret; _cryptoStore = astarteCryptoStore; + _timeOut = timeout; _AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm); _certificate = _cryptoStore.GetCertificate(); @@ -60,7 +62,7 @@ public async Task Init() private async Task ReloadTransports() { _transports = await _AstartePairingService.ReloadTransports(_credentialSecret, - _cryptoStore, _deviceId); + _cryptoStore, _deviceId, _timeOut); } public List GetTransports() diff --git a/AstarteDeviceSDKCSharp/AstartePairingService.cs b/AstarteDeviceSDKCSharp/AstartePairingService.cs index ab07f10..dff4707 100644 --- a/AstarteDeviceSDKCSharp/AstartePairingService.cs +++ b/AstarteDeviceSDKCSharp/AstartePairingService.cs @@ -48,7 +48,7 @@ public AstartePairingService(string pairingUrl, string astarteRealm) } internal async Task> ReloadTransports(string credentialSecret, - AstarteCryptoStore astarteCryptoStore, string deviceId) + AstarteCryptoStore astarteCryptoStore, string deviceId, TimeSpan timeOut) { List transports = new(); // Prepare the Pairing API request @@ -101,7 +101,8 @@ internal async Task> ReloadTransports(string credentialSe _astarteRealm, deviceId, item, - astarteCryptoStore); + astarteCryptoStore, + timeOut); transports.Add(supportedTransport); } diff --git a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs index 524fa94..d68a950 100644 --- a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs +++ b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs @@ -43,6 +43,7 @@ public class AstarteDevice : IAstarteTransportEventListener private bool _explicitDisconnectionRequest; private static int MIN_INCREMENT_INTERVAL = 5000; private static int MAX_INCREMENT_INTERVAL = 60000; + private TimeSpan _timeOut; /// /// Basic class defining an Astarte device. @@ -68,6 +69,7 @@ public class AstarteDevice : IAstarteTransportEventListener /// It can be a shared directory for multiple devices, a subdirectory for the given device /// ID will be created. /// + /// The timeout duration for the connection. public AstarteDevice( string deviceId, string astarteRealm, @@ -75,6 +77,7 @@ public AstarteDevice( IAstarteInterfaceProvider astarteInterfaceProvider, string pairingBaseUrl, string cryptoStoreDirectory, + TimeSpan timeOut, bool ignoreSSLErrors = false) { if (!Directory.Exists(cryptoStoreDirectory)) @@ -91,12 +94,14 @@ public AstarteDevice( AstarteCryptoStore astarteCryptoStore = new AstarteCryptoStore(fullCryptoDirPath); astarteCryptoStore.IgnoreSSLErrors = ignoreSSLErrors; + _pairingHandler = new AstartePairingHandler( pairingBaseUrl, astarteRealm, deviceId, credentialSecret, - astarteCryptoStore); + astarteCryptoStore, + timeOut); astartePropertyStorage = new AstartePropertyStorage(fullCryptoDirPath); @@ -180,7 +185,18 @@ private bool EventualyReconnect() { interval = MAX_INCREMENT_INTERVAL; } - Task.Run(async () => await Connect()).Wait(interval); + + try + { + Task.Run(async () => await Connect()).Wait(interval); + } + catch (AggregateException ex) + { + foreach (var innerException in ex.InnerExceptions) + { + Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}"); + } + } } } diff --git a/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs b/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs index b3fb457..b2511e7 100644 --- a/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs +++ b/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs @@ -29,7 +29,7 @@ internal class AstarteTransportFactory public static AstarteTransport? CreateAstarteTransportFromPairing (AstarteProtocolType protocolType, string astarteRealm, - string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore) + string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore, TimeSpan timeOut) { switch (protocolType) @@ -40,7 +40,8 @@ public static AstarteTransport? CreateAstarteTransportFromPairing new MutualSSLAuthenticationMqttConnectionInfo(brokerUrl, astarteRealm, deviceId, - astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters()) + astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters(), + timeOut) ); default: return null; diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs index 169fd39..3c78368 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs @@ -121,18 +121,30 @@ public override async Task Connect() _client = await InitClientAsync(); } - MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(), - CancellationToken.None).ConfigureAwait(false); - - if (result.ResultCode == MqttClientConnectResultCode.Success) + try { - await CompleteAstarteConnection(result.IsSessionPresent); + using (var timeoutToken = new CancellationTokenSource(_connectionInfo.GetTimeOut())) + { + MqttClientConnectResult result = await _client.ConnectAsync( + _connectionInfo.GetMqttConnectOptions(), + timeoutToken.Token); + + if (result.ResultCode == MqttClientConnectResultCode.Success) + { + await CompleteAstarteConnection(result.IsSessionPresent); + } + else + { + throw new AstarteTransportException + ($"Error connecting to MQTT. Code: {result.ResultCode}"); + } + } } - else + catch (OperationCanceledException) { - throw new AstarteTransportException - ($"Error connecting to MQTT. Code: {result.ResultCode}"); + Trace.WriteLine("Timeout while connecting."); } + } public override void Disconnect() diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index ecb0d37..83c1eae 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -69,7 +69,17 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface try { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + if (_client.TryPingAsync().Result) + { + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + } + else + { + HandleDatastreamFailedPublish( + new MqttCommunicationException("Broker is not available."), + mapping, topic, payload, qos); + } + } catch (MqttCommunicationException ex) { @@ -143,7 +153,7 @@ public override async Task SendIntrospection() .Remove(introspectionStringBuilder.Length - 1, 1); string introspection = introspectionStringBuilder.ToString(); - await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2); + await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), MqttQualityOfServiceLevel.ExactlyOnce); } public override async Task SendIndividualValue(AstarteInterface astarteInterface, @@ -170,11 +180,21 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast try { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + if (_client.TryPingAsync().Result) + { + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + } + else + { + HandleDatastreamFailedPublish( + new MqttCommunicationException("Broker is not avelabe"), + mapping, topic, payload, qos); + } + } - catch (MqttCommunicationException e) + catch (MqttCommunicationException ex) { - HandleDatastreamFailedPublish(e, mapping, topic, payload, qos); + HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos); _astarteTransportEventListener?.OnTransportDisconnected(); } catch (AstarteTransportException ex) diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs index 57bc3a5..768d4bf 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs @@ -30,5 +30,7 @@ public interface IMqttConnectionInfo string GetClientId(); MqttClientOptions GetMqttConnectOptions(); + + TimeSpan GetTimeOut(); } } diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs index 61f6b08..c4df9b5 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs @@ -28,9 +28,10 @@ public class MutualSSLAuthenticationMqttConnectionInfo : IMqttConnectionInfo private readonly Uri _brokerUrl; private readonly MqttClientOptions _mqttConnectOptions; private readonly string _clientId = string.Empty; + private readonly TimeSpan _timeOut; public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRealm, - string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions) + string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions, TimeSpan timeOut) { _brokerUrl = brokerUrl; _mqttConnectOptions = new MqttClientOptionsBuilder() @@ -42,6 +43,7 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe .WithSessionExpiryInterval(0) .Build(); + _timeOut = timeOut; _clientId = $"{astarteRealm}/{deviceId}"; } @@ -49,6 +51,8 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe public string GetClientId() => _clientId; + public TimeSpan GetTimeOut() => _timeOut; + public MqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions; } } diff --git a/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs b/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs index 9d2017d..9360fad 100644 --- a/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs +++ b/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs @@ -62,6 +62,7 @@ public static AstarteDevice Instance new MockInterfaceProvider(), astarteMockData.PairingUrl, cryptoStoreDir, + TimeSpan.FromMilliseconds(500), true ); astarteDevice.SetAlwaysReconnect(true); diff --git a/AstarteDeviceSDKExample/Program.cs b/AstarteDeviceSDKExample/Program.cs index 3502a83..4729c0b 100644 --- a/AstarteDeviceSDKExample/Program.cs +++ b/AstarteDeviceSDKExample/Program.cs @@ -93,7 +93,8 @@ static async Task Main(string[] args) credentialsSecret, interfaceProvider, pairingUrl, - cryptoStoreDir); + cryptoStoreDir, + TimeSpan.FromMilliseconds(500)); myDevice.SetAlwaysReconnect(true); myDevice.SetAstarteMessageListener(new ExampleMessageListener());