diff --git a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageEntry.cs b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageEntry.cs index 95aa469..241086a 100644 --- a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageEntry.cs +++ b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageEntry.cs @@ -24,7 +24,7 @@ namespace AstarteDeviceSDKCSharp.Data { - [Index(nameof(Guid), Name = "Index_Guid")] + [Index(nameof(Guid), Name = "Index_Guid", IsUnique = true)] public class AstarteFailedMessageEntry : IAstarteFailedMessage { [Key] diff --git a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs index 68495a2..6d7fa20 100644 --- a/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs +++ b/AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs @@ -35,7 +35,6 @@ public class AstarteFailedMessageStorage : IAstarteFailedMessageStorage SqliteConnection sqliteConnection = new SqliteConnection(); SqliteConnection sqliteReadConnection = new SqliteConnection(); private static List _astarteFailedMessageVolatile = new(); - readonly HashSet stored = new HashSet(); public AstarteFailedMessageStorage(string persistencyDir) { @@ -54,25 +53,27 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid) { try { - - if (sqliteConnection.State != ConnectionState.Open) + lock (_storeLock) { - sqliteConnection = new SqliteConnection(_dbConnectionString); - sqliteConnection.Open(); - } + if (sqliteConnection.State != ConnectionState.Open) + { + sqliteConnection = new SqliteConnection(_dbConnectionString); + sqliteConnection.Open(); + } - using (SqliteCommand insertCommand = new SqliteCommand()) - { - insertCommand.Connection = sqliteConnection; - insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " + - "VALUES (@qos,@payload,@topic,@absolute_expiry,@guid);"; - insertCommand.Parameters.AddWithValue("@qos", qos); - insertCommand.Parameters.AddWithValue("@payload", payload); - insertCommand.Parameters.AddWithValue("@topic", topic); - insertCommand.Parameters.AddWithValue("@absolute_expiry", 0); - insertCommand.Parameters.AddWithValue("@guid", guid); - - await insertCommand.ExecuteNonQueryAsync(); + using (SqliteCommand insertCommand = new SqliteCommand()) + { + insertCommand.Connection = sqliteConnection; + insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " + + "VALUES (@qos,@payload,@topic,@absolute_expiry,@guid) ON CONFLICT DO NOTHING;"; + insertCommand.Parameters.AddWithValue("@qos", qos); + insertCommand.Parameters.AddWithValue("@payload", payload); + insertCommand.Parameters.AddWithValue("@topic", topic); + insertCommand.Parameters.AddWithValue("@absolute_expiry", 0); + insertCommand.Parameters.AddWithValue("@guid", guid); + + insertCommand.ExecuteNonQueryAsync().Wait(); + } } Trace.WriteLine($"Insert fallback message in database:" @@ -83,31 +84,34 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid) { Trace.WriteLine($"Failed to insert fallback message in database. Error message: {ex.Message}"); } + await Task.CompletedTask; } public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid, int relativeExpiry) { try { - - if (sqliteConnection.State != ConnectionState.Open) + lock (_storeLock) { - sqliteConnection = new SqliteConnection(_dbConnectionString); - sqliteConnection.Open(); - } + if (sqliteConnection.State != ConnectionState.Open) + { + sqliteConnection = new SqliteConnection(_dbConnectionString); + sqliteConnection.Open(); + } - using (SqliteCommand insertCommand = new SqliteCommand()) - { - insertCommand.Connection = sqliteConnection; - insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " + - "VALUES (@qos,@payload,@topic,@absolute_expiry,@guid);"; - insertCommand.Parameters.AddWithValue("@qos", qos); - insertCommand.Parameters.AddWithValue("@payload", payload); - insertCommand.Parameters.AddWithValue("@topic", topic); - insertCommand.Parameters.AddWithValue("@absolute_expiry", relativeExpiry); - insertCommand.Parameters.AddWithValue("@guid", guid); - - await insertCommand.ExecuteNonQueryAsync(); + using (SqliteCommand insertCommand = new SqliteCommand()) + { + insertCommand.Connection = sqliteConnection; + insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " + + "VALUES (@qos,@payload,@topic,@absolute_expiry,@guid) ON CONFLICT DO NOTHING;"; + insertCommand.Parameters.AddWithValue("@qos", qos); + insertCommand.Parameters.AddWithValue("@payload", payload); + insertCommand.Parameters.AddWithValue("@topic", topic); + insertCommand.Parameters.AddWithValue("@absolute_expiry", relativeExpiry); + insertCommand.Parameters.AddWithValue("@guid", guid); + + insertCommand.ExecuteNonQueryAsync().Wait(); + } } Trace.WriteLine($"Insert fallback message in database:" @@ -119,6 +123,7 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid, { Trace.WriteLine($"Failed to insert fallback message in database. Error message: {ex.Message}"); } + await Task.CompletedTask; } public void InsertVolatile(string topic, byte[] payload, int qos, Guid guid) @@ -194,81 +199,71 @@ public async Task SaveQueuedMessagesAsync(IList m return; } - lock (_storeLock) - { - var newMessages = messages - .ExceptBy(stored.Select(x => x.Id), y => y.Id).ToList(); + ManagedMqttApplicationMessage message = messages.Last(); - MqttUserProperty? retention = null; - foreach (var message in newMessages) - { + MqttUserProperty? retention = null; - if (message.ApplicationMessage.UserProperties is null) - { - continue; - } + if (message.ApplicationMessage.UserProperties is null) + { + return; + } - retention = message.ApplicationMessage.UserProperties.Where(x => x.Name == "Retention").FirstOrDefault(); + retention = message.ApplicationMessage.UserProperties.Where(x => x.Name == "Retention").FirstOrDefault(); - if (retention == null || retention.Value == "DISCARD") - { - continue; - } - - if (retention.Value == "STORED") - { - if (message.ApplicationMessage.MessageExpiryInterval > 0) - { - InsertStored - ( - message.ApplicationMessage.Topic, - message.ApplicationMessage.Payload, - (int)message.ApplicationMessage.QualityOfServiceLevel, - message.Id, - (int)message.ApplicationMessage.MessageExpiryInterval - ).Wait(); - } - else - { - InsertStored - ( - message.ApplicationMessage.Topic, - message.ApplicationMessage.Payload, - (int)message.ApplicationMessage.QualityOfServiceLevel, - message.Id - ).Wait(); - } - } - else if (retention.Value == "VOLATILE") - { + if (retention == null || retention.Value == "DISCARD") + { + return; + } - if (message.ApplicationMessage.MessageExpiryInterval > 0) - { - InsertVolatile - ( - message.ApplicationMessage.Topic, - message.ApplicationMessage.Payload, - (int)message.ApplicationMessage.QualityOfServiceLevel, - message.Id, - (int)message.ApplicationMessage.MessageExpiryInterval - ); - } - else - { - InsertVolatile - ( - message.ApplicationMessage.Topic, - message.ApplicationMessage.Payload, - (int)message.ApplicationMessage.QualityOfServiceLevel, - message.Id - ); - } - } - stored.Add(message); + if (retention.Value == "STORED") + { + if (message.ApplicationMessage.MessageExpiryInterval > 0) + { + await InsertStored + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id, + (int)message.ApplicationMessage.MessageExpiryInterval + ); + } + else + { + await InsertStored + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id + ); + } + } + else if (retention.Value == "VOLATILE") + { + if (message.ApplicationMessage.MessageExpiryInterval > 0) + { + InsertVolatile + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id, + (int)message.ApplicationMessage.MessageExpiryInterval + ); + } + else + { + InsertVolatile + ( + message.ApplicationMessage.Topic, + message.ApplicationMessage.Payload, + (int)message.ApplicationMessage.QualityOfServiceLevel, + message.Id + ); } } - await Task.CompletedTask; } public async Task> LoadQueuedMessagesAsync() @@ -304,7 +299,6 @@ public async Task> LoadQueuedMessagesAsync( }; result.Add(item); - stored.Add(item); } foreach (var messageVolatile in _astarteFailedMessageVolatile) @@ -337,7 +331,6 @@ public async Task> LoadQueuedMessagesAsync( }; result.Add(item); - stored.Add(item); } return result; @@ -347,43 +340,39 @@ public async Task> LoadQueuedMessagesAsync( public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage) { - lock (_storeLock) + try { - if (stored.Contains(applicationMessage)) + lock (_storeLock) { - stored.Remove(applicationMessage); + if (sqliteConnection.State != ConnectionState.Open) + { + sqliteConnection = new SqliteConnection(_dbConnectionString); + sqliteConnection.Open(); + } - try + using (SqliteCommand cmd = new SqliteCommand( + "DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection)) { - if (sqliteConnection.State != ConnectionState.Open) - { - sqliteConnection = new SqliteConnection(_dbConnectionString); - sqliteConnection.Open(); - } - using (SqliteCommand cmd = new SqliteCommand( - "DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection)) - { - cmd.Parameters.AddWithValue("@guid", applicationMessage.Id); - cmd.ExecuteNonQueryAsync(); - } - - if (_astarteFailedMessageVolatile is not null) - { - var messageVolatile = _astarteFailedMessageVolatile - .Where(x => x.Guid == applicationMessage.Id) - .FirstOrDefault(); - if (messageVolatile is not null) - { - _astarteFailedMessageVolatile.Remove(messageVolatile); - } - } + cmd.Parameters.AddWithValue("@guid", applicationMessage.Id); + cmd.ExecuteNonQueryAsync().Wait(); } - catch (Exception ex) + } + + if (_astarteFailedMessageVolatile is not null) + { + var messageVolatile = _astarteFailedMessageVolatile + .Where(x => x.Guid == applicationMessage.Id) + .FirstOrDefault(); + if (messageVolatile is not null) { - Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}"); + _astarteFailedMessageVolatile.Remove(messageVolatile); } } } + catch (Exception ex) + { + Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}"); + } await Task.CompletedTask; } diff --git a/AstarteDeviceSDKCSharp/Migrations/20240517064719_AddUniqueGuid.Designer.cs b/AstarteDeviceSDKCSharp/Migrations/20240517064719_AddUniqueGuid.Designer.cs new file mode 100644 index 0000000..702dcaa --- /dev/null +++ b/AstarteDeviceSDKCSharp/Migrations/20240517064719_AddUniqueGuid.Designer.cs @@ -0,0 +1,83 @@ +// +using System; +using AstarteDeviceSDKCSharp.Data; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +#nullable disable + +namespace AstarteDeviceSDKCSharp.Migrations +{ + [DbContext(typeof(AstarteDbContext))] + [Migration("20240517064719_AddUniqueGuid")] + partial class AddUniqueGuid + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder.HasAnnotation("ProductVersion", "6.0.13"); + + modelBuilder.Entity("AstarteDeviceSDKCSharp.Data.AstarteFailedMessageEntry", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("INTEGER"); + + b.Property("AbsoluteExpiry") + .HasColumnType("INTEGER") + .HasColumnName("absolute_expiry"); + + b.Property("Guid") + .HasColumnType("TEXT") + .HasColumnName("guid"); + + b.Property("Payload") + .IsRequired() + .HasColumnType("BLOB"); + + b.Property("Qos") + .HasColumnType("INTEGER"); + + b.Property("Topic") + .IsRequired() + .HasColumnType("TEXT"); + + b.HasKey("Id"); + + b.HasIndex(new[] { "Guid" }, "Index_Guid") + .IsUnique(); + + b.ToTable("AstarteFailedMessages"); + }); + + modelBuilder.Entity("AstarteDeviceSDKCSharp.Data.AstarteGenericPropertyEntry", b => + { + b.Property("Id") + .HasColumnType("TEXT"); + + b.Property("BsonValue") + .IsRequired() + .HasColumnType("BLOB"); + + b.Property("InterfaceMajor") + .HasColumnType("INTEGER"); + + b.Property("InterfaceName") + .IsRequired() + .HasColumnType("TEXT") + .HasColumnName("INTERFACE_FIELD_NAME"); + + b.Property("Path") + .IsRequired() + .HasColumnType("TEXT"); + + b.HasKey("Id"); + + b.ToTable("AstarteGenericProperties"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/AstarteDeviceSDKCSharp/Migrations/20240517064719_AddUniqueGuid.cs b/AstarteDeviceSDKCSharp/Migrations/20240517064719_AddUniqueGuid.cs new file mode 100644 index 0000000..310c420 --- /dev/null +++ b/AstarteDeviceSDKCSharp/Migrations/20240517064719_AddUniqueGuid.cs @@ -0,0 +1,34 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace AstarteDeviceSDKCSharp.Migrations +{ + public partial class AddUniqueGuid : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "Index_Guid", + table: "AstarteFailedMessages"); + + migrationBuilder.CreateIndex( + name: "Index_Guid", + table: "AstarteFailedMessages", + column: "guid", + unique: true); + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropIndex( + name: "Index_Guid", + table: "AstarteFailedMessages"); + + migrationBuilder.CreateIndex( + name: "Index_Guid", + table: "AstarteFailedMessages", + column: "guid"); + } + } +} diff --git a/AstarteDeviceSDKCSharp/Migrations/AstarteDbContextModelSnapshot.cs b/AstarteDeviceSDKCSharp/Migrations/AstarteDbContextModelSnapshot.cs index 2d83c2a..ae7ef5f 100644 --- a/AstarteDeviceSDKCSharp/Migrations/AstarteDbContextModelSnapshot.cs +++ b/AstarteDeviceSDKCSharp/Migrations/AstarteDbContextModelSnapshot.cs @@ -44,7 +44,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id"); - b.HasIndex(new[] { "Guid" }, "Index_Guid"); + b.HasIndex(new[] { "Guid" }, "Index_Guid") + .IsUnique(); b.ToTable("AstarteFailedMessages"); });