Skip to content

Commit

Permalink
leak 1
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Jul 11, 2024
1 parent d7b458c commit cca5f3e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
1 change: 0 additions & 1 deletion src/dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ typedef struct __natsDispatcher_s
natsDispatchQueue queue;

// Flags.
unsigned running : 1;
unsigned shutdown : 1;
} natsDispatcher;

Expand Down
6 changes: 2 additions & 4 deletions src/dispatch_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static natsSharedDispatchers _gReply;

static inline void _cleanupSharedDispatcher(natsDispatcher *d)
{
if ((d == NULL) || !d->running)
if ((d == NULL) || d->thread == NULL)
return;

natsThread_Destroy(d->thread); // has already been joined, no need to detach.
Expand Down Expand Up @@ -90,8 +90,6 @@ static natsStatus _runSharedDispatcher(natsDispatcher *d, bool forReplies)
{
natsStatus s = NATS_OK;

d->running = true;

natsLib_Retain();

s = natsMutex_Create(&d->mu);
Expand Down Expand Up @@ -182,7 +180,7 @@ nats_assignSubToSharedDispatcher(natsSubscription *sub, bool forReplies)

// Get a running dispatcher.
d = &g->dispatchers[g->useNext];
if (!d->running)
if (d->thread == NULL)
{
s = _runSharedDispatcher(d, forReplies);
if (s == NATS_OK)
Expand Down
15 changes: 8 additions & 7 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ _initOwnDispatcher(natsSubscription *sub)

static inline void _cleanupOwnDispatcher(natsSubscription *sub)
{
if (!sub->ownDispatcher.running)
return;

printf("<>/<> _cleanupOwnDispatcher\n");
natsThread_Detach(sub->ownDispatcher.thread);
natsThread_Destroy(sub->ownDispatcher.thread);
if (sub->ownDispatcher.thread != NULL)
{
natsThread_Detach(sub->ownDispatcher.thread);
natsThread_Destroy(sub->ownDispatcher.thread);
sub->ownDispatcher.thread = NULL;
}

nats_destroyQueuedMessages(&sub->ownDispatcher.queue);
natsCondition_Destroy(sub->ownDispatcher.cond);
Expand Down Expand Up @@ -301,7 +302,7 @@ void natsSub_close(natsSubscription *sub, bool connectionClosed)
static void
_asyncTimeoutCb(natsTimer *timer, void *closure)
{
natsSubscription *sub = (natsSubscription*) closure;
natsSubscription *sub = (natsSubscription *)closure;

// Should not happen, but in case
if (sub->dispatcher == NULL)
Expand All @@ -318,7 +319,7 @@ _asyncTimeoutCb(natsTimer *timer, void *closure)

// Set the timer to a very high value, it will be reset from the
// worker thread.
natsTimer_Reset(sub->timeoutTimer, 60*60*1000);
natsTimer_Reset(sub->timeoutTimer, 60 * 60 * 1000);

// Post a control message to the worker thread.
printf("<>/<> natsSub_timeoutCb - posting TIMEOUT message\n");
Expand Down

0 comments on commit cca5f3e

Please sign in to comment.