From 4444714b1e9fbcdb4e0ce28460d7f53974688cf7 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sat, 2 Nov 2024 15:35:13 -0700 Subject: [PATCH 01/39] TRY MORE --- src/conn.c | 66 ------------------- src/conn.h | 4 +- src/micro.c | 183 +++++++++++++++++++++++++++++++++++++--------------- test/test.c | 2 +- 4 files changed, 134 insertions(+), 121 deletions(-) diff --git a/src/conn.c b/src/conn.c index 415f00e9..9a9823ee 100644 --- a/src/conn.c +++ b/src/conn.c @@ -4467,69 +4467,3 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus } fflush(stderr); } - -int natsConn_getServices(microService ***services, natsConnection *nc) -{ - int numServices = 0; - natsConn_Lock(nc); - *services = nc->services; - numServices = nc->numServices; - natsConn_Unlock(nc); - return numServices; -} - -bool natsConn_removeService(natsConnection *nc, microService *service) -{ - bool removed = false; - if (nc == NULL || service == NULL) - return false; - - natsConn_Lock(nc); - for (int i = 0; i < nc->numServices; i++) - { - if (nc->services[i] == service) - { - for (int j = i; j < nc->numServices - 1; j++) - { - nc->services[j] = nc->services[j + 1]; - } - nc->numServices--; - removed = true; - break; - } - } - natsConn_Unlock(nc); - return removed; -} - -natsStatus natsConn_addService(natsConnection *nc, microService *service) -{ - natsStatus s = NATS_OK; - if (nc == NULL || service == NULL) - return nats_setDefaultError(NATS_INVALID_ARG); - - natsConn_Lock(nc); - if (nc->services == NULL) - { - nc->services = NATS_CALLOC(1, sizeof(microService *)); - if (nc->services == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - } - else - { - microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); - if (tmp == NULL) - s = nats_setDefaultError(NATS_NO_MEMORY); - else - nc->services = tmp; - } - - if (s == NATS_OK) - { - nc->services[nc->numServices] = service; - nc->numServices++; - } - natsConn_Unlock(nc); - - return s; -} diff --git a/src/conn.h b/src/conn.h index 7d4e1249..57ff3de3 100644 --- a/src/conn.h +++ b/src/conn.h @@ -164,9 +164,9 @@ int natsConn_getServices(microService ***services, natsConnection *nc); bool -natsConn_removeService(natsConnection *nc, microService *service); +natsConn_detachService(natsConnection *nc, microService *service); natsStatus -natsConn_addService(natsConnection *nc, microService *service); +natsConn_attachService(natsConnection *nc, microService *service); #endif /* CONN_H_ */ diff --git a/src/micro.c b/src/micro.c index d72812be..564b5358 100644 --- a/src/micro.c +++ b/src/micro.c @@ -29,6 +29,9 @@ static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, v static void _free_cloned_service_config(microServiceConfig *cfg); static void _free_service(microService *m); +static microError * +_attach_service_to_connection(natsConnection *nc, microService *service); + microError * micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg) { @@ -53,7 +56,7 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c MICRO_CALL(err, _clone_service_config(&m->cfg, cfg)); // Add the service to the connection. - MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m))); + MICRO_CALL(err, _attach_service_to_connection(m->nc, m)); MICRO_CALL(err, (m->refs++, NULL)); // Wrap the connection callbacks before we subscribe to anything. @@ -244,21 +247,83 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg) } static microError * -_stop_service(microService *m, bool unsubscribe, bool finalRelease) +_attach_service_to_connection(natsConnection *nc, microService *service) +{ + natsStatus s = NATS_OK; + if (nc == NULL || service == NULL) + return micro_ErrorInvalidArg; + + natsConn_Lock(nc); + if (nc->services == NULL) + { + nc->services = NATS_CALLOC(1, sizeof(microService *)); + if (nc->services == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else + { + microService **tmp = NATS_REALLOC(nc->services, (nc->numServices + 1) * sizeof(microService *)); + if (tmp == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + nc->services = tmp; + } + + if (s == NATS_OK) + { + nc->services[nc->numServices] = service; + nc->numServices++; + } + natsConn_Unlock(nc); + + return micro_ErrorFromStatus(s); +} + +static bool +_detach_service_from_connection(natsConnection *nc, microService *m) +{ + bool removed = false; + if (nc == NULL || m == NULL) + return false; + + natsConn_Lock(nc); + for (int i = 0; i < nc->numServices; i++) + { + if (nc->services[i] != m) + continue; + + for (int j = i; j < nc->numServices - 1; j++) + nc->services[j] = nc->services[j + 1]; + nc->numServices--; + removed = true; + break; + } + natsConn_Unlock(nc); + + + return removed; +} + +static microError * +_stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool release) { microError *err = NULL; microEndpoint *ep = NULL; int refs = 0; int numEndpoints = 0; + bool alreadyStopped = false; if (m == NULL) return micro_ErrorInvalidArg; _lock_service(m); if (!m->stopped) - { m->stopped = true; + else + alreadyStopped = true; + if (!alreadyStopped) + { if (unsubscribe) { for (ep = m->first_ep; ep != NULL; ep = ep->next) @@ -271,17 +336,20 @@ _stop_service(microService *m, bool unsubscribe, bool finalRelease) } } } + } - if (natsConn_removeService(m->nc, m)) + if (detachFromConnection) + { + // This is called only from micro_release_endpoint_when_unsubscribed, + // which does not hold the connection lock. + if (_detach_service_from_connection(m->nc, m)) m->refs--; } - - if ((m->refs > 0) && finalRelease) + if ((m->refs > 0) && release) m->refs--; refs = m->refs; numEndpoints = m->numEndpoints; - _unlock_service(m); if ((refs == 0) && (numEndpoints == 0)) @@ -293,12 +361,11 @@ _stop_service(microService *m, bool unsubscribe, bool finalRelease) microError * microService_Stop(microService *m) { - // Public API: stop the service, unsubscribe, but don't do the final release. - return _stop_service(m, true, false); + return _stop_service(m, false, true, false); } static void -_remove_endpoint(microService *m, microEndpoint *toRemove) +_detach_endpoint_from_service(microService *m, microEndpoint *toRemove) { microEndpoint *ep = NULL; microEndpoint *prev_ep = NULL; @@ -351,15 +418,12 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // callback. _lock_service(m); - _remove_endpoint(m, ep); - + _detach_endpoint_from_service(m, ep); if (refs == 0) micro_free_endpoint(ep); - if (m->numEndpoints == 0) doneHandler = m->cfg->DoneHandler; - refs = m->refs; _unlock_service(m); // Special processing for the last endpoint. @@ -367,8 +431,9 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { doneHandler(m); - if (refs == 0) - _free_service(m); + // Stop the service now in case it hasn't already and detach from the + // connection, no need to unsubscribe. + _stop_service(m, true, false, false); } } @@ -389,7 +454,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - return _stop_service(m, true, true); + return _stop_service(m, false, true, true); } microError * @@ -457,6 +522,8 @@ _free_service(microService *m) if (m == NULL) return; + printf("<>/<> Freeing service %s\n", m->cfg->Name); + // destroy all groups. if (m->groups != NULL) { @@ -534,66 +601,78 @@ _free_cloned_service_config(microServiceConfig *cfg) static void _on_connection_closed(natsConnection *nc, void *ignored) { - microService *m = NULL; - microService **all = NULL; - - int n = natsConn_getServices(&all, nc); - for (int i = 0; i < n; i++) - { - m = all[i]; - _stop_service(m, false, false); // subs will be terminated by the connection close. - } + // Stop all services. They will get detached from the connection when their + // subs are complete. + natsConn_Lock(nc); + for (int i = 0; i < nc->numServices; i++) + _stop_service(nc->services[i], false, false, false); + natsConn_Unlock(nc); } -static void +static bool _on_service_error(microService *m, const char *subject, natsStatus s) { - microEndpoint *ep = NULL; + microEndpoint *found = NULL; microError *err = NULL; if (m == NULL) - return; + return false; _lock_service(m); - for (ep = m->first_ep; - (ep != NULL) && !micro_match_endpoint_subject(ep->subject, subject); - ep = ep->next) - ; - micro_retain_endpoint(ep); // for the callback - _unlock_service(m); - if (ep != NULL) + for (microEndpoint *ep = m->first_ep; ep != NULL; ep = ep->next) { - if (m->cfg->ErrHandler != NULL) - (*m->cfg->ErrHandler)(m, ep, s); - - err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", ep->subject); - micro_update_last_error(ep, err); - microError_Destroy(err); + if (!micro_match_endpoint_subject(ep->subject, subject)) + continue; + found = ep; + break; } - micro_release_endpoint(ep); // after the callback + + if (found != NULL) + micro_retain_endpoint(found); + + _unlock_service(m); + + if (found == NULL) + return false; + + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", found->subject); + micro_update_last_error(found, err); + microError_Destroy(err); + + if (m->cfg->ErrHandler != NULL) + (*m->cfg->ErrHandler)(m, found, s); + + micro_release_endpoint(found); + return true; } static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used) { - microService *m = NULL; - microService **all = NULL; const char *subject = NULL; if (sub == NULL) - { return; - } + subject = natsSubscription_GetSubject(sub); - int n = natsConn_getServices(&all, nc); - for (int i = 0; i < n; i++) + // <>/<> TODO: this would be a lot easier if sub had a ref to ep. + natsConn_Lock(nc); + for (int i = 0; i < nc->numServices; i++) { - m = all[i]; - _on_service_error(m, subject, s); - _stop_service(m, true, false); + microService *m = nc->services[i]; + + // See if the service owns the affected subscription, based on matching + // the subjects; notify it of the error. + if (!_on_service_error(m, subject, s)) + continue; + + // Stop the service in error. It will get detached from the connection + // and released when all of its subs are complete. + _stop_service(m, false, true, false); } + natsConn_Unlock(nc); } static inline microError * diff --git a/test/test.c b/test/test.c index ac684374..81dc7e18 100644 --- a/test/test.c +++ b/test/test.c @@ -33584,7 +33584,7 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(50); \ + nats_Sleep(valgrind ? 500 : 20); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ From b832eab6c370f23e1c1e095309266d4ad7b0a043 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sat, 2 Nov 2024 15:36:58 -0700 Subject: [PATCH 02/39] TRY MORE, .github --- .github/workflows/build-test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index cf96b75e..bbc53cc1 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,8 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} + ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 20 + # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov # PRs from external contributors fail: https://github.com/codecov/feedback/issues/301 From ea714ca2332c864c4e459c87f11ee7e8a2e9ef84 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sat, 2 Nov 2024 16:10:09 -0700 Subject: [PATCH 03/39] TRY MORE, logs for MicroServiceStopsOnClosedConn --- .github/workflows/build-test.yml | 2 +- src/micro.c | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index bbc53cc1..3432f48e 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 20 + ctest -R 'MicroServiceStopsOnClosedConn' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov diff --git a/src/micro.c b/src/micro.c index 564b5358..79081096 100644 --- a/src/micro.c +++ b/src/micro.c @@ -345,6 +345,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if (_detach_service_from_connection(m->nc, m)) m->refs--; } + if ((m->refs > 0) && release) m->refs--; @@ -361,6 +362,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool microError * microService_Stop(microService *m) { + printf("<>/<> Stopping service %s from microService_Stop\n", m->cfg->Name); return _stop_service(m, false, true, false); } @@ -433,6 +435,7 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // Stop the service now in case it hasn't already and detach from the // connection, no need to unsubscribe. + printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); _stop_service(m, true, false, false); } } @@ -454,6 +457,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { + printf("<>/<> Stopping service %s from microService_Destroy\n", m->cfg->Name); return _stop_service(m, false, true, true); } @@ -605,7 +609,10 @@ _on_connection_closed(natsConnection *nc, void *ignored) // subs are complete. natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) + { + printf("<>/<> Stopping service %s from on_connection_closed\n", nc->services[i]->cfg->Name); _stop_service(nc->services[i], false, false, false); + } natsConn_Unlock(nc); } @@ -670,6 +677,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use // Stop the service in error. It will get detached from the connection // and released when all of its subs are complete. + printf("<>/<> Stopping service %s from on_error\n", m->cfg->Name); _stop_service(m, false, true, false); } natsConn_Unlock(nc); From 6344935e0fad5c89abd70ca0eb7fb54c87179050 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 05:10:11 -0800 Subject: [PATCH 04/39] TRY MORE, +1 --- src/micro.c | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/src/micro.c b/src/micro.c index 79081096..54ce7c18 100644 --- a/src/micro.c +++ b/src/micro.c @@ -286,6 +286,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) if (nc == NULL || m == NULL) return false; + printf("<>/<> _detach_service_from_connection\n"); natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -299,7 +300,6 @@ _detach_service_from_connection(natsConnection *nc, microService *m) break; } natsConn_Unlock(nc); - return removed; } @@ -312,45 +312,41 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool int refs = 0; int numEndpoints = 0; bool alreadyStopped = false; + bool detached = false; if (m == NULL) return micro_ErrorInvalidArg; + if (detachFromConnection) + detached = _detach_service_from_connection(m->nc, m); + _lock_service(m); + printf("<>/<> _stop_service: 1\n"); if (!m->stopped) m->stopped = true; else alreadyStopped = true; + _unlock_service(m); - if (!alreadyStopped) + if (!alreadyStopped && unsubscribe) { - if (unsubscribe) + for (ep = m->first_ep; ep != NULL; ep = ep->next) { - for (ep = m->first_ep; ep != NULL; ep = ep->next) + if (err = micro_stop_endpoint(ep), err != NULL) { - if (err = micro_stop_endpoint(ep), err != NULL) - { - err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); - _unlock_service(m); - return err; - } + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + return err; } } } - if (detachFromConnection) - { - // This is called only from micro_release_endpoint_when_unsubscribed, - // which does not hold the connection lock. - if (_detach_service_from_connection(m->nc, m)) - m->refs--; - } - + _lock_service(m); + if (detached) + m->refs--; if ((m->refs > 0) && release) m->refs--; - refs = m->refs; - numEndpoints = m->numEndpoints; + printf("<>/<> _stop_service: 2: refs: %d, numEndpoints: %d\n", refs, numEndpoints); _unlock_service(m); if ((refs == 0) && (numEndpoints == 0)) From e8be798c5dcc4b6ab7ea199adf43b31f8e9f958a Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 05:11:54 -0800 Subject: [PATCH 05/39] TRY MORE, +2 --- .github/workflows/build-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 3432f48e..e59498a7 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -R 'MicroServiceStopsOnClosedConn' --timeout 60 --output-on-failure --repeat-until-fail 30 + ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov From c66c2642aa9281e183cead8e0cc777d1186f5b7d Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 05:18:04 -0800 Subject: [PATCH 06/39] TRY MORE, +3 --- .github/workflows/on-push-release.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/on-push-release.yml b/.github/workflows/on-push-release.yml index d47c2030..0c1b570e 100644 --- a/.github/workflows/on-push-release.yml +++ b/.github/workflows/on-push-release.yml @@ -22,3 +22,4 @@ jobs: server_version: main ubuntu_version: ${{ matrix.ubuntu_version }} compiler: ${{ matrix.compiler }} + type: RelWithDebInfo From aad38a8ecc9b55f7775d83c38f69ffca83ce40d9 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 05:23:36 -0800 Subject: [PATCH 07/39] TRY MORE, +4 --- .github/workflows/build-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index e59498a7..30a87c0d 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 30 + LD_PRELOAD=/lib/x86_64-linux-gnu/libSegFault.so ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov From cef4f7437866c3baae2c30d79fb9c5d20a09cacc Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 05:33:35 -0800 Subject: [PATCH 08/39] TRY MORE, +5 --- .github/workflows/build-test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 30a87c0d..e46653f0 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,9 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - LD_PRELOAD=/lib/x86_64-linux-gnu/libSegFault.so ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 30 + set +e + for i in $(seq 1 10); do ./bin/testsuite MicroStartStop; done + # ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov From f4122ed75584dcfe23693b1806acc7e0452f73ed Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 05:44:20 -0800 Subject: [PATCH 09/39] TRY MORE, +6 --- .github/workflows/build-test.yml | 4 +--- test/test.c | 12 ++++++------ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index e46653f0..a3040af8 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,9 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - set +e - for i in $(seq 1 10); do ./bin/testsuite MicroStartStop; done - # ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 30 + ctest -R 'MicroStartStop' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov diff --git a/test/test.c b/test/test.c index 81dc7e18..14439c87 100644 --- a/test/test.c +++ b/test/test.c @@ -33576,6 +33576,12 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer for (i = 0; i < n; i++) { + char buf[64]; + cfg->Version = "1.0.0"; + cfg->Description = "returns 42", + + snprintf(buf, sizeof(buf), "CoolService-%d", i); + cfg->Name = buf; _startMicroserviceOK(&(svcs[i]), nc, cfg, eps, num_eps, arg); } @@ -34098,9 +34104,6 @@ void test_MicroBasics(void) &ep2_cfg, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Metadata = (natsMetadata){ .List = (const char *[]){"skey1", "svalue1", "skey2", "svalue2"}, .Count = 2, @@ -34368,9 +34371,6 @@ void test_MicroStartStop(void) .Handler = _microHandleRequest42, }; microServiceConfig cfg = { - .Version = "1.0.0", - .Name = "CoolService", - .Description = "returns 42", .Endpoint = &ep_cfg, }; natsMsg *reply = NULL; From ba8a79339339513b94fb0c88a7ff221e86fd6c8b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 06:18:52 -0800 Subject: [PATCH 10/39] TRY MORE, +7 --- src/micro.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/micro.c b/src/micro.c index 54ce7c18..dbc88414 100644 --- a/src/micro.c +++ b/src/micro.c @@ -283,10 +283,12 @@ static bool _detach_service_from_connection(natsConnection *nc, microService *m) { bool removed = false; + int remaining = 0; + if (nc == NULL || m == NULL) return false; - printf("<>/<> _detach_service_from_connection\n"); + printf("<>/<> _detachING_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -299,8 +301,9 @@ _detach_service_from_connection(natsConnection *nc, microService *m) removed = true; break; } + remaining = nc->numServices; natsConn_Unlock(nc); - + printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); return removed; } @@ -404,6 +407,7 @@ void micro_release_endpoint_when_unsubscribed(void *closure) if ((m == NULL) || (m->service_mu == NULL)) return; + printf("<>/<> SUB complete: service %s, endpoint %s\n", m->cfg->Name, ep->subject); micro_lock_endpoint(ep); sub = ep->sub; ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. From 0ada3686b58e5f556213f0b70189558cdb78e998 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 06:23:25 -0800 Subject: [PATCH 11/39] TRY MORE, +8 --- src/micro.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/micro.c b/src/micro.c index dbc88414..616db952 100644 --- a/src/micro.c +++ b/src/micro.c @@ -324,7 +324,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool detached = _detach_service_from_connection(m->nc, m); _lock_service(m); - printf("<>/<> _stop_service: 1\n"); + printf("<>/<> _stop_service: %s: 1\n", m->cfg->Name); if (!m->stopped) m->stopped = true; else @@ -349,7 +349,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if ((m->refs > 0) && release) m->refs--; refs = m->refs; - printf("<>/<> _stop_service: 2: refs: %d, numEndpoints: %d\n", refs, numEndpoints); + printf("<>/<> _stop_service: %s: 2: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); _unlock_service(m); if ((refs == 0) && (numEndpoints == 0)) From c2e2e2f8dda213fee4bc03189ddd01d206ced446 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 06:28:51 -0800 Subject: [PATCH 12/39] TRY MORE, +9 --- src/micro.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/micro.c b/src/micro.c index 616db952..e4c461d1 100644 --- a/src/micro.c +++ b/src/micro.c @@ -349,6 +349,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if ((m->refs > 0) && release) m->refs--; refs = m->refs; + numEndpoints = m->numEndpoints; printf("<>/<> _stop_service: %s: 2: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); _unlock_service(m); From abb09e672cd1cc7ebb0b26f0cbbfea66ed7ddb1e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 06:37:28 -0800 Subject: [PATCH 13/39] TRY MORE, +10 --- src/micro.c | 3 +-- test/test.c | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/micro.c b/src/micro.c index e4c461d1..48385ec7 100644 --- a/src/micro.c +++ b/src/micro.c @@ -288,7 +288,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) if (nc == NULL || m == NULL) return false; - printf("<>/<> _detachING_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); + printf("<>/<> _detachING_service_from_connection %s\n", m->cfg->Name); natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -408,7 +408,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) if ((m == NULL) || (m->service_mu == NULL)) return; - printf("<>/<> SUB complete: service %s, endpoint %s\n", m->cfg->Name, ep->subject); micro_lock_endpoint(ep); sub = ep->sub; ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. diff --git a/test/test.c b/test/test.c index 14439c87..4965fb0a 100644 --- a/test/test.c +++ b/test/test.c @@ -33561,7 +33561,7 @@ _startMicroservice(microService** new_m, natsConnection *nc, microServiceConfig static void _startMicroserviceOK(microService** new_m, natsConnection *nc, microServiceConfig *cfg, microEndpointConfig **eps, int num_eps, struct threadArg *arg) { - char buf[64]; + char buf[256]; snprintf(buf, sizeof(buf), "Start microservice %s: ", cfg->Name); test(buf); @@ -34414,6 +34414,7 @@ void test_MicroStartStop(void) for (i = 0; i < NUM_MICRO_SERVICES; i++) { _destroyMicroservice(svcs[i]); + nats_Sleep(100); // <>/<> } _waitForMicroservicesAllDone(&arg); From dea759e437d09731f575ca129085f04e210ee4b7 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 06:51:52 -0800 Subject: [PATCH 14/39] TRY MORE, +11 --- .github/workflows/build-test.yml | 5 +++++ test/test.c | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index a3040af8..b3dfe418 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,6 +181,11 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" + echo "------------------------------------------------------------" + addr2line -e ./bin/testsuite -f -p 0x562bd0d021c2 + echo "------------------------------------------------------------" + nm -l ./bin/testsuite | sort + echo "------------------------------------------------------------" ctest -R 'MicroStartStop' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} diff --git a/test/test.c b/test/test.c index 4965fb0a..017f11e9 100644 --- a/test/test.c +++ b/test/test.c @@ -34414,7 +34414,6 @@ void test_MicroStartStop(void) for (i = 0; i < NUM_MICRO_SERVICES; i++) { _destroyMicroservice(svcs[i]); - nats_Sleep(100); // <>/<> } _waitForMicroservicesAllDone(&arg); From 1fcbe70114cc0059fb0e3a0ba0e4be7c35b29253 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 07:14:24 -0800 Subject: [PATCH 15/39] TRY MORE, +12 --- src/micro_endpoint.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index a14cbc83..d19713b2 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -93,6 +93,7 @@ micro_stop_endpoint(microEndpoint *ep) return NULL; micro_lock_endpoint(ep); + printf("<>/<> Stopping endpoint %s %s\n", ep->m->cfg->Name, ep->subject); sub = ep->sub; micro_unlock_endpoint(ep); if (sub == NULL) From 7fe1a464352852d3246f9f969652363ccd2c87d0 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 07:20:33 -0800 Subject: [PATCH 16/39] TRY MORE, +13 --- src/micro.c | 20 ++++++++++---------- src/micro_endpoint.c | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/micro.c b/src/micro.c index 48385ec7..114f0cdd 100644 --- a/src/micro.c +++ b/src/micro.c @@ -288,7 +288,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) if (nc == NULL || m == NULL) return false; - printf("<>/<> _detachING_service_from_connection %s\n", m->cfg->Name); + printf("<>/<> _detachING_service_from_connection %s\n", m->cfg->Name); fflush(stdout); natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -303,7 +303,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) } remaining = nc->numServices; natsConn_Unlock(nc); - printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); + printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); fflush(stdout); return removed; } @@ -324,7 +324,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool detached = _detach_service_from_connection(m->nc, m); _lock_service(m); - printf("<>/<> _stop_service: %s: 1\n", m->cfg->Name); + printf("<>/<> _stop_service: %s, detached: %d\n", m->cfg->Name, detached); fflush(stdout); if (!m->stopped) m->stopped = true; else @@ -350,9 +350,10 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool m->refs--; refs = m->refs; numEndpoints = m->numEndpoints; - printf("<>/<> _stop_service: %s: 2: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); _unlock_service(m); + printf("<>/<> _stoppED_service: %s: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); fflush(stdout); + if ((refs == 0) && (numEndpoints == 0)) _free_service(m); @@ -362,7 +363,6 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool microError * microService_Stop(microService *m) { - printf("<>/<> Stopping service %s from microService_Stop\n", m->cfg->Name); return _stop_service(m, false, true, false); } @@ -435,7 +435,7 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // Stop the service now in case it hasn't already and detach from the // connection, no need to unsubscribe. - printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); + printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); fflush(stdout); _stop_service(m, true, false, false); } } @@ -457,7 +457,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - printf("<>/<> Stopping service %s from microService_Destroy\n", m->cfg->Name); + printf("<>/<> Stopping service %s from microService_Destroy\n", m->cfg->Name); fflush(stdout); return _stop_service(m, false, true, true); } @@ -526,7 +526,7 @@ _free_service(microService *m) if (m == NULL) return; - printf("<>/<> Freeing service %s\n", m->cfg->Name); + printf("<>/<> Freeing service %s\n", m->cfg->Name); fflush(stdout); // destroy all groups. if (m->groups != NULL) @@ -610,7 +610,7 @@ _on_connection_closed(natsConnection *nc, void *ignored) natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { - printf("<>/<> Stopping service %s from on_connection_closed\n", nc->services[i]->cfg->Name); + printf("<>/<> Stopping service %s from on_connection_closed\n", nc->services[i]->cfg->Name); fflush(stdout); _stop_service(nc->services[i], false, false, false); } natsConn_Unlock(nc); @@ -677,7 +677,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use // Stop the service in error. It will get detached from the connection // and released when all of its subs are complete. - printf("<>/<> Stopping service %s from on_error\n", m->cfg->Name); + printf("<>/<> Stopping service %s from on_error\n", m->cfg->Name); fflush(stdout); _stop_service(m, false, true, false); } natsConn_Unlock(nc); diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index d19713b2..526c85ba 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -93,7 +93,7 @@ micro_stop_endpoint(microEndpoint *ep) return NULL; micro_lock_endpoint(ep); - printf("<>/<> Stopping endpoint %s %s\n", ep->m->cfg->Name, ep->subject); + printf("<>/<> Stopping endpoint %s %s\n", ep->m->cfg->Name, ep->subject); fflush(stdout); sub = ep->sub; micro_unlock_endpoint(ep); if (sub == NULL) From a00c0778fff0591a9589f7482d440fb09933fe0a Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 07:24:04 -0800 Subject: [PATCH 17/39] TRY MORE, +14 --- .github/workflows/build-test.yml | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index b3dfe418..87a4dcf9 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -174,6 +174,18 @@ jobs: mv ./nats-server ../deps/nats-server/nats-server fi + - name: "NC" + if: inputs.benchmark == 'OFF' + working-directory: ./build + run: | + nm -l ./bin/testsuite | sort + + - name: "objdump" + if: inputs.benchmark == 'OFF' + working-directory: ./build + run: | + objdump -d -S ./bin/testsuite + - name: "Test" if: inputs.benchmark == 'OFF' working-directory: ./build @@ -181,11 +193,6 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - echo "------------------------------------------------------------" - addr2line -e ./bin/testsuite -f -p 0x562bd0d021c2 - echo "------------------------------------------------------------" - nm -l ./bin/testsuite | sort - echo "------------------------------------------------------------" ctest -R 'MicroStartStop' --timeout 60 --output-on-failure --repeat-until-fail 30 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} From 7d118e4ac84f8e20ff4ac210b377bbac6aed35b5 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 07:29:48 -0800 Subject: [PATCH 18/39] TRY MORE, +15 --- src/micro_endpoint.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index 526c85ba..44afaa20 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -92,19 +92,28 @@ micro_stop_endpoint(microEndpoint *ep) if (ep == NULL) return NULL; + printf("<>/<> Stopping endpoint %p\n", ep); fflush(stdout); + micro_lock_endpoint(ep); - printf("<>/<> Stopping endpoint %s %s\n", ep->m->cfg->Name, ep->subject); fflush(stdout); + printf("<>/<> Stopping endpoint %p, %s %s\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); sub = ep->sub; micro_unlock_endpoint(ep); if (sub == NULL) + { + printf("<>/<> Stopping endpoint %p %s %s: sub is NULL\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return NULL; + } // When the drain is complete, the callback will free ep. We may get an // NATS_INVALID_SUBSCRIPTION if the subscription is already closed. s = natsSubscription_Drain(sub); if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION)) + { + printf("<>/<> Stopping endpoint %p %s %s: drain failed\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); + } + printf("<>/<> Stoped endpoint %p, %s %s!!!!\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return NULL; } From 96d655ed4f91b757c43091e0f92be9c5586041b8 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 07:40:38 -0800 Subject: [PATCH 19/39] TRY MORE, +16 --- src/micro.c | 2 -- src/micro_endpoint.c | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/micro.c b/src/micro.c index 114f0cdd..dbb5fdf8 100644 --- a/src/micro.c +++ b/src/micro.c @@ -329,7 +329,6 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool m->stopped = true; else alreadyStopped = true; - _unlock_service(m); if (!alreadyStopped && unsubscribe) { @@ -343,7 +342,6 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool } } - _lock_service(m); if (detached) m->refs--; if ((m->refs > 0) && release) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index 44afaa20..ff7219f9 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -113,7 +113,7 @@ micro_stop_endpoint(microEndpoint *ep) return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); } - printf("<>/<> Stoped endpoint %p, %s %s!!!!\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); + printf("<>/<> Stopped endpoint %p, %s %s!!!!\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return NULL; } From 176899f93ba131c25b31fb87ab9b6a7c9af41ec1 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 08:01:56 -0800 Subject: [PATCH 20/39] TRY MORE, +17 --- src/micro.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/micro.c b/src/micro.c index dbb5fdf8..6ea6e42a 100644 --- a/src/micro.c +++ b/src/micro.c @@ -348,9 +348,9 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool m->refs--; refs = m->refs; numEndpoints = m->numEndpoints; + printf("<>/<> _stoppED_service: %s: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); fflush(stdout); _unlock_service(m); - printf("<>/<> _stoppED_service: %s: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); fflush(stdout); if ((refs == 0) && (numEndpoints == 0)) _free_service(m); From a69c4176729bbcdd257f90f943fad830dabb9fef Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 08:13:19 -0800 Subject: [PATCH 21/39] TRY MORE, +18 --- .github/workflows/build-test.yml | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 87a4dcf9..a5f4080d 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -174,18 +174,6 @@ jobs: mv ./nats-server ../deps/nats-server/nats-server fi - - name: "NC" - if: inputs.benchmark == 'OFF' - working-directory: ./build - run: | - nm -l ./bin/testsuite | sort - - - name: "objdump" - if: inputs.benchmark == 'OFF' - working-directory: ./build - run: | - objdump -d -S ./bin/testsuite - - name: "Test" if: inputs.benchmark == 'OFF' working-directory: ./build @@ -193,7 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -R 'MicroStartStop' --timeout 60 --output-on-failure --repeat-until-fail 30 + ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 100 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov From 5538d7009bbc0b4e99d43ecbc21c684882fbb791 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 08:31:19 -0800 Subject: [PATCH 22/39] TRY MORE, +19 fixed MicroBasics --- test/test.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/test/test.c b/test/test.c index 017f11e9..5b075353 100644 --- a/test/test.c +++ b/test/test.c @@ -33578,10 +33578,13 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer { char buf[64]; cfg->Version = "1.0.0"; - cfg->Description = "returns 42", + cfg->Description = "returns 42"; - snprintf(buf, sizeof(buf), "CoolService-%d", i); - cfg->Name = buf; + if (nats_IsStringEmpty(cfg->Name)) + { + snprintf(buf, sizeof(buf), "CoolService-%d", i); + cfg->Name = buf; + } _startMicroserviceOK(&(svcs[i]), nc, cfg, eps, num_eps, arg); } @@ -34108,6 +34111,7 @@ void test_MicroBasics(void) .List = (const char *[]){"skey1", "svalue1", "skey2", "svalue2"}, .Count = 2, }, + .Name = "ManyServicesSameName", .Endpoint = NULL, .State = NULL, }; @@ -34165,7 +34169,7 @@ void test_MicroBasics(void) test(buf); err = microService_GetInfo(&info, svcs[i]); testCond((err == NULL) && - (strcmp(info->Name, "CoolService") == 0) && + (strcmp(info->Name, "ManyServicesSameName") == 0) && (strlen(info->Id) > 0) && (strcmp(info->Description, "returns 42") == 0) && (strcmp(info->Version, "1.0.0") == 0) && @@ -34176,7 +34180,7 @@ void test_MicroBasics(void) // Make sure we can request valid info with $SRV.INFO request. test("Create INFO inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_INFO_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_INFO_VERB, "ManyServicesSameName", NULL); test("Subscribe to INFO inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish INFO request: "); @@ -34202,7 +34206,7 @@ void test_MicroBasics(void) snprintf(buf, sizeof(buf), "Validate INFO response strings#%d: ", i); test(buf); testCond( - (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "CoolService") == 0) + (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && (strcmp(str, "ManyServicesSameName") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "description", &str)) && (strcmp(str, "returns 42") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "version", &str)) && (strcmp(str, "1.0.0") == 0) && (NATS_OK == nats_JSONGetStrPtr(js, "id", &str)) && (strlen(str) > 0) @@ -34254,7 +34258,7 @@ void test_MicroBasics(void) // Make sure we can request SRV.PING. test("Create PING inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_PING_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_PING_VERB, "ManyServicesSameName", NULL); test("Subscribe to PING inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish PING request: "); @@ -34276,7 +34280,7 @@ void test_MicroBasics(void) js = NULL; testCond((NATS_OK == nats_JSONParse(&js, reply->data, reply->dataLen)) && (NATS_OK == nats_JSONGetStrPtr(js, "name", &str)) && - (strcmp(str, "CoolService") == 0)); + (strcmp(str, "ManyServicesSameName") == 0)); nats_JSONDestroy(js); natsMsg_Destroy(reply); } @@ -34287,7 +34291,7 @@ void test_MicroBasics(void) // Get and validate $SRV.STATS from all service instances. test("Create STATS inbox: "); testCond(NATS_OK == natsInbox_Create(&inbox)); - micro_new_control_subject(&subject, MICRO_STATS_VERB, "CoolService", NULL); + micro_new_control_subject(&subject, MICRO_STATS_VERB, "ManyServicesSameName", NULL); test("Subscribe to STATS inbox: "); testCond(NATS_OK == natsConnection_SubscribeSync(&sub, nc, inbox)); test("Publish STATS request: "); From de7564f7818e62833fe650e649a19df7a916d16a Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 09:10:06 -0800 Subject: [PATCH 23/39] TRY MORE, +20 --- src/micro.c | 25 ++++++++++++++++++------- src/micro_endpoint.c | 10 ---------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/micro.c b/src/micro.c index 6ea6e42a..c56e936f 100644 --- a/src/micro.c +++ b/src/micro.c @@ -316,6 +316,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool int numEndpoints = 0; bool alreadyStopped = false; bool detached = false; + microEndpoint *EEEE[256]; if (m == NULL) return micro_ErrorInvalidArg; @@ -323,8 +324,9 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if (detachFromConnection) detached = _detach_service_from_connection(m->nc, m); - _lock_service(m); printf("<>/<> _stop_service: %s, detached: %d\n", m->cfg->Name, detached); fflush(stdout); + + _lock_service(m); if (!m->stopped) m->stopped = true; else @@ -332,13 +334,10 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if (!alreadyStopped && unsubscribe) { + int i = 0; for (ep = m->first_ep; ep != NULL; ep = ep->next) { - if (err = micro_stop_endpoint(ep), err != NULL) - { - err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); - return err; - } + EEEE[i++] = ep; } } @@ -348,9 +347,21 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool m->refs--; refs = m->refs; numEndpoints = m->numEndpoints; - printf("<>/<> _stoppED_service: %s: refs: %d, numEndpoints: %d\n", m->cfg->Name, refs, numEndpoints); fflush(stdout); _unlock_service(m); + if (!alreadyStopped && unsubscribe) + { + int i = 0; + for (i = 0; i < numEndpoints; i++) + { + ep = EEEE[i]; + if (err = micro_stop_endpoint(ep), err != NULL) + { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + return err; + } + } + } if ((refs == 0) && (numEndpoints == 0)) _free_service(m); diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index ff7219f9..a14cbc83 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -92,28 +92,18 @@ micro_stop_endpoint(microEndpoint *ep) if (ep == NULL) return NULL; - printf("<>/<> Stopping endpoint %p\n", ep); fflush(stdout); - micro_lock_endpoint(ep); - printf("<>/<> Stopping endpoint %p, %s %s\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); sub = ep->sub; micro_unlock_endpoint(ep); if (sub == NULL) - { - printf("<>/<> Stopping endpoint %p %s %s: sub is NULL\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return NULL; - } // When the drain is complete, the callback will free ep. We may get an // NATS_INVALID_SUBSCRIPTION if the subscription is already closed. s = natsSubscription_Drain(sub); if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION)) - { - printf("<>/<> Stopping endpoint %p %s %s: drain failed\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); - } - printf("<>/<> Stopped endpoint %p, %s %s!!!!\n", ep, ep->m->cfg->Name, ep->subject); fflush(stdout); return NULL; } From a18e07047f98a4f4ce861ede6a1d0f879fef2e46 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 10:52:40 -0800 Subject: [PATCH 24/39] TRY MORE, chase slowness --- .github/workflows/build-test.yml | 2 +- .github/workflows/on-pr-debug.yml | 240 +++++++++++++------------- .github/workflows/on-push-release.yml | 44 ++--- test/test.c | 76 ++++---- 4 files changed, 181 insertions(+), 181 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index a5f4080d..5b12a815 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 100 + ctest -R 'MicroAddService' --timeout 60 --output-on-failure --repeat-until-fail 100 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov diff --git a/.github/workflows/on-pr-debug.yml b/.github/workflows/on-pr-debug.yml index fe1a1d0e..43c10529 100644 --- a/.github/workflows/on-pr-debug.yml +++ b/.github/workflows/on-pr-debug.yml @@ -6,27 +6,27 @@ permissions: contents: write # so it can comment jobs: - Ubuntu: - name: "Ubuntu" - strategy: - fail-fast: false - matrix: - compiler: [gcc, clang] - uses: ./.github/workflows/build-test.yml - with: - compiler: ${{ matrix.compiler }} - server_version: main - type: Debug + # Ubuntu: + # name: "Ubuntu" + # strategy: + # fail-fast: false + # matrix: + # compiler: [gcc, clang] + # uses: ./.github/workflows/build-test.yml + # with: + # compiler: ${{ matrix.compiler }} + # server_version: main + # type: Debug - dev-mode: - name: "DEV_MODE" - uses: ./.github/workflows/build-test.yml - with: - dev_mode: ON - server_version: main - type: Debug - verbose_test_output: ON - verbose_make_output: ON + # dev-mode: + # name: "DEV_MODE" + # uses: ./.github/workflows/build-test.yml + # with: + # dev_mode: ON + # server_version: main + # type: Debug + # verbose_test_output: ON + # verbose_make_output: ON sanitize: name: "Sanitize" @@ -44,110 +44,110 @@ jobs: sanitize: ${{ matrix.sanitize }} pool_dispatch: ${{ matrix.pooled_dispatch }} - coverage-TLS: - name: "Coverage: TLS" - strategy: - fail-fast: false - matrix: - pooled_dispatch: [pool, NO-pool] - write_deadline: [write_deadline, NO-write_deadline] - uses: ./.github/workflows/build-test.yml - with: - coverage: ON - type: RelWithDebInfo - server_version: main - compiler: gcc - tls: TLS - verify_host: verify_host - pool_dispatch: ${{ matrix.pooled_dispatch }} - write_deadline: ${{ matrix.write_deadline }} - secrets: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # coverage-TLS: + # name: "Coverage: TLS" + # strategy: + # fail-fast: false + # matrix: + # pooled_dispatch: [pool, NO-pool] + # write_deadline: [write_deadline, NO-write_deadline] + # uses: ./.github/workflows/build-test.yml + # with: + # coverage: ON + # type: RelWithDebInfo + # server_version: main + # compiler: gcc + # tls: TLS + # verify_host: verify_host + # pool_dispatch: ${{ matrix.pooled_dispatch }} + # write_deadline: ${{ matrix.write_deadline }} + # secrets: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - coverage-NO-verify_host: - name: "Coverage: NO-verify_host" - uses: ./.github/workflows/build-test.yml - with: - coverage: ON - type: RelWithDebInfo - server_version: main - compiler: gcc - tls: TLS - verify_host: NO-verify_host - secrets: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # coverage-NO-verify_host: + # name: "Coverage: NO-verify_host" + # uses: ./.github/workflows/build-test.yml + # with: + # coverage: ON + # type: RelWithDebInfo + # server_version: main + # compiler: gcc + # tls: TLS + # verify_host: NO-verify_host + # secrets: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - coverage-NO-TLS: - name: "Coverage NO-TLS" - uses: ./.github/workflows/build-test.yml - with: - coverage: ON - type: RelWithDebInfo - server_version: main - compiler: gcc - tls: NO-TLS - verify_host: NO-verify_host - secrets: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # coverage-NO-TLS: + # name: "Coverage NO-TLS" + # uses: ./.github/workflows/build-test.yml + # with: + # coverage: ON + # type: RelWithDebInfo + # server_version: main + # compiler: gcc + # tls: NO-TLS + # verify_host: NO-verify_host + # secrets: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - bench: - name: "Benchmark" - uses: ./.github/workflows/build-test.yml - with: - server_version: main - benchmark: ON - type: Release + # bench: + # name: "Benchmark" + # uses: ./.github/workflows/build-test.yml + # with: + # server_version: main + # benchmark: ON + # type: Release - Windows: - name: "Windows" - runs-on: windows-latest - steps: - - name: Export GitHub Actions cache environment variables - uses: actions/github-script@v7 - with: - script: | - core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); - core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); + # Windows: + # name: "Windows" + # runs-on: windows-latest + # steps: + # - name: Export GitHub Actions cache environment variables + # uses: actions/github-script@v7 + # with: + # script: | + # core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); + # core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); - - name: Checkout nats.c - uses: actions/checkout@v4 + # - name: Checkout nats.c + # uses: actions/checkout@v4 - - name: Build - env: - VCPKG_BINARY_SOURCES: "clear;x-gha,readwrite" - run: | - cmake -B build -S . -DCMAKE_BUILD_TYPE=Debug -DNATS_BUILD_STREAMING=OFF - cmake --build build + # - name: Build + # env: + # VCPKG_BINARY_SOURCES: "clear;x-gha,readwrite" + # run: | + # cmake -B build -S . -DCMAKE_BUILD_TYPE=Debug -DNATS_BUILD_STREAMING=OFF + # cmake --build build - - name: Test - shell: bash - run: | - cd build - # Download latest nats-server - rel="latest" # TODO: parameterize - if [ "$rel" = "latest" ]; then - rel=$(curl -s https://api.github.com/repos/nats-io/nats-server/releases/latest | jq -r '.tag_name') - fi - if [ "$rel" != "${rel#v}" ] && wget https://github.com/nats-io/nats-server/releases/download/$rel/nats-server-$rel-windows-amd64.tar.gz; then - tar -xzf nats-server-$rel-linux-amd64.tar.gz - cp nats-server-$rel-windows-amd64/nats-server.exe ../deps/nats-server/nats-server.exe - else - for c in 1 2 3 4 5 - do - echo "Attempt $c to download binary for main" - rm -f ./nats-server - curl -sf "https://binaries.nats.dev/nats-io/nats-server/v2@$rel" | PREFIX=. sh - # We are sometimes getting nats-server of size 0. Make sure we have a - # working nats-server by making sure we get a version number. - mv ./nats-server ./nats-server.exe - v="$(./nats-server.exe -v)" - if [ "$v" != "" ]; then - break - fi - done - mkdir -p ../deps/nats-server - mv ./nats-server.exe ../deps/nats-server/nats-server.exe - fi - export PATH=../deps/nats-server:$PATH - export NATS_TEST_SERVER_VERSION="$(nats-server -v)" - ctest -L test -C Debug --timeout 60 --output-on-failure --repeat until-pass:3 + # - name: Test + # shell: bash + # run: | + # cd build + # # Download latest nats-server + # rel="latest" # TODO: parameterize + # if [ "$rel" = "latest" ]; then + # rel=$(curl -s https://api.github.com/repos/nats-io/nats-server/releases/latest | jq -r '.tag_name') + # fi + # if [ "$rel" != "${rel#v}" ] && wget https://github.com/nats-io/nats-server/releases/download/$rel/nats-server-$rel-windows-amd64.tar.gz; then + # tar -xzf nats-server-$rel-linux-amd64.tar.gz + # cp nats-server-$rel-windows-amd64/nats-server.exe ../deps/nats-server/nats-server.exe + # else + # for c in 1 2 3 4 5 + # do + # echo "Attempt $c to download binary for main" + # rm -f ./nats-server + # curl -sf "https://binaries.nats.dev/nats-io/nats-server/v2@$rel" | PREFIX=. sh + # # We are sometimes getting nats-server of size 0. Make sure we have a + # # working nats-server by making sure we get a version number. + # mv ./nats-server ./nats-server.exe + # v="$(./nats-server.exe -v)" + # if [ "$v" != "" ]; then + # break + # fi + # done + # mkdir -p ../deps/nats-server + # mv ./nats-server.exe ../deps/nats-server/nats-server.exe + # fi + # export PATH=../deps/nats-server:$PATH + # export NATS_TEST_SERVER_VERSION="$(nats-server -v)" + # ctest -L test -C Debug --timeout 60 --output-on-failure --repeat until-pass:3 diff --git a/.github/workflows/on-push-release.yml b/.github/workflows/on-push-release.yml index 0c1b570e..d7ebdd84 100644 --- a/.github/workflows/on-push-release.yml +++ b/.github/workflows/on-push-release.yml @@ -1,25 +1,25 @@ -name: "Release" -on: - push: +# name: "Release" +# on: +# push: -permissions: - contents: write # required by build-test to comment on coverage but not used here. +# permissions: +# contents: write # required by build-test to comment on coverage but not used here. -defaults: - run: - shell: bash --noprofile --norc -x -eo pipefail {0} +# defaults: +# run: +# shell: bash --noprofile --norc -x -eo pipefail {0} -jobs: - quick: - name: "Ubuntu" - strategy: - fail-fast: false - matrix: - compiler: [gcc, clang] - ubuntu_version: [latest, 20.04] - uses: ./.github/workflows/build-test.yml - with: - server_version: main - ubuntu_version: ${{ matrix.ubuntu_version }} - compiler: ${{ matrix.compiler }} - type: RelWithDebInfo +# jobs: + # quick: + # name: "Ubuntu" + # strategy: + # fail-fast: false + # matrix: + # compiler: [gcc, clang] + # ubuntu_version: [latest, 20.04] + # uses: ./.github/workflows/build-test.yml + # with: + # server_version: main + # ubuntu_version: ${{ matrix.ubuntu_version }} + # compiler: ${{ matrix.compiler }} + # type: RelWithDebInfo diff --git a/test/test.c b/test/test.c index 5b075353..09a19c74 100644 --- a/test/test.c +++ b/test/test.c @@ -33689,49 +33689,49 @@ void test_MicroAddService(void) }; add_service_test_case_t tcs[] = { - { - .name = "Minimal", - .cfg = &minimal_cfg, - }, + // { + // .name = "Minimal", + // .cfg = &minimal_cfg, + // }, { .name = "Full", .cfg = &full_cfg, .expected_num_subjects = 1, }, - { - .name = "Full-with-endpoints", - .cfg = &full_cfg, - .endpoints = all_ep_cfgs, - .num_endpoints = sizeof(all_ep_cfgs) / sizeof(all_ep_cfgs[0]), - .expected_num_subjects = 4, - }, - { - .name = "Err-null-connection", - .cfg = &minimal_cfg, - .null_nc = true, - .expected_err = "status 16: invalid function argument", - }, - { - .name = "Err-null-receiver", - .cfg = &minimal_cfg, - .null_receiver = true, - .expected_err = "status 16: invalid function argument", - }, - { - .name = "Err-no-name", - .cfg = &err_no_name_cfg, - .expected_err = "status 16: invalid function argument", - }, - { - .name = "Err-no-version", - .cfg = &err_no_version_cfg, - .expected_err = "status 16: invalid function argument", - }, - { - .name = "Err-invalid-version", - .cfg = &err_invalid_version_cfg, - // TODO: validate the version format. - }, + // { + // .name = "Full-with-endpoints", + // .cfg = &full_cfg, + // .endpoints = all_ep_cfgs, + // .num_endpoints = sizeof(all_ep_cfgs) / sizeof(all_ep_cfgs[0]), + // .expected_num_subjects = 4, + // }, + // { + // .name = "Err-null-connection", + // .cfg = &minimal_cfg, + // .null_nc = true, + // .expected_err = "status 16: invalid function argument", + // }, + // { + // .name = "Err-null-receiver", + // .cfg = &minimal_cfg, + // .null_receiver = true, + // .expected_err = "status 16: invalid function argument", + // }, + // { + // .name = "Err-no-name", + // .cfg = &err_no_name_cfg, + // .expected_err = "status 16: invalid function argument", + // }, + // { + // .name = "Err-no-version", + // .cfg = &err_no_version_cfg, + // .expected_err = "status 16: invalid function argument", + // }, + // { + // .name = "Err-invalid-version", + // .cfg = &err_invalid_version_cfg, + // // TODO: validate the version format. + // }, }; add_service_test_case_t tc; From eca119129f4a5763271369744fe9484a281311ab Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 11:05:42 -0800 Subject: [PATCH 25/39] TRY MORE, found? --- .github/workflows/build-test.yml | 2 +- .github/workflows/on-pr-debug.yml | 78 +++++++++++++-------------- .github/workflows/on-push-release.yml | 44 +++++++-------- test/test.c | 7 ++- 4 files changed, 68 insertions(+), 63 deletions(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 5b12a815..a5f4080d 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -R 'MicroAddService' --timeout 60 --output-on-failure --repeat-until-fail 100 + ctest -R 'Micro' --timeout 60 --output-on-failure --repeat-until-fail 100 # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov diff --git a/.github/workflows/on-pr-debug.yml b/.github/workflows/on-pr-debug.yml index 43c10529..5bbf7f39 100644 --- a/.github/workflows/on-pr-debug.yml +++ b/.github/workflows/on-pr-debug.yml @@ -6,27 +6,27 @@ permissions: contents: write # so it can comment jobs: - # Ubuntu: - # name: "Ubuntu" - # strategy: - # fail-fast: false - # matrix: - # compiler: [gcc, clang] - # uses: ./.github/workflows/build-test.yml - # with: - # compiler: ${{ matrix.compiler }} - # server_version: main - # type: Debug + Ubuntu: + name: "Ubuntu" + strategy: + fail-fast: false + matrix: + compiler: [gcc, clang] + uses: ./.github/workflows/build-test.yml + with: + compiler: ${{ matrix.compiler }} + server_version: main + type: Debug - # dev-mode: - # name: "DEV_MODE" - # uses: ./.github/workflows/build-test.yml - # with: - # dev_mode: ON - # server_version: main - # type: Debug - # verbose_test_output: ON - # verbose_make_output: ON + dev-mode: + name: "DEV_MODE" + uses: ./.github/workflows/build-test.yml + with: + dev_mode: ON + server_version: main + type: Debug + verbose_test_output: ON + verbose_make_output: ON sanitize: name: "Sanitize" @@ -44,25 +44,25 @@ jobs: sanitize: ${{ matrix.sanitize }} pool_dispatch: ${{ matrix.pooled_dispatch }} - # coverage-TLS: - # name: "Coverage: TLS" - # strategy: - # fail-fast: false - # matrix: - # pooled_dispatch: [pool, NO-pool] - # write_deadline: [write_deadline, NO-write_deadline] - # uses: ./.github/workflows/build-test.yml - # with: - # coverage: ON - # type: RelWithDebInfo - # server_version: main - # compiler: gcc - # tls: TLS - # verify_host: verify_host - # pool_dispatch: ${{ matrix.pooled_dispatch }} - # write_deadline: ${{ matrix.write_deadline }} - # secrets: - # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + coverage-TLS: + name: "Coverage: TLS" + strategy: + fail-fast: false + matrix: + pooled_dispatch: [pool, NO-pool] + write_deadline: [write_deadline, NO-write_deadline] + uses: ./.github/workflows/build-test.yml + with: + coverage: ON + type: RelWithDebInfo + server_version: main + compiler: gcc + tls: TLS + verify_host: verify_host + pool_dispatch: ${{ matrix.pooled_dispatch }} + write_deadline: ${{ matrix.write_deadline }} + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} # coverage-NO-verify_host: # name: "Coverage: NO-verify_host" diff --git a/.github/workflows/on-push-release.yml b/.github/workflows/on-push-release.yml index d7ebdd84..0c1b570e 100644 --- a/.github/workflows/on-push-release.yml +++ b/.github/workflows/on-push-release.yml @@ -1,25 +1,25 @@ -# name: "Release" -# on: -# push: +name: "Release" +on: + push: -# permissions: -# contents: write # required by build-test to comment on coverage but not used here. +permissions: + contents: write # required by build-test to comment on coverage but not used here. -# defaults: -# run: -# shell: bash --noprofile --norc -x -eo pipefail {0} +defaults: + run: + shell: bash --noprofile --norc -x -eo pipefail {0} -# jobs: - # quick: - # name: "Ubuntu" - # strategy: - # fail-fast: false - # matrix: - # compiler: [gcc, clang] - # ubuntu_version: [latest, 20.04] - # uses: ./.github/workflows/build-test.yml - # with: - # server_version: main - # ubuntu_version: ${{ matrix.ubuntu_version }} - # compiler: ${{ matrix.compiler }} - # type: RelWithDebInfo +jobs: + quick: + name: "Ubuntu" + strategy: + fail-fast: false + matrix: + compiler: [gcc, clang] + ubuntu_version: [latest, 20.04] + uses: ./.github/workflows/build-test.yml + with: + server_version: main + ubuntu_version: ${{ matrix.ubuntu_version }} + compiler: ${{ matrix.compiler }} + type: RelWithDebInfo diff --git a/test/test.c b/test/test.c index 09a19c74..785910d5 100644 --- a/test/test.c +++ b/test/test.c @@ -33593,7 +33593,7 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(valgrind ? 500 : 20); \ + nats_Sleep(valgrind ? 20 : 10); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ @@ -33844,8 +33844,13 @@ void test_MicroAddService(void) } microServiceInfo_Destroy(info); + + int64_t start = nats_Now(); + int64_t nn; _destroyMicroservice(m); + printf("<>/<> destroy time: %lld\n", (nn = nats_Now()) - start); _waitForMicroservicesAllDone(&arg); + printf("<>/<> waited for: %lld\n", nats_Now() - nn); } test("Destroy the test connection: "); From 0390c26d4ff635d528cf57cc00f65913658cabdd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 11:13:32 -0800 Subject: [PATCH 26/39] TRY MORE, +1 --- src/micro.c | 14 +++++----- test/test.c | 80 ++++++++++++++++++++++++++--------------------------- 2 files changed, 46 insertions(+), 48 deletions(-) diff --git a/src/micro.c b/src/micro.c index c56e936f..ac7949fd 100644 --- a/src/micro.c +++ b/src/micro.c @@ -288,7 +288,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) if (nc == NULL || m == NULL) return false; - printf("<>/<> _detachING_service_from_connection %s\n", m->cfg->Name); fflush(stdout); + // printf("<>/<> _detachING_service_from_connection %s\n", m->cfg->Name); fflush(stdout); natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -303,7 +303,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) } remaining = nc->numServices; natsConn_Unlock(nc); - printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); fflush(stdout); + // printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); fflush(stdout); return removed; } @@ -324,7 +324,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if (detachFromConnection) detached = _detach_service_from_connection(m->nc, m); - printf("<>/<> _stop_service: %s, detached: %d\n", m->cfg->Name, detached); fflush(stdout); + // printf("<>/<> _stop_service: %s, detached: %d\n", m->cfg->Name, detached); fflush(stdout); _lock_service(m); if (!m->stopped) @@ -444,7 +444,7 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // Stop the service now in case it hasn't already and detach from the // connection, no need to unsubscribe. - printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); fflush(stdout); + // printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); fflush(stdout); _stop_service(m, true, false, false); } } @@ -466,7 +466,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - printf("<>/<> Stopping service %s from microService_Destroy\n", m->cfg->Name); fflush(stdout); + // printf("<>/<> Stopping service %s from microService_Destroy\n", m->cfg->Name); fflush(stdout); return _stop_service(m, false, true, true); } @@ -619,7 +619,7 @@ _on_connection_closed(natsConnection *nc, void *ignored) natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { - printf("<>/<> Stopping service %s from on_connection_closed\n", nc->services[i]->cfg->Name); fflush(stdout); + // printf("<>/<> Stopping service %s from on_connection_closed\n", nc->services[i]->cfg->Name); fflush(stdout); _stop_service(nc->services[i], false, false, false); } natsConn_Unlock(nc); @@ -686,7 +686,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use // Stop the service in error. It will get detached from the connection // and released when all of its subs are complete. - printf("<>/<> Stopping service %s from on_error\n", m->cfg->Name); fflush(stdout); + // printf("<>/<> Stopping service %s from on_error\n", m->cfg->Name); fflush(stdout); _stop_service(m, false, true, false); } natsConn_Unlock(nc); diff --git a/test/test.c b/test/test.c index 785910d5..b4c96c77 100644 --- a/test/test.c +++ b/test/test.c @@ -33593,7 +33593,7 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(valgrind ? 20 : 10); \ + nats_Sleep(valgrind ? 0 : 0); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ @@ -33689,49 +33689,49 @@ void test_MicroAddService(void) }; add_service_test_case_t tcs[] = { - // { - // .name = "Minimal", - // .cfg = &minimal_cfg, - // }, + { + .name = "Minimal", + .cfg = &minimal_cfg, + }, { .name = "Full", .cfg = &full_cfg, .expected_num_subjects = 1, }, - // { - // .name = "Full-with-endpoints", - // .cfg = &full_cfg, - // .endpoints = all_ep_cfgs, - // .num_endpoints = sizeof(all_ep_cfgs) / sizeof(all_ep_cfgs[0]), - // .expected_num_subjects = 4, - // }, - // { - // .name = "Err-null-connection", - // .cfg = &minimal_cfg, - // .null_nc = true, - // .expected_err = "status 16: invalid function argument", - // }, - // { - // .name = "Err-null-receiver", - // .cfg = &minimal_cfg, - // .null_receiver = true, - // .expected_err = "status 16: invalid function argument", - // }, - // { - // .name = "Err-no-name", - // .cfg = &err_no_name_cfg, - // .expected_err = "status 16: invalid function argument", - // }, - // { - // .name = "Err-no-version", - // .cfg = &err_no_version_cfg, - // .expected_err = "status 16: invalid function argument", - // }, - // { - // .name = "Err-invalid-version", - // .cfg = &err_invalid_version_cfg, - // // TODO: validate the version format. - // }, + { + .name = "Full-with-endpoints", + .cfg = &full_cfg, + .endpoints = all_ep_cfgs, + .num_endpoints = sizeof(all_ep_cfgs) / sizeof(all_ep_cfgs[0]), + .expected_num_subjects = 4, + }, + { + .name = "Err-null-connection", + .cfg = &minimal_cfg, + .null_nc = true, + .expected_err = "status 16: invalid function argument", + }, + { + .name = "Err-null-receiver", + .cfg = &minimal_cfg, + .null_receiver = true, + .expected_err = "status 16: invalid function argument", + }, + { + .name = "Err-no-name", + .cfg = &err_no_name_cfg, + .expected_err = "status 16: invalid function argument", + }, + { + .name = "Err-no-version", + .cfg = &err_no_version_cfg, + .expected_err = "status 16: invalid function argument", + }, + { + .name = "Err-invalid-version", + .cfg = &err_invalid_version_cfg, + // TODO: validate the version format. + }, }; add_service_test_case_t tc; @@ -33848,9 +33848,7 @@ void test_MicroAddService(void) int64_t start = nats_Now(); int64_t nn; _destroyMicroservice(m); - printf("<>/<> destroy time: %lld\n", (nn = nats_Now()) - start); _waitForMicroservicesAllDone(&arg); - printf("<>/<> waited for: %lld\n", nats_Now() - nn); } test("Destroy the test connection: "); From 6a883d31578d49d4f5090cfe2683c5bddb3b08b0 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 11:27:35 -0800 Subject: [PATCH 27/39] TRY MORE, +2 --- src/micro.c | 8 ++++---- test/test.c | 5 +---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/micro.c b/src/micro.c index ac7949fd..18721c79 100644 --- a/src/micro.c +++ b/src/micro.c @@ -283,7 +283,7 @@ static bool _detach_service_from_connection(natsConnection *nc, microService *m) { bool removed = false; - int remaining = 0; + // int remaining = 0; if (nc == NULL || m == NULL) return false; @@ -301,7 +301,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) removed = true; break; } - remaining = nc->numServices; + // remaining = nc->numServices; natsConn_Unlock(nc); // printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); fflush(stdout); return removed; @@ -440,12 +440,12 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // Special processing for the last endpoint. if (doneHandler != NULL) { - doneHandler(m); - // Stop the service now in case it hasn't already and detach from the // connection, no need to unsubscribe. // printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); fflush(stdout); _stop_service(m, true, false, false); + + doneHandler(m); } } diff --git a/test/test.c b/test/test.c index b4c96c77..9543d3c8 100644 --- a/test/test.c +++ b/test/test.c @@ -33593,7 +33593,6 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer #define _waitForMicroservicesAllDone(_arg) \ { \ - nats_Sleep(valgrind ? 0 : 0); \ natsMutex_Lock((_arg)->m); \ testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ natsStatus waitStatus = NATS_OK; \ @@ -33844,9 +33843,6 @@ void test_MicroAddService(void) } microServiceInfo_Destroy(info); - - int64_t start = nats_Now(); - int64_t nn; _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); } @@ -34565,6 +34561,7 @@ void test_MicroServiceStopsWhenServerStops(void) _waitForMicroservicesAllDone(&arg); + // nats_Sleep(10); // <>/<> test("Test microservice is not running: "); testCond(microService_IsStopped(m)) From a68eb80c8a929237e99bfbc80e9fc7d2115ab3ba Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 11:40:23 -0800 Subject: [PATCH 28/39] TRY MORE, +3 --- src/micro.c | 4 ++-- test/test.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/micro.c b/src/micro.c index 18721c79..a1dfb6ca 100644 --- a/src/micro.c +++ b/src/micro.c @@ -440,12 +440,12 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // Special processing for the last endpoint. if (doneHandler != NULL) { + doneHandler(m); + // Stop the service now in case it hasn't already and detach from the // connection, no need to unsubscribe. // printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); fflush(stdout); _stop_service(m, true, false, false); - - doneHandler(m); } } diff --git a/test/test.c b/test/test.c index 9543d3c8..d27a92eb 100644 --- a/test/test.c +++ b/test/test.c @@ -33602,6 +33602,7 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer allDone = (_arg)->microAllDone; \ natsMutex_Unlock((_arg)->m); \ testCond((NATS_OK == waitStatus) && allDone); \ + nats_Sleep(1); \ } #define _destroyMicroservice(_s) \ @@ -34561,7 +34562,6 @@ void test_MicroServiceStopsWhenServerStops(void) _waitForMicroservicesAllDone(&arg); - // nats_Sleep(10); // <>/<> test("Test microservice is not running: "); testCond(microService_IsStopped(m)) From 335b7e38c6e288a0db620643de3ede42c78f1bbf Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 13:03:55 -0800 Subject: [PATCH 29/39] TRY MORE, +4 --- src/micro.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/micro.c b/src/micro.c index a1dfb6ca..8cf31527 100644 --- a/src/micro.c +++ b/src/micro.c @@ -628,8 +628,9 @@ _on_connection_closed(natsConnection *nc, void *ignored) static bool _on_service_error(microService *m, const char *subject, natsStatus s) { - microEndpoint *found = NULL; - microError *err = NULL; + microEndpoint *found = NULL; + const char *subject = NULL; + microError *err = NULL; if (m == NULL) return false; @@ -645,14 +646,17 @@ _on_service_error(microService *m, const char *subject, natsStatus s) } if (found != NULL) + { micro_retain_endpoint(found); + subject = found->subject; + } _unlock_service(m); if (found == NULL) return false; - err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", found->subject); + err = microError_Wrapf(micro_ErrorFromStatus(s), "NATS error on endpoint %s", subject); micro_update_last_error(found, err); microError_Destroy(err); From d565761327823278f33b2ef7eca3f90141f9add1 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 13:06:07 -0800 Subject: [PATCH 30/39] TRY MORE, +5 --- src/micro.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/micro.c b/src/micro.c index 8cf31527..5a91f2da 100644 --- a/src/micro.c +++ b/src/micro.c @@ -629,7 +629,6 @@ static bool _on_service_error(microService *m, const char *subject, natsStatus s) { microEndpoint *found = NULL; - const char *subject = NULL; microError *err = NULL; if (m == NULL) @@ -646,10 +645,7 @@ _on_service_error(microService *m, const char *subject, natsStatus s) } if (found != NULL) - { micro_retain_endpoint(found); - subject = found->subject; - } _unlock_service(m); From c811a80c9402560534d88de8924e1b111c7a13a3 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 13:14:59 -0800 Subject: [PATCH 31/39] cleanup --- src/conn.h | 9 --------- src/micro.c | 21 +++++---------------- 2 files changed, 5 insertions(+), 25 deletions(-) diff --git a/src/conn.h b/src/conn.h index 57ff3de3..41e93f40 100644 --- a/src/conn.h +++ b/src/conn.h @@ -160,13 +160,4 @@ natsConn_close(natsConnection *nc); void natsConn_destroy(natsConnection *nc, bool fromPublicDestroy); -int -natsConn_getServices(microService ***services, natsConnection *nc); - -bool -natsConn_detachService(natsConnection *nc, microService *service); - -natsStatus -natsConn_attachService(natsConnection *nc, microService *service); - #endif /* CONN_H_ */ diff --git a/src/micro.c b/src/micro.c index 5a91f2da..bb42b89c 100644 --- a/src/micro.c +++ b/src/micro.c @@ -283,12 +283,10 @@ static bool _detach_service_from_connection(natsConnection *nc, microService *m) { bool removed = false; - // int remaining = 0; if (nc == NULL || m == NULL) return false; - // printf("<>/<> _detachING_service_from_connection %s\n", m->cfg->Name); fflush(stdout); natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -301,9 +299,8 @@ _detach_service_from_connection(natsConnection *nc, microService *m) removed = true; break; } - // remaining = nc->numServices; natsConn_Unlock(nc); - // printf("<>/<> _detachED_service_from_connection %s: remaining: %d, removed: %d\n", m->cfg->Name, remaining, removed); fflush(stdout); + return removed; } @@ -324,8 +321,6 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if (detachFromConnection) detached = _detach_service_from_connection(m->nc, m); - // printf("<>/<> _stop_service: %s, detached: %d\n", m->cfg->Name, detached); fflush(stdout); - _lock_service(m); if (!m->stopped) m->stopped = true; @@ -444,7 +439,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) // Stop the service now in case it hasn't already and detach from the // connection, no need to unsubscribe. - // printf("<>/<> Stopping service %s from micro_release_endpoint_when_unsubscribed\n", m->cfg->Name); fflush(stdout); _stop_service(m, true, false, false); } } @@ -466,7 +460,6 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - // printf("<>/<> Stopping service %s from microService_Destroy\n", m->cfg->Name); fflush(stdout); return _stop_service(m, false, true, true); } @@ -535,8 +528,6 @@ _free_service(microService *m) if (m == NULL) return; - printf("<>/<> Freeing service %s\n", m->cfg->Name); fflush(stdout); - // destroy all groups. if (m->groups != NULL) { @@ -614,14 +605,13 @@ _free_cloned_service_config(microServiceConfig *cfg) static void _on_connection_closed(natsConnection *nc, void *ignored) { + natsConn_Lock(nc); + // Stop all services. They will get detached from the connection when their // subs are complete. - natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) - { - // printf("<>/<> Stopping service %s from on_connection_closed\n", nc->services[i]->cfg->Name); fflush(stdout); _stop_service(nc->services[i], false, false, false); - } + natsConn_Unlock(nc); } @@ -673,7 +663,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use subject = natsSubscription_GetSubject(sub); - // <>/<> TODO: this would be a lot easier if sub had a ref to ep. + // TODO: this would be a lot easier if sub had a ref to ep. natsConn_Lock(nc); for (int i = 0; i < nc->numServices; i++) { @@ -686,7 +676,6 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use // Stop the service in error. It will get detached from the connection // and released when all of its subs are complete. - // printf("<>/<> Stopping service %s from on_error\n", m->cfg->Name); fflush(stdout); _stop_service(m, false, true, false); } natsConn_Unlock(nc); From 9df3d27a75cb71a98ebfb47341c2086143a0bc86 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 13:29:45 -0800 Subject: [PATCH 32/39] servicesMu --- src/conn.c | 3 +++ src/micro.c | 38 +++++++++++++------------------------- src/natsp.h | 1 + 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/conn.c b/src/conn.c index 9a9823ee..c7dbeac3 100644 --- a/src/conn.c +++ b/src/conn.c @@ -204,6 +204,7 @@ _freeConn(natsConnection *nc) natsStrHash_Destroy(nc->respMap); natsCondition_Destroy(nc->reconnectCond); natsMutex_Destroy(nc->subsMu); + natsMutex_Destroy(nc->servicesMu); natsMutex_Destroy(nc->mu); NATS_FREE(nc->services); @@ -3239,6 +3240,8 @@ natsConn_create(natsConnection **newConn, natsOptions *options) s = natsMutex_Create(&(nc->mu)); if (s == NATS_OK) s = natsMutex_Create(&(nc->subsMu)); + if (s == NATS_OK) + s = natsMutex_Create(&(nc->servicesMu)); if (s == NATS_OK) s = _setupServerPool(nc); if (s == NATS_OK) diff --git a/src/micro.c b/src/micro.c index bb42b89c..a16c858d 100644 --- a/src/micro.c +++ b/src/micro.c @@ -253,7 +253,7 @@ _attach_service_to_connection(natsConnection *nc, microService *service) if (nc == NULL || service == NULL) return micro_ErrorInvalidArg; - natsConn_Lock(nc); + natsMutex_Lock(nc->servicesMu); if (nc->services == NULL) { nc->services = NATS_CALLOC(1, sizeof(microService *)); @@ -274,7 +274,7 @@ _attach_service_to_connection(natsConnection *nc, microService *service) nc->services[nc->numServices] = service; nc->numServices++; } - natsConn_Unlock(nc); + natsMutex_Unlock(nc->servicesMu); return micro_ErrorFromStatus(s); } @@ -287,7 +287,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) if (nc == NULL || m == NULL) return false; - natsConn_Lock(nc); + natsMutex_Lock(nc->servicesMu); for (int i = 0; i < nc->numServices; i++) { if (nc->services[i] != m) @@ -299,7 +299,7 @@ _detach_service_from_connection(natsConnection *nc, microService *m) removed = true; break; } - natsConn_Unlock(nc); + natsMutex_Unlock(nc->servicesMu); return removed; } @@ -313,7 +313,6 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool int numEndpoints = 0; bool alreadyStopped = false; bool detached = false; - microEndpoint *EEEE[256]; if (m == NULL) return micro_ErrorInvalidArg; @@ -329,10 +328,13 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool if (!alreadyStopped && unsubscribe) { - int i = 0; for (ep = m->first_ep; ep != NULL; ep = ep->next) { - EEEE[i++] = ep; + if (err = micro_stop_endpoint(ep), err != NULL) + { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + return err; + } } } @@ -344,20 +346,6 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool numEndpoints = m->numEndpoints; _unlock_service(m); - if (!alreadyStopped && unsubscribe) - { - int i = 0; - for (i = 0; i < numEndpoints; i++) - { - ep = EEEE[i]; - if (err = micro_stop_endpoint(ep), err != NULL) - { - err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); - return err; - } - } - } - if ((refs == 0) && (numEndpoints == 0)) _free_service(m); @@ -605,14 +593,14 @@ _free_cloned_service_config(microServiceConfig *cfg) static void _on_connection_closed(natsConnection *nc, void *ignored) { - natsConn_Lock(nc); + natsMutex_Lock(nc->servicesMu); // Stop all services. They will get detached from the connection when their // subs are complete. for (int i = 0; i < nc->numServices; i++) _stop_service(nc->services[i], false, false, false); - natsConn_Unlock(nc); + natsMutex_Unlock(nc->servicesMu); } static bool @@ -664,7 +652,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use subject = natsSubscription_GetSubject(sub); // TODO: this would be a lot easier if sub had a ref to ep. - natsConn_Lock(nc); + natsMutex_Lock(nc->servicesMu); for (int i = 0; i < nc->numServices; i++) { microService *m = nc->services[i]; @@ -678,7 +666,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use // and released when all of its subs are complete. _stop_service(m, false, true, false); } - natsConn_Unlock(nc); + natsMutex_Unlock(nc->servicesMu); } static inline microError * diff --git a/src/natsp.h b/src/natsp.h index e2e4a018..80164238 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -701,6 +701,7 @@ struct __natsConnection microService **services; int numServices; + natsMutex *servicesMu; natsConnStatus status; bool initc; // true if the connection is performing the initial connect From 2ae33a22a2d726b218bf71a8668cca25d459c2c4 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 13:43:44 -0800 Subject: [PATCH 33/39] adjustments --- src/micro.c | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/src/micro.c b/src/micro.c index a16c858d..1ad6e0f0 100644 --- a/src/micro.c +++ b/src/micro.c @@ -279,13 +279,11 @@ _attach_service_to_connection(natsConnection *nc, microService *service) return micro_ErrorFromStatus(s); } -static bool +static void _detach_service_from_connection(natsConnection *nc, microService *m) { - bool removed = false; - if (nc == NULL || m == NULL) - return false; + return; natsMutex_Lock(nc->servicesMu); for (int i = 0; i < nc->numServices; i++) @@ -296,30 +294,25 @@ _detach_service_from_connection(natsConnection *nc, microService *m) for (int j = i; j < nc->numServices - 1; j++) nc->services[j] = nc->services[j + 1]; nc->numServices--; - removed = true; break; } natsMutex_Unlock(nc->servicesMu); - return removed; + return; } static microError * -_stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool release) +_stop_service(microService *m, bool unsubscribe, bool release) { microError *err = NULL; microEndpoint *ep = NULL; int refs = 0; int numEndpoints = 0; bool alreadyStopped = false; - bool detached = false; if (m == NULL) return micro_ErrorInvalidArg; - if (detachFromConnection) - detached = _detach_service_from_connection(m->nc, m); - _lock_service(m); if (!m->stopped) m->stopped = true; @@ -338,10 +331,9 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool } } - if (detached) - m->refs--; - if ((m->refs > 0) && release) + if (release) m->refs--; + refs = m->refs; numEndpoints = m->numEndpoints; _unlock_service(m); @@ -355,7 +347,7 @@ _stop_service(microService *m, bool detachFromConnection, bool unsubscribe, bool microError * microService_Stop(microService *m) { - return _stop_service(m, false, true, false); + return _stop_service(m, true, false); } static void @@ -425,9 +417,8 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { doneHandler(m); - // Stop the service now in case it hasn't already and detach from the - // connection, no need to unsubscribe. - _stop_service(m, true, false, false); + _detach_service_from_connection(m->nc, m); + _stop_service(m, false, true); } } @@ -448,7 +439,7 @@ bool microService_IsStopped(microService *m) microError * microService_Destroy(microService *m) { - return _stop_service(m, false, true, true); + return _stop_service(m, true, true); } microError * @@ -598,7 +589,7 @@ _on_connection_closed(natsConnection *nc, void *ignored) // Stop all services. They will get detached from the connection when their // subs are complete. for (int i = 0; i < nc->numServices; i++) - _stop_service(nc->services[i], false, false, false); + _stop_service(nc->services[i], false, false); natsMutex_Unlock(nc->servicesMu); } @@ -664,7 +655,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use // Stop the service in error. It will get detached from the connection // and released when all of its subs are complete. - _stop_service(m, false, true, false); + _stop_service(m, true, false); } natsMutex_Unlock(nc->servicesMu); } From 05cf5b4b559d7c9dd1b5b1061a3b591907989e79 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 14:25:43 -0800 Subject: [PATCH 34/39] adjustments, +1 --- src/micro.c | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/micro.c b/src/micro.c index 1ad6e0f0..faac139c 100644 --- a/src/micro.c +++ b/src/micro.c @@ -350,6 +350,7 @@ microService_Stop(microService *m) return _stop_service(m, true, false); } +// service lock must be held by the caller. static void _detach_endpoint_from_service(microService *m, microEndpoint *toRemove) { @@ -359,18 +360,19 @@ _detach_endpoint_from_service(microService *m, microEndpoint *toRemove) if ((m == NULL) || (toRemove == NULL)) return; - for (ep = m->first_ep; ep != NULL; ep = ep->next) + for (ep = m->first_ep; (ep != NULL) && (ep != toRemove); prev_ep = ep, ep = ep->next) + ; + if (ep == NULL) + return; + + m->numEndpoints--; + if (prev_ep == NULL) + m->first_ep = ep->next; + else { - if (ep == toRemove) - { - m->numEndpoints--; - if (prev_ep == NULL) - m->first_ep = ep->next; - else - prev_ep->next = ep->next; - return; - } - prev_ep = ep; + micro_lock_endpoint(prev_ep); + prev_ep->next = ep->next; + micro_unlock_endpoint(prev_ep); } } From 1562929561b5e86c4c570c6db99ab383b68eddce Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 15:13:43 -0800 Subject: [PATCH 35/39] adjustments, +2, longer sleep --- test/test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.c b/test/test.c index d27a92eb..b54f6455 100644 --- a/test/test.c +++ b/test/test.c @@ -33602,7 +33602,7 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer allDone = (_arg)->microAllDone; \ natsMutex_Unlock((_arg)->m); \ testCond((NATS_OK == waitStatus) && allDone); \ - nats_Sleep(1); \ + nats_Sleep(5); \ } #define _destroyMicroservice(_s) \ From 541e2ea9c8bcb6728f3b8d2b73c7a87ad31d7e63 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 15:36:18 -0800 Subject: [PATCH 36/39] adjustments, +2, no sleep, but mark as stopped before the callback --- src/micro.c | 6 +++++- test/test.c | 1 - 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/micro.c b/src/micro.c index faac139c..1509362a 100644 --- a/src/micro.c +++ b/src/micro.c @@ -410,7 +410,11 @@ void micro_release_endpoint_when_unsubscribed(void *closure) if (refs == 0) micro_free_endpoint(ep); if (m->numEndpoints == 0) + { + // Mark the service as stopped before calling the done handler. + m->stopped = true; doneHandler = m->cfg->DoneHandler; + } _unlock_service(m); @@ -420,7 +424,7 @@ void micro_release_endpoint_when_unsubscribed(void *closure) doneHandler(m); _detach_service_from_connection(m->nc, m); - _stop_service(m, false, true); + _stop_service(m, false, true); // just release } } diff --git a/test/test.c b/test/test.c index b54f6455..2ea2a299 100644 --- a/test/test.c +++ b/test/test.c @@ -33602,7 +33602,6 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer allDone = (_arg)->microAllDone; \ natsMutex_Unlock((_arg)->m); \ testCond((NATS_OK == waitStatus) && allDone); \ - nats_Sleep(5); \ } #define _destroyMicroservice(_s) \ From 11be61c02ed5f457e5fa6cfd6bfce699b08bc012 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 15:49:46 -0800 Subject: [PATCH 37/39] adjustments, +3 --- src/micro.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/micro.c b/src/micro.c index 1509362a..ed8b2280 100644 --- a/src/micro.c +++ b/src/micro.c @@ -394,19 +394,17 @@ void micro_release_endpoint_when_unsubscribed(void *closure) if ((m == NULL) || (m->service_mu == NULL)) return; + _lock_service(m); micro_lock_endpoint(ep); + + _detach_endpoint_from_service(m, ep); sub = ep->sub; ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. refs = --(ep->refs); micro_unlock_endpoint(ep); - natsSubscription_Destroy(sub); + natsSubscription_Destroy(sub); // <>/<> do I need this hack? - // If this is the last endpoint, we need to notify the service's done - // callback. - _lock_service(m); - - _detach_endpoint_from_service(m, ep); if (refs == 0) micro_free_endpoint(ep); if (m->numEndpoints == 0) From c173957cc7a8aa1c78c6cf26f3d571004bee6d41 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 16:08:09 -0800 Subject: [PATCH 38/39] adjustments, +4, removed sub=NULL hack --- src/micro.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/micro.c b/src/micro.c index ed8b2280..9fd3326b 100644 --- a/src/micro.c +++ b/src/micro.c @@ -383,7 +383,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure) { microEndpoint *ep = (microEndpoint *)closure; microService *m = NULL; - natsSubscription *sub = NULL; microDoneHandler doneHandler = NULL; int refs = 0; @@ -395,16 +394,12 @@ void micro_release_endpoint_when_unsubscribed(void *closure) return; _lock_service(m); - micro_lock_endpoint(ep); + micro_lock_endpoint(ep); _detach_endpoint_from_service(m, ep); - sub = ep->sub; - ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. refs = --(ep->refs); micro_unlock_endpoint(ep); - natsSubscription_Destroy(sub); // <>/<> do I need this hack? - if (refs == 0) micro_free_endpoint(ep); if (m->numEndpoints == 0) From e430291c430689d61c4ee104e149406e00967a34 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Sun, 3 Nov 2024 16:11:05 -0800 Subject: [PATCH 39/39] adjustments, +5 --- src/micro.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/micro.c b/src/micro.c index 9fd3326b..17c1b286 100644 --- a/src/micro.c +++ b/src/micro.c @@ -308,25 +308,25 @@ _stop_service(microService *m, bool unsubscribe, bool release) microEndpoint *ep = NULL; int refs = 0; int numEndpoints = 0; - bool alreadyStopped = false; if (m == NULL) return micro_ErrorInvalidArg; _lock_service(m); if (!m->stopped) + { m->stopped = true; - else - alreadyStopped = true; - if (!alreadyStopped && unsubscribe) - { - for (ep = m->first_ep; ep != NULL; ep = ep->next) + if (unsubscribe) { - if (err = micro_stop_endpoint(ep), err != NULL) + for (ep = m->first_ep; ep != NULL; ep = ep->next) { - err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); - return err; + if (err = micro_stop_endpoint(ep), err != NULL) + { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); + _unlock_service(m); + return err; + } } } }