Skip to content

Commit

Permalink
Implement fallout strategy failed individual message
Browse files Browse the repository at this point in the history
Develop a fallout strategy for individual datastream messages with
datastream interface mapping. Retention types include 'volatile,' stored
in cache memory, and 'stored,' in the local database. The default
retention value is 'discarded,' and the 'discard' message retention
type does not save.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Jan 31, 2024
1 parent e1e03f2 commit c54c910
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public AstarteMqttV1Transport(MutualSSLAuthenticationMqttConnectionInfo connecti
public override async Task SendIndividualValue(AstarteInterface astarteInterface,
string path, object? value, DateTime? timestamp)
{
AstarteInterfaceDatastreamMapping mapping;
AstarteInterfaceDatastreamMapping mapping = new();
int qos = 2;

if (astarteInterface.GetType() == (typeof(AstarteDeviceDatastreamInterface)))
Expand All @@ -67,7 +67,15 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface
string topic = _baseTopic + "/" + astarteInterface.InterfaceName + path;
byte[] payload = AstartePayload.Serialize(value, timestamp);

await DoSendMqttMessage(topic, payload, qos);
try
{
await DoSendMqttMessage(topic, payload, qos);
}
catch (MqttCommunicationException ex)
{
HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos);
}

}

private async Task DoSendMqttMessage(string topic, byte[] payload, int qos)
Expand All @@ -94,6 +102,7 @@ private async Task DoSendMqttMessage(string topic, byte[] payload, int qos)
catch (Exception)
{
_astarteTransportEventListener?.OnTransportDisconnected();
throw new MqttCommunicationException(topic);
}

}
Expand Down Expand Up @@ -156,7 +165,6 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast
}
catch (MqttCommunicationException e)
{

HandleDatastreamFailedPublish(e, mapping, topic, payload, qos);
}
}
Expand Down

0 comments on commit c54c910

Please sign in to comment.