Skip to content

Commit

Permalink
refactor MemoryStorageConnection to not inlcude static fields and ret…
Browse files Browse the repository at this point in the history
…urn the same conenction instance when calling GetConnection() in MemoryStorage
  • Loading branch information
Spacefish authored and perrich committed Jul 31, 2023
1 parent b48943c commit 25b5c2e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
10 changes: 9 additions & 1 deletion src/Hangfire.MemoryStorage/MemoryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public class MemoryStorage : JobStorage
{
private readonly MemoryStorageOptions _options;

private readonly object _connectionLock = new object();
private MemoryStorageConnection _connection;

public Data Data { get; }

public MemoryStorage() : this(new MemoryStorageOptions(), new Data())
Expand All @@ -27,7 +30,12 @@ public MemoryStorage(MemoryStorageOptions options, Data data)

public override IStorageConnection GetConnection()
{
return new MemoryStorageConnection(Data, _options.FetchNextJobTimeout);
lock (_connectionLock)
{
if (_connection == null)
_connection = new MemoryStorageConnection(Data, _options.FetchNextJobTimeout);
}
return _connection;
}

public override IMonitoringApi GetMonitoringApi()
Expand Down
11 changes: 5 additions & 6 deletions src/Hangfire.MemoryStorage/MemoryStorageConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ namespace Hangfire.MemoryStorage
{
public class MemoryStorageConnection : JobStorageConnection
{
private static readonly object FetchJobsLock = new object();
private readonly object FetchJobsLock = new object();
private readonly TimeSpan _fetchNextJobTimeout;
private readonly Data _data;
private static readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly AutoResetEvent _newItemInQueueEvent = new AutoResetEvent(true);

public MemoryStorageConnection(Data data, TimeSpan fetchNextJobTimeout)
{
_fetchNextJobTimeout = fetchNextJobTimeout;
_data = data;
}

internal static readonly AutoResetEvent NewItemInQueueEvent = new AutoResetEvent(true);

public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout)
{
// try to acquire an existing lock or create a new one if none exists for the resource
Expand Down Expand Up @@ -102,7 +101,7 @@ public override string CreateExpiredJob(Job job, IDictionary<string, string> par

public override IWriteOnlyTransaction CreateWriteTransaction()
{
return new MemoryStorageWriteOnlyTransaction(_data, NewItemInQueueEvent);
return new MemoryStorageWriteOnlyTransaction(_data, _newItemInQueueEvent);
}

public override IFetchedJob FetchNextJob(string[] queues, CancellationToken cancellationToken)
Expand Down Expand Up @@ -139,7 +138,7 @@ orderby q.AddedAt descending
}
}

WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, NewItemInQueueEvent }, TimeSpan.FromSeconds(15));
WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, _newItemInQueueEvent }, TimeSpan.FromSeconds(15));
cancellationToken.ThrowIfCancellationRequested();
}

Expand Down

0 comments on commit 25b5c2e

Please sign in to comment.