From 21acc1490cc944d30749ad2f76416a9974a4ff19 Mon Sep 17 00:00:00 2001 From: Lev <1187448+levb@users.noreply.github.com> Date: Tue, 5 Nov 2024 10:02:40 -0800 Subject: [PATCH] [FIXED] microservice cleanup (flapping MicroServiceStops... tests) (#816) * [FIXED] microservice cleanup (flapping MicroServiceStops... tests) - Removed the global hash of services, it was the most immediate source of the deadlock. - Added the list of microservices to `natsConnection`. **NOTE** increased memory allocation for all connections, need a better way. - Fixed the flow of replacing an existing endpoint in a service. - Got rid of ep->name since it was synonimous with ep->cfg->Name - Adjusted the tests, some minor fixes. * Adjusted test parameters, REPEAT micro on travis * fixes, cleannup * Restored Travis * PR Feedback: include EP name in the error description * PR Feedback: s/not_used/ignored * PR Feedback: better connection handling in AddService * PR Feedback: removed extra indent * PR Feedback: just move the last array element, do not copy * PR feedback: removed unneeded return * PR feedback: quote display names in errors (micro) * PR feedback: removed trailing whitespace in micro* * PR feedback --- examples/micro-sequence.c | 2 +- src/conn.c | 4 + src/glib/glib.c | 41 --- src/glib/glibp.h | 5 - src/micro.c | 640 ++++++++++++++++++-------------------- src/micro_endpoint.c | 98 +----- src/micro_monitoring.c | 2 +- src/microp.h | 8 +- src/natsp.h | 16 +- test/test.c | 195 ++++++------ test/test.h | 9 + 11 files changed, 445 insertions(+), 575 deletions(-) diff --git a/examples/micro-sequence.c b/examples/micro-sequence.c index 2e530b2e2..b2688f367 100644 --- a/examples/micro-sequence.c +++ b/examples/micro-sequence.c @@ -139,7 +139,7 @@ static microError *handle_sequence(microRequest *req) result_len = snprintf(result, sizeof(result), "%Lf", value); if (err == NULL) err = microRequest_Respond(req, result, result_len); - + microArgs_Destroy(args); return err; } diff --git a/src/conn.c b/src/conn.c index 91cea7a11..1294c5cd5 100644 --- a/src/conn.c +++ b/src/conn.c @@ -204,7 +204,9 @@ _freeConn(natsConnection *nc) natsStrHash_Destroy(nc->respMap); natsCondition_Destroy(nc->reconnectCond); natsMutex_Destroy(nc->subsMu); + natsMutex_Destroy(nc->servicesMu); natsMutex_Destroy(nc->mu); + NATS_FREE(nc->services); NATS_FREE(nc); @@ -3256,6 +3258,8 @@ natsConn_create(natsConnection **newConn, natsOptions *options) s = natsMutex_Create(&(nc->mu)); if (s == NATS_OK) s = natsMutex_Create(&(nc->subsMu)); + if (s == NATS_OK) + s = natsMutex_Create(&(nc->servicesMu)); if (s == NATS_OK) s = _setupServerPool(nc); if (s == NATS_OK) diff --git a/src/glib/glib.c b/src/glib/glib.c index 4b8e4bc31..62e004ec7 100644 --- a/src/glib/glib.c +++ b/src/glib/glib.c @@ -119,8 +119,6 @@ _freeLib(void) nats_freeDispatcherPool(&gLib.replyDispatchers); natsNUID_free(); - natsMutex_Destroy(gLib.service_callback_mu); - natsHash_Destroy(gLib.all_services_to_callback); natsCondition_Destroy(gLib.cond); @@ -290,11 +288,6 @@ nats_openLib(natsClientConfig *config) if (s == NATS_OK) s = nats_initDispatcherPool(&(gLib.replyDispatchers), config->ReplyThreadPoolMax); - if (s == NATS_OK) - s = natsMutex_Create(&gLib.service_callback_mu); - if (s == NATS_OK) - s = natsHash_Create(&gLib.all_services_to_callback, 8); - if (s == NATS_OK) gLib.initialized = true; @@ -443,40 +436,6 @@ void nats_ReleaseThreadMemory(void) natsMutex_Unlock(gLib.lock); } -natsStatus -natsLib_startServiceCallbacks(microService *m) -{ - natsStatus s; - - natsMutex_Lock(gLib.service_callback_mu); - s = natsHash_Set(gLib.all_services_to_callback, (int64_t)m, (void *)m, NULL); - natsMutex_Unlock(gLib.service_callback_mu); - - return NATS_UPDATE_ERR_STACK(s); -} - -void natsLib_stopServiceCallbacks(microService *m) -{ - if (m == NULL) - return; - - natsMutex_Lock(gLib.service_callback_mu); - natsHash_Remove(gLib.all_services_to_callback, (int64_t)m); - natsMutex_Unlock(gLib.service_callback_mu); -} - -natsMutex * -natsLib_getServiceCallbackMutex(void) -{ - return gLib.service_callback_mu; -} - -natsHash * -natsLib_getAllServicesToCallback(void) -{ - return gLib.all_services_to_callback; -} - natsClientConfig *nats_testInspectClientConfig(void) { // Immutable after startup diff --git a/src/glib/glibp.h b/src/glib/glibp.h index 9babc60d6..fa658f139 100644 --- a/src/glib/glibp.h +++ b/src/glib/glibp.h @@ -117,11 +117,6 @@ typedef struct __natsLib natsGCList gc; - // For micro services code - natsMutex *service_callback_mu; - // uses `microService*` as the key and the value. - natsHash *all_services_to_callback; - } natsLib; natsLib *nats_lib(void); diff --git a/src/micro.c b/src/micro.c index 354824184..ba4090a37 100644 --- a/src/micro.c +++ b/src/micro.c @@ -23,12 +23,14 @@ static inline void _unlock_service(microService *m) { natsMutex_Unlock(m->servic static microError *_clone_service_config(microServiceConfig **out, microServiceConfig *cfg); static microError *_new_service(microService **ptr, natsConnection *nc); -static microError *_wrap_connection_event_callbacks(microService *m); +static void _on_connection_closed(natsConnection *nc, void *ignored); +static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *ignored); static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); -static void _release_service(microService *m); -static void _retain_service(microService *m); + +static microError * +_attach_service_to_connection(natsConnection *nc, microService *service); microError * micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg) @@ -53,102 +55,171 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c MICRO_CALL(err, _clone_service_config(&m->cfg, cfg)); - // Wrap the connection callbacks before we subscribe to anything. - MICRO_CALL(err, _wrap_connection_event_callbacks(m)); + // Add the service to the connection. + MICRO_CALL(err, _attach_service_to_connection(m->nc, m)); + // Initialize the monitoring endpoints. MICRO_CALL(err, micro_init_monitoring(m)); + + // Add the default endpoint. MICRO_CALL(err, microService_AddEndpoint(m, cfg->Endpoint)); if (err != NULL) { microError_Ignore(microService_Destroy(m)); - return microError_Wrapf(err, "failed to add microservice %s", cfg->Name); + return microError_Wrapf(err, "failed to add microservice '%s'", cfg->Name); } *new_m = m; return NULL; } +static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) +{ + size_t len = strlen(src) + 1; + char *p; + + if (g != NULL) + len += strlen(g->config->Prefix) + 1; + + *dst = NATS_CALLOC(1, len); + if (*dst == NULL) + return micro_ErrorOutOfMemory; + + p = *dst; + if (g != NULL) + { + len = strlen(g->config->Prefix); + memcpy(p, g->config->Prefix, len); + p[len] = '.'; + p += len + 1; + } + memcpy(p, src, strlen(src) + 1); + return NULL; +} + +microError * +_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal, char *fullSubject) +{ + microError *err = NULL; + microEndpoint *ep = NULL; + + if (cfg == NULL) + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); + if (!micro_is_valid_name(cfg->Name)) + return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name '%s'", cfg->Name); + if (cfg->Handler == NULL) + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for '%s'", cfg->Name); + + ep = NATS_CALLOC(1, sizeof(microEndpoint)); + if (ep == NULL) + return micro_ErrorOutOfMemory; + ep->is_monitoring_endpoint = is_internal; + ep->m = m; + + MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); + MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); + if (err != NULL) + { + micro_free_endpoint(ep); + return err; + } + + ep->subject = fullSubject; // already malloced, will be freed in micro_free_endpoint + ep->group = g; + *new_ep = ep; + return NULL; +} + + microError * micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) { microError *err = NULL; - microEndpoint *ptr = NULL; - microEndpoint *prev_ptr = NULL; microEndpoint *ep = NULL; - microEndpoint *prev_ep = NULL; + bool update = false; if (m == NULL) return micro_ErrorInvalidArg; if (cfg == NULL) return NULL; - err = micro_new_endpoint(&ep, m, g, cfg, is_internal); - if (err != NULL) - return microError_Wrapf(err, "failed to create endpoint %s", cfg->Name); + char *fullSubject = NULL; + if (err = _subjectWithGroupPrefix(&fullSubject, g, nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject), err != NULL) + return microError_Wrapf(err, "failed to create full subject for endpoint '%s'", cfg->Name); + if (!micro_is_valid_subject(fullSubject)) + { + NATS_FREE(fullSubject); + return microError_Wrapf(micro_ErrorInvalidArg, "invalid subject '%s' for endpoint '%s'", fullSubject, cfg->Name); + } _lock_service(m); - if (m->stopped) + err = micro_Errorf("can't add an endpoint '%s' to service '%s': the service is stopped", cfg->Name, m->cfg->Name); + + // See if there is already an endpoint with the same subject. ep->subject is + // immutable after the EP's creation so we don't need to lock it. + for (ep = m->first_ep; (err == NULL) && (ep != NULL); ep = ep->next) { - _unlock_service(m); - return micro_Errorf("can't add an endpoint %s to service %s: the service is stopped", cfg->Name, m->cfg->Name); + if (strcmp(ep->subject, fullSubject) == 0) + { + // Found an existing endpoint with the same subject. We will update + // it as long as we can re-use the existing subscription, which at + // the moment means we can't change the queue group settings. + if (cfg->NoQueueGroup != ep->config->NoQueueGroup) + err = micro_Errorf("can't change the queue group settings for endpoint '%s'", cfg->Name); + else if (!nats_StringEquals(cfg->QueueGroup, ep->config->QueueGroup)) + err = micro_Errorf("can't change the queue group for endpoint '%s'", cfg->Name); + if (err == NULL) + { + NATS_FREE(fullSubject); + update = true; + fullSubject = NULL; + break; + } + } } - if (m->first_ep != NULL) + if (err == NULL) { - if (strcmp(m->first_ep->subject, ep->subject) == 0) + if (update) { - ep->next = m->first_ep->next; - prev_ep = m->first_ep; - m->first_ep = ep; + // If the endpoint already exists, update its config and stats. + // This will make it use the new handler for subsequent + // requests. + micro_lock_endpoint(ep); + micro_free_cloned_endpoint_config(ep->config); + err = micro_clone_endpoint_config(&ep->config, cfg); + if (err == NULL) + memset(&ep->stats, 0, sizeof(ep->stats)); + micro_unlock_endpoint(ep); } else { - prev_ptr = m->first_ep; - for (ptr = m->first_ep->next; ptr != NULL; prev_ptr = ptr, ptr = ptr->next) + err = _new_endpoint(&ep, m, g, cfg, is_internal, fullSubject); + if (err == NULL) { - if (strcmp(ptr->subject, ep->subject) == 0) + // Set up the endpoint in the service before it starts + // processing messages. + ep->next = m->first_ep; + m->first_ep = ep; + m->numEndpoints++; + + err = micro_start_endpoint(ep); + if (err != NULL) { - ep->next = ptr->next; - prev_ptr->next = ep; - prev_ep = ptr; - break; + // Unwind, remove the endpoint from the list. + m->first_ep = ep->next; + m->numEndpoints--; + micro_free_endpoint(ep); } } - if (prev_ep == NULL) - { - prev_ptr->next = ep; - } } } - else - { - m->first_ep = ep; - } _unlock_service(m); - - if (prev_ep != NULL) - { - // Rid of the previous endpoint with the same subject, if any. If this - // fails we can return the error, leave the newly added endpoint in the - // list, not started. A retry with the same subject will clean it up. - if (err = micro_stop_endpoint(prev_ep), err != NULL) - return err; - micro_release_endpoint(prev_ep); - } - - // retain `m` before the endpoint uses it for its on_complete callback. - _retain_service(m); - - if (err = micro_start_endpoint(ep), err != NULL) - { - // Best effort, leave the new endpoint in the list, as is. A retry with - // the same name will clean it up. - _release_service(m); - return microError_Wrapf(err, "failed to start endpoint %s", ep->name); - } + if (err != NULL) + return microError_Wrapf(err, "can't add an endpoint '%s' to service '%s'", cfg->Name, m->cfg->Name); if (new_ep != NULL) *new_ep = ep; @@ -170,88 +241,159 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) return micro_add_endpoint(NULL, g->m, g, cfg, false); } -microError * -microService_Stop(microService *m) +static microError * +_attach_service_to_connection(natsConnection *nc, microService *service) { - microError *err = NULL; - microEndpoint *ep = NULL; - bool finalize = false; - microDoneHandler doneHandler = NULL; - - if (m == NULL) + natsStatus s = NATS_OK; + if (nc == NULL || service == NULL) return micro_ErrorInvalidArg; - _lock_service(m); + natsConn_Lock(nc); - if (m->stopped) + if (natsConn_isClosed(nc) || natsConn_isDraining(nc)) { - _unlock_service(m); - return NULL; + natsConn_Unlock(nc); + return micro_Errorf("can't add service '%s' to a closed or draining connection", service->cfg->Name); } - ep = m->first_ep; - for (; ep != NULL; ep = ep->next) + // Wrap the connection callbacks before we subscribe to anything. + s = natsOptions_setMicroCallbacks(nc->opts, _on_connection_closed, _on_error); + + if (s == NATS_OK) { - if (err = micro_stop_endpoint(ep), err != NULL) + natsMutex_Lock(nc->servicesMu); + if (nc->services == NULL) { - _unlock_service(m); - return microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->name); + nc->services = NATS_CALLOC(1, sizeof(microService *)); + if (nc->services == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); } + else + { + microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); + if (tmp == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + nc->services = tmp; + } + + if (s == NATS_OK) + { + service->refs++; // no lock needed, called from the constructor + nc->services[nc->numServices] = service; + nc->numServices++; + } + natsMutex_Unlock(nc->servicesMu); } - finalize = (m->first_ep == NULL); - if (finalize) + natsConn_Unlock(nc); + + return micro_ErrorFromStatus(s); +} + +static void +_detach_service_from_connection(natsConnection *nc, microService *m) +{ + if (nc == NULL || m == NULL) + return; + + natsMutex_Lock(nc->servicesMu); + for (int i = 0; i < nc->numServices; i++) { - natsLib_stopServiceCallbacks(m); - m->stopped = true; - doneHandler = m->cfg->DoneHandler; + if (nc->services[i] != m) + continue; + + nc->services[i] = nc->services[nc->numServices - 1]; + nc->numServices--; + break; } + natsMutex_Unlock(nc->servicesMu); +} - _unlock_service(m); +static microError * +_stop_service(microService *m, bool unsubscribe, bool release) +{ + microError *err = NULL; + microEndpoint *ep = NULL; + int refs = 0; + int numEndpoints = 0; - if (finalize) + if (m == NULL) + return micro_ErrorInvalidArg; + + _lock_service(m); + if (!m->stopped) { - if (doneHandler != NULL) - doneHandler(m); + m->stopped = true; - // Relase the endpoint's server reference from `micro_add_endpoint`. - _release_service(m); + if (unsubscribe) + { + for (ep = m->first_ep; ep != NULL; ep = ep->next) + { + if (err = micro_stop_endpoint(ep), err != NULL) + { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + _unlock_service(m); + return err; + } + } + } } + if (release) + m->refs--; + + refs = m->refs; + numEndpoints = m->numEndpoints; + _unlock_service(m); + + if ((refs == 0) && (numEndpoints == 0)) + _free_service(m); + return NULL; } -static bool -_find_endpoint(microEndpoint **prevp, microService *m, microEndpoint *to_find) +microError * +microService_Stop(microService *m) +{ + return _stop_service(m, true, false); +} + +// service lock must be held by the caller. +static void +_detach_endpoint_from_service(microService *m, microEndpoint *toRemove) { microEndpoint *ep = NULL; microEndpoint *prev_ep = NULL; - if ((m == NULL) || (to_find == NULL)) - return false; + if ((m == NULL) || (toRemove == NULL)) + return; + + for (ep = m->first_ep; (ep != NULL) && (ep != toRemove); prev_ep = ep, ep = ep->next) + ; + if (ep == NULL) + return; - for (ep = m->first_ep; ep != NULL; ep = ep->next) + m->numEndpoints--; + if (prev_ep == NULL) + m->first_ep = ep->next; + else { - if (ep == to_find) - { - *prevp = prev_ep; - return true; - } - prev_ep = ep; + micro_lock_endpoint(prev_ep); + prev_ep->next = ep->next; + micro_unlock_endpoint(prev_ep); } - - return false; } -void micro_release_on_endpoint_complete(void *closure) +// Callback for when an endpoint's subscription is finished. It is called in the +// context of the subscription's delivery thread; this is where we want to call +// the service's complete callback when the last subscription is done. +void micro_release_endpoint_when_unsubscribed(void *closure) { - microEndpoint *ep = (microEndpoint *)closure; - microEndpoint *prev_ep = NULL; - microService *m = NULL; - natsSubscription *sub = NULL; - microDoneHandler doneHandler = NULL; - bool free_ep = false; - bool finalize = false; + microEndpoint *ep = (microEndpoint *)closure; + microService *m = NULL; + microDoneHandler doneHandler = NULL; + int refs = 0; if (ep == NULL) return; @@ -260,56 +402,31 @@ void micro_release_on_endpoint_complete(void *closure) if ((m == NULL) || (m->service_mu == NULL)) return; - micro_lock_endpoint(ep); - ep->is_draining = false; - sub = ep->sub; - ep->sub = NULL; - ep->refs--; - free_ep = (ep->refs == 0); - micro_unlock_endpoint(ep); - - // Force the subscription to be destroyed now. - natsSubscription_Destroy(sub); - _lock_service(m); - // Release the service reference for the completed endpoint. It can not be - // the last reference, so no need to free m. - m->refs--; - - // Unlink the endpoint from the service. - if (_find_endpoint(&prev_ep, m, ep)) - { - if (prev_ep != NULL) - { - prev_ep->next = ep->next; - } - else - { - m->first_ep = ep->next; - } - } + micro_lock_endpoint(ep); + _detach_endpoint_from_service(m, ep); + refs = --(ep->refs); + micro_unlock_endpoint(ep); - finalize = (!m->stopped) && (m->first_ep == NULL); - if (finalize) + if (refs == 0) + micro_free_endpoint(ep); + if (m->numEndpoints == 0) { - natsLib_stopServiceCallbacks(m); + // Mark the service as stopped before calling the done handler. m->stopped = true; doneHandler = m->cfg->DoneHandler; } _unlock_service(m); - if (free_ep) - micro_free_endpoint(ep); - - if (finalize) + // Special processing for the last endpoint. + if (doneHandler != NULL) { - if (doneHandler != NULL) - doneHandler(m); + doneHandler(m); - // Relase the endpoint's server reference from `micro_add_endpoint`. - _release_service(m); + _detach_service_from_connection(m->nc, m); + _stop_service(m, false, true); // just release } } @@ -330,14 +447,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - microError *err = NULL; - - err = microService_Stop(m); - if (err != NULL) - return err; - - _release_service(m); - return NULL; + return _stop_service(m, true, true); } microError * @@ -370,44 +480,12 @@ _new_service(microService **ptr, natsConnection *nc) if (*ptr == NULL) return micro_ErrorOutOfMemory; - natsConn_retain(nc); (*ptr)->refs = 1; (*ptr)->nc = nc; (*ptr)->started = nats_Now() * 1000000; return NULL; } -static void -_retain_service(microService *m) -{ - if (m == NULL) - return; - - _lock_service(m); - - ++(m->refs); - - _unlock_service(m); -} - -static void -_release_service(microService *m) -{ - int refs = 0; - - if (m == NULL) - return; - - _lock_service(m); - - refs = --(m->refs); - - _unlock_service(m); - - if (refs == 0) - _free_service(m); -} - static inline void _free_cloned_group_config(microGroupConfig *cfg) { @@ -450,7 +528,6 @@ _free_service(microService *m) } _free_cloned_service_config(m->cfg); - natsConn_release(m->nc); natsMutex_Destroy(m->service_mu); NATS_FREE(m); } @@ -512,180 +589,83 @@ _free_cloned_service_config(microServiceConfig *cfg) NATS_FREE(cfg); } -static microError * -_start_service_callbacks(microService *m) -{ - natsStatus s = NATS_OK; - - if (m == NULL) - return micro_ErrorInvalidArg; - - // Extra reference to the service as long as its callbacks are registered. - _retain_service(m); - - s = natsLib_startServiceCallbacks(m); - if (s != NATS_OK) - { - _release_service(m); - } - - return micro_ErrorFromStatus(s); -} - -static microError * -_services_for_connection(microService ***to_call, int *num_microservices, natsConnection *nc) -{ - natsMutex *mu = natsLib_getServiceCallbackMutex(); - natsHash *h = natsLib_getAllServicesToCallback(); - microService *m = NULL; - microService **p = NULL; - natsHashIter iter; - int n = 0; - int i; - - natsMutex_Lock(mu); - - natsHashIter_Init(&iter, h); - while (natsHashIter_Next(&iter, NULL, (void **)&m)) - if (m->nc == nc) - n++; - natsHashIter_Done(&iter); - if (n > 0) - { - p = NATS_CALLOC(n, sizeof(microService *)); - if (p == NULL) - { - natsMutex_Unlock(mu); - return micro_ErrorOutOfMemory; - } - - natsHashIter_Init(&iter, h); - i = 0; - while (natsHashIter_Next(&iter, NULL, (void **)&m)) - { - if (m->nc == nc) - { - _retain_service(m); // for the callback - p[i++] = m; - } - } - natsHashIter_Done(&iter); - } - - natsMutex_Unlock(mu); - - *to_call = p; - *num_microservices = n; - return NULL; -} - static void _on_connection_closed(natsConnection *nc, void *ignored) { - microService *m = NULL; - microService **to_call = NULL; - microError *err = NULL; - int n = 0; - int i; - - err = _services_for_connection(&to_call, &n, nc); - if (err != NULL) - { - microError_Ignore(err); - return; - } + natsMutex_Lock(nc->servicesMu); - for (i = 0; i < n; i++) - { - m = to_call[i]; - microError_Ignore(microService_Stop(m)); - - _release_service(m); - } + // Stop all services. They will get detached from the connection when their + // subs are complete. + for (int i = 0; i < nc->numServices; i++) + _stop_service(nc->services[i], false, false); - NATS_FREE(to_call); + natsMutex_Unlock(nc->servicesMu); } -static void +static bool _on_service_error(microService *m, const char *subject, natsStatus s) { - microEndpoint *ep = NULL; - microError *err = NULL; + microEndpoint *found = NULL; + microError *err = NULL; if (m == NULL) - return; + return false; _lock_service(m); - for (ep = m->first_ep; - (ep != NULL) && !micro_match_endpoint_subject(ep->subject, subject); - ep = ep->next) - ; - micro_retain_endpoint(ep); // for the callback - _unlock_service(m); - if (ep != NULL) + for (microEndpoint *ep = m->first_ep; ep != NULL; ep = ep->next) { - if (m->cfg->ErrHandler != NULL) - (*m->cfg->ErrHandler)(m, ep, s); - - err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", ep->subject); - micro_update_last_error(ep, err); - microError_Destroy(err); + if (!micro_match_endpoint_subject(ep->subject, subject)) + continue; + found = ep; + break; } - micro_release_endpoint(ep); // after the callback - // TODO: Should we stop the service? The Go client does. - microError_Ignore(microService_Stop(m)); + if (found != NULL) + micro_retain_endpoint(found); + + _unlock_service(m); + + if (found == NULL) + return false; + + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint '%s'", subject); + micro_update_last_error(found, err); + microError_Destroy(err); + + if (m->cfg->ErrHandler != NULL) + (*m->cfg->ErrHandler)(m, found, s); + + micro_release_endpoint(found); + return true; } static void -_on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used) +_on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *ignored) { - microService *m = NULL; - microService **to_call = NULL; - microError *err = NULL; const char *subject = NULL; - int n = 0; - int i; if (sub == NULL) - { return; - } - subject = natsSubscription_GetSubject(sub); - // `to_call` will have a list of retained service pointers. - err = _services_for_connection(&to_call, &n, nc); - if (err != NULL) - { - microError_Ignore(err); - return; - } + subject = natsSubscription_GetSubject(sub); - for (i = 0; i < n; i++) + // TODO: this would be a lot easier if sub had a ref to ep. + natsMutex_Lock(nc->servicesMu); + for (int i = 0; i < nc->numServices; i++) { - m = to_call[i]; - _on_service_error(m, subject, s); - _release_service(m); // release the extra ref in `to_call`. - } - - NATS_FREE(to_call); -} + microService *m = nc->services[i]; -static microError * -_wrap_connection_event_callbacks(microService *m) -{ - microError *err = NULL; - - if ((m == NULL) || (m->nc == NULL) || (m->nc->opts == NULL)) - return micro_ErrorInvalidArg; - - // The new service must be in the list for this to work. - MICRO_CALL(err, _start_service_callbacks(m)); - MICRO_CALL(err, micro_ErrorFromStatus( - natsOptions_setMicroCallbacks(m->nc->opts, _on_connection_closed, _on_error))); + // See if the service owns the affected subscription, based on matching + // the subjects; notify it of the error. + if (!_on_service_error(m, subject, s)) + continue; - return microError_Wrapf(err, "failed to wrap connection event callbacks"); + // Stop the service in error. It will get detached from the connection + // and released when all of its subs are complete. + _stop_service(m, true, false); + } + natsMutex_Unlock(nc->servicesMu); } static inline microError * @@ -848,7 +828,7 @@ microService_GetInfo(microServiceInfo **new_info, microService *m) { if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL)) { - MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->name)); + MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->config->Name)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Subject, ep->subject)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); MICRO_CALL(err, micro_ErrorFromStatus( @@ -947,7 +927,7 @@ microService_GetStats(microServiceStats **new_stats, microService *m) // copy the entire struct, including the last error buffer. stats->Endpoints[len] = ep->stats; - MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->name)); + MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->config->Name)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Subject, ep->subject)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); if (err == NULL) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index efe72f41d..4249d4a57 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -16,53 +16,11 @@ #include "microp.h" #include "util.h" -static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src); - static void _handle_request(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); static void _retain_endpoint(microEndpoint *ep, bool lock); static void _release_endpoint(microEndpoint *ep); -microError * -micro_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) -{ - microError *err = NULL; - microEndpoint *ep = NULL; - const char *subj; - - if (cfg == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); - if (!micro_is_valid_name(cfg->Name)) - return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name %s", cfg->Name); - if (cfg->Handler == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for %s", cfg->Name); - - if ((cfg->Subject != NULL) && !micro_is_valid_subject(cfg->Subject)) - return micro_ErrorInvalidArg; - - subj = nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject; - - ep = NATS_CALLOC(1, sizeof(microEndpoint)); - if (ep == NULL) - return micro_ErrorOutOfMemory; - ep->is_monitoring_endpoint = is_internal; - ep->m = m; - - MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); - MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); - MICRO_CALL(err, micro_strdup(&ep->name, cfg->Name)); - MICRO_CALL(err, _subjectWithGroupPrefix(&ep->subject, g, subj)); - if (err != NULL) - { - micro_free_endpoint(ep); - return err; - } - - ep->group = g; - *new_ep = ep; - return NULL; -} - const char * micro_queue_group_for_endpoint(microEndpoint *ep) { @@ -113,10 +71,9 @@ micro_start_endpoint(microEndpoint *ep) micro_lock_endpoint(ep); ep->refs++; ep->sub = sub; - ep->is_draining = false; micro_unlock_endpoint(ep); - natsSubscription_SetOnCompleteCB(sub, micro_release_on_endpoint_complete, ep); + natsSubscription_SetOnCompleteCB(sub, micro_release_endpoint_when_unsubscribed, ep); } else { @@ -132,31 +89,20 @@ micro_stop_endpoint(microEndpoint *ep) natsStatus s = NATS_OK; natsSubscription *sub = NULL; - if ((ep == NULL) || (ep->m == NULL)) + if (ep == NULL) return NULL; micro_lock_endpoint(ep); sub = ep->sub; - - if (ep->is_draining || natsConnection_IsClosed(ep->m->nc) || !natsSubscription_IsValid(sub)) - { - // If stopping, _release_on_endpoint_complete will take care of - // finalizing, nothing else to do. In other cases - // _release_on_endpoint_complete has already been called. - micro_unlock_endpoint(ep); - return NULL; - } - - ep->is_draining = true; micro_unlock_endpoint(ep); + if (sub == NULL) + return NULL; - // When the drain is complete, will release the final ref on ep. + // When the drain is complete, the callback will free ep. We may get an + // NATS_INVALID_SUBSCRIPTION if the subscription is already closed. s = natsSubscription_Drain(sub); - if (s != NATS_OK) - { - return microError_Wrapf(micro_ErrorFromStatus(s), - "failed to stop endpoint %s: failed to drain subscription", ep->name); - } + if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION)) + return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); return NULL; } @@ -195,7 +141,6 @@ void micro_free_endpoint(microEndpoint *ep) if (ep == NULL) return; - NATS_FREE(ep->name); NATS_FREE(ep->subject); natsSubscription_Destroy(ep->sub); natsMutex_Destroy(ep->endpoint_mu); @@ -334,7 +279,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) microEndpointConfig *new_cfg = NULL; if (out == NULL) - return micro_ErrorInvalidArg; + return microError_Wrapf(micro_ErrorInvalidArg, "failed to clone endpoint config: '%s'", cfg->Name); if (cfg == NULL) { @@ -357,7 +302,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) if (err != NULL) { micro_free_cloned_endpoint_config(new_cfg); - return err; + return microError_Wrapf(err, "failed to clone endpoint config: '%s'", cfg->Name); } *out = new_cfg; @@ -434,26 +379,3 @@ bool micro_match_endpoint_subject(const char *ep_subject, const char *actual_sub } } -static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) -{ - size_t len = strlen(src) + 1; - char *p; - - if (g != NULL) - len += strlen(g->config->Prefix) + 1; - - *dst = NATS_CALLOC(1, len); - if (*dst == NULL) - return micro_ErrorOutOfMemory; - - p = *dst; - if (g != NULL) - { - len = strlen(g->config->Prefix); - memcpy(p, g->config->Prefix, len); - p[len] = '.'; - p += len + 1; - } - memcpy(p, src, strlen(src) + 1); - return NULL; -} diff --git a/src/micro_monitoring.c b/src/micro_monitoring.c index c8639392b..f1a0a16fa 100644 --- a/src/micro_monitoring.c +++ b/src/micro_monitoring.c @@ -161,7 +161,7 @@ micro_new_control_subject(char **newSubject, const char *verb, const char *name, { if (nats_IsStringEmpty(name) && !nats_IsStringEmpty(id)) { - return micro_Errorf("service name is required when id is provided: %s", id); + return micro_Errorf("service name is required when id is provided: '%s'", id); } else if (nats_IsStringEmpty(name) && nats_IsStringEmpty(id)) diff --git a/src/microp.h b/src/microp.h index 8184691c4..63dd77b74 100644 --- a/src/microp.h +++ b/src/microp.h @@ -49,12 +49,11 @@ struct micro_endpoint_s { // The name and subject that the endpoint is listening on (may be different // from one specified in config). - char *name; char *subject; // A copy of the config provided to add_endpoint. microEndpointConfig *config; - + // Retained/released by the service that owns the endpoint to avoid race // conditions. microService *m; @@ -75,7 +74,6 @@ struct micro_endpoint_s // Mutex for starting/stopping the endpoint, and for updating the stats. natsMutex *endpoint_mu; int refs; - bool is_draining; // The subscription for the endpoint. If NULL, the endpoint is stopped. natsSubscription *sub; @@ -110,7 +108,9 @@ struct micro_service_s natsMutex *service_mu; int refs; + // a linked list of endpoints. struct micro_endpoint_s *first_ep; + int numEndpoints; int64_t started; // UTC time expressed as number of nanoseconds since epoch. bool stopped; @@ -153,7 +153,7 @@ void micro_free_cloned_endpoint_config(microEndpointConfig *cfg); void micro_free_endpoint(microEndpoint *ep); void micro_free_request(microRequest *req); void micro_release_endpoint(microEndpoint *ep); -void micro_release_on_endpoint_complete(void *closure); +void micro_release_endpoint_when_unsubscribed(void *closure); void micro_retain_endpoint(microEndpoint *ep); void micro_update_last_error(microEndpoint *ep, microError *err); const char *micro_queue_group_for_endpoint(microEndpoint *ep); diff --git a/src/natsp.h b/src/natsp.h index 2f99de066..80164238b 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -699,6 +699,10 @@ struct __natsConnection natsHash *subs; natsMutex *subsMu; + microService **services; + int numServices; + natsMutex *servicesMu; + natsConnStatus status; bool initc; // true if the connection is performing the initial connect bool ar; // abort reconnect attempts @@ -778,18 +782,6 @@ nats_sslRegisterThreadForCleanup(void); void nats_setNATSThreadKey(void); -natsStatus -natsLib_startServiceCallbacks(microService *m); - -void -natsLib_stopServiceCallbacks(microService *m); - -natsMutex* -natsLib_getServiceCallbackMutex(void); - -natsHash* -natsLib_getAllServicesToCallback(void); - // // Threads // diff --git a/test/test.c b/test/test.c index 96ede2143..57ed6adcb 100644 --- a/test/test.c +++ b/test/test.c @@ -33621,7 +33621,7 @@ _startMicroservice(microService** new_m, natsConnection *nc, microServiceConfig static void _startMicroserviceOK(microService** new_m, natsConnection *nc, microServiceConfig *cfg, microEndpointConfig **eps, int num_eps, struct threadArg *arg) { - char buf[64]; + char buf[256]; snprintf(buf, sizeof(buf), "Start microservice %s: ", cfg->Name); test(buf); @@ -33636,45 +33636,44 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer for (i = 0; i < n; i++) { + char buf[64]; + cfg->Version = "1.0.0"; + cfg->Description = "returns 42"; + + if (nats_IsStringEmpty(cfg->Name)) + { + snprintf(buf, sizeof(buf), "CoolService-%d", i); + cfg->Name = buf; + } _startMicroserviceOK(&(svcs[i]), nc, cfg, eps, num_eps, arg); } testCond(true); } -static void -_waitForMicroservicesAllDone(struct threadArg *arg) -{ - natsStatus s = NATS_OK; - - test("Wait for all microservices to stop: "); - natsMutex_Lock(arg->m); - while ((s != NATS_TIMEOUT) && !arg->microAllDone) - s = natsCondition_TimedWait(arg->c, arg->m, 1000); - natsMutex_Unlock(arg->m); - testCond((NATS_OK == s) && arg->microAllDone); - - // `Done` may be immediately followed by freeing the service, so wait a bit - // to make sure it happens before the test exits. - nats_Sleep(20); -} - -static void -_destroyMicroservicesAndWaitForAllDone(microService** svcs, int n, struct threadArg *arg) -{ - char buf[64]; - - snprintf(buf, sizeof(buf), "Wait for all %d microservices to stop: ", n); - test(buf); - - for (int i = 0; i < n; i++) - { - if (NULL != microService_Destroy(svcs[i])) - FAIL("Unable to destroy microservice!"); - } - - _waitForMicroservicesAllDone(arg); -} +#define _waitForMicroservicesAllDone(_arg) \ + { \ + natsMutex_Lock((_arg)->m); \ + testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ + natsStatus waitStatus = NATS_OK; \ + bool allDone = false; \ + while ((waitStatus != NATS_TIMEOUT) && !(_arg)->microAllDone) \ + waitStatus = natsCondition_TimedWait((_arg)->c, (_arg)->m, 1000); \ + allDone = (_arg)->microAllDone; \ + natsMutex_Unlock((_arg)->m); \ + testCond((NATS_OK == waitStatus) && allDone); \ + } + +#define _destroyMicroservice(_s) \ + testf("Destroy microservice %s: ", (_s)->cfg->Name); \ + microError *_err = microService_Destroy(_s); \ + if (_err != NULL) \ + { \ + char _buf[256]; \ + FAILf("Unable to destroy microservice: %s", microError_String(_err, _buf, sizeof(_buf))); \ + microError_Destroy(_err); \ + } \ + testCond(true); typedef struct { @@ -33904,14 +33903,8 @@ void test_MicroAddService(void) } microServiceInfo_Destroy(info); - - if (m != NULL) - { - snprintf(buf, sizeof(buf), "%s: Destroy service: %d", m->cfg->Name, m->refs); - test(buf); - testCond(NULL == microService_Destroy(m)); - _waitForMicroservicesAllDone(&arg); - } + _destroyMicroservice(m); + _waitForMicroservicesAllDone(&arg); } test("Destroy the test connection: "); @@ -33947,11 +33940,11 @@ void test_MicroGroups(void) }; const char* expected_subjects[] = { - "ep1", - "g1.ep1", - "g1.g2.ep1", - "g1.g2.ep2", "g1.ep2", + "g1.g2.ep2", + "g1.g2.ep1", + "g1.ep1", + "ep1", }; int expected_num_endpoints = sizeof(expected_subjects) / sizeof(expected_subjects[0]); @@ -34001,7 +33994,7 @@ void test_MicroGroups(void) if (err != NULL) FAIL("failed to get service info!") - test("Verify number of endpoints: "); + testf("Verify number of endpoints %d is %d: ", info->EndpointsLen, expected_num_endpoints); testCond(info->EndpointsLen == expected_num_endpoints); test("Verify endpoint subjects: "); @@ -34017,7 +34010,7 @@ void test_MicroGroups(void) microServiceInfo_Destroy(info); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34122,14 +34115,14 @@ void test_MicroQueueGroupForEndpoint(void) testCond((err == NULL) && (info != NULL) && (info->EndpointsLen == 3) && (stats != NULL) && (stats->EndpointsLen == 3) && - (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[0].QueueGroup)) && - (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[2].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[2].QueueGroup)) && (_testQueueGroup(tc.expectedGroup1Level, stats->Endpoints[1].QueueGroup)) && (_testQueueGroup(tc.expectedGroup1Level, info->Endpoints[1].QueueGroup)) && - (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[2].QueueGroup)) && - (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[2].QueueGroup))); + (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[0].QueueGroup))); - microService_Destroy(service); + _destroyMicroservice(service); _waitForMicroservicesAllDone(&arg); microServiceInfo_Destroy(info); microServiceStats_Destroy(stats); @@ -34173,13 +34166,11 @@ void test_MicroBasics(void) &ep2_cfg, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Metadata = (natsMetadata){ .List = (const char *[]){"skey1", "svalue1", "skey2", "svalue2"}, .Count = 2, }, + .Name = "ManyServicesSameName", .Endpoint = NULL, .State = NULL, }; @@ -34237,7 +34228,7 @@ void test_MicroBasics(void) test(buf); err = microService_GetInfo(&info, svcs[i]); testCond((err == NULL) && - (strcmp(info->Name, "CoolService") == 0) && + (strcmp(info->Name, "ManyServicesSameName") == 0) && (strlen(info->Id) > 0) && (strcmp(info->Description, "returns 42") == 0) && (strcmp(info->Version, "1.0.0") == 0) && @@ -34248,7 +34239,7 @@ void test_MicroBasics(void) // Make sure we can request valid info with $SRV.INFO request. test("Create INFO inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_INFO_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_INFO_VERB, "ManyServicesSameName", NULL); test("Subscribe to INFO inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish INFO request: "); @@ -34274,7 +34265,7 @@ void test_MicroBasics(void) snprintf(buf, sizeof(buf), "Validate INFO response strings#%d: ", i); test(buf); testCond( - (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "CoolService") == 0) + (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "ManyServicesSameName") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "description", &str)) && (strcmp(str, "returns 42") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "version", &str)) && (strcmp(str, "1.0.0") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "id", &str)) && (strlen(str) > 0) @@ -34294,27 +34285,27 @@ void test_MicroBasics(void) s = nats_JSONGetArrayObject(js, "endpoints", &array, &array_len); testCond((NATS_OK == s) && (array != NULL) && (array_len == 2)); - test("Validate INFO svc.do endpoint: "); - md = NULL; - testCond( - (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "do") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.do") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) - && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (md == NULL) - ); - test("Validate INFO unused endpoint with metadata: "); md = NULL; testCond( - (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "unused") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.unused") == 0) + (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "unused") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.unused") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) - && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) + && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (NATS_OK == nats_JSONGetStrPtr(md, "key1", &str)) && (strcmp(str, "value1") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key2", &str)) && (strcmp(str, "value2") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key3", &str)) && (strcmp(str, "value3") == 0) ); + test("Validate INFO svc.do endpoint: "); + md = NULL; + testCond( + (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[1], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) + && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) && (md == NULL) + ); + nats_JSONDestroy(js); natsMsg_Destroy(reply); NATS_FREE(array); @@ -34326,7 +34317,7 @@ void test_MicroBasics(void) // Make sure we can request SRV.PING. test("Create PING inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_PING_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_PING_VERB, "ManyServicesSameName", NULL); test("Subscribe to PING inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish PING request: "); @@ -34348,7 +34339,7 @@ void test_MicroBasics(void) js = NULL; testCond((NATS_OK == nats_JSONParse(&js, reply->data, reply->dataLen)) && (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && - (strcmp(str, "CoolService") == 0)); + (strcmp(str, "ManyServicesSameName") == 0)); nats_JSONDestroy(js); natsMsg_Destroy(reply); } @@ -34359,7 +34350,7 @@ void test_MicroBasics(void) // Get and validate $SRV.STATS from all service instances. test("Create STATS inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_STATS_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_STATS_VERB, "ManyServicesSameName", NULL); test("Subscribe to STATS inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish STATS request: "); @@ -34391,13 +34382,13 @@ void test_MicroBasics(void) test("Ensure endpoint 0 has num_requests: "); n = 0; - s = nats_JSONGetInt(array[0], "num_requests", &n); + s = nats_JSONGetInt(array[1], "num_requests", &n); testCond(NATS_OK == s); num_requests += n; test("Ensure endpoint 0 has num_errors: "); n = 0; - s = nats_JSONGetInt(array[0], "num_errors", &n); + s = nats_JSONGetInt(array[1], "num_errors", &n); testCond(NATS_OK == s); num_errors += n; @@ -34414,7 +34405,11 @@ void test_MicroBasics(void) natsInbox_Destroy(inbox); NATS_FREE(subject); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34439,9 +34434,6 @@ void test_MicroStartStop(void) .Handler = _microHandleRequest42, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Endpoint = &ep_cfg, }; natsMsg *reply = NULL; @@ -34482,7 +34474,11 @@ void test_MicroStartStop(void) } testCond(NATS_OK == s); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34555,8 +34551,7 @@ void test_MicroServiceStopsOnClosedConn(void) test("Test microservice is stopped: "); testCond(microService_IsStopped(m)); - test("Destroy microservice (final): "); - testCond(NULL == microService_Destroy(m)) + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); natsOptions_Destroy(opts); @@ -34573,6 +34568,9 @@ void test_MicroServiceStopsWhenServerStops(void) natsPid serverPid = NATS_INVALID_PID; struct threadArg arg; microService *m = NULL; + microEndpointConfig ep_cfg = { + .Handler = _microHandleRequest42, + }; microServiceConfig cfg = { .Name = "test", .Version = "1.0.0", @@ -34598,24 +34596,35 @@ void test_MicroServiceStopsWhenServerStops(void) _startMicroservice(&m, nc, &cfg, NULL, 0, &arg); + const int numEndpoints = 50; + for (int i=0; i < numEndpoints; i++) + { + char buf[32]; + testf("Add endpoint %d: ", i); + snprintf(buf, sizeof(buf), "do-%d", i); + ep_cfg.Subject = buf; + ep_cfg.Name = buf; + testCond(NULL == microService_AddEndpoint(m, &ep_cfg)); + } + test("Test microservice is running: "); testCond(!microService_IsStopped(m)) + testf("Check that the service has %d endpoints: ", numEndpoints); + microServiceInfo *info = NULL; + microError *err = microService_GetInfo(&info, m); + testCond((err == NULL) && (info->EndpointsLen == numEndpoints)); + microServiceInfo_Destroy(info); + test("Stop the server: "); testCond((_stopServer(serverPid), true)); - test("Wait for the service to stop: "); - natsMutex_Lock(arg.m); - while ((s != NATS_TIMEOUT) && !arg.microAllDone) - s = natsCondition_TimedWait(arg.c, arg.m, 1000); - testCond(arg.microAllDone); - natsMutex_Unlock(arg.m); + _waitForMicroservicesAllDone(&arg); test("Test microservice is not running: "); testCond(microService_IsStopped(m)) - microService_Destroy(m); - _waitForMicroservicesAllDone(&arg); + _destroyMicroservice(m); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34723,7 +34732,7 @@ void test_MicroAsyncErrorHandlerMaxPendingMsgs(void) testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); natsMutex_Unlock(arg.m); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34805,7 +34814,7 @@ void test_MicroAsyncErrorHandlerMaxPendingBytes(void) natsMutex_Unlock(arg.m); testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); diff --git a/test/test.h b/test/test.h index 6a9bc8edf..9e5c32f03 100644 --- a/test/test.h +++ b/test/test.h @@ -33,6 +33,15 @@ static const char *clusterName = "test-cluster"; return; \ } +#define FAILf(f, ...) \ + { \ + printf("@@ "); \ + printf((f), __VA_ARGS__); \ + printf(" @@\n"); \ + failed = true; \ + return; \ + } + #define CHECK_SERVER_STARTED(p) \ if ((p) == NATS_INVALID_PID) \ FAIL("Unable to start or verify that the server was started!")