diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index cf96b75e6..d2d318835 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -181,7 +181,8 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} + ctest -R '^Micro' --timeout 60 --output-on-failure --repeat-until-fail 20 # ${{ inputs.repeat }} + # ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov # PRs from external contributors fail: https://github.com/codecov/feedback/issues/301 diff --git a/.github/workflows/on-pr-debug.yml b/.github/workflows/on-pr-debug.yml index fe1a1d0e7..43c10529e 100644 --- a/.github/workflows/on-pr-debug.yml +++ b/.github/workflows/on-pr-debug.yml @@ -6,27 +6,27 @@ permissions: contents: write # so it can comment jobs: - Ubuntu: - name: "Ubuntu" - strategy: - fail-fast: false - matrix: - compiler: [gcc, clang] - uses: ./.github/workflows/build-test.yml - with: - compiler: ${{ matrix.compiler }} - server_version: main - type: Debug + # Ubuntu: + # name: "Ubuntu" + # strategy: + # fail-fast: false + # matrix: + # compiler: [gcc, clang] + # uses: ./.github/workflows/build-test.yml + # with: + # compiler: ${{ matrix.compiler }} + # server_version: main + # type: Debug - dev-mode: - name: "DEV_MODE" - uses: ./.github/workflows/build-test.yml - with: - dev_mode: ON - server_version: main - type: Debug - verbose_test_output: ON - verbose_make_output: ON + # dev-mode: + # name: "DEV_MODE" + # uses: ./.github/workflows/build-test.yml + # with: + # dev_mode: ON + # server_version: main + # type: Debug + # verbose_test_output: ON + # verbose_make_output: ON sanitize: name: "Sanitize" @@ -44,110 +44,110 @@ jobs: sanitize: ${{ matrix.sanitize }} pool_dispatch: ${{ matrix.pooled_dispatch }} - coverage-TLS: - name: "Coverage: TLS" - strategy: - fail-fast: false - matrix: - pooled_dispatch: [pool, NO-pool] - write_deadline: [write_deadline, NO-write_deadline] - uses: ./.github/workflows/build-test.yml - with: - coverage: ON - type: RelWithDebInfo - server_version: main - compiler: gcc - tls: TLS - verify_host: verify_host - pool_dispatch: ${{ matrix.pooled_dispatch }} - write_deadline: ${{ matrix.write_deadline }} - secrets: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # coverage-TLS: + # name: "Coverage: TLS" + # strategy: + # fail-fast: false + # matrix: + # pooled_dispatch: [pool, NO-pool] + # write_deadline: [write_deadline, NO-write_deadline] + # uses: ./.github/workflows/build-test.yml + # with: + # coverage: ON + # type: RelWithDebInfo + # server_version: main + # compiler: gcc + # tls: TLS + # verify_host: verify_host + # pool_dispatch: ${{ matrix.pooled_dispatch }} + # write_deadline: ${{ matrix.write_deadline }} + # secrets: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - coverage-NO-verify_host: - name: "Coverage: NO-verify_host" - uses: ./.github/workflows/build-test.yml - with: - coverage: ON - type: RelWithDebInfo - server_version: main - compiler: gcc - tls: TLS - verify_host: NO-verify_host - secrets: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # coverage-NO-verify_host: + # name: "Coverage: NO-verify_host" + # uses: ./.github/workflows/build-test.yml + # with: + # coverage: ON + # type: RelWithDebInfo + # server_version: main + # compiler: gcc + # tls: TLS + # verify_host: NO-verify_host + # secrets: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - coverage-NO-TLS: - name: "Coverage NO-TLS" - uses: ./.github/workflows/build-test.yml - with: - coverage: ON - type: RelWithDebInfo - server_version: main - compiler: gcc - tls: NO-TLS - verify_host: NO-verify_host - secrets: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # coverage-NO-TLS: + # name: "Coverage NO-TLS" + # uses: ./.github/workflows/build-test.yml + # with: + # coverage: ON + # type: RelWithDebInfo + # server_version: main + # compiler: gcc + # tls: NO-TLS + # verify_host: NO-verify_host + # secrets: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - bench: - name: "Benchmark" - uses: ./.github/workflows/build-test.yml - with: - server_version: main - benchmark: ON - type: Release + # bench: + # name: "Benchmark" + # uses: ./.github/workflows/build-test.yml + # with: + # server_version: main + # benchmark: ON + # type: Release - Windows: - name: "Windows" - runs-on: windows-latest - steps: - - name: Export GitHub Actions cache environment variables - uses: actions/github-script@v7 - with: - script: | - core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); - core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); + # Windows: + # name: "Windows" + # runs-on: windows-latest + # steps: + # - name: Export GitHub Actions cache environment variables + # uses: actions/github-script@v7 + # with: + # script: | + # core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); + # core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); - - name: Checkout nats.c - uses: actions/checkout@v4 + # - name: Checkout nats.c + # uses: actions/checkout@v4 - - name: Build - env: - VCPKG_BINARY_SOURCES: "clear;x-gha,readwrite" - run: | - cmake -B build -S . -DCMAKE_BUILD_TYPE=Debug -DNATS_BUILD_STREAMING=OFF - cmake --build build + # - name: Build + # env: + # VCPKG_BINARY_SOURCES: "clear;x-gha,readwrite" + # run: | + # cmake -B build -S . -DCMAKE_BUILD_TYPE=Debug -DNATS_BUILD_STREAMING=OFF + # cmake --build build - - name: Test - shell: bash - run: | - cd build - # Download latest nats-server - rel="latest" # TODO: parameterize - if [ "$rel" = "latest" ]; then - rel=$(curl -s https://api.github.com/repos/nats-io/nats-server/releases/latest | jq -r '.tag_name') - fi - if [ "$rel" != "${rel#v}" ] && wget https://github.com/nats-io/nats-server/releases/download/$rel/nats-server-$rel-windows-amd64.tar.gz; then - tar -xzf nats-server-$rel-linux-amd64.tar.gz - cp nats-server-$rel-windows-amd64/nats-server.exe ../deps/nats-server/nats-server.exe - else - for c in 1 2 3 4 5 - do - echo "Attempt $c to download binary for main" - rm -f ./nats-server - curl -sf "https://binaries.nats.dev/nats-io/nats-server/v2@$rel" | PREFIX=. sh - # We are sometimes getting nats-server of size 0. Make sure we have a - # working nats-server by making sure we get a version number. - mv ./nats-server ./nats-server.exe - v="$(./nats-server.exe -v)" - if [ "$v" != "" ]; then - break - fi - done - mkdir -p ../deps/nats-server - mv ./nats-server.exe ../deps/nats-server/nats-server.exe - fi - export PATH=../deps/nats-server:$PATH - export NATS_TEST_SERVER_VERSION="$(nats-server -v)" - ctest -L test -C Debug --timeout 60 --output-on-failure --repeat until-pass:3 + # - name: Test + # shell: bash + # run: | + # cd build + # # Download latest nats-server + # rel="latest" # TODO: parameterize + # if [ "$rel" = "latest" ]; then + # rel=$(curl -s https://api.github.com/repos/nats-io/nats-server/releases/latest | jq -r '.tag_name') + # fi + # if [ "$rel" != "${rel#v}" ] && wget https://github.com/nats-io/nats-server/releases/download/$rel/nats-server-$rel-windows-amd64.tar.gz; then + # tar -xzf nats-server-$rel-linux-amd64.tar.gz + # cp nats-server-$rel-windows-amd64/nats-server.exe ../deps/nats-server/nats-server.exe + # else + # for c in 1 2 3 4 5 + # do + # echo "Attempt $c to download binary for main" + # rm -f ./nats-server + # curl -sf "https://binaries.nats.dev/nats-io/nats-server/v2@$rel" | PREFIX=. sh + # # We are sometimes getting nats-server of size 0. Make sure we have a + # # working nats-server by making sure we get a version number. + # mv ./nats-server ./nats-server.exe + # v="$(./nats-server.exe -v)" + # if [ "$v" != "" ]; then + # break + # fi + # done + # mkdir -p ../deps/nats-server + # mv ./nats-server.exe ../deps/nats-server/nats-server.exe + # fi + # export PATH=../deps/nats-server:$PATH + # export NATS_TEST_SERVER_VERSION="$(nats-server -v)" + # ctest -L test -C Debug --timeout 60 --output-on-failure --repeat until-pass:3 diff --git a/.github/workflows/on-push-release.yml b/.github/workflows/on-push-release.yml index d47c20308..54c36d47a 100644 --- a/.github/workflows/on-push-release.yml +++ b/.github/workflows/on-push-release.yml @@ -1,24 +1,24 @@ -name: "Release" -on: - push: +# name: "Release" +# on: +# push: -permissions: - contents: write # required by build-test to comment on coverage but not used here. +# permissions: +# contents: write # required by build-test to comment on coverage but not used here. -defaults: - run: - shell: bash --noprofile --norc -x -eo pipefail {0} +# defaults: +# run: +# shell: bash --noprofile --norc -x -eo pipefail {0} -jobs: - quick: - name: "Ubuntu" - strategy: - fail-fast: false - matrix: - compiler: [gcc, clang] - ubuntu_version: [latest, 20.04] - uses: ./.github/workflows/build-test.yml - with: - server_version: main - ubuntu_version: ${{ matrix.ubuntu_version }} - compiler: ${{ matrix.compiler }} +# jobs: +# quick: +# name: "Ubuntu" +# strategy: +# fail-fast: false +# matrix: +# compiler: [gcc, clang] +# ubuntu_version: [latest, 20.04] +# uses: ./.github/workflows/build-test.yml +# with: +# server_version: main +# ubuntu_version: ${{ matrix.ubuntu_version }} +# compiler: ${{ matrix.compiler }} diff --git a/src/conn.c b/src/conn.c index 37fcc1790..5c3d9e67a 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2506,6 +2506,8 @@ _removeAllSubscriptions(natsConnection *nc) } natsHashIter_Done(&iter); natsMutex_Unlock(nc->subsMu); + + printf("<>/<> removeAllSubscriptions\n"); } // Low level close call that will do correct cleanup and set @@ -2986,6 +2988,7 @@ natsConn_removeSubscription(natsConnection *nc, natsSubscription *removedSub) natsSub_close(sub, false); natsMutex_Unlock(nc->subsMu); + nats_Sleep(1); // <>/<> // If we really removed the subscription, then release it. if (sub != NULL) diff --git a/src/dispatch.c b/src/dispatch.c index bf9f8c0f7..bbf5dccbc 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -279,7 +279,11 @@ nats_dispatchThreadPool(void *arg) } if (completeCB != NULL) + { + nats_Sleep(100); + printf("<>/<> completeCB (pool) on %s\n", sub->subject); fflush(stdout); (*completeCB)(completeCBClosure); + } // Subscription closed, just release natsSub_release(sub); @@ -586,7 +590,11 @@ nats_dispatchThreadOwn(void *arg) } if (completeCB != NULL) + { + nats_Sleep(100); + printf("<>/<> completeCB on %s\n", sub->subject); fflush(stdout); (*completeCB)(completeCBClosure); + } natsSub_release(sub); } diff --git a/src/micro.c b/src/micro.c index 354824184..5a8c1f2af 100644 --- a/src/micro.c +++ b/src/micro.c @@ -69,86 +69,148 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c return NULL; } +static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) +{ + size_t len = strlen(src) + 1; + char *p; + + if (g != NULL) + len += strlen(g->config->Prefix) + 1; + + *dst = NATS_CALLOC(1, len); + if (*dst == NULL) + return micro_ErrorOutOfMemory; + + p = *dst; + if (g != NULL) + { + len = strlen(g->config->Prefix); + memcpy(p, g->config->Prefix, len); + p[len] = '.'; + p += len + 1; + } + memcpy(p, src, strlen(src) + 1); + return NULL; +} + +microError * +_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal, char *fullSubject) +{ + microError *err = NULL; + microEndpoint *ep = NULL; + + if (cfg == NULL) + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); + if (!micro_is_valid_name(cfg->Name)) + return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name %s", cfg->Name); + if (cfg->Handler == NULL) + return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for %s", cfg->Name); + + ep = NATS_CALLOC(1, sizeof(microEndpoint)); + if (ep == NULL) + return micro_ErrorOutOfMemory; + ep->is_monitoring_endpoint = is_internal; + ep->m = m; + + MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); + MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); + if (err != NULL) + { + micro_destroy_endpoint(ep); + return err; + } + + ep->subject = fullSubject; // already malloced, will be freed in micro_destroy_endpoint + ep->group = g; + *new_ep = ep; + return NULL; +} + + microError * micro_add_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) { microError *err = NULL; - microEndpoint *ptr = NULL; - microEndpoint *prev_ptr = NULL; microEndpoint *ep = NULL; - microEndpoint *prev_ep = NULL; + bool update = false; if (m == NULL) return micro_ErrorInvalidArg; if (cfg == NULL) return NULL; - err = micro_new_endpoint(&ep, m, g, cfg, is_internal); - if (err != NULL) - return microError_Wrapf(err, "failed to create endpoint %s", cfg->Name); + char *fullSubject = NULL; + if (err = _subjectWithGroupPrefix(&fullSubject, g, nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject), err != NULL) + return microError_Wrapf(err, "failed to create full subject for endpoint %s", cfg->Name); + if (!micro_is_valid_subject(fullSubject)) + { + NATS_FREE(fullSubject); + return microError_Wrapf(micro_ErrorInvalidArg, "invalid subject %s for endpoint %s", fullSubject, cfg->Name); + } _lock_service(m); - if (m->stopped) + err = micro_Errorf("can't add an endpoint %s to service %s: the service is stopped", cfg->Name, m->cfg->Name); + + // See if there is already an endpoint with the same subject. ep->subject is + // immutable after the EP's creation so we don't need to lock it. + for (ep = m->first_ep; (err == NULL) && (ep != NULL); ep = ep->next) { - _unlock_service(m); - return micro_Errorf("can't add an endpoint %s to service %s: the service is stopped", cfg->Name, m->cfg->Name); + if (strcmp(ep->subject, fullSubject) == 0) + { + // Found an existing endpoint with the same subject. We will update + // it as long as we can re-use the existing subscription, which at + // the moment means we can't change the queue group settings. + if (cfg->NoQueueGroup != ep->config->NoQueueGroup) + err = micro_Errorf("can't change the queue group settings for endpoint %s", cfg->Name); + else if (!nats_StringEquals(cfg->QueueGroup, ep->config->QueueGroup)) + err = micro_Errorf("can't change the queue group for endpoint %s", cfg->Name); + if (err == NULL) + { + NATS_FREE(fullSubject); + update = true; + fullSubject = NULL; + break; + } + } } - if (m->first_ep != NULL) + if (err == NULL) { - if (strcmp(m->first_ep->subject, ep->subject) == 0) + if (update) { - ep->next = m->first_ep->next; - prev_ep = m->first_ep; - m->first_ep = ep; + // If the endpoint already exists, update its config and stats. + // This will make it use the new handler for subsequent + // requests. + micro_lock_endpoint(ep); + micro_free_cloned_endpoint_config(ep->config); + err = micro_clone_endpoint_config(&ep->config, cfg); + if (err == NULL) + memset(&ep->stats, 0, sizeof(ep->stats)); + micro_unlock_endpoint(ep); } else { - prev_ptr = m->first_ep; - for (ptr = m->first_ep->next; ptr != NULL; prev_ptr = ptr, ptr = ptr->next) - { - if (strcmp(ptr->subject, ep->subject) == 0) - { - ep->next = ptr->next; - prev_ptr->next = ep; - prev_ep = ptr; - break; - } - } - if (prev_ep == NULL) + err = _new_endpoint(&ep, m, g, cfg, is_internal, fullSubject); + ep->next = m->first_ep; + m->first_ep = ep; + + // retain `m` before the endpoint uses it for its on_complete callback. + _retain_service(m); + + if (err = micro_subscribe_endpoint(ep), err != NULL) { - prev_ptr->next = ep; + // Best effort, leave the new endpoint in the list, as is. A retry with + // the same name will clean it up. + _release_service(m); + return microError_Wrapf(err, "failed to start endpoint %s", ep->config->Name); } } } - else - { - m->first_ep = ep; - } _unlock_service(m); - - if (prev_ep != NULL) - { - // Rid of the previous endpoint with the same subject, if any. If this - // fails we can return the error, leave the newly added endpoint in the - // list, not started. A retry with the same subject will clean it up. - if (err = micro_stop_endpoint(prev_ep), err != NULL) - return err; - micro_release_endpoint(prev_ep); - } - - // retain `m` before the endpoint uses it for its on_complete callback. - _retain_service(m); - - if (err = micro_start_endpoint(ep), err != NULL) - { - // Best effort, leave the new endpoint in the list, as is. A retry with - // the same name will clean it up. - _release_service(m); - return microError_Wrapf(err, "failed to start endpoint %s", ep->name); - } + if (err != NULL) + return microError_Wrapf(err, "can't add an endpoint %s to service %s", cfg->Name, m->cfg->Name); if (new_ep != NULL) *new_ep = ep; @@ -192,10 +254,11 @@ microService_Stop(microService *m) for (; ep != NULL; ep = ep->next) { - if (err = micro_stop_endpoint(ep), err != NULL) + if (err = micro_drain_endpoint(ep), err != NULL) { + err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name); _unlock_service(m); - return microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->name); + return err; } } @@ -250,7 +313,6 @@ void micro_release_on_endpoint_complete(void *closure) microService *m = NULL; natsSubscription *sub = NULL; microDoneHandler doneHandler = NULL; - bool free_ep = false; bool finalize = false; if (ep == NULL) @@ -261,20 +323,15 @@ void micro_release_on_endpoint_complete(void *closure) return; micro_lock_endpoint(ep); - ep->is_draining = false; sub = ep->sub; - ep->sub = NULL; - ep->refs--; - free_ep = (ep->refs == 0); + ep->sub = NULL; // Force the subscription to be destroyed now, so NULL out the pointer to avoid a double free. micro_unlock_endpoint(ep); - - // Force the subscription to be destroyed now. natsSubscription_Destroy(sub); _lock_service(m); // Release the service reference for the completed endpoint. It can not be - // the last reference, so no need to free m. + // the last reference, so no need to check for 0. m->refs--; // Unlink the endpoint from the service. @@ -298,10 +355,11 @@ void micro_release_on_endpoint_complete(void *closure) doneHandler = m->cfg->DoneHandler; } + printf("<>/<> micro: release completed endpoint %s\n", sub->subject); + _unlock_service(m); - if (free_ep) - micro_free_endpoint(ep); + micro_release_endpoint(ep); if (finalize) { @@ -588,6 +646,8 @@ _on_connection_closed(natsConnection *nc, void *ignored) int n = 0; int i; + printf("<>/<> micro: connection closed\n"); + err = _services_for_connection(&to_call, &n, nc); if (err != NULL) { @@ -612,6 +672,8 @@ _on_service_error(microService *m, const char *subject, natsStatus s) microEndpoint *ep = NULL; microError *err = NULL; + printf("<>/<> micro: service error\n"); + if (m == NULL) return; @@ -648,6 +710,8 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use int n = 0; int i; + printf("<>/<> micro: on error\n"); + if (sub == NULL) { return; @@ -848,7 +912,7 @@ microService_GetInfo(microServiceInfo **new_info, microService *m) { if ((!ep->is_monitoring_endpoint) && (ep->subject != NULL)) { - MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->name)); + MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Name, ep->config->Name)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].Subject, ep->subject)); MICRO_CALL(err, micro_strdup((char **)&info->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); MICRO_CALL(err, micro_ErrorFromStatus( @@ -947,7 +1011,7 @@ microService_GetStats(microServiceStats **new_stats, microService *m) // copy the entire struct, including the last error buffer. stats->Endpoints[len] = ep->stats; - MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->name)); + MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Name, ep->config->Name)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].Subject, ep->subject)); MICRO_CALL(err, micro_strdup((char **)&stats->Endpoints[len].QueueGroup, micro_queue_group_for_endpoint(ep))); if (err == NULL) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index efe72f41d..3fb9373c3 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -16,53 +16,11 @@ #include "microp.h" #include "util.h" -static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src); - static void _handle_request(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); static void _retain_endpoint(microEndpoint *ep, bool lock); static void _release_endpoint(microEndpoint *ep); -microError * -micro_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal) -{ - microError *err = NULL; - microEndpoint *ep = NULL; - const char *subj; - - if (cfg == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint config"); - if (!micro_is_valid_name(cfg->Name)) - return microError_Wrapf(micro_ErrorInvalidArg, "invalid endpoint name %s", cfg->Name); - if (cfg->Handler == NULL) - return microError_Wrapf(micro_ErrorInvalidArg, "NULL endpoint request handler for %s", cfg->Name); - - if ((cfg->Subject != NULL) && !micro_is_valid_subject(cfg->Subject)) - return micro_ErrorInvalidArg; - - subj = nats_IsStringEmpty(cfg->Subject) ? cfg->Name : cfg->Subject; - - ep = NATS_CALLOC(1, sizeof(microEndpoint)); - if (ep == NULL) - return micro_ErrorOutOfMemory; - ep->is_monitoring_endpoint = is_internal; - ep->m = m; - - MICRO_CALL(err, micro_ErrorFromStatus(natsMutex_Create(&ep->endpoint_mu))); - MICRO_CALL(err, micro_clone_endpoint_config(&ep->config, cfg)); - MICRO_CALL(err, micro_strdup(&ep->name, cfg->Name)); - MICRO_CALL(err, _subjectWithGroupPrefix(&ep->subject, g, subj)); - if (err != NULL) - { - micro_free_endpoint(ep); - return err; - } - - ep->group = g; - *new_ep = ep; - return NULL; -} - const char * micro_queue_group_for_endpoint(microEndpoint *ep) { @@ -88,7 +46,7 @@ micro_queue_group_for_endpoint(microEndpoint *ep) } microError * -micro_start_endpoint(microEndpoint *ep) +micro_subscribe_endpoint(microEndpoint *ep) { natsStatus s = NATS_OK; natsSubscription *sub = NULL; @@ -113,7 +71,6 @@ micro_start_endpoint(microEndpoint *ep) micro_lock_endpoint(ep); ep->refs++; ep->sub = sub; - ep->is_draining = false; micro_unlock_endpoint(ep); natsSubscription_SetOnCompleteCB(sub, micro_release_on_endpoint_complete, ep); @@ -127,36 +84,25 @@ micro_start_endpoint(microEndpoint *ep) } microError * -micro_stop_endpoint(microEndpoint *ep) +micro_drain_endpoint(microEndpoint *ep) { natsStatus s = NATS_OK; natsSubscription *sub = NULL; - if ((ep == NULL) || (ep->m == NULL)) + if (ep == NULL) return NULL; micro_lock_endpoint(ep); sub = ep->sub; - - if (ep->is_draining || natsConnection_IsClosed(ep->m->nc) || !natsSubscription_IsValid(sub)) - { - // If stopping, _release_on_endpoint_complete will take care of - // finalizing, nothing else to do. In other cases - // _release_on_endpoint_complete has already been called. - micro_unlock_endpoint(ep); - return NULL; - } - - ep->is_draining = true; micro_unlock_endpoint(ep); + if (sub == NULL) + return NULL; - // When the drain is complete, will release the final ref on ep. + // When the drain is complete, the callback will free ep. We may get an + // NATS_INVALID_SUBSCRIPTION if the subscription is already closed. s = natsSubscription_Drain(sub); - if (s != NATS_OK) - { - return microError_Wrapf(micro_ErrorFromStatus(s), - "failed to stop endpoint %s: failed to drain subscription", ep->name); - } + if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION)) + return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); return NULL; } @@ -187,15 +133,14 @@ void micro_release_endpoint(microEndpoint *ep) micro_unlock_endpoint(ep); if (refs == 0) - micro_free_endpoint(ep); + micro_destroy_endpoint(ep); } -void micro_free_endpoint(microEndpoint *ep) +void micro_destroy_endpoint(microEndpoint *ep) { if (ep == NULL) return; - NATS_FREE(ep->name); NATS_FREE(ep->subject); natsSubscription_Destroy(ep->sub); natsMutex_Destroy(ep->endpoint_mu); @@ -334,7 +279,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) microEndpointConfig *new_cfg = NULL; if (out == NULL) - return micro_ErrorInvalidArg; + return microError_Wrapf(micro_ErrorInvalidArg, "failed to clone endpoint config"); if (cfg == NULL) { @@ -357,6 +302,7 @@ micro_clone_endpoint_config(microEndpointConfig **out, microEndpointConfig *cfg) if (err != NULL) { micro_free_cloned_endpoint_config(new_cfg); + return microError_Wrapf(err, "failed to clone endpoint config %s", cfg->Name); return err; } @@ -434,26 +380,3 @@ bool micro_match_endpoint_subject(const char *ep_subject, const char *actual_sub } } -static microError *_subjectWithGroupPrefix(char **dst, microGroup *g, const char *src) -{ - size_t len = strlen(src) + 1; - char *p; - - if (g != NULL) - len += strlen(g->config->Prefix) + 1; - - *dst = NATS_CALLOC(1, len); - if (*dst == NULL) - return micro_ErrorOutOfMemory; - - p = *dst; - if (g != NULL) - { - len = strlen(g->config->Prefix); - memcpy(p, g->config->Prefix, len); - p[len] = '.'; - p += len + 1; - } - memcpy(p, src, strlen(src) + 1); - return NULL; -} diff --git a/src/microp.h b/src/microp.h index 8184691c4..d83f6e5ae 100644 --- a/src/microp.h +++ b/src/microp.h @@ -49,7 +49,6 @@ struct micro_endpoint_s { // The name and subject that the endpoint is listening on (may be different // from one specified in config). - char *name; char *subject; // A copy of the config provided to add_endpoint. @@ -75,7 +74,6 @@ struct micro_endpoint_s // Mutex for starting/stopping the endpoint, and for updating the stats. natsMutex *endpoint_mu; int refs; - bool is_draining; // The subscription for the endpoint. If NULL, the endpoint is stopped. natsSubscription *sub; @@ -146,11 +144,11 @@ microError *micro_is_error_message(natsStatus s, natsMsg *msg); microError *micro_new_control_subject(char **newSubject, const char *verb, const char *name, const char *id); microError *micro_new_endpoint(microEndpoint **new_ep, microService *m, microGroup *g, microEndpointConfig *cfg, bool is_internal); microError *micro_new_request(microRequest **new_request, microService *m, microEndpoint *ep, natsMsg *msg); -microError *micro_start_endpoint(microEndpoint *ep); -microError *micro_stop_endpoint(microEndpoint *ep); +microError *micro_subscribe_endpoint(microEndpoint *ep); +microError *micro_drain_endpoint(microEndpoint *ep); void micro_free_cloned_endpoint_config(microEndpointConfig *cfg); -void micro_free_endpoint(microEndpoint *ep); +void micro_destroy_endpoint(microEndpoint *ep); void micro_free_request(microRequest *req); void micro_release_endpoint(microEndpoint *ep); void micro_release_on_endpoint_complete(void *closure); diff --git a/test/test.c b/test/test.c index c2792f282..0a555b121 100644 --- a/test/test.c +++ b/test/test.c @@ -5970,6 +5970,8 @@ _closedCb(natsConnection *nc, void *closure) arg->closed = true; natsCondition_Broadcast(arg->c); natsMutex_Unlock(arg->m); + + // nats_Sleep(1000); // <>/<> } static natsStatus @@ -33582,39 +33584,30 @@ _startManyMicroservices(microService** svcs, int n, natsConnection *nc, microSer testCond(true); } -static void -_waitForMicroservicesAllDone(struct threadArg *arg) -{ - natsStatus s = NATS_OK; - - test("Wait for all microservices to stop: "); - natsMutex_Lock(arg->m); - while ((s != NATS_TIMEOUT) && !arg->microAllDone) - s = natsCondition_TimedWait(arg->c, arg->m, 1000); - natsMutex_Unlock(arg->m); - testCond((NATS_OK == s) && arg->microAllDone); - - // `Done` may be immediately followed by freeing the service, so wait a bit - // to make sure it happens before the test exits. - nats_Sleep(20); -} - -static void -_destroyMicroservicesAndWaitForAllDone(microService** svcs, int n, struct threadArg *arg) -{ - char buf[64]; - - snprintf(buf, sizeof(buf), "Wait for all %d microservices to stop: ", n); - test(buf); - - for (int i = 0; i < n; i++) - { - if (NULL != microService_Destroy(svcs[i])) - FAIL("Unable to destroy microservice!"); - } +#define _waitForMicroservicesAllDone(_arg) \ + { \ + testf("Wait for %d microservices to stop: ", (_arg)->microRunningServiceCount); \ + natsStatus waitStatus = NATS_OK; \ + bool allDone = false; \ + natsMutex_Lock((_arg)->m); \ + while ((waitStatus != NATS_TIMEOUT) && !(_arg)->microAllDone) \ + waitStatus = natsCondition_TimedWait((_arg)->c, (_arg)->m, 1000); \ + allDone = (_arg)->microAllDone; \ + natsMutex_Unlock((_arg)->m); \ + testCond((NATS_OK == waitStatus) && allDone); \ + } \ + nats_Sleep(1); - _waitForMicroservicesAllDone(arg); -} +#define _destroyMicroservice(_s) \ + testf("Destroy microservice %s: ", (_s)->cfg->Name); \ + microError *_err = microService_Destroy(_s); \ + if (_err != NULL) \ + { \ + char _buf[256]; \ + FAILf("Unable to destroy microservice: %s", microError_String(_err, _buf, sizeof(_buf))); \ + microError_Destroy(_err); \ + } \ + testCond(true); typedef struct { @@ -33844,14 +33837,8 @@ void test_MicroAddService(void) } microServiceInfo_Destroy(info); - - if (m != NULL) - { - snprintf(buf, sizeof(buf), "%s: Destroy service: %d", m->cfg->Name, m->refs); - test(buf); - testCond(NULL == microService_Destroy(m)); - _waitForMicroservicesAllDone(&arg); - } + _destroyMicroservice(m); + _waitForMicroservicesAllDone(&arg); } test("Destroy the test connection: "); @@ -33887,11 +33874,11 @@ void test_MicroGroups(void) }; const char* expected_subjects[] = { - "ep1", - "g1.ep1", - "g1.g2.ep1", - "g1.g2.ep2", "g1.ep2", + "g1.g2.ep2", + "g1.g2.ep1", + "g1.ep1", + "ep1", }; int expected_num_endpoints = sizeof(expected_subjects) / sizeof(expected_subjects[0]); @@ -33941,7 +33928,7 @@ void test_MicroGroups(void) if (err != NULL) FAIL("failed to get service info!") - test("Verify number of endpoints: "); + testf("Verify number of endpoints %d is %d: ", info->EndpointsLen, expected_num_endpoints); testCond(info->EndpointsLen == expected_num_endpoints); test("Verify endpoint subjects: "); @@ -33957,7 +33944,7 @@ void test_MicroGroups(void) microServiceInfo_Destroy(info); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34062,14 +34049,14 @@ void test_MicroQueueGroupForEndpoint(void) testCond((err == NULL) && (info != NULL) && (info->EndpointsLen == 3) && (stats != NULL) && (stats->EndpointsLen == 3) && - (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[0].QueueGroup)) && - (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, info->Endpoints[2].QueueGroup)) && + (_testQueueGroup(tc.expectedServiceLevel, stats->Endpoints[2].QueueGroup)) && (_testQueueGroup(tc.expectedGroup1Level, stats->Endpoints[1].QueueGroup)) && (_testQueueGroup(tc.expectedGroup1Level, info->Endpoints[1].QueueGroup)) && - (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[2].QueueGroup)) && - (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[2].QueueGroup))); + (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[0].QueueGroup)) && + (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[0].QueueGroup))); - microService_Destroy(service); + _destroyMicroservice(service); _waitForMicroservicesAllDone(&arg); microServiceInfo_Destroy(info); microServiceStats_Destroy(stats); @@ -34234,27 +34221,27 @@ void test_MicroBasics(void) s = nats_JSONGetArrayObject(js, "endpoints", &array, &array_len); testCond((NATS_OK == s) && (array != NULL) && (array_len == 2)); - test("Validate INFO svc.do endpoint: "); - md = NULL; - testCond( - (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "do") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.do") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) - && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (md == NULL) - ); - test("Validate INFO unused endpoint with metadata: "); md = NULL; testCond( - (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "unused") == 0) - && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.unused") == 0) + (NATS_OK == nats_JSONGetStrPtr(array[0], "name", &str)) && (strcmp(str, "unused") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[0], "subject", &str)) && (strcmp(str, "svc.unused") == 0) && (NATS_OK == nats_JSONGetStrPtr(array[0], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) - && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) + && (NATS_OK == nats_JSONGetObject(array[0], "metadata", &md)) && (NATS_OK == nats_JSONGetStrPtr(md, "key1", &str)) && (strcmp(str, "value1") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key2", &str)) && (strcmp(str, "value2") == 0) && (NATS_OK == nats_JSONGetStrPtr(md, "key3", &str)) && (strcmp(str, "value3") == 0) ); + test("Validate INFO svc.do endpoint: "); + md = NULL; + testCond( + (NATS_OK == nats_JSONGetStrPtr(array[1], "name", &str)) && (strcmp(str, "do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[1], "subject", &str)) && (strcmp(str, "svc.do") == 0) + && (NATS_OK == nats_JSONGetStrPtr(array[1], "queue_group", &str)) && (strcmp(str, MICRO_DEFAULT_QUEUE_GROUP) == 0) + && (NATS_OK == nats_JSONGetObject(array[1], "metadata", &md)) && (md == NULL) + ); + nats_JSONDestroy(js); natsMsg_Destroy(reply); NATS_FREE(array); @@ -34331,13 +34318,13 @@ void test_MicroBasics(void) test("Ensure endpoint 0 has num_requests: "); n = 0; - s = nats_JSONGetInt(array[0], "num_requests", &n); + s = nats_JSONGetInt(array[1], "num_requests", &n); testCond(NATS_OK == s); num_requests += n; test("Ensure endpoint 0 has num_errors: "); n = 0; - s = nats_JSONGetInt(array[0], "num_errors", &n); + s = nats_JSONGetInt(array[1], "num_errors", &n); testCond(NATS_OK == s); num_errors += n; @@ -34354,7 +34341,11 @@ void test_MicroBasics(void) natsInbox_Destroy(inbox); NATS_FREE(subject); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34422,7 +34413,11 @@ void test_MicroStartStop(void) } testCond(NATS_OK == s); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34495,8 +34490,7 @@ void test_MicroServiceStopsOnClosedConn(void) test("Test microservice is stopped: "); testCond(microService_IsStopped(m)); - test("Destroy microservice (final): "); - testCond(NULL == microService_Destroy(m)) + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); natsOptions_Destroy(opts); @@ -34513,6 +34507,9 @@ void test_MicroServiceStopsWhenServerStops(void) natsPid serverPid = NATS_INVALID_PID; struct threadArg arg; microService *m = NULL; + microEndpointConfig ep_cfg = { + .Handler = _microHandleRequest42, + }; microServiceConfig cfg = { .Name = "test", .Version = "1.0.0", @@ -34538,24 +34535,39 @@ void test_MicroServiceStopsWhenServerStops(void) _startMicroservice(&m, nc, &cfg, NULL, 0, &arg); + const int numEndpoints = 50; + for (int i=0; i < numEndpoints; i++) + { + char buf[32]; + testf("Add endpoint %d: ", i); + snprintf(buf, sizeof(buf), "do-%d", i); + ep_cfg.Subject = buf; + ep_cfg.Name = buf; + testCond(NULL == microService_AddEndpoint(m, &ep_cfg)); + } + test("Test microservice is running: "); testCond(!microService_IsStopped(m)) + testf("Check that the service has %d endpoints: ", numEndpoints); + microServiceInfo *info = NULL; + microError *err = microService_GetInfo(&info, m); + testCond((err == NULL) && (info->EndpointsLen == numEndpoints)); + microServiceInfo_Destroy(info); + test("Stop the server: "); testCond((_stopServer(serverPid), true)); - test("Wait for the service to stop: "); - natsMutex_Lock(arg.m); - while ((s != NATS_TIMEOUT) && !arg.microAllDone) - s = natsCondition_TimedWait(arg.c, arg.m, 1000); - testCond(arg.microAllDone); - natsMutex_Unlock(arg.m); + _waitForMicroservicesAllDone(&arg); test("Test microservice is not running: "); testCond(microService_IsStopped(m)) - microService_Destroy(m); - _waitForMicroservicesAllDone(&arg); + // printf("\n\n-----------------------\n\n"); + // nats_Sleep(10000); + // printf("\n\n-----------------------\n\n"); + + _destroyMicroservice(m); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34663,7 +34675,7 @@ void test_MicroAsyncErrorHandlerMaxPendingMsgs(void) testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); natsMutex_Unlock(arg.m); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34745,7 +34757,7 @@ void test_MicroAsyncErrorHandlerMaxPendingBytes(void) natsMutex_Unlock(arg.m); testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); diff --git a/test/test.h b/test/test.h index 6a9bc8edf..9e5c32f03 100644 --- a/test/test.h +++ b/test/test.h @@ -33,6 +33,15 @@ static const char *clusterName = "test-cluster"; return; \ } +#define FAILf(f, ...) \ + { \ + printf("@@ "); \ + printf((f), __VA_ARGS__); \ + printf(" @@\n"); \ + failed = true; \ + return; \ + } + #define CHECK_SERVER_STARTED(p) \ if ((p) == NATS_INVALID_PID) \ FAIL("Unable to start or verify that the server was started!")