Skip to content

Commit

Permalink
Merge pull request #48 from nkallima/v3.1.0-dev
Browse files Browse the repository at this point in the history
Releasing v3.1.0
  • Loading branch information
nkallima authored Jan 16, 2022
2 parents 3c7d7a5 + 9eb214d commit c4cdeda
Show file tree
Hide file tree
Showing 16 changed files with 716 additions and 23 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
---------

v3.1.0
------
- A new high-performant, optimized implementation for the flat-combining synchronization technique. This new implementation provided by the Synch framework is written from the scratch.
- A few API fixes.

v3.0.1
------
- README.md updated.
Expand Down
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ The current version of this code is optimized for x86_64 machine architecture, b

# Collection

The Synch framework provides a large set of highly efficient concurrent data-structures, such as combining-objects, concurrent queues and stacks, concurrent hash-tables and locks. The cornerstone of the Synch framework are the combining objects. A Combining object is a concurrent object/data-structure that is able to simulate any other concurrent object, e.g. stacks, queues, atomic counters, barriers, etc. The Synch framework provides the PSim wait-free combining object [2,10], the blocking combining objects CC-Synch, DSM-Synch and H-Synch [1], and the blocking combining object based on the technique presented in [4]. Moreover, the Synch framework provides the Osci blocking, combining technique [3] that achieves good performance using user-level threads.
The Synch framework provides a large set of highly efficient concurrent data-structures, such as combining-objects, concurrent queues and stacks, concurrent hash-tables and locks. The cornerstone of the Synch framework are the combining objects. A Combining object is a concurrent object/data-structure that is able to simulate any other concurrent object, e.g. stacks, queues, atomic counters, barriers, etc. The Synch framework provides the PSim wait-free combining object [2,10], the blocking combining objects CC-Synch, DSM-Synch and H-Synch [1], and the blocking combining object based on the technique presented in [4]. Moreover, the Synch framework provides the Osci blocking, combining technique [3] that achieves good performance using user-level threads. Since v3.1.0, the Synch framework offers a new high performant implementation of flat-combining synchronization technique [14]. This novel version is implemented from the scratch and is not just an optimized version of the original code provided in [15].

In terms of concurrent queues, the Synch framework provides the SimQueue [2,10] wait-free queue implementation that is based on the PSim combining object, the CC-Queue, DSM-Queue and H-Queue [1] blocking queue implementations based on the CC-Synch, DSM-Synch and H-Synch combining objects. A blocking queue implementation based on the CLH locks [5,6] and the lock-free implementation presented in [7] are also provided.
Since v2.4.0, the Synch framework provides the LCRQ [11,12] queue implementation. In terms of concurrent stacks, the Synch framework provides the SimStack [2,10] wait-free stack implementation that is based on the PSim combining object, the CC-Stack, DSM-Stack and H-Stack [1] blocking stack implementations based on the CC-Synch, DSM-Synch and H-Synch combining objects. Moreover, the lock-free stack implementation of [8] and the blocking implementation based on the CLH locks [5,6] are provided. The Synch framework also provides concurrent queue and stacks implementations (i.e. OsciQueue and OsciStack implementations) that achieve very high performance using user-level threads [3].
Since v2.4.0, the Synch framework provides the LCRQ [11,12] queue implementation. In terms of concurrent stacks, the Synch framework provides the SimStack [2,10] wait-free stack implementation that is based on the PSim combining object, the CC-Stack, DSM-Stack and H-Stack [1] blocking stack implementations based on the CC-Synch, DSM-Synch and H-Synch combining objects. Moreover, the lock-free stack implementation of [8] and the blocking implementation based on the CLH locks [5,6] are provided. The Synch framework also provides concurrent queue and stacks implementations (i.e. OsciQueue and OsciStack implementations) that achieve very high performance using user-level threads [3]. Since v3.1.0, the Synch framework provides stack and queue implementations (i.e. FC-Stack and FC-Queue) based on the implementation of flat-combining provided by the Synch framework.

Furthermore, the Synch framework provides a few scalable lock implementations, i.e. the MCS queue-lock presented in [9] and the CLH queue-lock presented in [5,6]. Finally, the Synch framework provides two example-implementations of concurrent hash-tables. More specifically, it provides a simple implementation based on CLH queue-locks [5,6] and an implementation based on the DSM-Synch [1] combining technique.

Expand All @@ -28,17 +28,20 @@ The following table presents a summary of the concurrent data-structures offered
| | PSim [2,10] |
| | Osci [3] |
| | Oyama [4] |
| | FC: a new implementation of flat-combining [14] |
| Concurrent Queues | CC-Queue, DSM-Queue and H-Queue [1] |
| | SimQueue [2,10] |
| | OsciQueue [3] |
| | CLH-Queue [5,6] |
| | MS-Queue [7] |
| | LCRQ [11,12] |
| | FC-Queue [14] |
| Concurrent Stacks | CC-Stack, DSM-Stack and H-Stack [1] |
| | SimStack [2,10] |
| | OsciStack [3] |
| | CLH-Stack [5,6] |
| | LF-Stack [8] |
| | FC-Stack [14] |
| Locks | CLH [5,6] |
| | MCS [9] |
| Hash Tables | CLH-Hash [5,6] |
Expand Down Expand Up @@ -161,11 +164,13 @@ The following table shows the memory reclamation characteristics of the provided
| | CLH-Queue [5,6] | Supported |
| | MS-Queue [7] | Hazard Pointers (not provided by Synch) |
| | LCRQ [11,12] | Hazard Pointers (not provided by Synch) |
| | FC-Queue [14] | Supported |
| Concurrent Stacks | CC-Stack, DSM-Stack and H-Stack [1] | Supported |
| | SimStack [2,10] | Supported (since v2.4.0) |
| | OsciStack [3] | Supported |
| | CLH-Stack [5,6] | Supported |
| | LF-Stack [8] | Hazard Pointers (not provided by Synch) |
| | FC-Stack [14] | Supported |


## Memory reclamation limitations
Expand Down Expand Up @@ -226,7 +231,7 @@ int main(int argc, char *argv[]) {
This example-benchmark creates `N_THREADS`, where each of them executes `RUNS` Fetch&Add operations in a shared 64-bit integer. At the end of the benchmark the throughput (i.e. Fetch&Add operations per second) is calculated. By seting varous values for `N_THREADS`, this benchmark is able to measure strong scaling.
The `synchStartThreadsN` function (provided by the API defined in `threadtools.h`) in main, creates `N_THREADS` threads and each of the executes the `Execute` function declared in the same file. The `_DONT_USE_UTHREADS_` argument imposes `synchStartThreadsN` to create only Posix threads; in case that the user sets the corresponding fibers argument to `M` > 0, then `StartThreadsN` will create `N_THREADS` Posix threads and each of them will create `M` user-level (i.e. fiber) threads. The `synchJoinThreadsN` function (also provided by `threadtools. h`) waits until all Posix and fiber (if any) threads finish the execution of the `Execute` function. The Fetch&Add instruction on 64-bit integers is performed by the `synchFAA64` function provided by the API of `primitives.h`.
The `synchStartThreadsN` function (provided by the API defined in `threadtools.h`) in main, creates `N_THREADS` threads and each of the executes the `Execute` function declared in the same file. The `SYNCH_DONT_USE_UTHREADS_` argument imposes `synchStartThreadsN` to create only Posix threads; in case that the user sets the corresponding fibers argument to `M` > 0, then `synchStartThreadsN` will create `N_THREADS` Posix threads and each of them will create `M` user-level (i.e. fiber) threads. The `synchJoinThreadsN` function (also provided by `threadtools. h`) waits until all Posix and fiber (if any) threads finish the execution of the `Execute` function. The Fetch&Add instruction on 64-bit integers is performed by the `synchFAA64` function provided by the API of `primitives.h`.
The threads executing the `Execute` function use the `SynchBarrier` re-entrant barrier object for simultaneously starting to perform Fetch&Add instructions on the shared variable `object`. This barrier is also re-used before the end of the `Execute` function in order to allow thread with `id = 0` to measure the amount of time that the benchmark needed for completion. The `synchBarrierSet` function in `main` initializes the `SynchBarrier` object. The `synchBarrierSet` takes as an argument a pointer to the barrier object and the number of threads `N_THREADS` that are going to use it. Both `synchBarrierSet` and `synchBarrierWait` are provided by the API of `barrier.h`
Expand Down Expand Up @@ -290,6 +295,11 @@ The Synch framework is provided under the [LGPL-2.1 License](https://github.com/

[13]. Guy E. Blelloch, and Yuanhao Wei. "Brief Announcement: Concurrent Fixed-Size Allocation and Free in Constant Time." 34th International Symposium on Distributed Computing (DISC 2020). Schloss Dagstuhl-Leibniz-Zentrum für Informatik, 2020.

[14]. Danny Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir. Flat combining and the synchronization-parallelism tradeoff. In Proceedings of the twenty-second annual ACM symposium on Parallelism in algorithms and architectures (SPAA 2010), pp. 355-364.

[15]. Danny Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir. Source code for flat-combing. https://github.com/mit-carbon/Flat-Combining


# Contact

For any further information, please do not hesitate to
Expand Down
70 changes: 70 additions & 0 deletions benchmarks/fcbench.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>

#include <config.h>
#include <primitives.h>
#include <fastrand.h>
#include <threadtools.h>
#include <fc.h>
#include <barrier.h>
#include <bench_args.h>
#include <fam.h>

ObjectState *object CACHE_ALIGN;
FCStruct *object_combiner;
int64_t d1, d2;
SynchBarrier bar CACHE_ALIGN;
SynchBenchArgs bench_args CACHE_ALIGN;

inline static void *Execute(void *Arg) {
FCThreadState *th_state;
long i, rnum;
volatile long j;
long id = (long)Arg;

synchFastRandomSetSeed(id + 1);
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(FCThreadState));
FCThreadStateInit(object_combiner, th_state, (int)id);
synchBarrierWait(&bar);
if (id == 0) d1 = synchGetTimeMillis();

for (i = 0; i < bench_args.runs; i++) {
// perform a fetchAndMultiply operation
FCApplyOp(object_combiner, th_state, fetchAndMultiply, (void *)object, (ArgVal)id, id);
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
}
synchBarrierWait(&bar);
if (id == 0) d2 = synchGetTimeMillis();

return NULL;
}

int main(int argc, char *argv[]) {
synchParseArguments(&bench_args, argc, argv);
object_combiner = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(FCStruct));
object = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(ObjectState));
object->state_f = 1.0;
FCStructInit(object_combiner, bench_args.nthreads);

synchBarrierSet(&bar, bench_args.nthreads);
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
synchJoinThreadsN(bench_args.nthreads - 1);

printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int)(d2 - d1), bench_args.runs * bench_args.nthreads / (1000.0 * (d2 - d1)));
synchPrintStats(bench_args.nthreads, bench_args.total_runs);

#ifdef DEBUG
fprintf(stderr, "DEBUG: Object float state: %f\n", object->state_f);
fprintf(stderr, "DEBUG: Object state: %ld\n", object_combiner->counter);
fprintf(stderr, "DEBUG: rounds: %ld\n", object_combiner->rounds);
fprintf(stderr, "DEBUG: Average helping: %.2f\n", (float)object_combiner->counter / object_combiner->rounds);
fprintf(stderr, "\n");
#endif

return 0;
}
79 changes: 79 additions & 0 deletions benchmarks/fcqueuebench.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>
#include <sched.h>

#include <config.h>
#include <primitives.h>
#include <fastrand.h>
#include <threadtools.h>
#include <fcqueue.h>
#include <barrier.h>
#include <bench_args.h>

FCQueueStruct *queue_object CACHE_ALIGN;
int64_t d1 CACHE_ALIGN, d2;
SynchBarrier bar CACHE_ALIGN;
SynchBenchArgs bench_args CACHE_ALIGN;

inline static void *Execute(void *Arg) {
FCQueueThreadState *th_state;
long i, rnum;
volatile int j;
long id = (long)Arg;

synchFastRandomSetSeed(id + 1);
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(FCQueueThreadState));
FCQueueThreadStateInit(queue_object, th_state, (int)id);

synchBarrierWait(&bar);
if (id == 0) d1 = synchGetTimeMillis();

for (i = 0; i < bench_args.runs; i++) {
// perform an enqueue operation
FCQueueApplyEnqueue(queue_object, th_state, (ArgVal)id, id);
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
// perform a dequeue operation
FCQueueApplyDequeue(queue_object, th_state, id);
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
}
synchBarrierWait(&bar);
if (id == 0) d2 = synchGetTimeMillis();

return NULL;
}

int main(int argc, char *argv[]) {
synchParseArguments(&bench_args, argc, argv);
queue_object = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(FCQueueStruct));
FCQueueStructInit(queue_object, bench_args.nthreads);

synchBarrierSet(&bar, bench_args.nthreads);
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
synchJoinThreadsN(bench_args.nthreads - 1);

printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int)(d2 - d1), 2 * bench_args.runs * bench_args.nthreads / (1000.0 * (d2 - d1)));
synchPrintStats(bench_args.nthreads, bench_args.total_runs);
#ifdef DEBUG
fprintf(stderr, "DEBUG: Enqueue: Object state: %ld\n", queue_object->enqueue_struct.counter);
fprintf(stderr, "DEBUG: Enqueue: rounds: %ld\n", queue_object->enqueue_struct.rounds);
fprintf(stderr, "DEBUG: Dequeue: Object state: %ld\n", queue_object->dequeue_struct.counter);
fprintf(stderr, "DEBUG: Dequeue: rounds: %ld\n", queue_object->dequeue_struct.rounds);
volatile Node *first = (Node *)queue_object->first;
long counter = 0;

while (first->next != NULL) {
first = first->next;
counter++;
}
fprintf(stderr, "DEBUG: %ld nodes were left in the queue\n", counter);
#endif

return 0;
}
75 changes: 75 additions & 0 deletions benchmarks/fcstackbench.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>

#include <config.h>
#include <primitives.h>
#include <fastrand.h>
#include <threadtools.h>
#include <fcstack.h>
#include <barrier.h>
#include <bench_args.h>

FCStackStruct *object_struct CACHE_ALIGN;
int64_t d1 CACHE_ALIGN, d2;
SynchBarrier bar CACHE_ALIGN;
SynchBenchArgs bench_args CACHE_ALIGN;

inline static void *Execute(void *Arg) {
FCStackThreadState *th_state;
long i, rnum;
volatile int j;
long id = (long)Arg;

synchFastRandomSetSeed(id + 1);
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(FCStackThreadState));
FCStackThreadStateInit(object_struct, th_state, (int)id);
synchBarrierWait(&bar);
if (id == 0) d1 = synchGetTimeMillis();

for (i = 0; i < bench_args.runs; i++) {
// perform a push operation
FCStackPush(object_struct, th_state, id, id);
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
// perform a pop operation
FCStackPop(object_struct, th_state, id);
rnum = synchFastRandomRange(1, bench_args.max_work);
for (j = 0; j < rnum; j++)
;
}
synchBarrierWait(&bar);
if (id == 0) d2 = synchGetTimeMillis();

return NULL;
}

int main(int argc, char *argv[]) {
synchParseArguments(&bench_args, argc, argv);
object_struct = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(FCStackStruct));
FCStackInit(object_struct, bench_args.nthreads);
synchBarrierSet(&bar, bench_args.nthreads);
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
synchJoinThreadsN(bench_args.nthreads - 1);

printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int)(d2 - d1), 2 * bench_args.runs * bench_args.nthreads / (1000.0 * (d2 - d1)));
synchPrintStats(bench_args.nthreads, bench_args.total_runs);

#ifdef DEBUG
fprintf(stderr, "DEBUG: Object state: %ld\n", object_struct->object_struct.counter);
fprintf(stderr, "DEBUG: rounds: %ld\n", object_struct->object_struct.rounds);
volatile Node *top = object_struct->top;
long counter = 0;

while (top != NULL) {
top = top->next;
counter++;
}
fprintf(stderr, "DEBUG: %ld nodes left in the queue\n", counter);
#endif

return 0;
}
Loading

0 comments on commit c4cdeda

Please sign in to comment.