From b779569d27d64f99228ef5f750557e54ffc2b193 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 31 Oct 2024 17:53:54 -0700 Subject: [PATCH] wip ++5 --- src/conn.c | 7 +++- src/conn.h | 2 +- src/micro.c | 103 +++++++++++++++------------------------------------- 3 files changed, 36 insertions(+), 76 deletions(-) diff --git a/src/conn.c b/src/conn.c index 66d9b938..0a1a1c9e 100644 --- a/src/conn.c +++ b/src/conn.c @@ -4478,10 +4478,11 @@ int natsConn_getServices(microService ***services, natsConnection *nc) return numServices; } -void natsConn_removeService(natsConnection *nc, microService *service) +bool natsConn_removeService(natsConnection *nc, microService *service) { + bool removed = false; if (nc == NULL || service == NULL) - return; + return false; natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) @@ -4493,10 +4494,12 @@ void natsConn_removeService(natsConnection *nc, microService *service) nc->services[j] = nc->services[j + 1]; } nc->numServices--; + removed = true; break; } } natsConn_Unlock(nc); + return removed; } natsStatus natsConn_addService(natsConnection *nc, microService *service) diff --git a/src/conn.h b/src/conn.h index 61dff978..7d4e1249 100644 --- a/src/conn.h +++ b/src/conn.h @@ -163,7 +163,7 @@ natsConn_destroy(natsConnection *nc, bool fromPublicDestroy); int natsConn_getServices(microService ***services, natsConnection *nc); -void +bool natsConn_removeService(natsConnection *nc, microService *service); natsStatus diff --git a/src/micro.c b/src/micro.c index 24e3b75e..b42caada 100644 --- a/src/micro.c +++ b/src/micro.c @@ -28,8 +28,6 @@ static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, v static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); -static void _release_service(microService *m); -static void _retain_service(microService *m); microError * micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg) @@ -55,8 +53,8 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c MICRO_CALL(err, _clone_service_config(&m->cfg, cfg)); // Add the service to the connection. - m->refs++; MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m))); + MICRO_CALL(err, (m->refs++, NULL)); // Wrap the connection callbacks before we subscribe to anything. MICRO_CALL(err, micro_ErrorFromStatus( @@ -246,43 +244,48 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) } static microError * -_stop_service(microService *m, bool unsubscribe) +_stop_service(microService *m, bool unsubscribe, bool finalRelease) { microError *err = NULL; microEndpoint *ep = NULL; - int refs = 0; + int refs = 0; + int numEndpoints = 0; if (m == NULL) return micro_ErrorInvalidArg; _lock_service(m); - if (m->stopped) + if (!m->stopped) { - _unlock_service(m); - return NULL; - } + m->stopped = true; - m->stopped = true; - - if (unsubscribe) - { - for (ep = m->first_ep; ep != NULL; ep = ep->next) + if (unsubscribe) { - if (err = micro_stop_endpoint(ep), err != NULL) + for (ep = m->first_ep; ep != NULL; ep = ep->next) { - err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); - _unlock_service(m); - return err; + if (err = micro_stop_endpoint(ep), err != NULL) + { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + _unlock_service(m); + return err; + } } } + + if (natsConn_removeService(m->nc, m)) + m->refs--; } - natsConn_removeService(m->nc, m); - refs = --(m->refs); + if ((m->refs > 0) && finalRelease) + m->refs--; + + refs = m->refs; + numEndpoints = m->numEndpoints; _unlock_service(m); - if (refs == 0) + + if ((refs == 0) && (numEndpoints == 0)) _free_service(m); return NULL; @@ -291,7 +294,8 @@ _stop_service(microService *m, bool unsubscribe) microError * microService_Stop(microService *m) { - return _stop_service(m, true); + // Public API: stop the service, unsubscribe, but don't do the final release. + return _stop_service(m, true, false); } static void @@ -326,7 +330,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) microEndpoint *ep = (microEndpoint *)closure; microService *m = NULL; natsSubscription *sub = NULL; - natsConnection *nc = NULL; microDoneHandler doneHandler = NULL; int refs = 0; @@ -355,12 +358,9 @@ void micro_release_endpoint_when_unsubscribed(void *closure) micro_free_endpoint(ep); if (m->numEndpoints == 0) - { - m->stopped = true; doneHandler = m->cfg->DoneHandler; - } + refs = m->refs; - nc = m->nc; _unlock_service(m); // Special processing for the last endpoint. @@ -368,9 +368,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { doneHandler(m); - natsConn_removeService(nc, m); - _release_service(m); - if (refs == 0) _free_service(m); } @@ -393,14 +390,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - microError *err = NULL; - - err = _stop_service(m, true); - if (err != NULL) - return err; - - _release_service(m); - return NULL; + return _stop_service(m, true, true); } microError * @@ -440,39 +430,6 @@ _new_service(microService **ptr, natsConnection *nc) return NULL; } -static void -_retain_service(microService *m) -{ - if (m == NULL) - return; - - _lock_service(m); - - ++(m->refs); - - _unlock_service(m); -} - -static void -_release_service(microService *m) -{ - int refs = 0; - int numEndpoints = 0; - - if (m == NULL) - return; - - _lock_service(m); - - refs = --(m->refs); - numEndpoints = m->numEndpoints; - - _unlock_service(m); - - if ((refs == 0) && (numEndpoints == 0)) - _free_service(m); -} - static inline void _free_cloned_group_config(microGroupConfig *cfg) { @@ -587,7 +544,7 @@ _on_connection_closed(natsConnection *nc, void *ignored) for (int i = 0; i < n; i++) { m = all[i]; - _stop_service(m, false); // subs will be terminated by the connection close. + _stop_service(m, false, false); // subs will be terminated by the connection close. } } @@ -638,7 +595,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use { m = all[i]; _on_service_error(m, subject, s); - _stop_service(m, true); + _stop_service(m, true, false); } }