Skip to content

Commit

Permalink
Merge pull request #76 from nedimtokic/stored-lazy
Browse files Browse the repository at this point in the history
Optimize message deletion from db
  • Loading branch information
harlem88 authored Jul 25, 2024
2 parents d44adf6 + 29bc49a commit 272b014
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 7 deletions.
1 change: 1 addition & 0 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AstarteFailedMessageEntry : IAstarteFailedMessage
[Required]
[Column("guid")]
public Guid Guid { get; set; }
public bool Processed { get; set; }


public AstarteFailedMessageEntry(int qos, byte[] payload, string topic, Guid guid)
Expand Down
70 changes: 64 additions & 6 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid)
using (SqliteCommand insertCommand = new SqliteCommand())
{
insertCommand.Connection = sqliteConnection;
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid) ON CONFLICT DO NOTHING;";
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid, processed) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid, 0) ON CONFLICT DO NOTHING;";
insertCommand.Parameters.AddWithValue("@qos", qos);
insertCommand.Parameters.AddWithValue("@payload", payload);
insertCommand.Parameters.AddWithValue("@topic", topic);
Expand Down Expand Up @@ -102,8 +102,8 @@ public async Task InsertStored(string topic, byte[] payload, int qos, Guid guid,
using (SqliteCommand insertCommand = new SqliteCommand())
{
insertCommand.Connection = sqliteConnection;
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid) ON CONFLICT DO NOTHING;";
insertCommand.CommandText = "INSERT INTO AstarteFailedMessages (Qos, Payload, Topic, absolute_expiry, guid, processed) " +
"VALUES (@qos,@payload,@topic,@absolute_expiry,@guid, 0) ON CONFLICT DO NOTHING;";
insertCommand.Parameters.AddWithValue("@qos", qos);
insertCommand.Parameters.AddWithValue("@payload", payload);
insertCommand.Parameters.AddWithValue("@topic", topic);
Expand Down Expand Up @@ -167,7 +167,7 @@ public async Task Reject(AstarteFailedMessageEntry failedMessages)
"DELETE FROM AstarteFailedMessages WHERE guid = @guid;", sqliteConnection))
{

cmd.Parameters.AddWithValue("@guid", failedMessages.Id);
cmd.Parameters.AddWithValue("@guid", failedMessages.Guid);

await cmd.ExecuteNonQueryAsync();
}
Expand Down Expand Up @@ -365,6 +365,64 @@ public async Task DeleteByGuidAsync(Guid applicationMessageId)
}
}

public async Task MarkAsProcessed(Guid applicationMessageId)
{

try
{
if (sqliteReadConnection.State != ConnectionState.Open)
{
sqliteReadConnection = new SqliteConnection(_dbConnectionString);
sqliteReadConnection.Open();
}

SqliteCommand cmd = new SqliteCommand(
"UPDATE AstarteFailedMessages SET processed = 1 WHERE guid = @guid ;", sqliteReadConnection);

cmd.Parameters.AddWithValue("@guid", applicationMessageId);
await cmd.ExecuteNonQueryAsync();
cmd.Dispose();

if (_astarteFailedMessageVolatile is not null)
{
var messageVolatile = _astarteFailedMessageVolatile
.Where(x => x.Guid == applicationMessageId)
.FirstOrDefault();
if (messageVolatile is not null)
{
_astarteFailedMessageVolatile.Remove(messageVolatile);
}
}
}
catch (Exception ex)
{
Trace.WriteLine($"Failed to mark processed fallback message in database. Error message: {ex.Message}");
}
}

public async Task DeleteProcessed()
{

try
{
if (sqliteReadConnection.State != ConnectionState.Open)
{
sqliteReadConnection = new SqliteConnection(_dbConnectionString);
sqliteReadConnection.Open();
}

SqliteCommand cmd = new SqliteCommand(
"DELETE FROM AstarteFailedMessages WHERE processed = 1;", sqliteReadConnection);
await cmd.ExecuteNonQueryAsync();
cmd.Dispose();

}
catch (Exception ex)
{
Trace.WriteLine($"Failed to delete fallback message from database. Error message: {ex.Message}");
}
}

private async Task<IList<AstarteFailedMessageEntry>> GetAstarteFailedMessageStorage()
{

Expand All @@ -377,7 +435,7 @@ private async Task<IList<AstarteFailedMessageEntry>> GetAstarteFailedMessageStor
}

string query = @"SELECT Qos, Payload, Topic, guid, absolute_expiry
FROM AstarteFailedMessages ORDER BY ID LIMIT 10000;";
FROM AstarteFailedMessages WHERE processed = 0 ORDER BY ID LIMIT 10000;";
using SqliteCommand cmd = new SqliteCommand(query, sqliteReadConnection);
using SqliteDataReader dr = await cmd.ExecuteReaderAsync();

Expand Down
2 changes: 2 additions & 0 deletions AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface IAstarteFailedMessageStorage
bool IsExpired(long expire);

Task DeleteByGuidAsync(Guid applicationMessage);
Task MarkAsProcessed(Guid applicationMessage);
Task DeleteProcessed();

Task<IList<ManagedMqttApplicationMessage>> LoadQueuedMessagesAsync();
Task SaveQueuedMessageAsync(ManagedMqttApplicationMessage message);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This file is part of Astarte.
#
# Copyright 2024 SECO Mind Srl
#
# SPDX-License-Identifier: Apache-2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// This file is part of Astarte.
//
// Copyright 2024 SECO Mind Srl
//
// SPDX-License-Identifier: Apache-2.0
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace AstarteDeviceSDKCSharp.Migrations
{
public partial class AddColumnProcessed : Migration
{
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<bool>(
name: "Processed",
table: "AstarteFailedMessages",
type: "INTEGER",
nullable: false,
defaultValue: false);
}

protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "Processed",
table: "AstarteFailedMessages");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ protected override void BuildModel(ModelBuilder modelBuilder)
.IsRequired()
.HasColumnType("BLOB");

b.Property<bool>("Processed")
.HasColumnType("INTEGER");

b.Property<int>("Qos")
.HasColumnType("INTEGER");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ async Task OnMessageProcessedAsync(ApplicationMessageProcessedEventArgs eventArg
{
if (_failedMessageStorage is not null)
{
await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage.Id);
if (_resendingInProgress)
{
await _failedMessageStorage.MarkAsProcessed(eventArgs.ApplicationMessage.Id);
}
else
{
await _failedMessageStorage.DeleteByGuidAsync(eventArgs.ApplicationMessage.Id);
}
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ public override void StartResenderTask()
if (!_resendingInProgress)
{
await ResendFailedMessages(cancellationToken);
if (_failedMessageStorage is not null)
{
await _failedMessageStorage.DeleteProcessed();
}
}
});
}
Expand Down

0 comments on commit 272b014

Please sign in to comment.