From 2db731a382f41c63ad5b0738cadba421e0ea64e7 Mon Sep 17 00:00:00 2001 From: nedimtokic Date: Thu, 6 Jun 2024 10:48:55 +0200 Subject: [PATCH] Resend failed messages in batches Resend stored messages in batches due to high memory consumption Signed-off-by: nedimtokic --- .../Data/AstarteFailedMessageStorage.cs | 71 ++++++---------- .../Data/IAstarteFailedMessageStorage.cs | 8 +- .../Device/AstarteDevice.cs | 1 + .../Transport/AstarteTransport.cs | 1 + .../Transport/MQTT/AstarteMqttTransport.cs | 29 +++---- .../Transport/MQTT/AstarteMqttV1Transport.cs | 82 ++++++++++++++++++- 6 files changed, 129 insertions(+), 63 deletions(-) diff --git a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs index 6d7fa20..a52fc4e 100644 --- a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs @@ -191,16 +191,9 @@ public bool IsExpired(long expire) return expire != 0 ? (DateTimeOffset.UtcNow.ToUnixTimeSeconds() > expire) : false; } - public async Task SaveQueuedMessagesAsync(IList messages) + public async Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message) { - if (messages.Count == 0) - { - return; - } - - ManagedMqttApplicationMessage message = messages.Last(); - MqttUserProperty? retention = null; if (message.ApplicationMessage.UserProperties is null) @@ -337,31 +330,28 @@ public async Task> LoadQueuedMessagesAsync( } - public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage) + public async Task DeleteByGuidAsync(Guid applicationMessageId) { try { - lock (_storeLock) + if (sqliteReadConnection.State != ConnectionState.Open) { - if (sqliteConnection.State != ConnectionState.Open) - { - sqliteConnection = new SqliteConnection(_dbConnectionString); - sqliteConnection.Open(); - } - - using (SqliteCommand cmd = new SqliteCommand( - "DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection)) - { - cmd.Parameters.AddWithValue("@guid", applicationMessage.Id); - cmd.ExecuteNonQueryAsync().Wait(); - } + sqliteReadConnection = new SqliteConnection(_dbConnectionString); + sqliteReadConnection.Open(); } + SqliteCommand cmd = new SqliteCommand( + "DELETE FROM AstarteFailedMessages WHERE guid = @guid ;", sqliteReadConnection); + + cmd.Parameters.AddWithValue("@guid", applicationMessageId); + await cmd.ExecuteNonQueryAsync(); + cmd.Dispose(); + if (_astarteFailedMessageVolatile is not null) { var messageVolatile = _astarteFailedMessageVolatile - .Where(x => x.Guid == applicationMessage.Id) + .Where(x => x.Guid == applicationMessageId) .FirstOrDefault(); if (messageVolatile is not null) { @@ -373,7 +363,6 @@ public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMes { Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}"); } - await Task.CompletedTask; } private async Task> GetAstarteFailedMessageStorage() @@ -387,29 +376,19 @@ private async Task> GetAstarteFailedMessageStor sqliteReadConnection.Open(); } - using (SqliteCommand cmd = new SqliteCommand("SELECT * FROM AstarteFailedMessages;", sqliteReadConnection)) - { - - var dt = new DataTable(); + string query = @"SELECT Qos, Payload, Topic, guid, absolute_expiry + FROM AstarteFailedMessages ORDER BY ID LIMIT 10000;"; + using SqliteCommand cmd = new SqliteCommand(query, sqliteReadConnection); + using SqliteDataReader dr = await cmd.ExecuteReaderAsync(); - using (SqliteDataReader dr = await cmd.ExecuteReaderAsync()) - { - dt.BeginLoadData(); - dt.Load(dr); - dt.EndLoadData(); - - foreach (DataRow row in dt.Rows) - { - AstarteFailedMessageEntry entry = new AstarteFailedMessageEntry( - Convert.ToInt32(row["Qos"]), - (byte[])row["Payload"], - row["Topic"].ToString()!, - Guid.Parse(row["guid"].ToString()!), - Convert.ToInt32(row["absolute_expiry"])); - - response.Add(entry); - } - } + while (await dr.ReadAsync()) + { + response.Add(new AstarteFailedMessageEntry( + dr.GetInt32(0), + (byte[])dr["Payload"], + dr.GetString(2), + dr.GetGuid(3), + dr.GetInt32(4))); } return response; } diff --git a/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs index 1ff3db6..725de3b 100644 --- a/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs @@ -22,7 +22,7 @@ namespace AstarteDeviceSDKCSharp.Data { - public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage + public interface IAstarteFailedMessageStorage { void InsertVolatile(String topic, byte[] payload, int qos, Guid guid); @@ -38,6 +38,10 @@ public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage bool IsExpired(long expire); - Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage); + Task DeleteByGuidAsync(Guid applicationMessage); + + Task> LoadQueuedMessagesAsync(); + Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message); + } } diff --git a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs index e1c1fa5..ba5dc00 100644 --- a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs +++ b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs @@ -385,6 +385,7 @@ public void OnTransportConnected() lock (this) { _astarteMessagelistener?.OnConnected(); + _astarteTransport?.StartResenderTask(); } } diff --git a/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs b/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs index 8160733..65b978c 100644 --- a/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs @@ -90,6 +90,7 @@ public void SetAstarteTransportEventListener( public abstract Task Connect(); public abstract Task Disconnect(); public abstract bool IsConnected(); + public abstract void StartResenderTask(); public void SetPropertyStorage(IAstartePropertyStorage propertyStorage) { diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs index 900cd73..8596764 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs @@ -38,6 +38,7 @@ public abstract class AstarteMqttTransport : AstarteTransport { protected IManagedMqttClient? _client; private readonly IMqttConnectionInfo _connectionInfo; + public bool _resendingInProgress = false; protected AstarteMqttTransport(AstarteProtocolType type, IMqttConnectionInfo connectionInfo) : base(type) @@ -65,14 +66,6 @@ private async Task InitClientAsync() private async Task CompleteAstarteConnection(bool IsSessionPresent) { - if (!IsSessionPresent || !_introspectionSent) - { - await SetupSubscriptions(); - await SendIntrospection(); - await SendEmptyCacheAsync(); - await ResendAllProperties(); - _introspectionSent = true; - } if (_client is not null) { @@ -82,10 +75,9 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent) return Task.CompletedTask; }; - _client.ConnectedAsync += e => + _client.ConnectedAsync += async e => { - OnConnectedAsync(e); - return Task.CompletedTask; + await OnConnectedAsync(e); }; _client.ConnectingFailedAsync += e => @@ -99,6 +91,15 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent) } + if (!IsSessionPresent || !_introspectionSent) + { + await SetupSubscriptions(); + await SendIntrospection(); + await SendEmptyCacheAsync(); + await ResendAllProperties(); + _introspectionSent = true; + } + } void OnConnectingFailAsync(ConnectingFailedEventArgs args) @@ -115,7 +116,7 @@ void OnConnectingFailAsync(ConnectingFailedEventArgs args) } - async void OnConnectedAsync(MqttClientConnectedEventArgs args) + async Task OnConnectedAsync(MqttClientConnectedEventArgs args) { if (!args.ConnectResult.IsSessionPresent) { @@ -139,7 +140,7 @@ async Task OnMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArg { if (_failedMessageStorage is not null) { - await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage); + await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage.Id); } } else @@ -168,7 +169,7 @@ public override async Task Connect() var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder() .WithClientOptions(_connectionInfo.GetMqttConnectOptions()) .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) - .WithStorage(_failedMessageStorage) + .WithMaxPendingMessages(10000) .WithPendingMessagesOverflowStrategy( MQTTnet.Server.MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) .Build(); diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index f15ca4c..454b3d5 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -26,6 +26,7 @@ using MQTTnet; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Protocol; +using System.Diagnostics; using System.Text; namespace AstarteDeviceSDKCSharp.Transport.MQTT @@ -92,7 +93,17 @@ private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOf if (_client is not null) { - await _client.EnqueueAsync(managedApplicationMessage); + if (_failedMessageStorage is not null) + { + await _failedMessageStorage.SaveQueuedMessageAsync(managedApplicationMessage); + } + + if (!_resendingInProgress || + mapping is null || + mapping.GetRetention() == AstarteInterfaceDatastreamMapping.MappingRetention.DISCARD) + { + await _client.EnqueueAsync(managedApplicationMessage); + } } } @@ -207,5 +218,74 @@ public override async Task ResendAllProperties() } } + public override void StartResenderTask() + { + + CancellationTokenSource _resenderCancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = _resenderCancellationTokenSource.Token; + + Task.Run(async () => + { + if (!_resendingInProgress) + { + await ResendFailedMessages(cancellationToken); + } + }); + } + + private async Task ResendFailedMessages(CancellationToken cancellationToken = default) + { + Trace.WriteLine("Resending stored messages in progress."); + _resendingInProgress = true; + + try + { + if (_client == null || _failedMessageStorage == null) + { + Trace.WriteLine("Client or failed message storage is null."); + _resendingInProgress = false; + return; + } + + await WaitForPendingMessagesToClear(_client); + + var storedMessages = await _failedMessageStorage.LoadQueuedMessagesAsync(); + if (storedMessages.Count == 0) + { + Trace.WriteLine("No more stored messages to resend."); + _resendingInProgress = false; + return; + } + + foreach (var message in storedMessages) + { + await _client.EnqueueAsync(message); + } + + await WaitForPendingMessagesToClear(_client); + await ResendFailedMessages(cancellationToken); + } + catch (OperationCanceledException) + { + Trace.WriteLine("Resending stored messages was canceled."); + } + finally + { + _resendingInProgress = false; + Trace.WriteLine("Resending stored messages finished."); + } + } + + private static async Task WaitForPendingMessagesToClear(IManagedMqttClient _client) + { + var timeout = _client.Options.ClientOptions.Timeout * 10000; + await Task.Run(() => SpinWait.SpinUntil(() => _client.PendingApplicationMessagesCount == 0, timeout)); + + if (_client.PendingApplicationMessagesCount > 0) + { + throw new AstarteTransportException("Timeout while resending stored messages."); + } + } + } }