Skip to content

Commit

Permalink
wip ++3
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Oct 31, 2024
1 parent c3f7a93 commit c48d4c6
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions src/micro.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ static inline void _unlock_service(microService *m) { natsMutex_Unlock(m->servic

static microError *_clone_service_config(microServiceConfig **out, microServiceConfig *cfg);
static microError *_new_service(microService **ptr, natsConnection *nc);
static microError *_add_service_to_connection(microService *m);
static void _on_connection_closed(natsConnection *nc, void *ignored);
static void _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_used);

static void _free_cloned_service_config(microServiceConfig *cfg);
static void _free_service(microService *m);
Expand Down Expand Up @@ -53,10 +54,18 @@ micro_AddService(microService **new_m, natsConnection *nc, microServiceConfig *c

MICRO_CALL(err, _clone_service_config(&m->cfg, cfg));

// Add the service to the connection.
m->refs++;
MICRO_CALL(err, micro_ErrorFromStatus(natsConn_addService(m->nc, m)));

// Wrap the connection callbacks before we subscribe to anything.
MICRO_CALL(err, _add_service_to_connection(m));
MICRO_CALL(err, micro_ErrorFromStatus(
natsOptions_setMicroCallbacks(m->nc->opts, _on_connection_closed, _on_error)));

// Initialize the monitoring endpoints.
MICRO_CALL(err, micro_init_monitoring(m));

// Add the default endpoint.
MICRO_CALL(err, microService_AddEndpoint(m, cfg->Endpoint));

if (err != NULL)
Expand Down Expand Up @@ -269,6 +278,7 @@ _stop_service(microService *m, bool unsubscribe)
_unlock_service(m);

natsConn_removeService(m->nc, m);
_release_service(m);
return NULL;
}

Expand Down Expand Up @@ -347,10 +357,13 @@ void micro_release_endpoint_when_unsubscribed(void *closure)
nc = m->nc;
_unlock_service(m);

// Special processing for the last endpoint.
if (doneHandler != NULL)
{
doneHandler(m);

natsConn_removeService(nc, m);
_release_service(m);

if (refs == 0)
_free_service(m);
Expand Down Expand Up @@ -623,24 +636,6 @@ _on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *not_use
}
}

static microError *
_add_service_to_connection(microService *m)
{
microError *err = NULL;

if ((m == NULL) || (m->nc == NULL) || (m->nc->opts == NULL))
return micro_ErrorInvalidArg;

// The new service must be in the list for this to work.
natsStatus s = natsConn_addService(m->nc, m);
if (s != NATS_OK)
err = micro_ErrorFromStatus(s);
MICRO_CALL(err, micro_ErrorFromStatus(
natsOptions_setMicroCallbacks(m->nc->opts, _on_connection_closed, _on_error)));

return microError_Wrapf(err, "failed to wrap connection event callbacks");
}

static inline microError *
_new_group_config(microGroupConfig **ptr)
{
Expand Down

0 comments on commit c48d4c6

Please sign in to comment.