Skip to content

Commit

Permalink
Update the RetryFailedMessages() method
Browse files Browse the repository at this point in the history
Update the RetryFailedMessages() method in the AstarteMqttV1Transport
class to accommodate the interface mapping type'retention': 'volatile'.
After reconnecting on the Astarte device, resend the message and remove
it from the cache memory.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Feb 7, 2024
1 parent 396d7a0 commit 0a492e1
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
36 changes: 36 additions & 0 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,41 @@ public void RejectFirst()

_astarteDbContext.SaveChanges();
}

public bool IsCashEmpty()
{
return !_astarteFailedMessageVolatile.Any();
}

public AstarteFailedMessageEntry? PeekFirstCash()
{
return _astarteFailedMessageVolatile
.OrderBy(x => x.Id)
.FirstOrDefault();
}

public void RejectFirstCash()
{
var failedMessages = _astarteFailedMessageVolatile
.OrderBy(x => x.Id)
.ToList();

if (failedMessages.Count() > 0)
{
_astarteDbContext.AstarteFailedMessages.Remove(failedMessages.First());
}
}

public void AckFirstCash()
{
var failedMessages = _astarteFailedMessageVolatile
.OrderBy(x => x.Id)
.ToList();

if (failedMessages.Count() > 0)
{
_astarteFailedMessageVolatile.Remove(failedMessages.First());
}
}
}
}
8 changes: 8 additions & 0 deletions AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ public interface IAstarteFailedMessageStorage

bool IsEmpty();

bool IsCashEmpty();

AstarteFailedMessageEntry? PeekFirst();

AstarteFailedMessageEntry? PeekFirstCash();

void AckFirst();

void AckFirstCash();

void RejectFirst();

void RejectFirstCash();
}
}
26 changes: 26 additions & 0 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ public override void RetryFailedMessages()
}
_failedMessageStorage.AckFirst();
}

while (!_failedMessageStorage.IsCashEmpty())
{
IAstarteFailedMessage? failedMessage = _failedMessageStorage.PeekFirstCash();
if (failedMessage is null)
{
return;
}

if (failedMessage.IsExpired())
{
// No need to send this anymore, drop it
_failedMessageStorage.RejectFirstCash();
continue;
}

try
{
Task.Run(async () => await DoSendMessage(failedMessage));
}
catch (MqttCommunicationException e)
{
throw new AstarteTransportException(e.Message);
}
_failedMessageStorage.AckFirstCash();
}
}

private async Task DoSendMessage(IAstarteFailedMessage failedMessage)
Expand Down

0 comments on commit 0a492e1

Please sign in to comment.