From 7b5d3d735a62a8ff6e43f56b30efa4e51795cbce Mon Sep 17 00:00:00 2001 From: rolflussi Date: Fri, 11 Oct 2024 15:42:53 +0200 Subject: [PATCH 01/13] Generate cnats-config-version.cmake (#813) * provide cnats-config-version.cmake to be able to define required version in find_package * change from requiring same major version to same minor version --------- Co-authored-by: Rolf Lussi --- CMakeLists.txt | 6 ++++++ src/CMakeLists.txt | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 409f21054..413e0523e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 3.13) project(cnats) include(CTest) include(FindPackageHandleStandardArgs) +include(CMakePackageConfigHelpers) # Uncomment to have the build process verbose # set(CMAKE_VERBOSE_MAKEFILE TRUE) @@ -258,6 +259,11 @@ set(NATS_VERSION_SUFFIX "-beta") set(NATS_VERSION_REQUIRED_NUMBER 0x030900) +write_basic_package_version_file( + "${PROJECT_BINARY_DIR}/cnats-config-version.cmake" + COMPATIBILITY SameMinorVersion + VERSION ${NATS_VERSION_MAJOR}.${NATS_VERSION_MINOR}.${NATS_VERSION_PATCH}${NATS_VERSION_SUFFIX}) + if(NATS_UPDATE_VERSION OR NATS_UPDATE_DOC) configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/src/version.h.in diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b18f72406..971ddf09f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -67,6 +67,8 @@ if(NATS_BUILD_LIB_SHARED) NAMESPACE cnats:: FILE cnats-config.cmake DESTINATION ${NATS_LIBDIR}/cmake/cnats) + install(FILES "${PROJECT_BINARY_DIR}/cnats-config-version.cmake" + DESTINATION ${NATS_LIBDIR}/cmake/cnats) endif(NATS_BUILD_LIB_SHARED) if(NATS_BUILD_LIB_STATIC) @@ -79,6 +81,8 @@ if(NATS_BUILD_LIB_STATIC) NAMESPACE cnats:: FILE cnats-config.cmake DESTINATION ${NATS_LIBDIR}/cmake/cnats) + install(FILES "${PROJECT_BINARY_DIR}/cnats-config-version.cmake" + DESTINATION ${NATS_LIBDIR}/cmake/cnats) endif(NATS_BUILD_LIB_STATIC) install(FILES deprnats.h DESTINATION ${NATS_INCLUDE_DIR} RENAME nats.h) From 036665f44864bcd18f5832ad0961b8c25ca1d19b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 30 Oct 2024 18:09:48 -0700 Subject: [PATCH 02/13] [FIXED] microservice cleanup (flapping MicroServiceStops... tests) - Removed the global hash of services, it was the most immediate source of the deadlock. - Added the list of microservices to `natsConnection`. **NOTE** increased memory allocation for all connections, need a better way. - Fixed the flow of replacing an existing endpoint in a service. - Got rid of ep->name since it was synonimous with ep->cfg->Name - Adjusted the tests, some minor fixes. --- src/conn.c | 67 ++++++ src/conn.h | 9 + src/glib/glib.c | 41 ---- src/glib/glibp.h | 5 - src/micro.c | 510 ++++++++++++++++++------------------------- src/micro_endpoint.c | 97 +------- src/microp.h | 6 +- src/natsp.h | 15 +- test/test.c | 166 +++++++------- test/test.h | 9 + 10 files changed, 396 insertions(+), 529 deletions(-) diff --git a/src/conn.c b/src/conn.c index 37fcc1790..415f00e9e 100644 --- a/src/conn.c +++ b/src/conn.c @@ -205,6 +205,7 @@ _freeConn(natsConnection *nc) natsCondition_Destroy(nc->reconnectCond); natsMutex_Destroy(nc->subsMu); natsMutex_Destroy(nc->mu); + NATS_FREE(nc->services); NATS_FREE(nc); @@ -4466,3 +4467,69 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus } fflush(stderr); } + +int natsConn_getServices(microService ***services, natsConnection *nc) +{ + int numServices = 0; + natsConn_Lock(nc); + *services = nc->services; + numServices = nc->numServices; + natsConn_Unlock(nc); + return numServices; +} + +bool natsConn_removeService(natsConnection *nc, microService *service) +{ + bool removed = false; + if (nc == NULL || service == NULL) + return false; + + natsConn_Lock(nc); + for (int i = 0; i < nc->numServices; i++) + { + if (nc->services[i] == service) + { + for (int j = i; j < nc->numServices - 1; j++) + { + nc->services[j] = nc->services[j + 1]; + } + nc->numServices--; + removed = true; + break; + } + } + natsConn_Unlock(nc); + return removed; +} + +natsStatus natsConn_addService(natsConnection *nc, microService *service) +{ + natsStatus s = NATS_OK; + if (nc == NULL || service == NULL) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + if (nc->services == NULL) + { + nc->services = NATS_CALLOC(1, sizeof(microService *)); + if (nc->services == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else + { + microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); + if (tmp == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + nc->services = tmp; + } + + if (s == NATS_OK) + { + nc->services[nc->numServices] = service; + nc->numServices++; + } + natsConn_Unlock(nc); + + return s; +} diff --git a/src/conn.h b/src/conn.h index 41e93f402..7d4e12492 100644 --- a/src/conn.h +++ b/src/conn.h @@ -160,4 +160,13 @@ natsConn_close(natsConnection *nc); void natsConn_destroy(natsConnection *nc, bool fromPublicDestroy); +int +natsConn_getServices(microService ***services, natsConnection *nc); + +bool +natsConn_removeService(natsConnection *nc, microService *service); + +natsStatus +natsConn_addService(natsConnection *nc, microService *service); + #endif /* CONN_H_ */ diff --git a/src/glib/glib.c b/src/glib/glib.c index 4b8e4bc31..62e004ec7 100644 --- a/src/glib/glib.c +++ b/src/glib/glib.c @@ -119,8 +119,6 @@ _freeLib(void) nats_freeDispatcherPool(&gLib.replyDispatchers); natsNUID_free(); - natsMutex_Destroy(gLib.service_callback_mu); - natsHash_Destroy(gLib.all_services_to_callback); natsCondition_Destroy(gLib.cond); @@ -290,11 +288,6 @@ nats_openLib(natsClientConfig *config) if (s == NATS_OK) s = nats_initDispatcherPool(&(gLib.replyDispatchers), config->ReplyThreadPoolMax); - if (s == NATS_OK) - s = natsMutex_Create(&gLib.service_callback_mu); - if (s == NATS_OK) - s = natsHash_Create(&gLib.all_services_to_callback, 8); - if (s == NATS_OK) gLib.initialized = true; @@ -443,40 +436,6 @@ void nats_ReleaseThreadMemory(void) natsMutex_Unlock(gLib.lock); } -natsStatus -natsLib_startServiceCallbacks(microService *m) -{ - natsStatus s; - - natsMutex_Lock(gLib.service_callback_mu); - s = natsHash_Set(gLib.all_services_to_callback, (int64_t)m, (void *)m, NULL); - natsMutex_Unlock(gLib.service_callback_mu); - - return NATS_UPDATE_ERR_STACK(s); -} - -void natsLib_stopServiceCallbacks(microService *m) -{ - if (m == NULL) - return; - - natsMutex_Lock(gLib.service_callback_mu); - natsHash_Remove(gLib.all_services_to_callback, (int64_t)m); - natsMutex_Unlock(gLib.service_callback_mu); -} - -natsMutex * -natsLib_getServiceCallbackMutex(void) -{ - return gLib.service_callback_mu; -} - -natsHash * -natsLib_getAllServicesToCallback(void) -{ - return gLib.all_services_to_callback; -} - natsClientConfig *nats_testInspectClientConfig(void) { // Immutable after startup diff --git a/src/glib/glibp.h b/src/glib/glibp.h index 9babc60d6..fa658f139 100644 --- a/src/glib/glibp.h +++ b/src/glib/glibp.h @@ -117,11 +117,6 @@ typedef struct __natsLib natsGCList gc; - // For micro services code - natsMutex *service_callback_mu; - // uses `microService*` as the key and the value. - natsHash *all_services_to_callback; - } natsLib; natsLib *nats_lib(void); diff --git a/src/micro.c b/src/micro.c index 354824184..d72812be4 100644 --- a/src/micro.c +++ b/src/micro.c @@ -23,12 +23,11 @@ static inline void _unlock_service(microService *m) { natsMutex_Unlock(m->servic static microError *_clone_service_config(microServiceConfig **out, microServiceConfig *cfg); static microError *_new_service(microService **ptr, natsConnection *nc); -static microError *_wrap_connection_event_callbacks(microService *m); +static void _on_connection_closed(natsConnection *nc, void *ignored); +static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used); static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); -static void _release_service(microService *m); -static void _retain_service(microService *m); microError * micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg) @@ -53,10 +52,18 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c MICRO_CALL(err, _clone_service_config(&m->cfg, cfg)); + // Add the service to the connection. + MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m))); + MICRO_CALL(err, (m->refs++, NULL)); + // Wrap the connection callbacks before we subscribe to anything. - MICRO_CALL(err, _wrap_connection_event_callbacks(m)); + MICRO_CALL(err, micro_ErrorFromStatus( + natsOptions_setMicroCallbacks(m->nc->opts, _on_connection_closed, _on_error))); + // Initialize the monitoring endpoints. MICRO_CALL(err, micro_init_monitoring(m)); + + // Add the default endpoint. MICRO_CALL(err, microService_AddEndpoint(m, cfg->Endpoint)); if (err != NULL) @@ -69,86 +76,152 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c return NULL; } +static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) +{ + size_t len = strlen(src) + 1; + char *p; + + if (g != NULL) + len += strlen(g->config->Prefix) + 1; + + *dst = NATS_CALLOC(1, len); + if (*dst == NULL) + return micro_ErrorOutOfMemory; + + p = *dst; + if (g != NULL) + { + len = strlen(g->config->Prefix); + memcpy(p, g->config->Prefix, len); + p[len] = '.'; + p += len + 1; + } + memcpy(p, src, strlen(src) + 1); + return NULL; +} + +microError * +_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal, char *fullSubject) +{ + microError *err = NULL; + microEndpoint *ep = NULL; + + if (cfg == NULL) + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); + if (!micro_is_valid_name(cfg->Name)) + return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name %s", cfg->Name); + if (cfg->Handler == NULL) + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for %s", cfg->Name); + + ep = NATS_CALLOC(1, sizeof(microEndpoint)); + if (ep == NULL) + return micro_ErrorOutOfMemory; + ep->is_monitoring_endpoint = is_internal; + ep->m = m; + + MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); + MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); + if (err != NULL) + { + micro_free_endpoint(ep); + return err; + } + + ep->subject = fullSubject; // already malloced, will be freed in micro_free_endpoint + ep->group = g; + *new_ep = ep; + return NULL; +} + + microError * micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) { microError *err = NULL; - microEndpoint *ptr = NULL; - microEndpoint *prev_ptr = NULL; microEndpoint *ep = NULL; - microEndpoint *prev_ep = NULL; + bool update = false; if (m == NULL) return micro_ErrorInvalidArg; if (cfg == NULL) return NULL; - err = micro_new_endpoint(&ep, m, g, cfg, is_internal); - if (err != NULL) - return microError_Wrapf(err, "failed to create endpoint %s", cfg->Name); + char *fullSubject = NULL; + if (err = _subjectWithGroupPrefix(&fullSubject, g, nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject), err != NULL) + return microError_Wrapf(err, "failed to create full subject for endpoint %s", cfg->Name); + if (!micro_is_valid_subject(fullSubject)) + { + NATS_FREE(fullSubject); + return microError_Wrapf(micro_ErrorInvalidArg, "invalid subject %s for endpoint %s", fullSubject, cfg->Name); + } _lock_service(m); - if (m->stopped) + err = micro_Errorf("can't add an endpoint %s to service %s: the service is stopped", cfg->Name, m->cfg->Name); + + // See if there is already an endpoint with the same subject. ep->subject is + // immutable after the EP's creation so we don't need to lock it. + for (ep = m->first_ep; (err == NULL) && (ep != NULL); ep = ep->next) { - _unlock_service(m); - return micro_Errorf("can't add an endpoint %s to service %s: the service is stopped", cfg->Name, m->cfg->Name); + if (strcmp(ep->subject, fullSubject) == 0) + { + // Found an existing endpoint with the same subject. We will update + // it as long as we can re-use the existing subscription, which at + // the moment means we can't change the queue group settings. + if (cfg->NoQueueGroup != ep->config->NoQueueGroup) + err = micro_Errorf("can't change the queue group settings for endpoint %s", cfg->Name); + else if (!nats_StringEquals(cfg->QueueGroup, ep->config->QueueGroup)) + err = micro_Errorf("can't change the queue group for endpoint %s", cfg->Name); + if (err == NULL) + { + NATS_FREE(fullSubject); + update = true; + fullSubject = NULL; + break; + } + } } - if (m->first_ep != NULL) + if (err == NULL) { - if (strcmp(m->first_ep->subject, ep->subject) == 0) + if (update) { - ep->next = m->first_ep->next; - prev_ep = m->first_ep; - m->first_ep = ep; + // If the endpoint already exists, update its config and stats. + // This will make it use the new handler for subsequent + // requests. + micro_lock_endpoint(ep); + micro_free_cloned_endpoint_config(ep->config); + err = micro_clone_endpoint_config(&ep->config, cfg); + if (err == NULL) + memset(&ep->stats, 0, sizeof(ep->stats)); + micro_unlock_endpoint(ep); } else { - prev_ptr = m->first_ep; - for (ptr = m->first_ep->next; ptr != NULL; prev_ptr = ptr, ptr = ptr->next) + err = _new_endpoint(&ep, m, g, cfg, is_internal, fullSubject); + if (err == NULL) { - if (strcmp(ptr->subject, ep->subject) == 0) + // Set up the endpoint in the service before it starts + // processing messages. + ep->next = m->first_ep; + m->first_ep = ep; + m->numEndpoints++; + + err = micro_start_endpoint(ep); + if (err != NULL) { - ep->next = ptr->next; - prev_ptr->next = ep; - prev_ep = ptr; - break; + // Unwind, remove the endpoint from the list. + m->first_ep = ep->next; + m->numEndpoints--; + micro_free_endpoint(ep); } } - if (prev_ep == NULL) - { - prev_ptr->next = ep; - } } } - else - { - m->first_ep = ep; - } _unlock_service(m); - - if (prev_ep != NULL) - { - // Rid of the previous endpoint with the same subject, if any. If this - // fails we can return the error, leave the newly added endpoint in the - // list, not started. A retry with the same subject will clean it up. - if (err = micro_stop_endpoint(prev_ep), err != NULL) - return err; - micro_release_endpoint(prev_ep); - } - - // retain `m` before the endpoint uses it for its on_complete callback. - _retain_service(m); - - if (err = micro_start_endpoint(ep), err != NULL) - { - // Best effort, leave the new endpoint in the list, as is. A retry with - // the same name will clean it up. - _release_service(m); - return microError_Wrapf(err, "failed to start endpoint %s", ep->name); - } + if (err != NULL) + return microError_Wrapf(err, "can't add an endpoint %s to service %s", cfg->Name, m->cfg->Name); if (new_ep != NULL) *new_ep = ep; @@ -170,88 +243,94 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) return micro_add_endpoint(NULL, g->m, g, cfg, false); } -microError * -microService_Stop(microService *m) +static microError * +_stop_service(microService *m, bool unsubscribe, bool finalRelease) { - microError *err = NULL; - microEndpoint *ep = NULL; - bool finalize = false; - microDoneHandler doneHandler = NULL; + microError *err = NULL; + microEndpoint *ep = NULL; + int refs = 0; + int numEndpoints = 0; if (m == NULL) return micro_ErrorInvalidArg; _lock_service(m); - - if (m->stopped) + if (!m->stopped) { - _unlock_service(m); - return NULL; - } - ep = m->first_ep; + m->stopped = true; - for (; ep != NULL; ep = ep->next) - { - if (err = micro_stop_endpoint(ep), err != NULL) + if (unsubscribe) { - _unlock_service(m); - return microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->name); + for (ep = m->first_ep; ep != NULL; ep = ep->next) + { + if (err = micro_stop_endpoint(ep), err != NULL) + { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + _unlock_service(m); + return err; + } + } } - } - finalize = (m->first_ep == NULL); - if (finalize) - { - natsLib_stopServiceCallbacks(m); - m->stopped = true; - doneHandler = m->cfg->DoneHandler; + if (natsConn_removeService(m->nc, m)) + m->refs--; } - _unlock_service(m); + if ((m->refs > 0) && finalRelease) + m->refs--; - if (finalize) - { - if (doneHandler != NULL) - doneHandler(m); + refs = m->refs; + numEndpoints = m->numEndpoints; - // Relase the endpoint's server reference from `micro_add_endpoint`. - _release_service(m); - } + _unlock_service(m); + + if ((refs == 0) && (numEndpoints == 0)) + _free_service(m); return NULL; } -static bool -_find_endpoint(microEndpoint **prevp, microService *m, microEndpoint *to_find) +microError * +microService_Stop(microService *m) +{ + // Public API: stop the service, unsubscribe, but don't do the final release. + return _stop_service(m, true, false); +} + +static void +_remove_endpoint(microService *m, microEndpoint *toRemove) { microEndpoint *ep = NULL; microEndpoint *prev_ep = NULL; - if ((m == NULL) || (to_find == NULL)) - return false; + if ((m == NULL) || (toRemove == NULL)) + return; for (ep = m->first_ep; ep != NULL; ep = ep->next) { - if (ep == to_find) + if (ep == toRemove) { - *prevp = prev_ep; - return true; + m->numEndpoints--; + if (prev_ep == NULL) + m->first_ep = ep->next; + else + prev_ep->next = ep->next; + return; } prev_ep = ep; } - - return false; } -void micro_release_on_endpoint_complete(void *closure) +// Callback for when an endpoint's subscription is finished. It is called in the +// context of the subscription's delivery thread; this is where we want to call +// the service's complete callback when the last subscription is done. +void micro_release_endpoint_when_unsubscribed(void *closure) { - microEndpoint *ep = (microEndpoint *)closure; - microEndpoint *prev_ep = NULL; - microService *m = NULL; - natsSubscription *sub = NULL; - microDoneHandler doneHandler = NULL; - bool free_ep = false; - bool finalize = false; + microEndpoint *ep = (microEndpoint *)closure; + microService *m = NULL; + natsSubscription *sub = NULL; + microDoneHandler doneHandler = NULL; + int refs = 0; if (ep == NULL) return; @@ -261,55 +340,35 @@ void micro_release_on_endpoint_complete(void *closure) return; micro_lock_endpoint(ep); - ep->is_draining = false; sub = ep->sub; - ep->sub = NULL; - ep->refs--; - free_ep = (ep->refs == 0); + ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. + refs = --(ep->refs); micro_unlock_endpoint(ep); - // Force the subscription to be destroyed now. natsSubscription_Destroy(sub); + // If this is the last endpoint, we need to notify the service's done + // callback. _lock_service(m); - // Release the service reference for the completed endpoint. It can not be - // the last reference, so no need to free m. - m->refs--; + _remove_endpoint(m, ep); - // Unlink the endpoint from the service. - if (_find_endpoint(&prev_ep, m, ep)) - { - if (prev_ep != NULL) - { - prev_ep->next = ep->next; - } - else - { - m->first_ep = ep->next; - } - } + if (refs == 0) + micro_free_endpoint(ep); - finalize = (!m->stopped) && (m->first_ep == NULL); - if (finalize) - { - natsLib_stopServiceCallbacks(m); - m->stopped = true; + if (m->numEndpoints == 0) doneHandler = m->cfg->DoneHandler; - } + refs = m->refs; _unlock_service(m); - if (free_ep) - micro_free_endpoint(ep); - - if (finalize) + // Special processing for the last endpoint. + if (doneHandler != NULL) { - if (doneHandler != NULL) - doneHandler(m); + doneHandler(m); - // Relase the endpoint's server reference from `micro_add_endpoint`. - _release_service(m); + if (refs == 0) + _free_service(m); } } @@ -330,14 +389,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - microError *err = NULL; - - err = microService_Stop(m); - if (err != NULL) - return err; - - _release_service(m); - return NULL; + return _stop_service(m, true, true); } microError * @@ -370,44 +422,12 @@ _new_service(microService **ptr, natsConnection *nc) if (*ptr == NULL) return micro_ErrorOutOfMemory; - natsConn_retain(nc); (*ptr)->refs = 1; (*ptr)->nc = nc; (*ptr)->started = nats_Now() * 1000000; return NULL; } -static void -_retain_service(microService *m) -{ - if (m == NULL) - return; - - _lock_service(m); - - ++(m->refs); - - _unlock_service(m); -} - -static void -_release_service(microService *m) -{ - int refs = 0; - - if (m == NULL) - return; - - _lock_service(m); - - refs = --(m->refs); - - _unlock_service(m); - - if (refs == 0) - _free_service(m); -} - static inline void _free_cloned_group_config(microGroupConfig *cfg) { @@ -450,7 +470,6 @@ _free_service(microService *m) } _free_cloned_service_config(m->cfg); - natsConn_release(m->nc); natsMutex_Destroy(m->service_mu); NATS_FREE(m); } @@ -512,98 +531,18 @@ _free_cloned_service_config(microServiceConfig *cfg) NATS_FREE(cfg); } -static microError * -_start_service_callbacks(microService *m) -{ - natsStatus s = NATS_OK; - - if (m == NULL) - return micro_ErrorInvalidArg; - - // Extra reference to the service as long as its callbacks are registered. - _retain_service(m); - - s = natsLib_startServiceCallbacks(m); - if (s != NATS_OK) - { - _release_service(m); - } - - return micro_ErrorFromStatus(s); -} - -static microError * -_services_for_connection(microService ***to_call, int *num_microservices, natsConnection *nc) -{ - natsMutex *mu = natsLib_getServiceCallbackMutex(); - natsHash *h = natsLib_getAllServicesToCallback(); - microService *m = NULL; - microService **p = NULL; - natsHashIter iter; - int n = 0; - int i; - - natsMutex_Lock(mu); - - natsHashIter_Init(&iter, h); - while (natsHashIter_Next(&iter, NULL, (void **)&m)) - if (m->nc == nc) - n++; - natsHashIter_Done(&iter); - if (n > 0) - { - p = NATS_CALLOC(n, sizeof(microService *)); - if (p == NULL) - { - natsMutex_Unlock(mu); - return micro_ErrorOutOfMemory; - } - - natsHashIter_Init(&iter, h); - i = 0; - while (natsHashIter_Next(&iter, NULL, (void **)&m)) - { - if (m->nc == nc) - { - _retain_service(m); // for the callback - p[i++] = m; - } - } - natsHashIter_Done(&iter); - } - - natsMutex_Unlock(mu); - - *to_call = p; - *num_microservices = n; - return NULL; -} - static void _on_connection_closed(natsConnection *nc, void *ignored) { microService *m = NULL; - microService **to_call = NULL; - microError *err = NULL; - int n = 0; - int i; - - err = _services_for_connection(&to_call, &n, nc); - if (err != NULL) - { - microError_Ignore(err); - return; - } + microService **all = NULL; - for (i = 0; i < n; i++) + int n = natsConn_getServices(&all, nc); + for (int i = 0; i < n; i++) { - m = to_call[i]; - microError_Ignore(microService_Stop(m)); - - _release_service(m); + m = all[i]; + _stop_service(m, false, false); // subs will be terminated by the connection close. } - - NATS_FREE(to_call); } static void @@ -633,20 +572,14 @@ _on_service_error(microService *m, const char *subject, natsStatus s) microError_Destroy(err); } micro_release_endpoint(ep); // after the callback - - // TODO: Should we stop the service? The Go client does. - microError_Ignore(microService_Stop(m)); } static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used) { microService *m = NULL; - microService **to_call = NULL; - microError *err = NULL; + microService **all = NULL; const char *subject = NULL; - int n = 0; - int i; if (sub == NULL) { @@ -654,38 +587,13 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use } subject = natsSubscription_GetSubject(sub); - // `to_call` will have a list of retained service pointers. - err = _services_for_connection(&to_call, &n, nc); - if (err != NULL) - { - microError_Ignore(err); - return; - } - - for (i = 0; i < n; i++) + int n = natsConn_getServices(&all, nc); + for (int i = 0; i < n; i++) { - m = to_call[i]; + m = all[i]; _on_service_error(m, subject, s); - _release_service(m); // release the extra ref in `to_call`. + _stop_service(m, true, false); } - - NATS_FREE(to_call); -} - -static microError * -_wrap_connection_event_callbacks(microService *m) -{ - microError *err = NULL; - - if ((m == NULL) || (m->nc == NULL) || (m->nc->opts == NULL)) - return micro_ErrorInvalidArg; - - // The new service must be in the list for this to work. - MICRO_CALL(err, _start_service_callbacks(m)); - MICRO_CALL(err, micro_ErrorFromStatus( - natsOptions_setMicroCallbacks(m->nc->opts, _on_connection_closed, _on_error))); - - return microError_Wrapf(err, "failed to wrap connection event callbacks"); } static inline microError * @@ -848,7 +756,7 @@ microService_GetInfo(microServiceInfo **new_info, microService *m) { if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL)) { - MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->name)); + MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->config->Name)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Subject, ep->subject)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); MICRO_CALL(err, micro_ErrorFromStatus( @@ -947,7 +855,7 @@ microService_GetStats(microServiceStats **new_stats, microService *m) // copy the entire struct, including the last error buffer. stats->Endpoints[len] = ep->stats; - MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->name)); + MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->config->Name)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Subject, ep->subject)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); if (err == NULL) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index efe72f41d..a14cbc830 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -16,53 +16,11 @@ #include "microp.h" #include "util.h" -static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src); - static void _handle_request(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); static void _retain_endpoint(microEndpoint *ep, bool lock); static void _release_endpoint(microEndpoint *ep); -microError * -micro_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) -{ - microError *err = NULL; - microEndpoint *ep = NULL; - const char *subj; - - if (cfg == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); - if (!micro_is_valid_name(cfg->Name)) - return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name %s", cfg->Name); - if (cfg->Handler == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for %s", cfg->Name); - - if ((cfg->Subject != NULL) && !micro_is_valid_subject(cfg->Subject)) - return micro_ErrorInvalidArg; - - subj = nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject; - - ep = NATS_CALLOC(1, sizeof(microEndpoint)); - if (ep == NULL) - return micro_ErrorOutOfMemory; - ep->is_monitoring_endpoint = is_internal; - ep->m = m; - - MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); - MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); - MICRO_CALL(err, micro_strdup(&ep->name, cfg->Name)); - MICRO_CALL(err, _subjectWithGroupPrefix(&ep->subject, g, subj)); - if (err != NULL) - { - micro_free_endpoint(ep); - return err; - } - - ep->group = g; - *new_ep = ep; - return NULL; -} - const char * micro_queue_group_for_endpoint(microEndpoint *ep) { @@ -113,10 +71,9 @@ micro_start_endpoint(microEndpoint *ep) micro_lock_endpoint(ep); ep->refs++; ep->sub = sub; - ep->is_draining = false; micro_unlock_endpoint(ep); - natsSubscription_SetOnCompleteCB(sub, micro_release_on_endpoint_complete, ep); + natsSubscription_SetOnCompleteCB(sub, micro_release_endpoint_when_unsubscribed, ep); } else { @@ -132,31 +89,20 @@ micro_stop_endpoint(microEndpoint *ep) natsStatus s = NATS_OK; natsSubscription *sub = NULL; - if ((ep == NULL) || (ep->m == NULL)) + if (ep == NULL) return NULL; micro_lock_endpoint(ep); sub = ep->sub; - - if (ep->is_draining || natsConnection_IsClosed(ep->m->nc) || !natsSubscription_IsValid(sub)) - { - // If stopping, _release_on_endpoint_complete will take care of - // finalizing, nothing else to do. In other cases - // _release_on_endpoint_complete has already been called. - micro_unlock_endpoint(ep); - return NULL; - } - - ep->is_draining = true; micro_unlock_endpoint(ep); + if (sub == NULL) + return NULL; - // When the drain is complete, will release the final ref on ep. + // When the drain is complete, the callback will free ep. We may get an + // NATS_INVALID_SUBSCRIPTION if the subscription is already closed. s = natsSubscription_Drain(sub); - if (s != NATS_OK) - { - return microError_Wrapf(micro_ErrorFromStatus(s), - "failed to stop endpoint %s: failed to drain subscription", ep->name); - } + if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION)) + return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); return NULL; } @@ -195,7 +141,6 @@ void micro_free_endpoint(microEndpoint *ep) if (ep == NULL) return; - NATS_FREE(ep->name); NATS_FREE(ep->subject); natsSubscription_Destroy(ep->sub); natsMutex_Destroy(ep->endpoint_mu); @@ -334,7 +279,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) microEndpointConfig *new_cfg = NULL; if (out == NULL) - return micro_ErrorInvalidArg; + return microError_Wrapf(micro_ErrorInvalidArg, "failed to clone endpoint config"); if (cfg == NULL) { @@ -357,6 +302,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) if (err != NULL) { micro_free_cloned_endpoint_config(new_cfg); + return microError_Wrapf(err, "failed to clone endpoint config %s", cfg->Name); return err; } @@ -434,26 +380,3 @@ bool micro_match_endpoint_subject(const char *ep_subject, const char *actual_sub } } -static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) -{ - size_t len = strlen(src) + 1; - char *p; - - if (g != NULL) - len += strlen(g->config->Prefix) + 1; - - *dst = NATS_CALLOC(1, len); - if (*dst == NULL) - return micro_ErrorOutOfMemory; - - p = *dst; - if (g != NULL) - { - len = strlen(g->config->Prefix); - memcpy(p, g->config->Prefix, len); - p[len] = '.'; - p += len + 1; - } - memcpy(p, src, strlen(src) + 1); - return NULL; -} diff --git a/src/microp.h b/src/microp.h index 8184691c4..c2262ffa4 100644 --- a/src/microp.h +++ b/src/microp.h @@ -49,7 +49,6 @@ struct micro_endpoint_s { // The name and subject that the endpoint is listening on (may be different // from one specified in config). - char *name; char *subject; // A copy of the config provided to add_endpoint. @@ -75,7 +74,6 @@ struct micro_endpoint_s // Mutex for starting/stopping the endpoint, and for updating the stats. natsMutex *endpoint_mu; int refs; - bool is_draining; // The subscription for the endpoint. If NULL, the endpoint is stopped. natsSubscription *sub; @@ -110,7 +108,9 @@ struct micro_service_s natsMutex *service_mu; int refs; + // a linked list of endpoints. struct micro_endpoint_s *first_ep; + int numEndpoints; int64_t started; // UTC time expressed as number of nanoseconds since epoch. bool stopped; @@ -153,7 +153,7 @@ void micro_free_cloned_endpoint_config(microEndpointConfig *cfg); void micro_free_endpoint(microEndpoint *ep); void micro_free_request(microRequest *req); void micro_release_endpoint(microEndpoint *ep); -void micro_release_on_endpoint_complete(void *closure); +void micro_release_endpoint_when_unsubscribed(void *closure); void micro_retain_endpoint(microEndpoint *ep); void micro_update_last_error(microEndpoint *ep, microError *err); const char *micro_queue_group_for_endpoint(microEndpoint *ep); diff --git a/src/natsp.h b/src/natsp.h index 2f99de066..e2e4a0189 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -699,6 +699,9 @@ struct __natsConnection natsHash *subs; natsMutex *subsMu; + microService **services; + int numServices; + natsConnStatus status; bool initc; // true if the connection is performing the initial connect bool ar; // abort reconnect attempts @@ -778,18 +781,6 @@ nats_sslRegisterThreadForCleanup(void); void nats_setNATSThreadKey(void); -natsStatus -natsLib_startServiceCallbacks(microService *m); - -void -natsLib_stopServiceCallbacks(microService *m); - -natsMutex* -natsLib_getServiceCallbackMutex(void); - -natsHash* -natsLib_getAllServicesToCallback(void); - // // Threads // diff --git a/test/test.c b/test/test.c index c2792f282..8abee5065 100644 --- a/test/test.c +++ b/test/test.c @@ -33582,39 +33582,30 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer testCond(true); } -static void -_waitForMicroservicesAllDone(struct threadArg *arg) -{ - natsStatus s = NATS_OK; - - test("Wait for all microservices to stop: "); - natsMutex_Lock(arg->m); - while ((s != NATS_TIMEOUT) && !arg->microAllDone) - s = natsCondition_TimedWait(arg->c, arg->m, 1000); - natsMutex_Unlock(arg->m); - testCond((NATS_OK == s) && arg->microAllDone); - - // `Done` may be immediately followed by freeing the service, so wait a bit - // to make sure it happens before the test exits. - nats_Sleep(20); -} - -static void -_destroyMicroservicesAndWaitForAllDone(microService** svcs, int n, struct threadArg *arg) -{ - char buf[64]; - - snprintf(buf, sizeof(buf), "Wait for all %d microservices to stop: ", n); - test(buf); - - for (int i = 0; i < n; i++) - { - if (NULL != microService_Destroy(svcs[i])) - FAIL("Unable to destroy microservice!"); - } - - _waitForMicroservicesAllDone(arg); -} +#define _waitForMicroservicesAllDone(_arg) \ + { \ + nats_Sleep(20); \ + natsMutex_Lock((_arg)->m); \ + testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ + natsStatus waitStatus = NATS_OK; \ + bool allDone = false; \ + while ((waitStatus != NATS_TIMEOUT) && !(_arg)->microAllDone) \ + waitStatus = natsCondition_TimedWait((_arg)->c, (_arg)->m, 1000); \ + allDone = (_arg)->microAllDone; \ + natsMutex_Unlock((_arg)->m); \ + testCond((NATS_OK == waitStatus) && allDone); \ + } + +#define _destroyMicroservice(_s) \ + testf("Destroy microservice %s: ", (_s)->cfg->Name); \ + microError *_err = microService_Destroy(_s); \ + if (_err != NULL) \ + { \ + char _buf[256]; \ + FAILf("Unable to destroy microservice: %s", microError_String(_err, _buf, sizeof(_buf))); \ + microError_Destroy(_err); \ + } \ + testCond(true); typedef struct { @@ -33844,14 +33835,8 @@ void test_MicroAddService(void) } microServiceInfo_Destroy(info); - - if (m != NULL) - { - snprintf(buf, sizeof(buf), "%s: Destroy service: %d", m->cfg->Name, m->refs); - test(buf); - testCond(NULL == microService_Destroy(m)); - _waitForMicroservicesAllDone(&arg); - } + _destroyMicroservice(m); + _waitForMicroservicesAllDone(&arg); } test("Destroy the test connection: "); @@ -33887,11 +33872,11 @@ void test_MicroGroups(void) }; const char* expected_subjects[] = { - "ep1", - "g1.ep1", - "g1.g2.ep1", - "g1.g2.ep2", "g1.ep2", + "g1.g2.ep2", + "g1.g2.ep1", + "g1.ep1", + "ep1", }; int expected_num_endpoints = sizeof(expected_subjects) / sizeof(expected_subjects[0]); @@ -33941,7 +33926,7 @@ void test_MicroGroups(void) if (err != NULL) FAIL("failed to get service info!") - test("Verify number of endpoints: "); + testf("Verify number of endpoints %d is %d: ", info->EndpointsLen, expected_num_endpoints); testCond(info->EndpointsLen == expected_num_endpoints); test("Verify endpoint subjects: "); @@ -33957,7 +33942,7 @@ void test_MicroGroups(void) microServiceInfo_Destroy(info); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34062,14 +34047,14 @@ void test_MicroQueueGroupForEndpoint(void) testCond((err == NULL) && (info != NULL) && (info->EndpointsLen == 3) && (stats != NULL) && (stats->EndpointsLen == 3) && - (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[0].QueueGroup)) && - (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[2].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[2].QueueGroup)) && (_testQueueGroup(tc.expectedGroup1Level, stats->Endpoints[1].QueueGroup)) && (_testQueueGroup(tc.expectedGroup1Level, info->Endpoints[1].QueueGroup)) && - (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[2].QueueGroup)) && - (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[2].QueueGroup))); + (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[0].QueueGroup))); - microService_Destroy(service); + _destroyMicroservice(service); _waitForMicroservicesAllDone(&arg); microServiceInfo_Destroy(info); microServiceStats_Destroy(stats); @@ -34234,27 +34219,27 @@ void test_MicroBasics(void) s = nats_JSONGetArrayObject(js, "endpoints", &array, &array_len); testCond((NATS_OK == s) && (array != NULL) && (array_len == 2)); - test("Validate INFO svc.do endpoint: "); - md = NULL; - testCond( - (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "do") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.do") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) - && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (md == NULL) - ); - test("Validate INFO unused endpoint with metadata: "); md = NULL; testCond( - (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "unused") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.unused") == 0) + (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "unused") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.unused") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) - && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) + && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (NATS_OK == nats_JSONGetStrPtr(md, "key1", &str)) && (strcmp(str, "value1") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key2", &str)) && (strcmp(str, "value2") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key3", &str)) && (strcmp(str, "value3") == 0) ); + test("Validate INFO svc.do endpoint: "); + md = NULL; + testCond( + (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[1], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) + && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) && (md == NULL) + ); + nats_JSONDestroy(js); natsMsg_Destroy(reply); NATS_FREE(array); @@ -34331,13 +34316,13 @@ void test_MicroBasics(void) test("Ensure endpoint 0 has num_requests: "); n = 0; - s = nats_JSONGetInt(array[0], "num_requests", &n); + s = nats_JSONGetInt(array[1], "num_requests", &n); testCond(NATS_OK == s); num_requests += n; test("Ensure endpoint 0 has num_errors: "); n = 0; - s = nats_JSONGetInt(array[0], "num_errors", &n); + s = nats_JSONGetInt(array[1], "num_errors", &n); testCond(NATS_OK == s); num_errors += n; @@ -34354,7 +34339,11 @@ void test_MicroBasics(void) natsInbox_Destroy(inbox); NATS_FREE(subject); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34422,7 +34411,11 @@ void test_MicroStartStop(void) } testCond(NATS_OK == s); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34495,8 +34488,7 @@ void test_MicroServiceStopsOnClosedConn(void) test("Test microservice is stopped: "); testCond(microService_IsStopped(m)); - test("Destroy microservice (final): "); - testCond(NULL == microService_Destroy(m)) + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); natsOptions_Destroy(opts); @@ -34513,6 +34505,9 @@ void test_MicroServiceStopsWhenServerStops(void) natsPid serverPid = NATS_INVALID_PID; struct threadArg arg; microService *m = NULL; + microEndpointConfig ep_cfg = { + .Handler = _microHandleRequest42, + }; microServiceConfig cfg = { .Name = "test", .Version = "1.0.0", @@ -34538,24 +34533,35 @@ void test_MicroServiceStopsWhenServerStops(void) _startMicroservice(&m, nc, &cfg, NULL, 0, &arg); + const int numEndpoints = 4; + for (int i=0; i < numEndpoints; i++) + { + char buf[32]; + testf("Add endpoint %d: ", i); + snprintf(buf, sizeof(buf), "do-%d", i); + ep_cfg.Subject = buf; + ep_cfg.Name = buf; + testCond(NULL == microService_AddEndpoint(m, &ep_cfg)); + } + test("Test microservice is running: "); testCond(!microService_IsStopped(m)) + testf("Check that the service has %d endpoints: ", numEndpoints); + microServiceInfo *info = NULL; + microError *err = microService_GetInfo(&info, m); + testCond((err == NULL) && (info->EndpointsLen == numEndpoints)); + microServiceInfo_Destroy(info); + test("Stop the server: "); testCond((_stopServer(serverPid), true)); - test("Wait for the service to stop: "); - natsMutex_Lock(arg.m); - while ((s != NATS_TIMEOUT) && !arg.microAllDone) - s = natsCondition_TimedWait(arg.c, arg.m, 1000); - testCond(arg.microAllDone); - natsMutex_Unlock(arg.m); + _waitForMicroservicesAllDone(&arg); test("Test microservice is not running: "); testCond(microService_IsStopped(m)) - microService_Destroy(m); - _waitForMicroservicesAllDone(&arg); + _destroyMicroservice(m); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34663,7 +34669,7 @@ void test_MicroAsyncErrorHandlerMaxPendingMsgs(void) testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); natsMutex_Unlock(arg.m); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34745,7 +34751,7 @@ void test_MicroAsyncErrorHandlerMaxPendingBytes(void) natsMutex_Unlock(arg.m); testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); diff --git a/test/test.h b/test/test.h index 6a9bc8edf..9e5c32f03 100644 --- a/test/test.h +++ b/test/test.h @@ -33,6 +33,15 @@ static const char *clusterName = "test-cluster"; return; \ } +#define FAILf(f, ...) \ + { \ + printf("@@ "); \ + printf((f), __VA_ARGS__); \ + printf(" @@\n"); \ + failed = true; \ + return; \ + } + #define CHECK_SERVER_STARTED(p) \ if ((p) == NATS_INVALID_PID) \ FAIL("Unable to start or verify that the server was started!") From bc40ea7bab34110c8cd9c33281f6b9cb6a97598f Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 1 Nov 2024 10:04:18 -0700 Subject: [PATCH 03/13] Adjusted test parameters, REPEAT micro on travis --- buildOnTravis.sh | 3 ++- test/test.c | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/buildOnTravis.sh b/buildOnTravis.sh index 8bc7da678..afa339a2e 100755 --- a/buildOnTravis.sh +++ b/buildOnTravis.sh @@ -75,7 +75,8 @@ fi export NATS_TEST_TRAVIS=yes echo "Using NATS server version: $NATS_TEST_SERVER_VERSION" -ctest -L 'test' --timeout 60 --output-on-failure $4 +ctest -R 'Micro' --timeout 60 --output-on-failure $4 --repeat-until-fail 20 +# ctest -L 'test' --timeout 60 --output-on-failure $4 res=$? if [ $res -ne 0 ]; then exit $res diff --git a/test/test.c b/test/test.c index 8abee5065..ac6843746 100644 --- a/test/test.c +++ b/test/test.c @@ -33584,7 +33584,7 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(20); \ + nats_Sleep(50); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ @@ -34533,7 +34533,7 @@ void test_MicroServiceStopsWhenServerStops(void) _startMicroservice(&m, nc, &cfg, NULL, 0, &arg); - const int numEndpoints = 4; + const int numEndpoints = 50; for (int i=0; i < numEndpoints; i++) { char buf[32]; From 8479c081dc5bf9b99f54ed62636ff18a6ab048cb Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 16:25:39 -0800 Subject: [PATCH 04/13] fixes, cleannup --- src/conn.c | 69 +----------------- src/conn.h | 9 --- src/micro.c | 201 ++++++++++++++++++++++++++++++++++------------------ src/natsp.h | 1 + test/test.c | 31 ++++---- 5 files changed, 153 insertions(+), 158 deletions(-) diff --git a/src/conn.c b/src/conn.c index 415f00e9e..c7dbeac31 100644 --- a/src/conn.c +++ b/src/conn.c @@ -204,6 +204,7 @@ _freeConn(natsConnection *nc) natsStrHash_Destroy(nc->respMap); natsCondition_Destroy(nc->reconnectCond); natsMutex_Destroy(nc->subsMu); + natsMutex_Destroy(nc->servicesMu); natsMutex_Destroy(nc->mu); NATS_FREE(nc->services); @@ -3239,6 +3240,8 @@ natsConn_create(natsConnection **newConn, natsOptions *options) s = natsMutex_Create(&(nc->mu)); if (s == NATS_OK) s = natsMutex_Create(&(nc->subsMu)); + if (s == NATS_OK) + s = natsMutex_Create(&(nc->servicesMu)); if (s == NATS_OK) s = _setupServerPool(nc); if (s == NATS_OK) @@ -4467,69 +4470,3 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus } fflush(stderr); } - -int natsConn_getServices(microService ***services, natsConnection *nc) -{ - int numServices = 0; - natsConn_Lock(nc); - *services = nc->services; - numServices = nc->numServices; - natsConn_Unlock(nc); - return numServices; -} - -bool natsConn_removeService(natsConnection *nc, microService *service) -{ - bool removed = false; - if (nc == NULL || service == NULL) - return false; - - natsConn_Lock(nc); - for (int i = 0; i < nc->numServices; i++) - { - if (nc->services[i] == service) - { - for (int j = i; j < nc->numServices - 1; j++) - { - nc->services[j] = nc->services[j + 1]; - } - nc->numServices--; - removed = true; - break; - } - } - natsConn_Unlock(nc); - return removed; -} - -natsStatus natsConn_addService(natsConnection *nc, microService *service) -{ - natsStatus s = NATS_OK; - if (nc == NULL || service == NULL) - return nats_setDefaultError(NATS_INVALID_ARG); - - natsConn_Lock(nc); - if (nc->services == NULL) - { - nc->services = NATS_CALLOC(1, sizeof(microService *)); - if (nc->services == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - } - else - { - microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); - if (tmp == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - else - nc->services = tmp; - } - - if (s == NATS_OK) - { - nc->services[nc->numServices] = service; - nc->numServices++; - } - natsConn_Unlock(nc); - - return s; -} diff --git a/src/conn.h b/src/conn.h index 7d4e12492..41e93f402 100644 --- a/src/conn.h +++ b/src/conn.h @@ -160,13 +160,4 @@ natsConn_close(natsConnection *nc); void natsConn_destroy(natsConnection *nc, bool fromPublicDestroy); -int -natsConn_getServices(microService ***services, natsConnection *nc); - -bool -natsConn_removeService(natsConnection *nc, microService *service); - -natsStatus -natsConn_addService(natsConnection *nc, microService *service); - #endif /* CONN_H_ */ diff --git a/src/micro.c b/src/micro.c index d72812be4..17c1b2861 100644 --- a/src/micro.c +++ b/src/micro.c @@ -29,6 +29,9 @@ static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, v static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); +static microError * +_attach_service_to_connection(natsConnection *nc, microService *service); + microError * micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg) { @@ -53,7 +56,7 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c MICRO_CALL(err, _clone_service_config(&m->cfg, cfg)); // Add the service to the connection. - MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m))); + MICRO_CALL(err, _attach_service_to_connection(m->nc, m)); MICRO_CALL(err, (m->refs++, NULL)); // Wrap the connection callbacks before we subscribe to anything. @@ -244,7 +247,62 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) } static microError * -_stop_service(microService *m, bool unsubscribe, bool finalRelease) +_attach_service_to_connection(natsConnection *nc, microService *service) +{ + natsStatus s = NATS_OK; + if (nc == NULL || service == NULL) + return micro_ErrorInvalidArg; + + natsMutex_Lock(nc->servicesMu); + if (nc->services == NULL) + { + nc->services = NATS_CALLOC(1, sizeof(microService *)); + if (nc->services == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else + { + microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); + if (tmp == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + nc->services = tmp; + } + + if (s == NATS_OK) + { + nc->services[nc->numServices] = service; + nc->numServices++; + } + natsMutex_Unlock(nc->servicesMu); + + return micro_ErrorFromStatus(s); +} + +static void +_detach_service_from_connection(natsConnection *nc, microService *m) +{ + if (nc == NULL || m == NULL) + return; + + natsMutex_Lock(nc->servicesMu); + for (int i = 0; i < nc->numServices; i++) + { + if (nc->services[i] != m) + continue; + + for (int j = i; j < nc->numServices - 1; j++) + nc->services[j] = nc->services[j + 1]; + nc->numServices--; + break; + } + natsMutex_Unlock(nc->servicesMu); + + return; +} + +static microError * +_stop_service(microService *m, bool unsubscribe, bool release) { microError *err = NULL; microEndpoint *ep = NULL; @@ -271,17 +329,13 @@ _stop_service(microService *m, bool unsubscribe, bool finalRelease) } } } - - if (natsConn_removeService(m->nc, m)) - m->refs--; } - if ((m->refs > 0) && finalRelease) + if (release) m->refs--; refs = m->refs; numEndpoints = m->numEndpoints; - _unlock_service(m); if ((refs == 0) && (numEndpoints == 0)) @@ -293,12 +347,12 @@ _stop_service(microService *m, bool unsubscribe, bool finalRelease) microError * microService_Stop(microService *m) { - // Public API: stop the service, unsubscribe, but don't do the final release. return _stop_service(m, true, false); } +// service lock must be held by the caller. static void -_remove_endpoint(microService *m, microEndpoint *toRemove) +_detach_endpoint_from_service(microService *m, microEndpoint *toRemove) { microEndpoint *ep = NULL; microEndpoint *prev_ep = NULL; @@ -306,18 +360,19 @@ _remove_endpoint(microService *m, microEndpoint *toRemove) if ((m == NULL) || (toRemove == NULL)) return; - for (ep = m->first_ep; ep != NULL; ep = ep->next) + for (ep = m->first_ep; (ep != NULL) && (ep != toRemove); prev_ep = ep, ep = ep->next) + ; + if (ep == NULL) + return; + + m->numEndpoints--; + if (prev_ep == NULL) + m->first_ep = ep->next; + else { - if (ep == toRemove) - { - m->numEndpoints--; - if (prev_ep == NULL) - m->first_ep = ep->next; - else - prev_ep->next = ep->next; - return; - } - prev_ep = ep; + micro_lock_endpoint(prev_ep); + prev_ep->next = ep->next; + micro_unlock_endpoint(prev_ep); } } @@ -328,7 +383,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { microEndpoint *ep = (microEndpoint *)closure; microService *m = NULL; - natsSubscription *sub = NULL; microDoneHandler doneHandler = NULL; int refs = 0; @@ -339,27 +393,22 @@ void micro_release_endpoint_when_unsubscribed(void *closure) if ((m == NULL) || (m->service_mu == NULL)) return; + _lock_service(m); + micro_lock_endpoint(ep); - sub = ep->sub; - ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. + _detach_endpoint_from_service(m, ep); refs = --(ep->refs); micro_unlock_endpoint(ep); - natsSubscription_Destroy(sub); - - // If this is the last endpoint, we need to notify the service's done - // callback. - _lock_service(m); - - _remove_endpoint(m, ep); - if (refs == 0) micro_free_endpoint(ep); - if (m->numEndpoints == 0) + { + // Mark the service as stopped before calling the done handler. + m->stopped = true; doneHandler = m->cfg->DoneHandler; + } - refs = m->refs; _unlock_service(m); // Special processing for the last endpoint. @@ -367,8 +416,8 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { doneHandler(m); - if (refs == 0) - _free_service(m); + _detach_service_from_connection(m->nc, m); + _stop_service(m, false, true); // just release } } @@ -534,66 +583,80 @@ _free_cloned_service_config(microServiceConfig *cfg) static void _on_connection_closed(natsConnection *nc, void *ignored) { - microService *m = NULL; - microService **all = NULL; + natsMutex_Lock(nc->servicesMu); - int n = natsConn_getServices(&all, nc); - for (int i = 0; i < n; i++) - { - m = all[i]; - _stop_service(m, false, false); // subs will be terminated by the connection close. - } + // Stop all services. They will get detached from the connection when their + // subs are complete. + for (int i = 0; i < nc->numServices; i++) + _stop_service(nc->services[i], false, false); + + natsMutex_Unlock(nc->servicesMu); } -static void +static bool _on_service_error(microService *m, const char *subject, natsStatus s) { - microEndpoint *ep = NULL; - microError *err = NULL; + microEndpoint *found = NULL; + microError *err = NULL; if (m == NULL) - return; + return false; _lock_service(m); - for (ep = m->first_ep; - (ep != NULL) && !micro_match_endpoint_subject(ep->subject, subject); - ep = ep->next) - ; - micro_retain_endpoint(ep); // for the callback - _unlock_service(m); - if (ep != NULL) + for (microEndpoint *ep = m->first_ep; ep != NULL; ep = ep->next) { - if (m->cfg->ErrHandler != NULL) - (*m->cfg->ErrHandler)(m, ep, s); - - err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", ep->subject); - micro_update_last_error(ep, err); - microError_Destroy(err); + if (!micro_match_endpoint_subject(ep->subject, subject)) + continue; + found = ep; + break; } - micro_release_endpoint(ep); // after the callback + + if (found != NULL) + micro_retain_endpoint(found); + + _unlock_service(m); + + if (found == NULL) + return false; + + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", subject); + micro_update_last_error(found, err); + microError_Destroy(err); + + if (m->cfg->ErrHandler != NULL) + (*m->cfg->ErrHandler)(m, found, s); + + micro_release_endpoint(found); + return true; } static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used) { - microService *m = NULL; - microService **all = NULL; const char *subject = NULL; if (sub == NULL) - { return; - } + subject = natsSubscription_GetSubject(sub); - int n = natsConn_getServices(&all, nc); - for (int i = 0; i < n; i++) + // TODO: this would be a lot easier if sub had a ref to ep. + natsMutex_Lock(nc->servicesMu); + for (int i = 0; i < nc->numServices; i++) { - m = all[i]; - _on_service_error(m, subject, s); + microService *m = nc->services[i]; + + // See if the service owns the affected subscription, based on matching + // the subjects; notify it of the error. + if (!_on_service_error(m, subject, s)) + continue; + + // Stop the service in error. It will get detached from the connection + // and released when all of its subs are complete. _stop_service(m, true, false); } + natsMutex_Unlock(nc->servicesMu); } static inline microError * diff --git a/src/natsp.h b/src/natsp.h index e2e4a0189..80164238b 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -701,6 +701,7 @@ struct __natsConnection microService **services; int numServices; + natsMutex *servicesMu; natsConnStatus status; bool initc; // true if the connection is performing the initial connect diff --git a/test/test.c b/test/test.c index ac6843746..2ea2a299c 100644 --- a/test/test.c +++ b/test/test.c @@ -33561,7 +33561,7 @@ _startMicroservice(microService** new_m, natsConnection *nc, microServiceConfig static void _startMicroserviceOK(microService** new_m, natsConnection *nc, microServiceConfig *cfg, microEndpointConfig **eps, int num_eps, struct threadArg *arg) { - char buf[64]; + char buf[256]; snprintf(buf, sizeof(buf), "Start microservice %s: ", cfg->Name); test(buf); @@ -33576,6 +33576,15 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer for (i = 0; i < n; i++) { + char buf[64]; + cfg->Version = "1.0.0"; + cfg->Description = "returns 42"; + + if (nats_IsStringEmpty(cfg->Name)) + { + snprintf(buf, sizeof(buf), "CoolService-%d", i); + cfg->Name = buf; + } _startMicroserviceOK(&(svcs[i]), nc, cfg, eps, num_eps, arg); } @@ -33584,7 +33593,6 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(50); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ @@ -34098,13 +34106,11 @@ void test_MicroBasics(void) &ep2_cfg, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Metadata = (natsMetadata){ .List = (const char *[]){"skey1", "svalue1", "skey2", "svalue2"}, .Count = 2, }, + .Name = "ManyServicesSameName", .Endpoint = NULL, .State = NULL, }; @@ -34162,7 +34168,7 @@ void test_MicroBasics(void) test(buf); err = microService_GetInfo(&info, svcs[i]); testCond((err == NULL) && - (strcmp(info->Name, "CoolService") == 0) && + (strcmp(info->Name, "ManyServicesSameName") == 0) && (strlen(info->Id) > 0) && (strcmp(info->Description, "returns 42") == 0) && (strcmp(info->Version, "1.0.0") == 0) && @@ -34173,7 +34179,7 @@ void test_MicroBasics(void) // Make sure we can request valid info with $SRV.INFO request. test("Create INFO inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_INFO_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_INFO_VERB, "ManyServicesSameName", NULL); test("Subscribe to INFO inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish INFO request: "); @@ -34199,7 +34205,7 @@ void test_MicroBasics(void) snprintf(buf, sizeof(buf), "Validate INFO response strings#%d: ", i); test(buf); testCond( - (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "CoolService") == 0) + (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "ManyServicesSameName") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "description", &str)) && (strcmp(str, "returns 42") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "version", &str)) && (strcmp(str, "1.0.0") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "id", &str)) && (strlen(str) > 0) @@ -34251,7 +34257,7 @@ void test_MicroBasics(void) // Make sure we can request SRV.PING. test("Create PING inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_PING_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_PING_VERB, "ManyServicesSameName", NULL); test("Subscribe to PING inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish PING request: "); @@ -34273,7 +34279,7 @@ void test_MicroBasics(void) js = NULL; testCond((NATS_OK == nats_JSONParse(&js, reply->data, reply->dataLen)) && (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && - (strcmp(str, "CoolService") == 0)); + (strcmp(str, "ManyServicesSameName") == 0)); nats_JSONDestroy(js); natsMsg_Destroy(reply); } @@ -34284,7 +34290,7 @@ void test_MicroBasics(void) // Get and validate $SRV.STATS from all service instances. test("Create STATS inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_STATS_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_STATS_VERB, "ManyServicesSameName", NULL); test("Subscribe to STATS inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish STATS request: "); @@ -34368,9 +34374,6 @@ void test_MicroStartStop(void) .Handler = _microHandleRequest42, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Endpoint = &ep_cfg, }; natsMsg *reply = NULL; From 026dde1360e1f848b3b18f3c93935584715583c2 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 16:36:46 -0800 Subject: [PATCH 05/13] Restored Travis --- buildOnTravis.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/buildOnTravis.sh b/buildOnTravis.sh index afa339a2e..8bc7da678 100755 --- a/buildOnTravis.sh +++ b/buildOnTravis.sh @@ -75,8 +75,7 @@ fi export NATS_TEST_TRAVIS=yes echo "Using NATS server version: $NATS_TEST_SERVER_VERSION" -ctest -R 'Micro' --timeout 60 --output-on-failure $4 --repeat-until-fail 20 -# ctest -L 'test' --timeout 60 --output-on-failure $4 +ctest -L 'test' --timeout 60 --output-on-failure $4 res=$? if [ $res -ne 0 ]; then exit $res From 89b05fb12590de3fe2e0d6414b68dbdeab49301e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:16:41 -0800 Subject: [PATCH 06/13] PR Feedback: include EP name in the error description --- src/micro_endpoint.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index a14cbc830..623d68f3a 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -279,7 +279,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) microEndpointConfig *new_cfg = NULL; if (out == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "failed to clone endpoint config"); + return microError_Wrapf(err, "failed to clone endpoint config: %s", cfg->Name); if (cfg == NULL) { @@ -302,7 +302,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) if (err != NULL) { micro_free_cloned_endpoint_config(new_cfg); - return microError_Wrapf(err, "failed to clone endpoint config %s", cfg->Name); + return microError_Wrapf(err, "failed to clone endpoint config: %s", cfg->Name); return err; } From 8c3b921864ef80c1c57a976cf40645043ddaee66 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:18:27 -0800 Subject: [PATCH 07/13] PR Feedback: s/not_used/ignored --- src/micro.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/micro.c b/src/micro.c index 17c1b2861..4b7668b6c 100644 --- a/src/micro.c +++ b/src/micro.c @@ -24,7 +24,7 @@ static inline void _unlock_service(microService *m) { natsMutex_Unlock(m->servic static microError *_clone_service_config(microServiceConfig **out, microServiceConfig *cfg); static microError *_new_service(microService **ptr, natsConnection *nc); static void _on_connection_closed(natsConnection *nc, void *ignored); -static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used); +static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *ignored); static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); @@ -632,7 +632,7 @@ _on_service_error(microService *m, const char *subject, natsStatus s) } static void -_on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used) +_on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *ignored) { const char *subject = NULL; From 53d5cb79fe5363c821428bee4931f9a745f5c901 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:25:05 -0800 Subject: [PATCH 08/13] PR Feedback: better connection handling in AddService --- src/micro.c | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/src/micro.c b/src/micro.c index 4b7668b6c..e35f2a960 100644 --- a/src/micro.c +++ b/src/micro.c @@ -57,11 +57,6 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c // Add the service to the connection. MICRO_CALL(err, _attach_service_to_connection(m->nc, m)); - MICRO_CALL(err, (m->refs++, NULL)); - - // Wrap the connection callbacks before we subscribe to anything. - MICRO_CALL(err, micro_ErrorFromStatus( - natsOptions_setMicroCallbacks(m->nc->opts, _on_connection_closed, _on_error))); // Initialize the monitoring endpoints. MICRO_CALL(err, micro_init_monitoring(m)); @@ -253,28 +248,44 @@ _attach_service_to_connection(natsConnection *nc, microService *service) if (nc == NULL || service == NULL) return micro_ErrorInvalidArg; - natsMutex_Lock(nc->servicesMu); - if (nc->services == NULL) + natsConn_Lock(nc); + + if (natsConn_isClosed(nc) || natsConn_isDraining(nc)) { - nc->services = NATS_CALLOC(1, sizeof(microService *)); - if (nc->services == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); + natsConn_Unlock(nc); + return micro_Errorf("can't add service %s to a closed or draining connection", service->cfg->Name); } - else + + // Wrap the connection callbacks before we subscribe to anything. + s = natsOptions_setMicroCallbacks(nc->opts, _on_connection_closed, _on_error); + + if (s == NATS_OK) { - microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); - if (tmp == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); + natsMutex_Lock(nc->servicesMu); + if (nc->services == NULL) + { + nc->services = NATS_CALLOC(1, sizeof(microService *)); + if (nc->services == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + } else - nc->services = tmp; + { + microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); + if (tmp == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + nc->services = tmp; + } } if (s == NATS_OK) { + service->refs++; // no lock needed, called from the constructor nc->services[nc->numServices] = service; nc->numServices++; } natsMutex_Unlock(nc->servicesMu); + natsConn_Unlock(nc); return micro_ErrorFromStatus(s); } From df42953291d33128521b1786c3d1bf89ed6419e0 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:27:01 -0800 Subject: [PATCH 09/13] PR Feedback: removed extra indent --- src/micro.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/micro.c b/src/micro.c index e35f2a960..3e122168f 100644 --- a/src/micro.c +++ b/src/micro.c @@ -184,15 +184,15 @@ micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, micro { if (update) { - // If the endpoint already exists, update its config and stats. - // This will make it use the new handler for subsequent - // requests. - micro_lock_endpoint(ep); - micro_free_cloned_endpoint_config(ep->config); - err = micro_clone_endpoint_config(&ep->config, cfg); - if (err == NULL) - memset(&ep->stats, 0, sizeof(ep->stats)); - micro_unlock_endpoint(ep); + // If the endpoint already exists, update its config and stats. + // This will make it use the new handler for subsequent + // requests. + micro_lock_endpoint(ep); + micro_free_cloned_endpoint_config(ep->config); + err = micro_clone_endpoint_config(&ep->config, cfg); + if (err == NULL) + memset(&ep->stats, 0, sizeof(ep->stats)); + micro_unlock_endpoint(ep); } else { From 0d9605028f8e325a8550320d77af242663893e76 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:29:48 -0800 Subject: [PATCH 10/13] PR Feedback: just move the last array element, do not copy --- src/micro.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/micro.c b/src/micro.c index 3e122168f..c07377afa 100644 --- a/src/micro.c +++ b/src/micro.c @@ -302,8 +302,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) if (nc->services[i] != m) continue; - for (int j = i; j < nc->numServices - 1; j++) - nc->services[j] = nc->services[j + 1]; + nc->services[i] = nc->services[nc->numServices - 1]; nc->numServices--; break; } From 5fd559b8e01d5cd74f3b94ece600f2fc02a9b369 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:30:41 -0800 Subject: [PATCH 11/13] PR feedback: removed unneeded return --- src/micro.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/micro.c b/src/micro.c index c07377afa..aebdd757c 100644 --- a/src/micro.c +++ b/src/micro.c @@ -307,8 +307,6 @@ _detach_service_from_connection(natsConnection *nc, microService *m) break; } natsMutex_Unlock(nc->servicesMu); - - return; } static microError * From 0a197ab7ad281746b6a1b059998c01899832ae8c Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:35:39 -0800 Subject: [PATCH 12/13] PR feedback: quote display names in errors (micro) --- src/micro.c | 22 +++++++++++----------- src/micro_endpoint.c | 4 ++-- src/micro_monitoring.c | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/micro.c b/src/micro.c index aebdd757c..7baa06465 100644 --- a/src/micro.c +++ b/src/micro.c @@ -67,7 +67,7 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c if (err != NULL) { microError_Ignore(microService_Destroy(m)); - return microError_Wrapf(err, "failed to add microservice %s", cfg->Name); + return microError_Wrapf(err, "failed to add microservice '%s'", cfg->Name); } *new_m = m; @@ -107,9 +107,9 @@ _new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpo if (cfg == NULL) return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); if (!micro_is_valid_name(cfg->Name)) - return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name %s", cfg->Name); + return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name '%s'", cfg->Name); if (cfg->Handler == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for %s", cfg->Name); + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for '%s'", cfg->Name); ep = NATS_CALLOC(1, sizeof(microEndpoint)); if (ep == NULL) @@ -146,16 +146,16 @@ micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, micro char *fullSubject = NULL; if (err = _subjectWithGroupPrefix(&fullSubject, g, nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject), err != NULL) - return microError_Wrapf(err, "failed to create full subject for endpoint %s", cfg->Name); + return microError_Wrapf(err, "failed to create full subject for endpoint '%s'", cfg->Name); if (!micro_is_valid_subject(fullSubject)) { NATS_FREE(fullSubject); - return microError_Wrapf(micro_ErrorInvalidArg, "invalid subject %s for endpoint %s", fullSubject, cfg->Name); + return microError_Wrapf(micro_ErrorInvalidArg, "invalid subject '%s' for endpoint '%s'", fullSubject, cfg->Name); } _lock_service(m); if (m->stopped) - err = micro_Errorf("can't add an endpoint %s to service %s: the service is stopped", cfg->Name, m->cfg->Name); + err = micro_Errorf("can't add an endpoint '%s' to service '%s': the service is stopped", cfg->Name, m->cfg->Name); // See if there is already an endpoint with the same subject. ep->subject is // immutable after the EP's creation so we don't need to lock it. @@ -167,9 +167,9 @@ micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, micro // it as long as we can re-use the existing subscription, which at // the moment means we can't change the queue group settings. if (cfg->NoQueueGroup != ep->config->NoQueueGroup) - err = micro_Errorf("can't change the queue group settings for endpoint %s", cfg->Name); + err = micro_Errorf("can't change the queue group settings for endpoint '%s'", cfg->Name); else if (!nats_StringEquals(cfg->QueueGroup, ep->config->QueueGroup)) - err = micro_Errorf("can't change the queue group for endpoint %s", cfg->Name); + err = micro_Errorf("can't change the queue group for endpoint '%s'", cfg->Name); if (err == NULL) { NATS_FREE(fullSubject); @@ -219,7 +219,7 @@ micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, micro _unlock_service(m); if (err != NULL) - return microError_Wrapf(err, "can't add an endpoint %s to service %s", cfg->Name, m->cfg->Name); + return microError_Wrapf(err, "can't add an endpoint '%s' to service '%s'", cfg->Name, m->cfg->Name); if (new_ep != NULL) *new_ep = ep; @@ -253,7 +253,7 @@ _attach_service_to_connection(natsConnection *nc, microService *service) if (natsConn_isClosed(nc) || natsConn_isDraining(nc)) { natsConn_Unlock(nc); - return micro_Errorf("can't add service %s to a closed or draining connection", service->cfg->Name); + return micro_Errorf("can't add service '%s' to a closed or draining connection", service->cfg->Name); } // Wrap the connection callbacks before we subscribe to anything. @@ -628,7 +628,7 @@ _on_service_error(microService *m, const char *subject, natsStatus s) if (found == NULL) return false; - err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", subject); + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint '%s'", subject); micro_update_last_error(found, err); microError_Destroy(err); diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index 623d68f3a..a1a6d3b5e 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -279,7 +279,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) microEndpointConfig *new_cfg = NULL; if (out == NULL) - return microError_Wrapf(err, "failed to clone endpoint config: %s", cfg->Name); + return microError_Wrapf(err, "failed to clone endpoint config: '%s'", cfg->Name); if (cfg == NULL) { @@ -302,7 +302,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) if (err != NULL) { micro_free_cloned_endpoint_config(new_cfg); - return microError_Wrapf(err, "failed to clone endpoint config: %s", cfg->Name); + return microError_Wrapf(err, "failed to clone endpoint config: '%s'", cfg->Name); return err; } diff --git a/src/micro_monitoring.c b/src/micro_monitoring.c index c8639392b..f1a0a16fa 100644 --- a/src/micro_monitoring.c +++ b/src/micro_monitoring.c @@ -161,7 +161,7 @@ micro_new_control_subject(char **newSubject, const char *verb, const char *name, { if (nats_IsStringEmpty(name) && !nats_IsStringEmpty(id)) { - return micro_Errorf("service name is required when id is provided: %s", id); + return micro_Errorf("service name is required when id is provided: '%s'", id); } else if (nats_IsStringEmpty(name) && nats_IsStringEmpty(id)) From 9bc5e7ff01ee4365dd7a169ac94cfc15c23d9d7b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 5 Nov 2024 07:40:37 -0800 Subject: [PATCH 13/13] PR feedback: removed trailing whitespace in micro* --- examples/micro-sequence.c | 2 +- src/micro.c | 8 ++++---- src/microp.h | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/micro-sequence.c b/examples/micro-sequence.c index 2e530b2e2..b2688f367 100644 --- a/examples/micro-sequence.c +++ b/examples/micro-sequence.c @@ -139,7 +139,7 @@ static microError *handle_sequence(microRequest *req) result_len = snprintf(result, sizeof(result), "%Lf", value); if (err == NULL) err = microRequest_Respond(req, result, result_len); - + microArgs_Destroy(args); return err; } diff --git a/src/micro.c b/src/micro.c index 7baa06465..94625b609 100644 --- a/src/micro.c +++ b/src/micro.c @@ -301,7 +301,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) { if (nc->services[i] != m) continue; - + nc->services[i] = nc->services[nc->numServices - 1]; nc->numServices--; break; @@ -372,7 +372,7 @@ _detach_endpoint_from_service(microService *m, microEndpoint *toRemove) ; if (ep == NULL) return; - + m->numEndpoints--; if (prev_ep == NULL) m->first_ep = ep->next; @@ -627,14 +627,14 @@ _on_service_error(microService *m, const char *subject, natsStatus s) if (found == NULL) return false; - + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint '%s'", subject); micro_update_last_error(found, err); microError_Destroy(err); if (m->cfg->ErrHandler != NULL) (*m->cfg->ErrHandler)(m, found, s); - + micro_release_endpoint(found); return true; } diff --git a/src/microp.h b/src/microp.h index c2262ffa4..63dd77b74 100644 --- a/src/microp.h +++ b/src/microp.h @@ -53,7 +53,7 @@ struct micro_endpoint_s // A copy of the config provided to add_endpoint. microEndpointConfig *config; - + // Retained/released by the service that owns the endpoint to avoid race // conditions. microService *m;