Skip to content

Commit

Permalink
Resend failed messages in batches
Browse files Browse the repository at this point in the history
Resend stored messages in batches due to
high memory consumption

Signed-off-by: nedimtokic <nedim.tokic@secomind.com>
  • Loading branch information
nedimtokic committed Jun 11, 2024
1 parent 055f834 commit 2db731a
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 63 deletions.
71 changes: 25 additions & 46 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,9 @@ public bool IsExpired(long expire)
return expire != 0 ? (DateTimeOffset.UtcNow.ToUnixTimeSeconds() > expire) : false;
}

public async Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages)
public async Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message)
{

if (messages.Count == 0)
{
return;
}

ManagedMqttApplicationMessage message = messages.Last();

MqttUserProperty? retention = null;

if (message.ApplicationMessage.UserProperties is null)
Expand Down Expand Up @@ -337,31 +330,28 @@ public async Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync(

}

public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage)
public async Task DeleteByGuidAsync(Guid applicationMessageId)
{

try
{
lock (_storeLock)
if (sqliteReadConnection.State != ConnectionState.Open)
{
if (sqliteConnection.State != ConnectionState.Open)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}

using (SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection))
{
cmd.Parameters.AddWithValue("@guid", applicationMessage.Id);
cmd.ExecuteNonQueryAsync().Wait();
}
sqliteReadConnection = new SqliteConnection(_dbConnectionString);
sqliteReadConnection.Open();
}

SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE guid = @guid ;", sqliteReadConnection);

cmd.Parameters.AddWithValue("@guid", applicationMessageId);
await cmd.ExecuteNonQueryAsync();
cmd.Dispose();

if (_astarteFailedMessageVolatile is not null)
{
var messageVolatile = _astarteFailedMessageVolatile
.Where(x => x.Guid == applicationMessage.Id)
.Where(x => x.Guid == applicationMessageId)
.FirstOrDefault();
if (messageVolatile is not null)
{
Expand All @@ -373,7 +363,6 @@ public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMes
{
Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}");
}
await Task.CompletedTask;
}

private async Task<IList<AstarteFailedMessageEntry>> GetAstarteFailedMessageStorage()
Expand All @@ -387,29 +376,19 @@ private async Task<IList<AstarteFailedMessageEntry>> GetAstarteFailedMessageStor
sqliteReadConnection.Open();
}

using (SqliteCommand cmd = new SqliteCommand("SELECT * FROM AstarteFailedMessages;", sqliteReadConnection))
{

var dt = new DataTable();
string query = @"SELECT Qos, Payload, Topic, guid, absolute_expiry
FROM AstarteFailedMessages ORDER BY ID LIMIT 10000;";
using SqliteCommand cmd = new SqliteCommand(query, sqliteReadConnection);
using SqliteDataReader dr = await cmd.ExecuteReaderAsync();

using (SqliteDataReader dr = await cmd.ExecuteReaderAsync())
{
dt.BeginLoadData();
dt.Load(dr);
dt.EndLoadData();

foreach (DataRow row in dt.Rows)
{
AstarteFailedMessageEntry entry = new AstarteFailedMessageEntry(
Convert.ToInt32(row["Qos"]),
(byte[])row["Payload"],
row["Topic"].ToString()!,
Guid.Parse(row["guid"].ToString()!),
Convert.ToInt32(row["absolute_expiry"]));

response.Add(entry);
}
}
while (await dr.ReadAsync())
{
response.Add(new AstarteFailedMessageEntry(
dr.GetInt32(0),
(byte[])dr["Payload"],
dr.GetString(2),
dr.GetGuid(3),
dr.GetInt32(4)));
}
return response;
}
Expand Down
8 changes: 6 additions & 2 deletions AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace AstarteDeviceSDKCSharp.Data
{
public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage
public interface IAstarteFailedMessageStorage
{
void InsertVolatile(String topic, byte[] payload, int qos, Guid guid);

Expand All @@ -38,6 +38,10 @@ public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage

bool IsExpired(long expire);

Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage);
Task DeleteByGuidAsync(Guid applicationMessage);

Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync();
Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message);

}
}
1 change: 1 addition & 0 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public void OnTransportConnected()
lock (this)
{
_astarteMessagelistener?.OnConnected();
_astarteTransport?.StartResenderTask();
}
}

Expand Down
1 change: 1 addition & 0 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void SetAstarteTransportEventListener(
public abstract Task Connect();
public abstract Task Disconnect();
public abstract bool IsConnected();
public abstract void StartResenderTask();

public void SetPropertyStorage(IAstartePropertyStorage propertyStorage)
{
Expand Down
29 changes: 15 additions & 14 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public abstract class AstarteMqttTransport : AstarteTransport
{
protected IManagedMqttClient? _client;
private readonly IMqttConnectionInfo _connectionInfo;
public bool _resendingInProgress = false;

protected AstarteMqttTransport(AstarteProtocolType type,
IMqttConnectionInfo connectionInfo) : base(type)
Expand Down Expand Up @@ -65,14 +66,6 @@ private async Task<IManagedMqttClient> InitClientAsync()

private async Task CompleteAstarteConnection(bool IsSessionPresent)
{
if (!IsSessionPresent || !_introspectionSent)
{
await SetupSubscriptions();
await SendIntrospection();
await SendEmptyCacheAsync();
await ResendAllProperties();
_introspectionSent = true;
}

if (_client is not null)
{
Expand All @@ -82,10 +75,9 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent)
return Task.CompletedTask;
};

_client.ConnectedAsync += e =>
_client.ConnectedAsync += async e =>
{
OnConnectedAsync(e);
return Task.CompletedTask;
await OnConnectedAsync(e);
};

_client.ConnectingFailedAsync += e =>
Expand All @@ -99,6 +91,15 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent)

}

if (!IsSessionPresent || !_introspectionSent)
{
await SetupSubscriptions();
await SendIntrospection();
await SendEmptyCacheAsync();
await ResendAllProperties();
_introspectionSent = true;
}

}

void OnConnectingFailAsync(ConnectingFailedEventArgs args)
Expand All @@ -115,7 +116,7 @@ void OnConnectingFailAsync(ConnectingFailedEventArgs args)

}

async void OnConnectedAsync(MqttClientConnectedEventArgs args)
async Task OnConnectedAsync(MqttClientConnectedEventArgs args)
{
if (!args.ConnectResult.IsSessionPresent)
{
Expand All @@ -139,7 +140,7 @@ async Task OnMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArg
{
if (_failedMessageStorage is not null)
{
await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage);
await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage.Id);
}
}
else
Expand Down Expand Up @@ -168,7 +169,7 @@ public override async Task Connect()
var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(_connectionInfo.GetMqttConnectOptions())
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithStorage(_failedMessageStorage)
.WithMaxPendingMessages(10000)
.WithPendingMessagesOverflowStrategy(
MQTTnet.Server.MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
.Build();
Expand Down
82 changes: 81 additions & 1 deletion AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using MQTTnet;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using System.Diagnostics;
using System.Text;

namespace AstarteDeviceSDKCSharp.Transport.MQTT
Expand Down Expand Up @@ -92,7 +93,17 @@ private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOf

if (_client is not null)
{
await _client.EnqueueAsync(managedApplicationMessage);
if (_failedMessageStorage is not null)
{
await _failedMessageStorage.SaveQueuedMessageAsync(managedApplicationMessage);
}

if (!_resendingInProgress ||
mapping is null ||
mapping.GetRetention() == AstarteInterfaceDatastreamMapping.MappingRetention.DISCARD)
{
await _client.EnqueueAsync(managedApplicationMessage);
}
}

}
Expand Down Expand Up @@ -207,5 +218,74 @@ public override async Task ResendAllProperties()
}
}

public override void StartResenderTask()
{

CancellationTokenSource _resenderCancellationTokenSource = new CancellationTokenSource();
var cancellationToken = _resenderCancellationTokenSource.Token;

Task.Run(async () =>
{
if (!_resendingInProgress)
{
await ResendFailedMessages(cancellationToken);
}
});
}

private async Task ResendFailedMessages(CancellationToken cancellationToken = default)
{
Trace.WriteLine("Resending stored messages in progress.");
_resendingInProgress = true;

try
{
if (_client == null || _failedMessageStorage == null)
{
Trace.WriteLine("Client or failed message storage is null.");
_resendingInProgress = false;
return;
}

await WaitForPendingMessagesToClear(_client);

var storedMessages = await _failedMessageStorage.LoadQueuedMessagesAsync();
if (storedMessages.Count == 0)
{
Trace.WriteLine("No more stored messages to resend.");
_resendingInProgress = false;
return;
}

foreach (var message in storedMessages)
{
await _client.EnqueueAsync(message);
}

await WaitForPendingMessagesToClear(_client);
await ResendFailedMessages(cancellationToken);
}
catch (OperationCanceledException)
{
Trace.WriteLine("Resending stored messages was canceled.");
}
finally
{
_resendingInProgress = false;
Trace.WriteLine("Resending stored messages finished.");
}
}

private static async Task WaitForPendingMessagesToClear(IManagedMqttClient _client)
{
var timeout = _client.Options.ClientOptions.Timeout * 10000;
await Task.Run(() => SpinWait.SpinUntil(() => _client.PendingApplicationMessagesCount == 0, timeout));

if (_client.PendingApplicationMessagesCount > 0)
{
throw new AstarteTransportException("Timeout while resending stored messages.");
}
}

}
}

0 comments on commit 2db731a

Please sign in to comment.