Skip to content

Commit

Permalink
Merge pull request #74 from nedimtokic/stored-lazy
Browse files Browse the repository at this point in the history
Resend failed messages in batches
  • Loading branch information
harlem88 authored Jun 11, 2024
2 parents 055f834 + 2db731a commit af37605
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 63 deletions.
71 changes: 25 additions & 46 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,9 @@ public bool IsExpired(long expire)
return expire != 0 ? (DateTimeOffset.UtcNow.ToUnixTimeSeconds() > expire) : false;
}

public async Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> messages)
public async Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message)
{

if (messages.Count == 0)
{
return;
}

ManagedMqttApplicationMessage message = messages.Last();

MqttUserProperty? retention = null;

if (message.ApplicationMessage.UserProperties is null)
Expand Down Expand Up @@ -337,31 +330,28 @@ public async Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync(

}

public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage)
public async Task DeleteByGuidAsync(Guid applicationMessageId)
{

try
{
lock (_storeLock)
if (sqliteReadConnection.State != ConnectionState.Open)
{
if (sqliteConnection.State != ConnectionState.Open)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}

using (SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection))
{
cmd.Parameters.AddWithValue("@guid", applicationMessage.Id);
cmd.ExecuteNonQueryAsync().Wait();
}
sqliteReadConnection = new SqliteConnection(_dbConnectionString);
sqliteReadConnection.Open();
}

SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE guid = @guid ;", sqliteReadConnection);

cmd.Parameters.AddWithValue("@guid", applicationMessageId);
await cmd.ExecuteNonQueryAsync();
cmd.Dispose();

if (_astarteFailedMessageVolatile is not null)
{
var messageVolatile = _astarteFailedMessageVolatile
.Where(x => x.Guid == applicationMessage.Id)
.Where(x => x.Guid == applicationMessageId)
.FirstOrDefault();
if (messageVolatile is not null)
{
Expand All @@ -373,7 +363,6 @@ public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMes
{
Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}");
}
await Task.CompletedTask;
}

private async Task<IList<AstarteFailedMessageEntry>> GetAstarteFailedMessageStorage()
Expand All @@ -387,29 +376,19 @@ private async Task<IList<AstarteFailedMessageEntry>> GetAstarteFailedMessageStor
sqliteReadConnection.Open();
}

using (SqliteCommand cmd = new SqliteCommand("SELECT * FROM AstarteFailedMessages;", sqliteReadConnection))
{

var dt = new DataTable();
string query = @"SELECT Qos, Payload, Topic, guid, absolute_expiry
FROM AstarteFailedMessages ORDER BY ID LIMIT 10000;";
using SqliteCommand cmd = new SqliteCommand(query, sqliteReadConnection);
using SqliteDataReader dr = await cmd.ExecuteReaderAsync();

using (SqliteDataReader dr = await cmd.ExecuteReaderAsync())
{
dt.BeginLoadData();
dt.Load(dr);
dt.EndLoadData();

foreach (DataRow row in dt.Rows)
{
AstarteFailedMessageEntry entry = new AstarteFailedMessageEntry(
Convert.ToInt32(row["Qos"]),
(byte[])row["Payload"],
row["Topic"].ToString()!,
Guid.Parse(row["guid"].ToString()!),
Convert.ToInt32(row["absolute_expiry"]));

response.Add(entry);
}
}
while (await dr.ReadAsync())
{
response.Add(new AstarteFailedMessageEntry(
dr.GetInt32(0),
(byte[])dr["Payload"],
dr.GetString(2),
dr.GetGuid(3),
dr.GetInt32(4)));
}
return response;
}
Expand Down
8 changes: 6 additions & 2 deletions AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace AstarteDeviceSDKCSharp.Data
{
public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage
public interface IAstarteFailedMessageStorage
{
void InsertVolatile(String topic, byte[] payload, int qos, Guid guid);

Expand All @@ -38,6 +38,10 @@ public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage

bool IsExpired(long expire);

Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage);
Task DeleteByGuidAsync(Guid applicationMessage);

Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync();
Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message);

}
}
1 change: 1 addition & 0 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public void OnTransportConnected()
lock (this)
{
_astarteMessagelistener?.OnConnected();
_astarteTransport?.StartResenderTask();
}
}

Expand Down
1 change: 1 addition & 0 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void SetAstarteTransportEventListener(
public abstract Task Connect();
public abstract Task Disconnect();
public abstract bool IsConnected();
public abstract void StartResenderTask();

public void SetPropertyStorage(IAstartePropertyStorage propertyStorage)
{
Expand Down
29 changes: 15 additions & 14 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public abstract class AstarteMqttTransport : AstarteTransport
{
protected IManagedMqttClient? _client;
private readonly IMqttConnectionInfo _connectionInfo;
public bool _resendingInProgress = false;

protected AstarteMqttTransport(AstarteProtocolType type,
IMqttConnectionInfo connectionInfo) : base(type)
Expand Down Expand Up @@ -65,14 +66,6 @@ private async Task<IManagedMqttClient> InitClientAsync()

private async Task CompleteAstarteConnection(bool IsSessionPresent)
{
if (!IsSessionPresent || !_introspectionSent)
{
await SetupSubscriptions();
await SendIntrospection();
await SendEmptyCacheAsync();
await ResendAllProperties();
_introspectionSent = true;
}

if (_client is not null)
{
Expand All @@ -82,10 +75,9 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent)
return Task.CompletedTask;
};

_client.ConnectedAsync += e =>
_client.ConnectedAsync += async e =>
{
OnConnectedAsync(e);
return Task.CompletedTask;
await OnConnectedAsync(e);
};

_client.ConnectingFailedAsync += e =>
Expand All @@ -99,6 +91,15 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent)

}

if (!IsSessionPresent || !_introspectionSent)
{
await SetupSubscriptions();
await SendIntrospection();
await SendEmptyCacheAsync();
await ResendAllProperties();
_introspectionSent = true;
}

}

void OnConnectingFailAsync(ConnectingFailedEventArgs args)
Expand All @@ -115,7 +116,7 @@ void OnConnectingFailAsync(ConnectingFailedEventArgs args)

}

async void OnConnectedAsync(MqttClientConnectedEventArgs args)
async Task OnConnectedAsync(MqttClientConnectedEventArgs args)
{
if (!args.ConnectResult.IsSessionPresent)
{
Expand All @@ -139,7 +140,7 @@ async Task OnMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArg
{
if (_failedMessageStorage is not null)
{
await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage);
await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage.Id);
}
}
else
Expand Down Expand Up @@ -168,7 +169,7 @@ public override async Task Connect()
var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(_connectionInfo.GetMqttConnectOptions())
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithStorage(_failedMessageStorage)
.WithMaxPendingMessages(10000)
.WithPendingMessagesOverflowStrategy(
MQTTnet.Server.MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
.Build();
Expand Down
82 changes: 81 additions & 1 deletion AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using MQTTnet;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
using System.Diagnostics;
using System.Text;

namespace AstarteDeviceSDKCSharp.Transport.MQTT
Expand Down Expand Up @@ -92,7 +93,17 @@ private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOf

if (_client is not null)
{
await _client.EnqueueAsync(managedApplicationMessage);
if (_failedMessageStorage is not null)
{
await _failedMessageStorage.SaveQueuedMessageAsync(managedApplicationMessage);
}

if (!_resendingInProgress ||
mapping is null ||
mapping.GetRetention() == AstarteInterfaceDatastreamMapping.MappingRetention.DISCARD)
{
await _client.EnqueueAsync(managedApplicationMessage);
}
}

}
Expand Down Expand Up @@ -207,5 +218,74 @@ public override async Task ResendAllProperties()
}
}

public override void StartResenderTask()
{

CancellationTokenSource _resenderCancellationTokenSource = new CancellationTokenSource();
var cancellationToken = _resenderCancellationTokenSource.Token;

Task.Run(async () =>
{
if (!_resendingInProgress)
{
await ResendFailedMessages(cancellationToken);
}
});
}

private async Task ResendFailedMessages(CancellationToken cancellationToken = default)
{
Trace.WriteLine("Resending stored messages in progress.");
_resendingInProgress = true;

try
{
if (_client == null || _failedMessageStorage == null)
{
Trace.WriteLine("Client or failed message storage is null.");
_resendingInProgress = false;
return;
}

await WaitForPendingMessagesToClear(_client);

var storedMessages = await _failedMessageStorage.LoadQueuedMessagesAsync();
if (storedMessages.Count == 0)
{
Trace.WriteLine("No more stored messages to resend.");
_resendingInProgress = false;
return;
}

foreach (var message in storedMessages)
{
await _client.EnqueueAsync(message);
}

await WaitForPendingMessagesToClear(_client);
await ResendFailedMessages(cancellationToken);
}
catch (OperationCanceledException)
{
Trace.WriteLine("Resending stored messages was canceled.");
}
finally
{
_resendingInProgress = false;
Trace.WriteLine("Resending stored messages finished.");
}
}

private static async Task WaitForPendingMessagesToClear(IManagedMqttClient _client)
{
var timeout = _client.Options.ClientOptions.Timeout * 10000;
await Task.Run(() => SpinWait.SpinUntil(() => _client.PendingApplicationMessagesCount == 0, timeout));

if (_client.PendingApplicationMessagesCount > 0)
{
throw new AstarteTransportException("Timeout while resending stored messages.");
}
}

}
}

0 comments on commit af37605

Please sign in to comment.