Skip to content

Commit

Permalink
Update the RetryFailedMessages() method
Browse files Browse the repository at this point in the history
Update the RetryFailedMessages() method in the AstarteMqttV1Transport
class to accommodate the interface mapping type'retention': 'volatile'.
After reconnecting on the Astarte device, resend the message and remove
it from the cache memory.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Feb 7, 2024
1 parent 396d7a0 commit ef57dcf
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
36 changes: 36 additions & 0 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,41 @@ public void RejectFirst()

_astarteDbContext.SaveChanges();
}

public bool IsCacheEmpty()
{
return !_astarteFailedMessageVolatile.Any();
}

public AstarteFailedMessageEntry? PeekFirstCache()
{
return _astarteFailedMessageVolatile
.OrderBy(x => x.Id)
.FirstOrDefault();
}

public void RejectFirstCache()
{
var failedMessages = _astarteFailedMessageVolatile
.OrderBy(x => x.Id)
.ToList();

if (failedMessages.Count() > 0)
{
_astarteDbContext.AstarteFailedMessages.Remove(failedMessages.First());
}
}

public void AckFirstCache()
{
var failedMessages = _astarteFailedMessageVolatile
.OrderBy(x => x.Id)
.ToList();

if (failedMessages.Count() > 0)
{
_astarteFailedMessageVolatile.Remove(failedMessages.First());
}
}
}
}
8 changes: 8 additions & 0 deletions AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ public interface IAstarteFailedMessageStorage

bool IsEmpty();

bool IsCacheEmpty();

AstarteFailedMessageEntry? PeekFirst();

AstarteFailedMessageEntry? PeekFirstCache();

void AckFirst();

void AckFirstCache();

void RejectFirst();

void RejectFirstCache();
}
}
26 changes: 26 additions & 0 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ public override void RetryFailedMessages()
}
_failedMessageStorage.AckFirst();
}

while (!_failedMessageStorage.IsCacheEmpty())
{
IAstarteFailedMessage? failedMessage = _failedMessageStorage.PeekFirstCache();
if (failedMessage is null)
{
return;
}

if (failedMessage.IsExpired())
{
// No need to send this anymore, drop it
_failedMessageStorage.RejectFirstCache();
continue;
}

try
{
Task.Run(async () => await DoSendMessage(failedMessage));
}
catch (MqttCommunicationException e)
{
throw new AstarteTransportException(e.Message);
}
_failedMessageStorage.AckFirstCache();
}
}

private async Task DoSendMessage(IAstarteFailedMessage failedMessage)
Expand Down

0 comments on commit ef57dcf

Please sign in to comment.