diff --git a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs index a167368..1c11fdc 100644 --- a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs @@ -106,5 +106,41 @@ public void RejectFirst() _astarteDbContext.SaveChanges(); } + + public bool IsCacheEmpty() + { + return !_astarteFailedMessageVolatile.Any(); + } + + public AstarteFailedMessageEntry? PeekFirstCache() + { + return _astarteFailedMessageVolatile + .OrderBy(x => x.Id) + .FirstOrDefault(); + } + + public void RejectFirstCache() + { + var failedMessages = _astarteFailedMessageVolatile + .OrderBy(x => x.Id) + .ToList(); + + if (failedMessages.Count() > 0) + { + _astarteDbContext.AstarteFailedMessages.Remove(failedMessages.First()); + } + } + + public void AckFirstCache() + { + var failedMessages = _astarteFailedMessageVolatile + .OrderBy(x => x.Id) + .ToList(); + + if (failedMessages.Count() > 0) + { + _astarteFailedMessageVolatile.Remove(failedMessages.First()); + } + } } } diff --git a/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs index 7be777f..c34f094 100644 --- a/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs @@ -32,10 +32,18 @@ public interface IAstarteFailedMessageStorage bool IsEmpty(); + bool IsCacheEmpty(); + AstarteFailedMessageEntry? PeekFirst(); + AstarteFailedMessageEntry? PeekFirstCache(); + void AckFirst(); + void AckFirstCache(); + void RejectFirst(); + + void RejectFirstCache(); } } diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index 6d0c41f..c8bd8ee 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -201,6 +201,32 @@ public override void RetryFailedMessages() } _failedMessageStorage.AckFirst(); } + + while (!_failedMessageStorage.IsCacheEmpty()) + { + IAstarteFailedMessage? failedMessage = _failedMessageStorage.PeekFirstCache(); + if (failedMessage is null) + { + return; + } + + if (failedMessage.IsExpired()) + { + // No need to send this anymore, drop it + _failedMessageStorage.RejectFirstCache(); + continue; + } + + try + { + Task.Run(async () => await DoSendMessage(failedMessage)); + } + catch (MqttCommunicationException e) + { + throw new AstarteTransportException(e.Message); + } + _failedMessageStorage.AckFirstCache(); + } } private async Task DoSendMessage(IAstarteFailedMessage failedMessage)