From cca5f3ef5a146cc798d69acce5fe9fcb6fabb6d7 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 10 Jul 2024 23:53:35 -0700 Subject: [PATCH] leak 1 --- src/dispatch.h | 1 - src/dispatch_pool.c | 6 ++---- src/sub.c | 15 ++++++++------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/dispatch.h b/src/dispatch.h index 2a56e1df0..0438682ef 100644 --- a/src/dispatch.h +++ b/src/dispatch.h @@ -35,7 +35,6 @@ typedef struct __natsDispatcher_s natsDispatchQueue queue; // Flags. - unsigned running : 1; unsigned shutdown : 1; } natsDispatcher; diff --git a/src/dispatch_pool.c b/src/dispatch_pool.c index 21689e45a..75bacb1f5 100644 --- a/src/dispatch_pool.c +++ b/src/dispatch_pool.c @@ -30,7 +30,7 @@ static natsSharedDispatchers _gReply; static inline void _cleanupSharedDispatcher(natsDispatcher *d) { - if ((d == NULL) || !d->running) + if ((d == NULL) || d->thread == NULL) return; natsThread_Destroy(d->thread); // has already been joined, no need to detach. @@ -90,8 +90,6 @@ static natsStatus _runSharedDispatcher(natsDispatcher *d, bool forReplies) { natsStatus s = NATS_OK; - d->running = true; - natsLib_Retain(); s = natsMutex_Create(&d->mu); @@ -182,7 +180,7 @@ nats_assignSubToSharedDispatcher(natsSubscription *sub, bool forReplies) // Get a running dispatcher. d = &g->dispatchers[g->useNext]; - if (!d->running) + if (d->thread == NULL) { s = _runSharedDispatcher(d, forReplies); if (s == NATS_OK) diff --git a/src/sub.c b/src/sub.c index 8e08ac11d..aa7d8fc4d 100644 --- a/src/sub.c +++ b/src/sub.c @@ -124,12 +124,13 @@ _initOwnDispatcher(natsSubscription *sub) static inline void _cleanupOwnDispatcher(natsSubscription *sub) { - if (!sub->ownDispatcher.running) - return; - printf("<>/<> _cleanupOwnDispatcher\n"); - natsThread_Detach(sub->ownDispatcher.thread); - natsThread_Destroy(sub->ownDispatcher.thread); + if (sub->ownDispatcher.thread != NULL) + { + natsThread_Detach(sub->ownDispatcher.thread); + natsThread_Destroy(sub->ownDispatcher.thread); + sub->ownDispatcher.thread = NULL; + } nats_destroyQueuedMessages(&sub->ownDispatcher.queue); natsCondition_Destroy(sub->ownDispatcher.cond); @@ -301,7 +302,7 @@ void natsSub_close(natsSubscription *sub, bool connectionClosed) static void _asyncTimeoutCb(natsTimer *timer, void *closure) { - natsSubscription *sub = (natsSubscription*) closure; + natsSubscription *sub = (natsSubscription *)closure; // Should not happen, but in case if (sub->dispatcher == NULL) @@ -318,7 +319,7 @@ _asyncTimeoutCb(natsTimer *timer, void *closure) // Set the timer to a very high value, it will be reset from the // worker thread. - natsTimer_Reset(sub->timeoutTimer, 60*60*1000); + natsTimer_Reset(sub->timeoutTimer, 60 * 60 * 1000); // Post a control message to the worker thread. printf("<>/<> natsSub_timeoutCb - posting TIMEOUT message\n");