Multi-threaded task library with user-level schedule supporting both local and remote task execution.
- Multiple task boards supported in a single application.
- Flexibility between local task board and remote tasks.
- Multiple types of tasks supported.
- Designed for multithreaded application.
- Fast, robust and highly efficient while being customizable.
- Readable sources and documentation.
- Support for running in Valgrind.
- Testing library is included.
- Remote MQTT support via flexible user-defined MQTT adapter.
- Virtually any remote protocol is supported via flexibility in remote task adapter implementation.
- Built in logging functions
To create a task board and start it, include #include "tboard.h"
and call the following code at the start of the program:
int secondary_executors = 10; // number of sExec threads to run
tboard_t *tboard = tboard_create(secondary_executors); // create task board
tboard_start(tboard); // start pExec and sExec threads
To join pExec
and sExec
, call tboard_destroy(tboard)
at the end of the function. This will cause the calling thread to wait until pExec
and sExec
terminate. Once all task boards are destroyed, call tboard_exit()
to exit application.
By default, the task board will run indefinitely, with executor threads completing tasks until no tasks are left. Once that occurs, the executor threads will sleep on a condition variable until new tasks are inserted into the task board.
To manually kill the task board, in a separate thread call tboard_kill(tboard)
. In order to capture task board data before task board is destroyed after executor threads terminate, the following structure must be followed:
pthread_mutex_lock(&(tboard->tmutex)); // prevent immediate destruction
tboard_kill(tboard); // kill task board
/*
* capture task board data
*/
pthread_mutex_unlock(&(tboard->tmutex)); // unlock task board mutex
// task board will now be destroyed
Task execution history is saved by default in history.c
. In order to print task execution history to stdout
, simply call history_print_records(tboard, stdout)
.
The task board structure contains the following elements:
- Primary executor thread (
pExec
) - Secondary executor threads (
sExec
) - Primary and Secondary ready queues that hold tasks waiting to execute
- Incoming and outgoing message queues for remote task execution
- Task execution history hash table (type:
history_t
) - Concurrent task count, readable at any time via
int tboard_get_concurrent(tboard);
call. - Various mutex locks and condition variables to ensure consistent data across threads and predictable behavior
Task board structure is type tboard_t
. Definitions can be found in tboard.h
.
In the task board, the task executors (TExec
) runs indefinitely until task board terminates. It runs the Task Sequencer function TSeq
to interface worker and controller communication over MQTT and schedule task execution. If there are no tasks in the executor's task ready queue, executor will go to sleep on a condition variable. Once a task is pulled out of the task ready queue, TExec
will switch to that task, returning only once task has yielded or terminates. If task yields, it will be returned back into the task ready queue to be executed later. If task terminates, execution statistics will be recorded in history hash table and it's stack will be destroyed.
Task executors can be split into two categories:
- Primary task executor (
pExec
): Primary task executor is the main thread of the task board. It will run tasks that are in the primary task ready queue. Should there be no tasks present, it will attempt to run tasks from secondary task ready queues. To prevent the possibility of a deadlock, if no tasks are present, it will initiate a timed wait on condition variable so thatTSeq
can run. - Secondary task executor (
sExec
): Secondary task executor runs pulls tasks from it's secondary task ready queue exclusively. If no tasks are present in ready queue, it will sleep on it's own condition variable, awakening only when a task is placed in it's ready queue.
All essential tasks that need to be run on pExec
will be contained within the primary task ready queue.
There are several different types of tasks. The first kind are local tasks, which can terminate or run indefinitely, yielding at every iteration. Local tasks can be classified into three different types:
PRIORITY_EXEC
: Priority tasks will be placed at the head of the primary ready queue, for execution to happen in a timely manner. Priority tasks will block other tasks from running onpExec
until priority task has terminated. Priority tasks most closely followLIFO
task scheduling. Priority tasks do not block secondary tasks from running insExec
.PRIMARY_EXEC
: Primary tasks will be placed at the tail of the primary ready queue. These tasks will run exclusively on the primary executor thread.SECONDARY_EXEC
: Secondary tasks will be placed at the tail of some secondary ready queue, selected arbitrarily. If task board is made withsecondary_queues = 0
, they will be placed in the primary ready queue. These tasks are defined as tasks without any major dependencies or side effects. Secondary tasks can be run in either the primary execution thread or a secondary execution thread.
Local tasks can be created in any thread or any task. Local tasks must have the function signature void task_func(context_t ctx)
. The following example shows how to create a local primary task:
void task_func(context_t ctx); // task function to run by executor
...
bool res = task_create(tboard, TBOARD_FUNC(task_func), PRIMARY_EXEC, NULL, 0);
if (res)
printf("Task created successfully.\n");
To specify arguments to a local task, one must simply pass the pointer of the argument to task_create()
. If the user wishes for the arguments to be free'd from the memory on task termination, they must specify the size of the allocated data passed as an argument. Task arguments can then be retrieved by the task_get_args()
function call.
void task_func(context_t ctx) {
type_t *args = (type_t *)task_get_args();
...
}
...
type_t *args = calloc(1, sizeof(type_t));
// modify argument
bool res = task_create(tboard, TBOARD_FUNC(task_func), PRIMARY_EXEC, args, sizeof(type_t));
if (res)
printf("Task created successfully.\n");
If size is not specified, then it is the users responsibility to handle garbage collection.
Blocking tasks are local tasks that are created within another parent task that must terminate before parent task will be allowed to resume execution. Within a task, blocking tasks can be created in the following way:
void blocking_task(context_t ctx);
void parent_task(context_t ctx) {
type_t *args;
...
bool res = blocking_task_create(tboard, TBOARD_FUNC(blocking_task), SECONDARY_EXEC, args, 0);
if (res) {
// use args
}
...
}
Should the user wish for argument modifications passed to a blocking child task to persist after blocking task termination, they must specify an argument size of zero (if persisting argument is manually allocated, the user is responsible for garbage collection). More detailed examples can be found provided tests.
Remote tasks are tasks issued by local tasks that are to be sent to controller via the MQTT adapter. There are two kinds of remote tasks: Blocking tasks and non-blocking tasks. Blocking tasks will yield issuing task after sending, preventing issuing task from continuing until a response from controller is received by the MQTT adapter. Non-blocking tasks will send the message via MQTT adapter and continue execution. Creating an remote task via calls to remote_task_create()
. By default, remote tasks are issued as a message with maximum length set in MAX_MSG_LENGTH
macro, with return values being saved to void *response
. Blocking is specified by setting bool blocking
equal to true. Freedom is given to the user in terms of response data type, as it ultimately comes down to the implementation of the MQTT adapter. An example of sending remote tasks from worker to controller can be found in tests/test6_milestone2.c
.
Provided in this package is an example of an MQTT adapter, called dummy_MQTT.c
. Freedom with the actual MQTT Adapter is given to the user, as it is an independent entity from the task board, but the following approaches should be followed:
For controller to worker communication, the MQTT adapter must receive and parse messages. After parsing message, MQTT should create a task_t
object corresponding to the local task that is to be run, including any arguments specified in the message. It will then generate a msg_t
object to send to the task board via processor.c:msg_processor()
. msg_t
has int type
, int subtype
, bool has_side_effects
, void *data
, void *user_data
and size_t ud_allocd
as fields. For local task execution messages, data
field should correspond to an allocated task_t
object. An example of msg_t
requirements can be found in dummy_MQTT.c:MQTT_recv()
function. If MQTT_ADD_BACK_TO_QUEUE_ON_FAILURE
is specified, MQTT will return message to message queue to be added to task board later. This is enabled by default.
For worker to controller communication, the MQTT adapter must pull messages from the tboard->msg_send
message queue, and return responses to the tboard->msg_recv
message queue. The user is responsible for locking mutex tboard->msg_mutex
before accessing these queues. Objects in these queues have type remote_task_t
, with fields int status
, char message[]
, void *data
, size_t data_size
, task_t *calling_task
, and bool blocking
. Responses should be written to data
and status
should be updated before returning a message to the task board. All requests must be returned to the task board in order for proper garbage collection to occur, even if the request is non-blocking. Example implementation can be found in dummy_MQTT.c:MQTT_issue_remote_task()
.
In my implementation of a dummy MQTT, I have two threads running. One thread polls task board for outgoing message requests, sleeping on condition variable tboard->msg_cond
. This thread is responsible for pulling messages out of the outgoing message queue, sending them to the controller, and awaiting a response. The other thread waits to receive requests from the controller as a string, at which point it processes the request and takes appropriate action. Tests 5-8 contain examples of MQTT implementations.
Running tests can be specified in main.h
. Milestone tests are located in tests
, and they show usage for specific achievements associated with each milestone. Functionality tests are found in legacy_tests
and they were designed to test different edge cases of the task board. I have decided to leave the legacy tests intact with brief explanations within the test files for the next person who continues this project where I left off.
A testing library has been including in with the task board, with prototypes and definitions in tests/tests.h
and tests/tests.c
respectively. A template for creating test can be found in tests/test_template.c
.
All tests can find more detailed explanations in the source code files.
To run a milestone test, set RUN_TEST = 1
in main.h
with TEST_NUM
corresponding to desired milestone test to run. Milestone tests are located in /src/tests/
directory
To run a legacy test, set RUN_LTEST = 1
in main.h
with TEST_LNUM
corresponding to the desired legacy test to run. Legacy tests are located in /src/legacy_tests/
directory
Afterwards, running a task can be achieved by calling in make clean && make && ./output/main
in the project root directory.
Milestone 1 is to create a task board object and run a variety of local tasks and we let them run. There are a variety of possible local tests, some run indefinitely yielding at each iteration, and others run and terminate after spawning zero or more local tasks. Although this milestone does not indicate task type, they will adopt the same type which can be specified by TASK_TYPE
.
test1
creates 4 types of local tasks: indefinite tasks, completing tasks, spawning tasks, and blocking tasks. The task board will end at some amount of time after starting, selected randomly between[0, MAX_RUN_TIME]
.test2
creates spawning tasks and sub tasks rapidly to measure task execution rate. IfBLOCKING_TASKS
is specified, then sub tasks will be initiated as blocking tasks. AfterNUM_TASKS
number of sub tasks have completed, task board will terminate and relevant execution information will be printed.
Milestone 1b was to create local tasks with multiple task types, determining whether they are executed by pExec
or sExec
.
test3
creates 3 types of tasks: primary tasks, and secondary tasks, as well as priority tasks ifISSUE_PRIORITY_TASKS
is specified. Max time between priority tasks can be set inMAX_TIME_BETWEEN_PRIORITY
. Task board will terminate onceNUM_TASKS
amount of secondary tasks are issued by primary task. Priority tasks will be issued at random.test4
creates primary tasks and secondary tasks. Primary tasks will issue a single secondary task. Both tasks will terminate rapidly, causing task board to sleep due to no work. Tasks will be issued at<1s
intervals. The task board will end at some amount of time after starting, selected randomly between[0, MAX_RUN_TIME]
.
Milestone 2 was to create remote tasks that connect the worker and controller. Worker-to-controller tasks, can be both blocking and non-blocking. Controller-to-worker tasks can have varying degree of complications, ultimately left to the user to implement.
test5
simulates controller-to-worker tasks exclusively. It does this by creating an independent thread that generates MQTT messages, sending them to MQTT viaMQTT_send()
function call in the form of a string. IfRAPID_GENERATION
is set to 1, then generator thread will rapidly issue commands to MQTT for up to 2 seconds before terminating. Otherwise, test will run for a maximum ofMAX_RUN_TIME
before terminating.test6
tests worker-to-controller exclusively, simulating response from controller viadummy_MQTT
. The two types of remote tasks I have implemented in MQTT is the blocking arithmetic task and non-blocking printing task. For the arithmetic task, task board will issue a request for the controller to perform some type of arithmetic, blocking the issuing task from continuing until the controller has responded with the calculation. The issuing task will then print the result tostdout
. For the printing task, issuing task will continue execution, printing the message from the controller viadummy_MQTT
tostdout
.test7
tests both controller-to-worker tasks and worker-to-controller tasks by combining bothtest5
andtest6
test8
combines all of the aforementioned tests into a single task board. It will create worker-to-controller tasks, controller-to-worker tasks, priority tasks, primary tasks, secondary tasks, and blocking tasks. If RAPID_GENERATION
is specified, it will terminate after up to MAX_RUN_TIME
seconds. Otherwise, it will generate NUM_TASKS
remote and local tasks, terminating once all tasks complete.
The following can be defined to change behavior
MAX_TASKS
will change the maximum number of concurrent tasks that the task board can run. Default is 65536. After the maximum number of concurrent tasks have been reached, no non-blocking local tasks can be created until at least 1 task terminates. The only way the maximum number of concurrent tasks can be exceeded is by MQTT adapter placing blocking worker-to-controller back in a ready queue after response is received.MAX_SECONDARIES
defines the maximum number of secondary executor threads the task board will support. The default is 10. It is good practice to set this number below the maximum number of CPU threads are supported by the hardware running the task board.STACK_SIZE
defines the stack size of task board tasks. Default is 57344 bytes. Task stack size cannot be change after task has been initalized, soSTACK_SIZE
must be large enough for all local task board tasks, otherwise stack overflow will occur leading to unpredictable results. Since task space is heap allocated,STACK_SIZE * MAX_TASKS
should not exceed the maximum amount of heap storage defined inulimits
of the running environment.REINSERT_PRIORITY_AT_HEAD
will dictate whether a yielding priority task will be inserted at the head or tail of the primary task ready queue.
Compile using make
with provided makefile. To create application that uses task board, simply include tboard.h
and link all task board objects in /src/
generated by make
.
- Task board depends on POSIX Threads, therefore task board has
libpthread
as a dependency. Compiling therefore requires-pthread
compiler flag. Undefined behavior may occur on non-POSIX compliant OS's - Task board tests depend on math library. Compiling tests therefore requires
-lm
compiler flag. - Task board depends on coroutine library minicoro, included in
/include/minicoro.h
. - Task board depends on hash table library uthash, included in
/include/uthash.h
.
typedef struct tboard_t {
pthread_t primary, secondary[]; // executor threads
pthread_mutex_t pmutex, smutex[]; // executor mutexes
pthread_cond_t pcond, scond[]; // executor condition vars
struct queue pqueue, squeue[]; // executor ready queues
...
struct queue msg_sent, msg_recv; // remote task wait queues
pthread_mutex_t msg_mutex; // remote task mutex
pthread_cond_t msg_cond; // remote task condition var
...
int status; // taskboard status
int task_count; // number of currently running tasks
}
tboard_t *tboard_create(int sqs); /* create task board with #sqs secondary queues */
void tboard_start(tboard_t *t); /* start task board t */
void tboard_destroy(tboard_t *t); /* join executors, destroy task board t */
void tboard_kill(tboard_t *t); /* kill task board executors */
int tboard_get_concurrent(tboard_t *t); /* query current number of concurrently running tasks */
int tboard_log(char *format, ...); /* log information to same file descriptor across task board */
int tboard_err(char *format, ...); /* report error to same file descriptor across task board */
In order to retrieve task board information, you must lock tboard->tmutex
before initiating shutdown like so
pthread_mutex_lock(&(tboard->tmutex));
tboard_kill(tboard);
history_print_records(tboard, stdout);
int unfinished_tasks = tboard->task_count;
pthread_mutex_unlock(&(tboard->tmutex));
/* task functions must have signature `void func(context_t ctx);` */
typedef void (*tb_task_f)(context_t);
/*** local tasks ***/
typedef struct task_t {
int id, status, type, cpu_time, yields; /* internal information */
function_t fn; /* task function pointer and name */
context_t ctx; /* coroutine context */
context_desc desc; /* coroutine description, including stack and arguments */
size_t data_size; /* size of arguments */
struct history_t *hist; /* entry in task execution history hash table */
struct task_t *parent; /* link to parent task */
} task_t;
/* Note: obtain function_t fn from TBOARD_FUNC(tb_task_f func) function call */
bool task_create(tboard_t *t, function_t fn, int type, void *args, size_t sizeof_args); /* create local task */
bool blocking_task_create(tboard_t *t, function_t fn, int type, void *args, size_t sizeof_args); /* create blocking local task */
void task_yield(); /* yield local task */
void *task_get_args(); /* returns args passed in task_create() */
/** remote tasks **/
typedef struct remote_task_t {
int status; /* remote task status */
char message[]; /* remote task command */
void *data; /* data object passed to MQTT to issue response */
size_t data_size; /* size of data, non-zero if heap allocated */
task_t *calling_task; /* link to parent task */
bool blocking; /* whether or not remote task is blocking is asynchronous */
} remote_task_t;
/* create remote task */
bool remote_task_create(tboard_t *t, char *message, void *args, size_t sizeof_args, bool blocking);
struct MQTT_data {
int imsg_sent; /* controller-to-worker messages sent by controller */
int imsg_recv; /* controller-to-worker messages received by worker */
int omsg_sent; /* worker-to-controller messages sent by worker */
int omsg_recv; /* worker-to-controller messages responded to by controller */
}
void MQTT_init(tboard_t *t); /* initialize MQTT adapter */
void MQTT_kill(struct MQTT_data *data); /* kill MQTT, record statistics to data */
void MQTT_destroy(); /* destroy MQTT */
void MQTT_send(char *message); /* send controller message to worker MQTT */
void MQTT_recv(tboard_t *t); /* worker MQTT receives controller message */
void MQTT_issue_remote_task(tboard_t *t, remote_task_t *rtask); /* worker MQTT send controller message */
void *MQTT_othread(void *args); /* thread that handles worker-to-controller communications */
void *MQTT_ithread(void *args); /* thread that handles controller-to-worker communications */