From 000c987ab89bd454cedb12d42e4f7164ee189b68 Mon Sep 17 00:00:00 2001 From: Osman Hadzic Date: Thu, 18 Apr 2024 13:49:12 +0200 Subject: [PATCH] Implement MQTTNet extension ManagedClient Refactor `Connect` method in `AstarteMqttTransport`. Refactor fallout strategy to implement ManagedClient storage. Signed-off-by: Osman Hadzic --- .../AstarteDeviceSDKCSharp.csproj | 1 + .../Data/AstarteFailedMessageStorage.cs | 319 +++++++++++++----- .../Data/IAstarteFailedMessageStorage.cs | 30 +- .../Device/AstarteDevice.cs | 129 +------ .../IAstarteTransportEventListener.cs | 4 - .../Transport/AstarteTransport.cs | 3 +- .../Transport/MQTT/AstarteMqttTransport.cs | 96 +++--- .../Transport/MQTT/AstarteMqttV1Transport.cs | 216 +----------- 8 files changed, 328 insertions(+), 470 deletions(-) diff --git a/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj b/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj index 9074191..c3219d8 100644 --- a/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj +++ b/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj @@ -35,6 +35,7 @@ SPDX-License-Identifier: Apache-2.0 --> + diff --git a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs index 5db06aa..f78a9fa 100644 --- a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs @@ -18,170 +18,311 @@ * SPDX-License-Identifier: Apache-2.0 */ -using Microsoft.EntityFrameworkCore.Storage; +using System.Diagnostics; +using Microsoft.EntityFrameworkCore; +using MQTTnet; +using MQTTnet.Extensions.ManagedClient; +using MQTTnet.Packets; namespace AstarteDeviceSDKCSharp.Data { public class AstarteFailedMessageStorage : IAstarteFailedMessageStorage { private readonly AstarteDbContext _astarteDbContext; + private readonly AstarteDbContext _astarteDbContextDelete; + private readonly AstarteDbContext _astarteDbContextRead; private static List _astarteFailedMessageVolatile = new(); + readonly HashSet stored = new HashSet(); + public AstarteFailedMessageStorage(string persistencyDir) { this._astarteDbContext = new AstarteDbContext(persistencyDir); + this._astarteDbContextDelete = new AstarteDbContext(persistencyDir); + this._astarteDbContextRead = new AstarteDbContext(persistencyDir); } - public void Ack(AstarteFailedMessageEntry failedMessages) - { - if (failedMessages is not null) - { - Console.WriteLine($"The message has been removed from local database."); - Console.WriteLine($"{failedMessages.GetTopic()}" - + $" : {failedMessages.GetPayload()}"); - - _astarteDbContext.AstarteFailedMessages.Remove(failedMessages); - } - - _astarteDbContext.SaveChangesAsync(); - } - - public void InsertStored(string topic, byte[] payload, int qos) + public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid) { AstarteFailedMessageEntry failedMessageEntry - = new AstarteFailedMessageEntry(qos, payload, topic); + = new AstarteFailedMessageEntry(qos, payload, topic, guid); _astarteDbContext.AstarteFailedMessages.Add(failedMessageEntry); - Console.WriteLine($"Insert fallback message in database: " - + $"{topic} : {payload}"); + Trace.WriteLine($"Insert fallback message in database: " + + $"{topic} : {guid}"); - _astarteDbContext.SaveChangesAsync(); + await _astarteDbContext.SaveChangesAsync(); } - public void InsertStored(string topic, byte[] payload, int qos, int relativeExpiry) + public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid, int relativeExpiry) { - using (IDbContextTransaction transaction = _astarteDbContext.Database.BeginTransaction()) + try { - try - { - AstarteFailedMessageEntry failedMessageEntry - = new AstarteFailedMessageEntry(qos, payload, topic, relativeExpiry); - - _astarteDbContext.AstarteFailedMessages.Add(failedMessageEntry); + AstarteFailedMessageEntry failedMessageEntry + = new AstarteFailedMessageEntry(qos, payload, topic, guid, relativeExpiry); - Console.WriteLine($"Insert fallback message in database:" - + $"{topic} : {payload}," - + $" expiry time: {relativeExpiry}"); + _astarteDbContext.AstarteFailedMessages.Add(failedMessageEntry); - _astarteDbContext.SaveChangesAsync(); + Trace.WriteLine($"Insert fallback message in database:" + + $"{topic} : {guid}," + + $" expiry time: {relativeExpiry}"); - transaction.CommitAsync(); - } - catch (Exception ex) - { - transaction.RollbackAsync(); - Console.WriteLine($"Failed to insert fallback message in database. Error message: {ex.Message}"); - } + await _astarteDbContext.SaveChangesAsync(); + } + catch (Exception ex) + { + Trace.WriteLine($"Failed to insert fallback message in database. Error message: {ex.Message}"); } } - public void InsertVolatile(string topic, byte[] payload, int qos) + public void InsertVolatile(string topic, byte[] payload, int qos, Guid guid) { AstarteFailedMessageEntry failedMessageEntry - = new AstarteFailedMessageEntry(qos, payload, topic); + = new AstarteFailedMessageEntry(qos, payload, topic, guid); - Console.WriteLine($"Insert fallback message in cache memory: " - + $"{topic} : {payload}"); + Trace.WriteLine($"Insert fallback message in cache memory: " + + $"{topic} : {guid}"); _astarteFailedMessageVolatile.Add(failedMessageEntry); } - public void InsertVolatile(string topic, byte[] payload, int qos, int relativeExpiry) + public void InsertVolatile(string topic, byte[] payload, int qos, Guid guid, int relativeExpiry) { AstarteFailedMessageEntry failedMessageEntry - = new AstarteFailedMessageEntry(qos, payload, topic, relativeExpiry); + = new AstarteFailedMessageEntry(qos, payload, topic, guid, relativeExpiry); - Console.WriteLine($"Insert fallback message in cache memory: " - + $"{topic} : {payload}," + Trace.WriteLine($"Insert fallback message in cache memory: " + + $"{topic} : {guid}," + $" expiry time: {relativeExpiry}"); _astarteFailedMessageVolatile.Add(failedMessageEntry); } - public bool IsEmpty() - { - return !_astarteDbContext.AstarteFailedMessages.Any(); - } - - public AstarteFailedMessageEntry? PeekFirst() - { - return _astarteDbContext.AstarteFailedMessages - .OrderBy(x => x.Id) - .FirstOrDefault(); - } - - public void Reject(AstarteFailedMessageEntry failedMessages) + public async Task Reject(AstarteFailedMessageEntry failedMessages) { if (failedMessages is not null) { - Console.WriteLine($"Remove from local database " + Trace.WriteLine($"Remove from local database " + $"{failedMessages.GetTopic()} : " + $"{failedMessages.GetPayload()}"); _astarteDbContext.AstarteFailedMessages.Remove(failedMessages); } - _astarteDbContext.SaveChangesAsync(); + await _astarteDbContext.SaveChangesAsync(); } - public bool IsCacheEmpty() + public void RejectCache(AstarteFailedMessageEntry failedMessages) { - return !_astarteFailedMessageVolatile.Any(); + if (failedMessages is not null) + { + Trace.WriteLine($"Remove from cache memory " + + $"{failedMessages.GetTopic()} : " + + $"{failedMessages.GetGuid()}"); + _astarteFailedMessageVolatile.Remove(failedMessages); + } } - public AstarteFailedMessageEntry? PeekFirstCache() + public bool IsExpired(long expire) { - return _astarteFailedMessageVolatile - .OrderBy(x => x.Id) - .FirstOrDefault(); + return expire != 0 ? (DateTimeOffset.UtcNow.ToUnixTimeSeconds() > expire) : false; } - public void RejectFirstCache() + public async Task SaveQueuedMessagesAsync(IList messages) { - var failedMessages = _astarteFailedMessageVolatile - .OrderBy(x => x.Id) - .ToList(); - - if (failedMessages.Count() > 0) + MqttUserProperty? retention = null; + foreach (var message in messages) { - Console.WriteLine($"Remove from cache memory " - + $"{failedMessages.First().GetTopic()} : " - + $"{failedMessages.First().GetPayload()}"); - _astarteDbContext.AstarteFailedMessages.Remove(failedMessages.First()); + + if (message.ApplicationMessage.UserProperties is null) + { + continue; + } + + if (!stored.Contains(message)) + { + retention = message.ApplicationMessage.UserProperties.Where(x => x.Name == "Retention").FirstOrDefault(); + + if (retention == null || retention.Value == "DISCARD") + { + continue; + } + + if (retention.Value == "STORED") + { + if (message.ApplicationMessage.MessageExpiryInterval > 0) + { + await InsertStored + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id, + (int)message.ApplicationMessage.MessageExpiryInterval + ); + } + else + { + await InsertStored + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id + ); + } + } + else if (retention.Value == "VOLATILE") + { + + if (message.ApplicationMessage.MessageExpiryInterval > 0) + { + InsertVolatile + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id, + (int)message.ApplicationMessage.MessageExpiryInterval + ); + } + else + { + InsertVolatile + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id + ); + } + } + stored.Add(message); + } } } - public void AckFirstCache() + public async Task> LoadQueuedMessagesAsync() { - var failedMessages = _astarteFailedMessageVolatile - .OrderBy(x => x.Id) - .ToList(); + var result = new List(); + foreach (var failedMessage in await _astarteDbContext.AstarteFailedMessages.ToListAsync()) + { + if (IsExpired(failedMessage.GetExpiry())) + { + await Reject(failedMessage); + continue; + } - if (failedMessages.Count() > 0) + var item = new ManagedMqttApplicationMessage + { + ApplicationMessage + = new MqttApplicationMessage + { + ContentType = null, + CorrelationData = null, + Dup = false, + MessageExpiryInterval = 0, + Payload = failedMessage.GetPayload(), + QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)failedMessage.GetQos(), + ResponseTopic = null, + Retain = false, + SubscriptionIdentifiers = null, + Topic = failedMessage.GetTopic(), + TopicAlias = 0, + UserProperties = null + }, + Id = failedMessage.GetGuid(), + }; + + result.Add(item); + stored.Add(item); + } + + foreach (var messageVolatile in _astarteFailedMessageVolatile) { - Console.WriteLine($"The message has been removed from cache memory."); - Console.WriteLine($"{failedMessages.First().GetTopic()} :" - + "{failedMessages.First().GetPayload()}"); + if (IsExpired(messageVolatile.GetExpiry())) + { + RejectCache(messageVolatile); + continue; + } - _astarteFailedMessageVolatile.Remove(failedMessages.First()); + var item = new ManagedMqttApplicationMessage + { + ApplicationMessage + = new MqttApplicationMessage + { + ContentType = null, + CorrelationData = null, + Dup = false, + MessageExpiryInterval = 0, + Payload = messageVolatile.GetPayload(), + QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)messageVolatile.GetQos(), + ResponseTopic = null, + Retain = false, + SubscriptionIdentifiers = null, + Topic = messageVolatile.GetTopic(), + TopicAlias = 0, + UserProperties = null + }, + Id = messageVolatile.GetGuid(), + }; + + result.Add(item); + stored.Add(item); } + + return result; + } - public bool IsExpired(long expire) + public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage) { - return expire != 0 ? (DateTimeOffset.UtcNow.ToUnixTimeSeconds() > expire) : false; + if (stored.Contains(applicationMessage)) + { + stored.Remove(applicationMessage); + + try + { + var message = await GetAstarteFailedMessageStorage(applicationMessage.Id); + + if (message is not null) + { + _astarteDbContextDelete.AstarteFailedMessages.Remove(message); + } + + if (_astarteFailedMessageVolatile is not null) + { + var messageVolatile = _astarteFailedMessageVolatile + .Where(x => x.Guid == applicationMessage.Id) + .FirstOrDefault(); + if (messageVolatile is not null) + { + _astarteFailedMessageVolatile.Remove(messageVolatile); + } + + } + + await _astarteDbContextDelete.SaveChangesAsync(); + } + catch (Exception ex) + { + Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}"); + } + + } + + } + + private async Task GetAstarteFailedMessageStorage(Guid guid) + { + var response = await _astarteDbContextRead.AstarteFailedMessages + .Where(x => x.Guid.ToString() == guid.ToString().ToUpper()) + .AsNoTracking() + .FirstOrDefaultAsync(); + + return response is not null ? response : null; } } } diff --git a/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs index eede189..1ff3db6 100644 --- a/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs @@ -18,34 +18,26 @@ * SPDX-License-Identifier: Apache-2.0 */ +using MQTTnet.Extensions.ManagedClient; + namespace AstarteDeviceSDKCSharp.Data { - public interface IAstarteFailedMessageStorage + public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage { - void InsertVolatile(String topic, byte[] payload, int qos); - - void InsertVolatile(String topic, byte[] payload, int qos, int relativeExpiry); - - void InsertStored(String topic, byte[] payload, int qos); - - void InsertStored(String topic, byte[] payload, int qos, int relativeExpiry); + void InsertVolatile(String topic, byte[] payload, int qos, Guid guid); - bool IsEmpty(); + void InsertVolatile(String topic, byte[] payload, int qos, Guid guid, int relativeExpiry); - bool IsCacheEmpty(); + Task InsertStored(String topic, byte[] payload, int qos, Guid guid); - AstarteFailedMessageEntry? PeekFirst(); + Task InsertStored(String topic, byte[] payload, int qos, Guid guid, int relativeExpiry); - AstarteFailedMessageEntry? PeekFirstCache(); + Task Reject(AstarteFailedMessageEntry astarteFailedMessages); - void Ack(AstarteFailedMessageEntry failedMessages); - - void AckFirstCache(); - - void Reject(AstarteFailedMessageEntry astarteFailedMessages); - - void RejectFirstCache(); + void RejectCache(AstarteFailedMessageEntry astarteFailedMessages); bool IsExpired(long expire); + + Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage); } } diff --git a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs index 43bd2d3..a54eaa3 100644 --- a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs +++ b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs @@ -40,9 +40,6 @@ public class AstarteDevice : IAstarteTransportEventListener private bool _initialized = false; private const string _cryptoSubDir = "crypto"; private bool _alwaysReconnect = false; - private bool _explicitDisconnectionRequest; - private static int MIN_INCREMENT_INTERVAL = 5000; - private static int MAX_INCREMENT_INTERVAL = 60000; /// /// Basic class defining an Astarte device. @@ -163,47 +160,6 @@ private void ConfigureTransport(AstarteTransport astarteTransport) } - private bool EventualyReconnect() - { - if (_astarteTransport is null) - { - return false; - } - lock (this) - { - int x = 1; - int interval = 0; - while (_alwaysReconnect && !IsConnected()) - { - - if (interval < MAX_INCREMENT_INTERVAL) - { - interval = MIN_INCREMENT_INTERVAL * x; - x++; - } - else - { - interval = MAX_INCREMENT_INTERVAL; - } - - try - { - Task.Run(async () => await Connect()).Wait(interval); - } - catch (AggregateException ex) - { - foreach (var innerException in ex.InnerExceptions) - { - Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}"); - } - } - } - } - - _explicitDisconnectionRequest = false; - return false; - } - /// /// Method for getting a list of interfaces for the device /// @@ -222,6 +178,11 @@ public void SetAlwaysReconnect(bool alwaysReconnect) _alwaysReconnect = alwaysReconnect; } + public bool GetAlwaysReconnect() + { + return _alwaysReconnect; + } + /// /// Establishes a connection to the Astarte asynchronously. /// @@ -291,17 +252,16 @@ public bool IsConnected() /// /// Disconnect device from Astarte /// - public void Disconnect() + public async Task Disconnect() { - lock (this) + + if (!IsConnected() || _astarteTransport is null) { - if (!IsConnected()) - { - return; - } - _explicitDisconnectionRequest = true; - _astarteTransport?.Disconnect(); + return; } + + await _astarteTransport.Disconnect(); + } /// @@ -428,33 +388,6 @@ public void OnTransportConnected() } } - public void OnTransportConnectionInitializationError(Exception ex) - { - lock (this) - { - - _astarteMessagelistener?.OnFailure(new AstarteMessageException(ex.Message, ex)); - - new Thread(delegate () - { - try - { - Disconnect(); - - _astarteMessagelistener? - .OnDisconnected(new AstarteMessageException(ex.Message, ex)); - - } - catch (AstarteTransportException e) - { - Trace.WriteLine(e.Message); - } - EventualyReconnect(); - }).Start(); - - } - } - public void OnTransportConnectionError(Exception ex) { lock (this) @@ -468,38 +401,18 @@ public void OnTransportConnectionError(Exception ex) } catch (AstartePairingException e) { - - if (!EventualyReconnect()) - { - - _astarteMessagelistener? - .OnFailure(new AstarteMessageException(e.Message, e)); - Trace.WriteLine(e); - } - return; - } - - try - { - _astarteTransport?.Connect(); - } - catch (AstarteTransportException e) - { - _astarteMessagelistener? .OnFailure(new AstarteMessageException(e.Message, e)); + Trace.WriteLine(e); + return; } + } else { - if (!EventualyReconnect()) - { - - _astarteMessagelistener? - .OnFailure(new AstarteMessageException(ex.Message, ex)); - - } + _astarteMessagelistener? + .OnFailure(new AstarteMessageException(ex.Message, ex)); } } } @@ -508,16 +421,8 @@ public void OnTransportDisconnected() { lock (this) { - _astarteMessagelistener? .OnDisconnected(new AstarteMessageException("Connection lost")); - - if (_alwaysReconnect && !_explicitDisconnectionRequest) - { - EventualyReconnect(); - } - _explicitDisconnectionRequest = false; - } } diff --git a/AstarteDeviceSDKCSharp/IAstarteTransportEventListener.cs b/AstarteDeviceSDKCSharp/IAstarteTransportEventListener.cs index 3632e74..192fa98 100644 --- a/AstarteDeviceSDKCSharp/IAstarteTransportEventListener.cs +++ b/AstarteDeviceSDKCSharp/IAstarteTransportEventListener.cs @@ -23,11 +23,7 @@ namespace AstarteDeviceSDKCSharp.Transport public interface IAstarteTransportEventListener { public void OnTransportConnected(); - - public void OnTransportConnectionInitializationError(Exception ex); - public void OnTransportConnectionError(Exception ex); - public void OnTransportDisconnected(); } } diff --git a/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs b/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs index f7560a0..8160733 100644 --- a/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs @@ -88,9 +88,8 @@ public void SetAstarteTransportEventListener( } public abstract Task Connect(); - public abstract void Disconnect(); + public abstract Task Disconnect(); public abstract bool IsConnected(); - public abstract void RetryFailedMessages(); public void SetPropertyStorage(IAstartePropertyStorage propertyStorage) { diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs index 3c78368..b52b29f 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs @@ -27,16 +27,16 @@ using MQTTnet; using MQTTnet.Client; using MQTTnet.Exceptions; +using MQTTnet.Extensions.ManagedClient; using System.Diagnostics; using System.IO.Compression; -using System.Runtime.CompilerServices; using System.Text; namespace AstarteDeviceSDKCSharp.Transport.MQTT { public abstract class AstarteMqttTransport : AstarteTransport { - protected IMqttClient? _client; + protected IManagedMqttClient? _client; private readonly IMqttConnectionInfo _connectionInfo; protected AstarteMqttTransport(AstarteProtocolType type, @@ -45,13 +45,13 @@ protected AstarteMqttTransport(AstarteProtocolType type, _connectionInfo = connectionInfo; } - private async Task InitClientAsync() + private async Task InitClientAsync() { if (_client != null) { try { - await _client.DisconnectAsync(); + await _client.StopAsync(); } catch (MqttCommunicationException ex) { @@ -60,12 +60,12 @@ private async Task InitClientAsync() } MqttFactory mqttFactory = new(); - return mqttFactory.CreateMqttClient(); + return mqttFactory.CreateManagedMqttClient(); } - private async Task CompleteAstarteConnection(bool IsSessionPresent) + private async Task CompleteAstarteConnection() { - if (!IsSessionPresent || !_introspectionSent) + if (!_introspectionSent) { await SetupSubscriptions(); await SendIntrospection(); @@ -74,15 +74,6 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent) _introspectionSent = true; } - try - { - RetryFailedMessages(); - } - catch (AstarteTransportException e) - { - throw new AstarteTransportException("Message redelivery failed", e); - } - if (_astarteTransportEventListener != null) { _astarteTransportEventListener.OnTransportConnected(); @@ -101,11 +92,30 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent) }; _client.DisconnectedAsync += OnDisconnectAsync; + _client.ApplicationMessageProcessedAsync += OnMessageProcessedAsync; } } + async Task OnMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArgs) + { + + if (eventArgs.Exception is null) + { + if (_failedMessageStorage is not null) + { + await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage); + } + } + else + { + Trace.WriteLine(eventArgs.ApplicationMessage.Id + " " + eventArgs.Exception); + } + + await Task.CompletedTask; + } + public override async Task Connect() { @@ -121,39 +131,29 @@ public override async Task Connect() _client = await InitClientAsync(); } - try - { - using (var timeoutToken = new CancellationTokenSource(_connectionInfo.GetTimeOut())) - { - MqttClientConnectResult result = await _client.ConnectAsync( - _connectionInfo.GetMqttConnectOptions(), - timeoutToken.Token); + var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder() + .WithClientOptions(_connectionInfo.GetMqttConnectOptions()) + .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) + .WithStorage(_failedMessageStorage) + .WithPendingMessagesOverflowStrategy( + MQTTnet.Server.MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) + .Build(); - if (result.ResultCode == MqttClientConnectResultCode.Success) - { - await CompleteAstarteConnection(result.IsSessionPresent); - } - else - { - throw new AstarteTransportException - ($"Error connecting to MQTT. Code: {result.ResultCode}"); - } - } - } - catch (OperationCanceledException) + if (!_client.IsStarted) { - Trace.WriteLine("Timeout while connecting."); + await _client.StartAsync(managedMqttClientOptions); + await CompleteAstarteConnection(); } } - public override void Disconnect() + public override async Task Disconnect() { if (_client != null) { if (_client.IsConnected) { - _client.DisconnectAsync(); + await _client.StopAsync(); } } } @@ -216,18 +216,23 @@ private async Task SendEmptyCacheAsync() return; } - MqttClientPublishResult result = await _client.PublishAsync(applicationMessage); - if (result.ReasonCode != MqttClientPublishReasonCode.Success) - { - throw new AstarteTransportException($"Error publishing on MQTT. Code: " + - "{" + result.ReasonCode + "}"); - } + await _client.EnqueueAsync(applicationMessage); } - Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e) + async Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e) { + if (Device is not null && _client is not null) + { + if (!Device.GetAlwaysReconnect()) + { + await _client.StopAsync(); + } + + } + if (_astarteTransportEventListener != null) { + Trace.WriteLine("The Connection was lost."); _astarteTransportEventListener.OnTransportDisconnected(); } else @@ -235,7 +240,6 @@ Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e) Trace.WriteLine("The Connection was lost."); } - return Task.CompletedTask; } private void OnMessageReceive(MqttApplicationMessageReceivedEventArgs e) diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index 3bbffdf..f15ca4c 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -19,17 +19,14 @@ */ using AstarteDeviceSDK.Protocol; -using AstarteDeviceSDKCSharp.Data; using AstarteDeviceSDKCSharp.Device; using AstarteDeviceSDKCSharp.Protocol; using AstarteDeviceSDKCSharp.Protocol.AstarteException; using AstarteDeviceSDKCSharp.Utilities; using MQTTnet; -using MQTTnet.Client; -using MQTTnet.Exceptions; +using MQTTnet.Extensions.ManagedClient; using MQTTnet.Protocol; using System.Text; -using static AstarteDeviceSDKCSharp.Protocol.AstarteInterfaceDatastreamMapping; namespace AstarteDeviceSDKCSharp.Transport.MQTT { @@ -67,61 +64,35 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface string topic = _baseTopic + "/" + astarteInterface.InterfaceName + path; byte[] payload = AstartePayload.Serialize(value, timestamp); - try - { - if (_client.TryPingAsync().Result) - { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); - } - else - { - HandleDatastreamFailedPublish( - new MqttCommunicationException("Broker is not available."), - mapping, topic, payload, qos); - } - - } - catch (MqttCommunicationException ex) - { - HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos); - _astarteTransportEventListener?.OnTransportDisconnected(); - } - catch (AstarteTransportException ex) - { - HandleDatastreamFailedPublish(new MqttCommunicationException(ex), - mapping, topic, payload, qos); - _astarteTransportEventListener?.OnTransportDisconnected(); - } + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos, mapping); } - private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qos) + private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qos, + AstarteInterfaceDatastreamMapping? mapping = null) { var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) - .WithRetainFlag(false) - .Build(); + .WithRetainFlag(false); - try + if (mapping is not null) { - if (_client is not null) - { - MqttClientPublishResult result = await _client.PublishAsync(applicationMessage); - - if (result.ReasonCode != MqttClientPublishReasonCode.Success) - { - throw new AstarteTransportException - ($"Error publishing on MQTT. Code: {result.ReasonCode}"); - } + applicationMessage + .WithMessageExpiryInterval((uint)mapping.GetExpiry()) + .WithUserProperty("Retention", mapping.GetRetention().ToString()); + } - } + var managedApplicationMessage = new ManagedMqttApplicationMessage + { + ApplicationMessage = applicationMessage.Build(), + Id = Guid.NewGuid() + }; - } - catch (Exception) + if (_client is not null) { - throw new MqttCommunicationException(topic); + await _client.EnqueueAsync(managedApplicationMessage); } } @@ -178,103 +149,9 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast string topic = _baseTopic + "/" + astarteInterface.InterfaceName + path; byte[] payload = AstartePayload.Serialize(value, timeStamp); - try - { - if (_client.TryPingAsync().Result) - { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); - } - else - { - HandleDatastreamFailedPublish( - new MqttCommunicationException("Broker is not available."), - mapping, topic, payload, qos); - } - - } - catch (MqttCommunicationException ex) - { - HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos); - _astarteTransportEventListener?.OnTransportDisconnected(); - } - catch (AstarteTransportException ex) - { - HandleDatastreamFailedPublish(new MqttCommunicationException(ex), - mapping, topic, payload, qos); - _astarteTransportEventListener?.OnTransportDisconnected(); - } - } - - public override void RetryFailedMessages() - { - if (_failedMessageStorage is null) - { - return; - } - - while (!_failedMessageStorage.IsEmpty()) - { - AstarteFailedMessageEntry? failedMessage = _failedMessageStorage.PeekFirst(); - if (failedMessage is null) - { - return; - } - - if (_failedMessageStorage.IsExpired(failedMessage.GetExpiry())) - { - // No need to send this anymore, drop it - _failedMessageStorage.Reject(failedMessage); - continue; - } - - try - { - Console.WriteLine($"Resending fallback message from local database: " + - $"{failedMessage.GetTopic()} : {failedMessage.GetPayload()}"); - - Task.Run(async () => await DoSendMessage(failedMessage)); - } - catch (MqttCommunicationException e) - { - throw new AstarteTransportException(e.Message); - } - _failedMessageStorage.Ack(failedMessage); - } - - while (!_failedMessageStorage.IsCacheEmpty()) - { - IAstarteFailedMessage? failedMessage = _failedMessageStorage.PeekFirstCache(); - if (failedMessage is null) - { - return; - } - - if (_failedMessageStorage.IsExpired(failedMessage.GetExpiry())) - { - // No need to send this anymore, drop it - _failedMessageStorage.RejectFirstCache(); - continue; - } - - try - { - Console.WriteLine($"Resending fallback message from cache memory: " + - $"{failedMessage.GetTopic()} : {failedMessage.GetPayload()}"); - Task.Run(async () => await DoSendMessage(failedMessage)); - } - catch (MqttCommunicationException e) - { - throw new AstarteTransportException(e.Message); - } - _failedMessageStorage.AckFirstCache(); - } - } + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos, mapping); - private async Task DoSendMessage(IAstarteFailedMessage failedMessage) - { - await DoSendMqttMessage(failedMessage.GetTopic(), failedMessage.GetPayload(), - (MqttQualityOfServiceLevel)failedMessage.GetQos()); } private int QosFromReliability(AstarteInterfaceDatastreamMapping mapping) @@ -329,63 +206,6 @@ public override async Task ResendAllProperties() } } } - private void HandlePropertiesFailedPublish(MqttCommunicationException e, string topic, - byte[] payload, int qos) - { - if (_failedMessageStorage is null) - { - return; - } - - // Properties are always stored and never expire - _failedMessageStorage.InsertStored(topic, payload, qos); - } - - private async void DoSendMqttMessage(IAstarteFailedMessage failedMessage) - { - await DoSendMqttMessage(failedMessage.GetTopic(), - failedMessage.GetPayload(), - (MqttQualityOfServiceLevel)failedMessage.GetQos()); - } - - private void HandleDatastreamFailedPublish(MqttCommunicationException e, - AstarteInterfaceDatastreamMapping mapping, string topic, byte[] payload, int qos) - { - int expiry = mapping.GetExpiry(); - switch (mapping.GetRetention()) - { - case MappingRetention.DISCARD: - throw new AstarteTransportException("Cannot send value", e); - - case MappingRetention.VOLATILE: - { - if (expiry > 0) - { - _failedMessageStorage?.InsertVolatile(topic, payload, qos, expiry); - } - else - { - _failedMessageStorage?.InsertVolatile(topic, payload, qos); - } - break; - } - - case MappingRetention.STORED: - { - if (expiry > 0) - { - _failedMessageStorage?.InsertStored(topic, payload, qos, expiry); - } - else - { - _failedMessageStorage?.InsertStored(topic, payload, qos); - } - break; - } - default: - throw new AstarteTransportException("Invalid retention value", e); - } - } } }