From 893ea7a40b97a9b21cc73071eb29ab85bf8ff921 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 9 Oct 2024 09:48:52 -0700 Subject: [PATCH] wip more --- src/micro_endpoint.c | 12 ++++----- test/test.c | 63 ++++++++++++++++++++------------------------ test/test.h | 9 +++++++ 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/src/micro_endpoint.c b/src/micro_endpoint.c index f33e3995d..6693868b7 100644 --- a/src/micro_endpoint.c +++ b/src/micro_endpoint.c @@ -96,14 +96,14 @@ micro_drain_endpoint(microEndpoint *ep) micro_lock_endpoint(ep); sub = ep->sub; micro_unlock_endpoint(ep); + if (sub == NULL) + return NULL; - // When the drain is complete, the callback will free ep. + // When the drain is complete, the callback will free ep. We may get an + // NATS_INVALID_SUBSCRIPTION if the subscription is already closed. s = natsSubscription_Drain(sub); - if (s != NATS_OK) - { - return microError_Wrapf(micro_ErrorFromStatus(s), - "failed to stop endpoint %s: failed to drain subscription", ep->config->Name); - } + if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION)) + return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription"); return NULL; } diff --git a/test/test.c b/test/test.c index d9234b210..2b5809c9e 100644 --- a/test/test.c +++ b/test/test.c @@ -33589,8 +33589,8 @@ _waitForMicroservicesAllDone(struct threadArg *arg) { natsStatus s = NATS_OK; - test("Wait for all microservices to stop: "); natsMutex_Lock(arg->m); + testf("Wait for %d microservices to stop: ", arg->microRunningServiceCount); while ((s != NATS_TIMEOUT) && !arg->microAllDone) s = natsCondition_TimedWait(arg->c, arg->m, 1000); testCond((NATS_OK == s) && arg->microAllDone); @@ -33602,22 +33602,16 @@ _waitForMicroservicesAllDone(struct threadArg *arg) nats_Sleep(100); } -static void -_destroyMicroservicesAndWaitForAllDone(microService** svcs, int n, struct threadArg *arg) -{ - char buf[64]; - - snprintf(buf, sizeof(buf), "Wait for all %d microservices to stop: ", n); - test(buf); - - for (int i = 0; i < n; i++) - { - if (NULL != microService_Destroy(svcs[i])) - FAIL("Unable to destroy microservice!"); - } - - _waitForMicroservicesAllDone(arg); -} +#define _destroyMicroservice(_s) \ + testf("Destroy microservice %s: ", (_s)->cfg->Name); \ + microError *_err = microService_Destroy(_s); \ + if (_err != NULL) \ + { \ + char _buf[256]; \ + FAILf("Unable to destroy microservice: %s", microError_String(_err, _buf, sizeof(_buf))); \ + microError_Destroy(_err); \ + } \ + testCond(true); typedef struct { @@ -33847,14 +33841,8 @@ void test_MicroAddService(void) } microServiceInfo_Destroy(info); - - if (m != NULL) - { - snprintf(buf, sizeof(buf), "%s: Destroy service: %d", m->cfg->Name, m->refs); - test(buf); - testCond(NULL == microService_Destroy(m)); - _waitForMicroservicesAllDone(&arg); - } + _destroyMicroservice(m); + _waitForMicroservicesAllDone(&arg); } test("Destroy the test connection: "); @@ -33960,7 +33948,7 @@ void test_MicroGroups(void) microServiceInfo_Destroy(info); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34072,7 +34060,7 @@ void test_MicroQueueGroupForEndpoint(void) (_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[0].QueueGroup)) && (_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[0].QueueGroup))); - microService_Destroy(service); + _destroyMicroservice(service); _waitForMicroservicesAllDone(&arg); microServiceInfo_Destroy(info); microServiceStats_Destroy(stats); @@ -34357,7 +34345,11 @@ void test_MicroBasics(void) natsInbox_Destroy(inbox); NATS_FREE(subject); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34425,7 +34417,11 @@ void test_MicroStartStop(void) } testCond(NATS_OK == s); - _destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg); + for (i = 0; i < NUM_MICRO_SERVICES; i++) + { + _destroyMicroservice(svcs[i]); + } + _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34498,8 +34494,7 @@ void test_MicroServiceStopsOnClosedConn(void) test("Test microservice is stopped: "); testCond(microService_IsStopped(m)); - test("Destroy microservice (final): "); - testCond(NULL == microService_Destroy(m)) + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); natsOptions_Destroy(opts); @@ -34572,7 +34567,7 @@ void test_MicroServiceStopsWhenServerStops(void) test("Test microservice is not running: "); testCond(microService_IsStopped(m)) - microService_Destroy(m); + _destroyMicroservice(m); test("Destroy the test connection: "); natsConnection_Destroy(nc); @@ -34680,7 +34675,7 @@ void test_MicroAsyncErrorHandlerMaxPendingMsgs(void) testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); natsMutex_Unlock(arg.m); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); @@ -34762,7 +34757,7 @@ void test_MicroAsyncErrorHandlerMaxPendingBytes(void) natsMutex_Unlock(arg.m); testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER)); - microService_Destroy(m); + _destroyMicroservice(m); _waitForMicroservicesAllDone(&arg); test("Destroy the test connection: "); diff --git a/test/test.h b/test/test.h index 6a9bc8edf..9e5c32f03 100644 --- a/test/test.h +++ b/test/test.h @@ -33,6 +33,15 @@ static const char *clusterName = "test-cluster"; return; \ } +#define FAILf(f, ...) \ + { \ + printf("@@ "); \ + printf((f), __VA_ARGS__); \ + printf(" @@\n"); \ + failed = true; \ + return; \ + } + #define CHECK_SERVER_STARTED(p) \ if ((p) == NATS_INVALID_PID) \ FAIL("Unable to start or verify that the server was started!")