Skip to content

Commit

Permalink
Implement thread stopping.
Browse files Browse the repository at this point in the history
  • Loading branch information
pbeyssac committed Sep 1, 2024
1 parent 122a48c commit 1b5ac37
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 11 deletions.
5 changes: 4 additions & 1 deletion caster/caster.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ caster_new(struct config *config, const char *config_file) {
}

void caster_free(struct caster_state *this) {
if (threads)
jobs_stop_threads(this->joblist);

if (this->signalpipe_event)
event_free(this->signalpipe_event);
if (this->signalhup_event)
Expand Down Expand Up @@ -649,7 +652,7 @@ int caster_main(char *config_file) {
return 1;
}

if (threads && jobs_start_threads(caster, nthreads) < 0) {
if (threads && jobs_start_threads(caster->joblist, nthreads) < 0) {
caster_free(caster);
fprintf(stderr, "Could not create threads!\n");
return 1;
Expand Down
66 changes: 58 additions & 8 deletions caster/jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>

#include "conf.h"
#include "caster.h"
Expand All @@ -22,6 +24,8 @@ struct joblist *joblist_new(struct caster_state *caster) {
this->njobs = 0;
this->append_njobs = 0;
this->caster = caster;
this->nthreads = 0;
this->threads = NULL;
STAILQ_INIT(&this->ntrip_queue);
STAILQ_INIT(&this->append_queue);
STAILQ_INIT(&this->jobq);
Expand Down Expand Up @@ -121,6 +125,8 @@ void joblist_run(struct joblist *this) {
j->ntrip_unlocked.cb(j->ntrip_unlocked.st);
else if (j->type == JOB_NTRIP_UNLOCKED_CONTENT)
j->ntrip_unlocked_content.cb(j->ntrip_unlocked_content.st, j->ntrip_unlocked_content.content_cb);
else if (j->type == JOB_STOP_THREAD)
pthread_exit(NULL);
free(j);
P_MUTEX_LOCK(&this->mutex);
}
Expand Down Expand Up @@ -414,6 +420,12 @@ void joblist_append_ntrip_unlocked_content(struct joblist *this, void (*cb)(stru
cb(st, content_cb);
}

void joblist_append_stop(struct joblist *this) {
struct job tmpj;
tmpj.type = JOB_STOP_THREAD;
_joblist_append_generic(this, NULL, &tmpj);
}

/*
* Drain the job queue for a ntrip_state
*
Expand Down Expand Up @@ -446,31 +458,69 @@ void *jobs_start_routine(void *arg) {
return NULL;
}

int jobs_start_threads(struct caster_state *caster, int nthreads) {
int jobs_start_threads(struct joblist *this, int nthreads) {
int err = 0;
pthread_t *p = (pthread_t *)malloc(sizeof(pthread_t)*nthreads);
if (p == NULL) {
return -1;
}

pthread_key_create(&caster->thread_id, NULL);
pthread_setspecific(caster->thread_id, 0);
pthread_key_create(&this->caster->thread_id, NULL);
pthread_setspecific(this->caster->thread_id, 0);

// Set stack size to the configured value
size_t stacksize = caster->config->threads[0].stacksize;
size_t stacksize = this->caster->config->threads[0].stacksize;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, stacksize);
printf("Setting thread stack size to %zu bytes\n", stacksize);

for (int i = 0; i < nthreads; i++) {
int i;
for (i = 0; i < nthreads; i++) {
struct thread_start_args *args = (struct thread_start_args *)malloc(sizeof(struct thread_start_args));
args->thread_id = i+1;
args->caster = caster;
args->caster = this->caster;
int r = pthread_create(&p[i], &attr, jobs_start_routine, args);
if (r < 0) {
return -1;
if (r != 0) {
err = 1;
free(args);
break;
}
}
pthread_attr_destroy(&attr);

if (err) {
this->nthreads = i;
jobs_stop_threads(this);
return -1;
}

this->threads = p;
this->nthreads = nthreads;
return 0;
}

void jobs_stop_threads(struct joblist *this) {
for (int i = 0; i < this->nthreads; i++) {
joblist_append_stop(this);
pthread_yield();
}

int r, nlive;

do {
nlive = 0;
for (int i = 0; i < this->nthreads; i++) {
r = pthread_kill(this->threads[i], 0);
if (r == 0)
nlive++;
}
if (nlive != 0) {
logfmt(&this->caster->flog, "%d thread(s) still active, waiting\n", nlive);
sleep(1);
}
} while (nlive);
free(this->threads);
this->threads = NULL;
this->nthreads = 0;
}
11 changes: 9 additions & 2 deletions caster/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ enum job_type {
JOB_NTRIP_LOCK,
JOB_NTRIP_UNLOCKED,
JOB_NTRIP_UNLOCKED_CONTENT,
JOB_REDISTRIBUTE
JOB_REDISTRIBUTE,
JOB_STOP_THREAD
};

struct ntrip_state;
Expand Down Expand Up @@ -117,6 +118,10 @@ struct joblist {

/* The associated caster */
struct caster_state *caster;

/* Pointer to threads */
pthread_t *threads;
int nthreads; // number of threads
};

struct joblist *joblist_new(struct caster_state *caster);
Expand All @@ -127,8 +132,10 @@ void joblist_append_ntrip_locked(struct joblist *this, struct ntrip_state *st, v
void joblist_append_redistribute(struct joblist *this, void (*cb)(struct redistribute_cb_args *redis_args), struct redistribute_cb_args *redis_args);
void joblist_append_ntrip_unlocked(struct joblist *this, void (*cb)(struct ntrip_state *st), struct ntrip_state *st);
void joblist_append_ntrip_unlocked_content(struct joblist *this, void (*cb)(struct ntrip_state *st, struct mime_content *(*content_cb)(struct caster_state *caster)), struct ntrip_state *st, struct mime_content *(*content_cb)(struct caster_state *caster));
void joblist_append_stop(struct joblist *this);
void joblist_drain(struct ntrip_state *st);
void *jobs_start_routine(void *arg);
int jobs_start_threads(struct caster_state *caster, int nthreads);
int jobs_start_threads(struct joblist *this, int nthreads);
void jobs_stop_threads(struct joblist *this);

#endif

0 comments on commit 1b5ac37

Please sign in to comment.