Skip to content

Commit

Permalink
wip ++5
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Nov 1, 2024
1 parent ba500f3 commit b779569
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 76 deletions.
7 changes: 5 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -4478,10 +4478,11 @@ int natsConn_getServices(microService ***services, natsConnection *nc)
return numServices;
}

void natsConn_removeService(natsConnection *nc, microService *service)
bool natsConn_removeService(natsConnection *nc, microService *service)
{
bool removed = false;
if (nc == NULL || service == NULL)
return;
return false;

natsConn_Lock(nc);
for (int i = 0; i < nc->numServices; i++)
Expand All @@ -4493,10 +4494,12 @@ void natsConn_removeService(natsConnection *nc, microService *service)
nc->services[j] = nc->services[j + 1];
}
nc->numServices--;
removed = true;
break;
}
}
natsConn_Unlock(nc);
return removed;
}

natsStatus natsConn_addService(natsConnection *nc, microService *service)
Expand Down
2 changes: 1 addition & 1 deletion src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ natsConn_destroy(natsConnection *nc, bool fromPublicDestroy);
int
natsConn_getServices(microService ***services, natsConnection *nc);

void
bool
natsConn_removeService(natsConnection *nc, microService *service);

natsStatus
Expand Down
103 changes: 30 additions & 73 deletions src/micro.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, v

static void _free_cloned_service_config(microServiceConfig *cfg);
static void _free_service(microService *m);
static void _release_service(microService *m);
static void _retain_service(microService *m);

microError *
micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *cfg)
Expand All @@ -55,8 +53,8 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c
MICRO_CALL(err, _clone_service_config(&m->cfg, cfg));

// Add the service to the connection.
m->refs++;
MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m)));
MICRO_CALL(err, (m->refs++, NULL));

// Wrap the connection callbacks before we subscribe to anything.
MICRO_CALL(err, micro_ErrorFromStatus(
Expand Down Expand Up @@ -246,43 +244,48 @@ microGroup_AddEndpoint(microGroup *g, microEndpointConfig *cfg)
}

static microError *
_stop_service(microService *m, bool unsubscribe)
_stop_service(microService *m, bool unsubscribe, bool finalRelease)
{
microError *err = NULL;
microEndpoint *ep = NULL;
int refs = 0;
int refs = 0;
int numEndpoints = 0;

if (m == NULL)
return micro_ErrorInvalidArg;

_lock_service(m);
if (m->stopped)
if (!m->stopped)
{
_unlock_service(m);
return NULL;
}
m->stopped = true;

m->stopped = true;

if (unsubscribe)
{
for (ep = m->first_ep; ep != NULL; ep = ep->next)
if (unsubscribe)
{
if (err = micro_stop_endpoint(ep), err != NULL)
for (ep = m->first_ep; ep != NULL; ep = ep->next)
{
err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name);
_unlock_service(m);
return err;
if (err = micro_stop_endpoint(ep), err != NULL)
{
err = microError_Wrapf(err, "failed to stop service '%s', stopping endpoint '%s'", m->cfg->Name, ep->config->Name);
_unlock_service(m);
return err;

Check warning on line 270 in src/micro.c

View check run for this annotation

Codecov / codecov/patch

src/micro.c#L268-L270

Added lines #L268 - L270 were not covered by tests
}
}
}

if (natsConn_removeService(m->nc, m))
m->refs--;
}

natsConn_removeService(m->nc, m);
refs = --(m->refs);
if ((m->refs > 0) && finalRelease)
m->refs--;

refs = m->refs;
numEndpoints = m->numEndpoints;

_unlock_service(m);

if (refs == 0)

if ((refs == 0) && (numEndpoints == 0))
_free_service(m);

return NULL;
Expand All @@ -291,7 +294,8 @@ _stop_service(microService *m, bool unsubscribe)
microError *
microService_Stop(microService *m)

Check warning on line 295 in src/micro.c

View check run for this annotation

Codecov / codecov/patch

src/micro.c#L295

Added line #L295 was not covered by tests
{
return _stop_service(m, true);
// Public API: stop the service, unsubscribe, but don't do the final release.
return _stop_service(m, true, false);

Check warning on line 298 in src/micro.c

View check run for this annotation

Codecov / codecov/patch

src/micro.c#L298

Added line #L298 was not covered by tests
}

static void
Expand Down Expand Up @@ -326,7 +330,6 @@ void micro_release_endpoint_when_unsubscribed(void *closure)
microEndpoint *ep = (microEndpoint *)closure;
microService *m = NULL;
natsSubscription *sub = NULL;
natsConnection *nc = NULL;
microDoneHandler doneHandler = NULL;
int refs = 0;

Expand Down Expand Up @@ -355,22 +358,16 @@ void micro_release_endpoint_when_unsubscribed(void *closure)
micro_free_endpoint(ep);

if (m->numEndpoints == 0)
{
m->stopped = true;
doneHandler = m->cfg->DoneHandler;
}

refs = m->refs;
nc = m->nc;
_unlock_service(m);

// Special processing for the last endpoint.
if (doneHandler != NULL)
{
doneHandler(m);

natsConn_removeService(nc, m);
_release_service(m);

if (refs == 0)
_free_service(m);
}
Expand All @@ -393,14 +390,7 @@ bool microService_IsStopped(microService *m)
microError *
microService_Destroy(microService *m)
{
microError *err = NULL;

err = _stop_service(m, true);
if (err != NULL)
return err;

_release_service(m);
return NULL;
return _stop_service(m, true, true);
}

microError *
Expand Down Expand Up @@ -440,39 +430,6 @@ _new_service(microService **ptr, natsConnection *nc)
return NULL;
}

static void
_retain_service(microService *m)
{
if (m == NULL)
return;

_lock_service(m);

++(m->refs);

_unlock_service(m);
}

static void
_release_service(microService *m)
{
int refs = 0;
int numEndpoints = 0;

if (m == NULL)
return;

_lock_service(m);

refs = --(m->refs);
numEndpoints = m->numEndpoints;

_unlock_service(m);

if ((refs == 0) && (numEndpoints == 0))
_free_service(m);
}

static inline void
_free_cloned_group_config(microGroupConfig *cfg)
{
Expand Down Expand Up @@ -587,7 +544,7 @@ _on_connection_closed(natsConnection *nc, void *ignored)
for (int i = 0; i < n; i++)
{
m = all[i];
_stop_service(m, false); // subs will be terminated by the connection close.
_stop_service(m, false, false); // subs will be terminated by the connection close.
}
}

Expand Down Expand Up @@ -638,7 +595,7 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use
{
m = all[i];
_on_service_error(m, subject, s);
_stop_service(m, true);
_stop_service(m, true, false);
}
}

Expand Down

0 comments on commit b779569

Please sign in to comment.