Skip to content

Commit

Permalink
Non clustered index on RowVersion (#620)
Browse files Browse the repository at this point in the history
* No clustered index on `RowVersion`

* review fixes
  • Loading branch information
tmasternak authored May 28, 2020
1 parent eb854b5 commit 319d320
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace NServiceBus.Transport.SQLServer
public static readonly string AddMessageBodyStringColumn;
public static readonly string CheckHeadersColumnType;
public static readonly string CheckIfExpiresIndexIsPresent;
public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent;
public static readonly string CreateDelayedMessageStoreText;
public static readonly string CreateQueueText;
public static readonly string CreateSubscriptionTableText;
Expand Down
11 changes: 9 additions & 2 deletions src/NServiceBus.SqlServer/Queuing/SqlConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ Body varbinary(max),
RowVersion bigint IDENTITY(1,1) NOT NULL
);
CREATE CLUSTERED INDEX Index_RowVersion ON {0}
CREATE NONCLUSTERED INDEX Index_RowVersion ON {0}
(
RowVersion
[RowVersion] ASC
)
CREATE NONCLUSTERED INDEX Index_Expires ON {0}
Expand Down Expand Up @@ -248,6 +248,13 @@ FROM sys.indexes
WHERE name = 'Index_Expires'
AND object_id = OBJECT_ID('{0}')";

public static readonly string CheckIfNonClusteredRowVersionIndexIsPresent = @"
SELECT COUNT(*)
FROM sys.indexes
WHERE name = 'Index_RowVersion'
AND object_id = OBJECT_ID('{0}')
AND type = 2"; // 2 = non-clustered index

public static readonly string CheckHeadersColumnType = @"
SELECT t.name
FROM sys.columns c
Expand Down
17 changes: 14 additions & 3 deletions src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public TableBasedQueue(string qualifiedTableName, string queueName)
sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName);
purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName);
purgeExpiredCommand = Format(SqlConstants.PurgeBatchOfExpiredMessagesText, this.qualifiedTableName);
checkIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName);
checkExpiresIndexCommand = Format(SqlConstants.CheckIfExpiresIndexIsPresent, this.qualifiedTableName);
checkNonClusteredRowVersionIndexCommand = Format(SqlConstants.CheckIfNonClusteredRowVersionIndexIsPresent, this.qualifiedTableName);
checkHeadersColumnTypeCommand = Format(SqlConstants.CheckHeadersColumnType, this.qualifiedTableName);
#pragma warning restore 618
}
Expand Down Expand Up @@ -134,7 +135,16 @@ public async Task<int> PurgeBatchOfExpiredMessages(SqlConnection connection, int

public async Task<bool> CheckExpiresIndexPresence(SqlConnection connection)
{
using (var command = new SqlCommand(checkIndexCommand, connection))
using (var command = new SqlCommand(checkExpiresIndexCommand, connection))
{
var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false);
return rowsCount > 0;
}
}

public async Task<bool> CheckNonClusteredRowVersionIndexPresence(SqlConnection connection)
{
using (var command = new SqlCommand(checkNonClusteredRowVersionIndexCommand, connection))
{
var rowsCount = (int) await command.ExecuteScalarAsync().ConfigureAwait(false);
return rowsCount > 0;
Expand All @@ -160,7 +170,8 @@ public override string ToString()
string sendCommand;
string purgeCommand;
string purgeExpiredCommand;
string checkIndexCommand;
string checkExpiresIndexCommand;
string checkNonClusteredRowVersionIndexCommand;
string checkHeadersColumnTypeCommand;
}
}
25 changes: 22 additions & 3 deletions src/NServiceBus.SqlServer/Receiving/SchemaVerification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ public SchemaInspector(Func<TableBasedQueue, Task<SqlConnection>> openConnection
public async Task PerformInspection(TableBasedQueue queue)
{
await VerifyExpiredIndex(queue).ConfigureAwait(false);
await VerifyNonClusteredRowVersionIndex(queue).ConfigureAwait(false);
await VerifyHeadersColumnType(queue).ConfigureAwait(false);
}

async Task VerifyExpiredIndex(TableBasedQueue queue)
async Task VerifyIndex(TableBasedQueue queue, Func<TableBasedQueue, SqlConnection, Task<bool>> check, string noIndexMessage)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false);
var indexExists = await check(queue, connection).ConfigureAwait(false);

if (!indexExists)
{
Logger.Warn($@"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
Logger.Warn(noIndexMessage);
}
}
}
Expand All @@ -37,6 +39,23 @@ async Task VerifyExpiredIndex(TableBasedQueue queue)
}
}

Task VerifyNonClusteredRowVersionIndex(TableBasedQueue queue)
{
return VerifyIndex(
queue,
(q, c) => q.CheckNonClusteredRowVersionIndexPresence(c),
$"Table {queue.Name} does not contain non-clustered index 'Index_RowVersion'.{Environment.NewLine}Migrating to this non-clustered index improves performance for send and receive operations.");
}

Task VerifyExpiredIndex(TableBasedQueue queue)
{
return VerifyIndex(
queue,
(q, c) => q.CheckExpiresIndexPresence(c),
$"Table {queue.Name} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."
);
}

async Task VerifyHeadersColumnType(TableBasedQueue queue)
{
try
Expand Down

0 comments on commit 319d320

Please sign in to comment.