-
Notifications
You must be signed in to change notification settings - Fork 109
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test/perf/msgpass: ubench a producer-consumer app
Approximate a message-passing application as a set of producers, a set of consumers, and a set of proxies that do both. We'll use this for some initial insight for #634 but it seems worth having in general.
- Loading branch information
Showing
1 changed file
with
235 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
/** | ||
* A simulation of a message-passing application workload for snmalloc. | ||
* | ||
* - N_PRODUCER producer threads allocate and queue spans of messages randomly, | ||
* - to N_CONSUMER consumer threads, which dequeue messages and free() them. | ||
* | ||
* Optionally, N_PROXY threads act as both producers and consumers, forwarding | ||
* received messages back to another queue rather than freeing them. | ||
*/ | ||
|
||
#include "test/opt.h" | ||
#include "test/setup.h" | ||
#include "test/usage.h" | ||
#include "test/xoroshiro.h" | ||
|
||
constexpr static bool use_malloc = false; | ||
constexpr static bool be_chatty = false; | ||
|
||
#include <chrono> | ||
#include <iomanip> | ||
#include <iostream> | ||
#include <snmalloc/snmalloc.h> | ||
#include <stdarg.h> | ||
#include <thread> | ||
#include <vector> | ||
|
||
constexpr static size_t N_PRODUCER = 3; | ||
constexpr static size_t N_CONSUMER = 3; | ||
constexpr static size_t N_PROXY = 2; | ||
constexpr static size_t N_QUEUE = N_CONSUMER + N_PROXY; | ||
|
||
constexpr static size_t N_PRODUCER_BATCH = 256 * 1024; | ||
constexpr static size_t N_MAX_OUTSTANDING = 4 * 1024; | ||
|
||
using namespace snmalloc; | ||
|
||
void chatty(const char* p, ...) | ||
{ | ||
if constexpr (be_chatty) | ||
{ | ||
va_list va; | ||
va_start(va, p); | ||
vfprintf(stderr, p, va); | ||
va_end(va); | ||
} | ||
} | ||
|
||
/* | ||
* RemoteAllocators make for convenient MPSC queues, so we use those for sending | ||
* "messages". Each consumer or proxy has its own (source) queue. | ||
*/ | ||
RemoteAllocator msgqueue[N_QUEUE]; | ||
|
||
std::atomic<bool> producers_live; | ||
std::atomic<size_t> queue_gate; | ||
std::atomic<size_t> messages_outstanding; | ||
|
||
freelist::HeadPtr domesticate_nop(freelist::QueuePtr p) | ||
{ | ||
return freelist::HeadPtr::unsafe_from(p.unsafe_ptr()); | ||
}; | ||
|
||
void consumer(size_t qix) | ||
{ | ||
auto& a = ThreadAlloc::get(); | ||
auto& myq = msgqueue[qix]; | ||
size_t reap; | ||
|
||
chatty("Cl %zu q is %p\n", qix, &myq); | ||
|
||
do | ||
{ | ||
reap = 0; | ||
|
||
if (myq.can_dequeue(domesticate_nop)) | ||
{ | ||
myq.dequeue( | ||
domesticate_nop, | ||
domesticate_nop, | ||
[qix, &a, &reap](freelist::HeadPtr o) { | ||
UNUSED(qix); | ||
auto p = o.as_void().unsafe_ptr(); | ||
chatty("Cl %zu free %p\n", qix, p); | ||
(use_malloc ? free(p) : a.dealloc(p)); | ||
reap++; | ||
return true; | ||
}); | ||
} | ||
|
||
messages_outstanding -= reap; | ||
|
||
if (reap == 0) | ||
{ | ||
std::this_thread::yield(); | ||
} | ||
else | ||
{ | ||
chatty("Cl %zu reap %zu\n", qix, reap); | ||
} | ||
|
||
} while (reap || producers_live || (queue_gate > N_CONSUMER)); | ||
|
||
chatty("Cl %zu fini\n", qix); | ||
} | ||
|
||
void proxy(size_t qix) | ||
{ | ||
auto& myq = msgqueue[qix]; | ||
bool any; | ||
|
||
chatty("Px %zu q is %p\n", qix, &myq); | ||
|
||
xoroshiro::p128r32 r(1234 + qix, qix); | ||
do | ||
{ | ||
any = false; | ||
|
||
if (myq.can_dequeue(domesticate_nop)) | ||
{ | ||
myq.dequeue( | ||
domesticate_nop, domesticate_nop, [qix, &r, &any](freelist::HeadPtr o) { | ||
auto rcptqix = r.next() % qix; | ||
|
||
chatty( | ||
"Px %zu send %p to %zu\n", qix, o.as_void().unsafe_ptr(), rcptqix); | ||
|
||
msgqueue[rcptqix].enqueue(o, o, domesticate_nop); | ||
any = true; | ||
return true; | ||
}); | ||
} | ||
|
||
std::this_thread::yield(); | ||
} while (any || producers_live || (queue_gate > qix + 1)); | ||
|
||
chatty("Px %zu fini\n", qix); | ||
queue_gate--; | ||
} | ||
|
||
freelist::Builder<false> batch[N_PRODUCER]; | ||
|
||
void producer(size_t pix) | ||
{ | ||
auto& a = ThreadAlloc::get(); | ||
static constexpr size_t msgsizes[] = {48, 64, 96, 128}; | ||
static constexpr size_t nmsgsizes = sizeof(msgsizes) / sizeof(msgsizes[0]); | ||
|
||
xoroshiro::p128r32 r(5489 + pix, pix); | ||
batch[pix].init(0, RemoteAllocator::key_global); | ||
|
||
for (size_t batchix = N_PRODUCER_BATCH; batchix > 0; batchix--) | ||
{ | ||
while (messages_outstanding >= N_MAX_OUTSTANDING) | ||
{ | ||
std::this_thread::yield(); | ||
} | ||
|
||
size_t nmsg = (r.next() & 15) + 1; | ||
size_t msgsize = msgsizes[r.next() % nmsgsizes]; | ||
|
||
/* Allocate batch and form list */ | ||
for (size_t msgix = 0; msgix < nmsg; msgix++) | ||
{ | ||
auto msg = (use_malloc ? malloc(msgsize) : a.alloc(msgsize)); | ||
chatty("Pd %zu make %p\n", pix, msg); | ||
|
||
auto msgc = capptr::Alloc<void>::unsafe_from(msg) | ||
.template as_reinterpret<freelist::Object::T<>>(); | ||
batch[pix].add(msgc, RemoteAllocator::key_global); | ||
} | ||
|
||
/* Post to random queue */ | ||
auto [bfirst, blast] = | ||
batch[pix].extract_segment(RemoteAllocator::key_global); | ||
auto rcptqix = r.next() % N_QUEUE; | ||
msgqueue[rcptqix].enqueue(bfirst, blast, domesticate_nop); | ||
messages_outstanding += nmsg; | ||
|
||
chatty("Pd %zu send %zu to %zu\n", pix, nmsg, rcptqix); | ||
|
||
/* Occasionally yield the CPU */ | ||
if ((batchix & 0xF) == 1) | ||
std::this_thread::yield(); | ||
} | ||
|
||
chatty("Pd %zu fini\n", pix); | ||
} | ||
|
||
int main() | ||
{ | ||
std::thread producer_threads[N_PRODUCER]; | ||
std::thread queue_threads[N_QUEUE]; | ||
|
||
for (size_t i = 0; i < N_QUEUE; i++) | ||
{ | ||
msgqueue[i].init(); | ||
} | ||
|
||
producers_live = true; | ||
queue_gate = N_QUEUE; | ||
messages_outstanding = 0; | ||
|
||
/* Spawn consumers */ | ||
for (size_t i = 0; i < N_CONSUMER; i++) | ||
{ | ||
queue_threads[i] = std::thread(consumer, i); | ||
} | ||
|
||
/* Spawn proxies */ | ||
for (size_t i = N_CONSUMER; i < N_QUEUE; i++) | ||
{ | ||
queue_threads[i] = std::thread(proxy, i); | ||
} | ||
|
||
/* Spawn producers */ | ||
for (size_t i = 0; i < N_PRODUCER; i++) | ||
{ | ||
producer_threads[i] = std::thread(producer, i); | ||
} | ||
|
||
/* Wait for producers to finish */ | ||
for (size_t i = 0; i < N_PRODUCER; i++) | ||
{ | ||
producer_threads[i].join(); | ||
} | ||
producers_live = false; | ||
|
||
/* Wait for proxies and consumers to finish */ | ||
for (size_t i = 0; i < N_QUEUE; i++) | ||
{ | ||
queue_threads[N_QUEUE - 1 - i].join(); | ||
} | ||
|
||
return 0; | ||
} |