Skip to content

Commit

Permalink
wip more
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Oct 9, 2024
1 parent 5892839 commit 893ea7a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 40 deletions.
12 changes: 6 additions & 6 deletions src/micro_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ micro_drain_endpoint(microEndpoint *ep)
micro_lock_endpoint(ep);
sub = ep->sub;
micro_unlock_endpoint(ep);
if (sub == NULL)
return NULL;

// When the drain is complete, the callback will free ep.
// When the drain is complete, the callback will free ep. We may get an
// NATS_INVALID_SUBSCRIPTION if the subscription is already closed.
s = natsSubscription_Drain(sub);
if (s != NATS_OK)
{
return microError_Wrapf(micro_ErrorFromStatus(s),
"failed to stop endpoint %s: failed to drain subscription", ep->config->Name);
}
if ((s != NATS_OK) && (s != NATS_INVALID_SUBSCRIPTION))
return microError_Wrapf(micro_ErrorFromStatus(s), "failed to drain subscription");

return NULL;
}
Expand Down
63 changes: 29 additions & 34 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -33589,8 +33589,8 @@ _waitForMicroservicesAllDone(struct threadArg *arg)
{
natsStatus s = NATS_OK;

test("Wait for all microservices to stop: ");
natsMutex_Lock(arg->m);
testf("Wait for %d microservices to stop: ", arg->microRunningServiceCount);
while ((s != NATS_TIMEOUT) && !arg->microAllDone)
s = natsCondition_TimedWait(arg->c, arg->m, 1000);
testCond((NATS_OK == s) && arg->microAllDone);
Expand All @@ -33602,22 +33602,16 @@ _waitForMicroservicesAllDone(struct threadArg *arg)
nats_Sleep(100);
}

static void
_destroyMicroservicesAndWaitForAllDone(microService** svcs, int n, struct threadArg *arg)
{
char buf[64];

snprintf(buf, sizeof(buf), "Wait for all %d microservices to stop: ", n);
test(buf);

for (int i = 0; i < n; i++)
{
if (NULL != microService_Destroy(svcs[i]))
FAIL("Unable to destroy microservice!");
}

_waitForMicroservicesAllDone(arg);
}
#define _destroyMicroservice(_s) \
testf("Destroy microservice %s: ", (_s)->cfg->Name); \
microError *_err = microService_Destroy(_s); \
if (_err != NULL) \
{ \
char _buf[256]; \
FAILf("Unable to destroy microservice: %s", microError_String(_err, _buf, sizeof(_buf))); \
microError_Destroy(_err); \
} \
testCond(true);

typedef struct
{
Expand Down Expand Up @@ -33847,14 +33841,8 @@ void test_MicroAddService(void)
}

microServiceInfo_Destroy(info);

if (m != NULL)
{
snprintf(buf, sizeof(buf), "%s: Destroy service: %d", m->cfg->Name, m->refs);
test(buf);
testCond(NULL == microService_Destroy(m));
_waitForMicroservicesAllDone(&arg);
}
_destroyMicroservice(m);
_waitForMicroservicesAllDone(&arg);
}

test("Destroy the test connection: ");
Expand Down Expand Up @@ -33960,7 +33948,7 @@ void test_MicroGroups(void)

microServiceInfo_Destroy(info);

microService_Destroy(m);
_destroyMicroservice(m);
_waitForMicroservicesAllDone(&arg);

test("Destroy the test connection: ");
Expand Down Expand Up @@ -34072,7 +34060,7 @@ void test_MicroQueueGroupForEndpoint(void)
(_testQueueGroup(tc.expectedGroup2Level, info->Endpoints[0].QueueGroup)) &&
(_testQueueGroup(tc.expectedGroup2Level, stats->Endpoints[0].QueueGroup)));

microService_Destroy(service);
_destroyMicroservice(service);
_waitForMicroservicesAllDone(&arg);
microServiceInfo_Destroy(info);
microServiceStats_Destroy(stats);
Expand Down Expand Up @@ -34357,7 +34345,11 @@ void test_MicroBasics(void)
natsInbox_Destroy(inbox);
NATS_FREE(subject);

_destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg);
for (i = 0; i < NUM_MICRO_SERVICES; i++)
{
_destroyMicroservice(svcs[i]);
}
_waitForMicroservicesAllDone(&arg);

test("Destroy the test connection: ");
natsConnection_Destroy(nc);
Expand Down Expand Up @@ -34425,7 +34417,11 @@ void test_MicroStartStop(void)
}
testCond(NATS_OK == s);

_destroyMicroservicesAndWaitForAllDone(svcs, NUM_MICRO_SERVICES, &arg);
for (i = 0; i < NUM_MICRO_SERVICES; i++)
{
_destroyMicroservice(svcs[i]);
}
_waitForMicroservicesAllDone(&arg);

test("Destroy the test connection: ");
natsConnection_Destroy(nc);
Expand Down Expand Up @@ -34498,8 +34494,7 @@ void test_MicroServiceStopsOnClosedConn(void)
test("Test microservice is stopped: ");
testCond(microService_IsStopped(m));

test("Destroy microservice (final): ");
testCond(NULL == microService_Destroy(m))
_destroyMicroservice(m);
_waitForMicroservicesAllDone(&arg);

natsOptions_Destroy(opts);
Expand Down Expand Up @@ -34572,7 +34567,7 @@ void test_MicroServiceStopsWhenServerStops(void)
test("Test microservice is not running: ");
testCond(microService_IsStopped(m))

microService_Destroy(m);
_destroyMicroservice(m);

test("Destroy the test connection: ");
natsConnection_Destroy(nc);
Expand Down Expand Up @@ -34680,7 +34675,7 @@ void test_MicroAsyncErrorHandlerMaxPendingMsgs(void)
testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER));
natsMutex_Unlock(arg.m);

microService_Destroy(m);
_destroyMicroservice(m);
_waitForMicroservicesAllDone(&arg);

test("Destroy the test connection: ");
Expand Down Expand Up @@ -34762,7 +34757,7 @@ void test_MicroAsyncErrorHandlerMaxPendingBytes(void)
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER));

microService_Destroy(m);
_destroyMicroservice(m);
_waitForMicroservicesAllDone(&arg);

test("Destroy the test connection: ");
Expand Down
9 changes: 9 additions & 0 deletions test/test.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ static const char *clusterName = "test-cluster";
return; \
}

#define FAILf(f, ...) \
{ \
printf("@@ "); \
printf((f), __VA_ARGS__); \
printf(" @@\n"); \
failed = true; \
return; \
}

#define CHECK_SERVER_STARTED(p) \
if ((p) == NATS_INVALID_PID) \
FAIL("Unable to start or verify that the server was started!")
Expand Down

0 comments on commit 893ea7a

Please sign in to comment.