Skip to content

Commit

Permalink
Optimize database locking
Browse files Browse the repository at this point in the history
Avoid locking except storing operations

Signed-off-by: nedimtokic <nedim.tokic@secomind.com>
  • Loading branch information
nedimtokic committed May 17, 2024
1 parent 31de215 commit ecfd9d6
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 134 deletions.
2 changes: 1 addition & 1 deletion AstarteDeviceSDKCSharp/Data/AstarteFailedMessageEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace AstarteDeviceSDKCSharp.Data
{
[Index(nameof(Guid), Name = "Index_Guid")]
[Index(nameof(Guid), Name = "Index_Guid", IsUnique = true)]
public class AstarteFailedMessageEntry : IAstarteFailedMessage
{
[Key]
Expand Down
253 changes: 121 additions & 132 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class AstarteFailedMessageStorage : IAstarteFailedMessageStorage
SqliteConnection sqliteConnection = new SqliteConnection();
SqliteConnection sqliteReadConnection = new SqliteConnection();
private static List<AstarteFailedMessageEntry> _astarteFailedMessageVolatile = new();
readonly HashSet<ManagedMqttApplicationMessage> stored = new HashSet<ManagedMqttApplicationMessage>();

public AstarteFailedMessageStorage(string persistencyDir)
{
Expand All @@ -54,25 +53,27 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid)
{
try
{

if (sqliteConnection.State != ConnectionState.Open)
lock (_storeLock)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}
if (sqliteConnection.State != ConnectionState.Open)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}

using (SqliteCommand insertCommand = new SqliteCommand())
{
insertCommand.Connection = sqliteConnection;
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid);";
insertCommand.Parameters.AddWithValue("@qos", qos);
insertCommand.Parameters.AddWithValue("@payload", payload);
insertCommand.Parameters.AddWithValue("@topic", topic);
insertCommand.Parameters.AddWithValue("@absolute_expiry", 0);
insertCommand.Parameters.AddWithValue("@guid", guid);

await insertCommand.ExecuteNonQueryAsync();
using (SqliteCommand insertCommand = new SqliteCommand())
{
insertCommand.Connection = sqliteConnection;
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid) ON CONFLICT DO NOTHING;";
insertCommand.Parameters.AddWithValue("@qos", qos);
insertCommand.Parameters.AddWithValue("@payload", payload);
insertCommand.Parameters.AddWithValue("@topic", topic);
insertCommand.Parameters.AddWithValue("@absolute_expiry", 0);
insertCommand.Parameters.AddWithValue("@guid", guid);

insertCommand.ExecuteNonQueryAsync().Wait();
}
}

Trace.WriteLine($"Insert fallback message in database:"
Expand All @@ -83,31 +84,34 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid)
{
Trace.WriteLine($"Failed to insert fallback message in database. Error message: {ex.Message}");
}
await Task.CompletedTask;
}

public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid, int relativeExpiry)
{
try
{

if (sqliteConnection.State != ConnectionState.Open)
lock (_storeLock)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}
if (sqliteConnection.State != ConnectionState.Open)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}

using (SqliteCommand insertCommand = new SqliteCommand())
{
insertCommand.Connection = sqliteConnection;
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid);";
insertCommand.Parameters.AddWithValue("@qos", qos);
insertCommand.Parameters.AddWithValue("@payload", payload);
insertCommand.Parameters.AddWithValue("@topic", topic);
insertCommand.Parameters.AddWithValue("@absolute_expiry", relativeExpiry);
insertCommand.Parameters.AddWithValue("@guid", guid);

await insertCommand.ExecuteNonQueryAsync();
using (SqliteCommand insertCommand = new SqliteCommand())
{
insertCommand.Connection = sqliteConnection;
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid) ON CONFLICT DO NOTHING;";
insertCommand.Parameters.AddWithValue("@qos", qos);
insertCommand.Parameters.AddWithValue("@payload", payload);
insertCommand.Parameters.AddWithValue("@topic", topic);
insertCommand.Parameters.AddWithValue("@absolute_expiry", relativeExpiry);
insertCommand.Parameters.AddWithValue("@guid", guid);

insertCommand.ExecuteNonQueryAsync().Wait();
}
}

Trace.WriteLine($"Insert fallback message in database:"
Expand All @@ -119,6 +123,7 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid,
{
Trace.WriteLine($"Failed to insert fallback message in database. Error message: {ex.Message}");
}
await Task.CompletedTask;
}

public void InsertVolatile(string topic, byte[] payload, int qos, Guid guid)
Expand Down Expand Up @@ -194,81 +199,71 @@ public async Task SaveQueuedMessagesAsync(IList<ManagedMqttApplicationMessage> m
return;
}

lock (_storeLock)
{
var newMessages = messages
.ExceptBy(stored.Select(x => x.Id), y => y.Id).ToList();
ManagedMqttApplicationMessage message = messages.Last();

MqttUserProperty? retention = null;
foreach (var message in newMessages)
{
MqttUserProperty? retention = null;

if (message.ApplicationMessage.UserProperties is null)
{
continue;
}
if (message.ApplicationMessage.UserProperties is null)
{
return;
}

retention = message.ApplicationMessage.UserProperties.Where(x => x.Name == "Retention").FirstOrDefault();
retention = message.ApplicationMessage.UserProperties.Where(x => x.Name == "Retention").FirstOrDefault();

if (retention == null || retention.Value == "DISCARD")
{
continue;
}

if (retention.Value == "STORED")
{
if (message.ApplicationMessage.MessageExpiryInterval > 0)
{
InsertStored
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id,
(int)message.ApplicationMessage.MessageExpiryInterval
).Wait();
}
else
{
InsertStored
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id
).Wait();
}
}
else if (retention.Value == "VOLATILE")
{
if (retention == null || retention.Value == "DISCARD")
{
return;
}

if (message.ApplicationMessage.MessageExpiryInterval > 0)
{
InsertVolatile
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id,
(int)message.ApplicationMessage.MessageExpiryInterval
);
}
else
{
InsertVolatile
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id
);
}
}
stored.Add(message);
if (retention.Value == "STORED")
{
if (message.ApplicationMessage.MessageExpiryInterval > 0)
{
await InsertStored
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id,
(int)message.ApplicationMessage.MessageExpiryInterval
);
}
else
{
await InsertStored
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id
);
}
}
else if (retention.Value == "VOLATILE")
{

if (message.ApplicationMessage.MessageExpiryInterval > 0)
{
InsertVolatile
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id,
(int)message.ApplicationMessage.MessageExpiryInterval
);
}
else
{
InsertVolatile
(
message.ApplicationMessage.Topic,
message.ApplicationMessage.Payload,
(int)message.ApplicationMessage.QualityOfServiceLevel,
message.Id
);
}
}
await Task.CompletedTask;
}

public async Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync()
Expand Down Expand Up @@ -304,7 +299,6 @@ public async Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync(
};

result.Add(item);
stored.Add(item);
}

foreach (var messageVolatile in _astarteFailedMessageVolatile)
Expand Down Expand Up @@ -337,7 +331,6 @@ public async Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync(
};

result.Add(item);
stored.Add(item);
}

return result;
Expand All @@ -347,43 +340,39 @@ public async Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync(
public async Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage)
{

lock (_storeLock)
try
{
if (stored.Contains(applicationMessage))
lock (_storeLock)
{
stored.Remove(applicationMessage);
if (sqliteConnection.State != ConnectionState.Open)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}

try
using (SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection))
{
if (sqliteConnection.State != ConnectionState.Open)
{
sqliteConnection = new SqliteConnection(_dbConnectionString);
sqliteConnection.Open();
}
using (SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection))
{
cmd.Parameters.AddWithValue("@guid", applicationMessage.Id);
cmd.ExecuteNonQueryAsync();
}

if (_astarteFailedMessageVolatile is not null)
{
var messageVolatile = _astarteFailedMessageVolatile
.Where(x => x.Guid == applicationMessage.Id)
.FirstOrDefault();
if (messageVolatile is not null)
{
_astarteFailedMessageVolatile.Remove(messageVolatile);
}
}
cmd.Parameters.AddWithValue("@guid", applicationMessage.Id);
cmd.ExecuteNonQueryAsync().Wait();
}
catch (Exception ex)
}

if (_astarteFailedMessageVolatile is not null)
{
var messageVolatile = _astarteFailedMessageVolatile
.Where(x => x.Guid == applicationMessage.Id)
.FirstOrDefault();
if (messageVolatile is not null)
{
Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}");
_astarteFailedMessageVolatile.Remove(messageVolatile);
}
}
}
catch (Exception ex)
{
Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}");
}
await Task.CompletedTask;
}

Expand Down
Loading

0 comments on commit ecfd9d6

Please sign in to comment.