diff --git a/src/test/perf/msgpass/msgpass.cc b/src/test/perf/msgpass/msgpass.cc new file mode 100644 index 000000000..1aa116754 --- /dev/null +++ b/src/test/perf/msgpass/msgpass.cc @@ -0,0 +1,244 @@ +/** + * A simulation of a message-passing application workload for snmalloc. + * + * - N_PRODUCER producer threads allocate and queue spans of messages randomly, + * - to N_CONSUMER consumer threads, which dequeue messages and free() them. + * + * Optionally, N_PROXY threads act as both producers and consumers, forwarding + * received messages back to another queue rather than freeing them. + */ + +#include "test/opt.h" +#include "test/setup.h" +#include "test/usage.h" +#include "test/xoroshiro.h" + +constexpr static bool use_malloc = false; +constexpr static bool be_chatty = false; + +#include +#include +#include +#include +#include +#include +#include + +constexpr static size_t N_PRODUCER = 3; +constexpr static size_t N_CONSUMER = 3; +constexpr static size_t N_PROXY = 2; +constexpr static size_t N_QUEUE = N_CONSUMER + N_PROXY; + +constexpr static size_t N_PRODUCER_BATCH = 256 * 1024; +constexpr static size_t N_MAX_OUTSTANDING = 4 * 1024; + +using namespace snmalloc; + +void chatty(const char* p, ...) +{ + if constexpr (be_chatty) + { + va_list va; + va_start(va, p); + vfprintf(stderr, p, va); + va_end(va); + } +} + +/* + * RemoteAllocators make for convenient MPSC queues, so we use those for sending + * "messages". Each consumer or proxy has its own (source) queue. + */ +RemoteAllocator msgqueue[N_QUEUE]; + +std::atomic producers_live; +std::atomic queue_gate; +std::atomic messages_outstanding; + +freelist::HeadPtr domesticate_nop(freelist::QueuePtr p) +{ + return freelist::HeadPtr::unsafe_from(p.unsafe_ptr()); +}; + +void consumer(size_t qix) +{ + auto& a = ThreadAlloc::get(); + auto& myq = msgqueue[qix]; + size_t reap; + + chatty("Cl %zu q is %p\n", qix, &myq); + + do + { + reap = 0; + + if (myq.can_dequeue(domesticate_nop)) + { + myq.dequeue( + domesticate_nop, + domesticate_nop, + [qix, &a, &reap](freelist::HeadPtr o) { + UNUSED(qix); + auto p = o.as_void().unsafe_ptr(); + chatty("Cl %zu free %p\n", qix, p); + (use_malloc ? free(p) : a.dealloc(p)); + reap++; + return true; + }); + } + + messages_outstanding -= reap; + + if (reap == 0) + { + std::this_thread::yield(); + } + else + { + chatty("Cl %zu reap %zu\n", qix, reap); + } + + } while (reap || producers_live || (queue_gate > N_CONSUMER)); + + chatty("Cl %zu fini\n", qix); +} + +void proxy(size_t qix) +{ + auto& myq = msgqueue[qix]; + bool any; + + chatty("Px %zu q is %p\n", qix, &myq); + + xoroshiro::p128r32 r(1234 + qix, qix); + do + { + any = false; + + if (myq.can_dequeue(domesticate_nop)) + { + myq.dequeue( + domesticate_nop, domesticate_nop, [qix, &r, &any](freelist::HeadPtr o) { + auto rcptqix = r.next() % qix; + + chatty( + "Px %zu send %p to %zu\n", qix, o.as_void().unsafe_ptr(), rcptqix); + + msgqueue[rcptqix].enqueue(o, o, domesticate_nop); + any = true; + return true; + }); + } + + std::this_thread::yield(); + } while (any || producers_live || (queue_gate > qix + 1)); + + chatty("Px %zu fini\n", qix); + queue_gate--; +} + +freelist::Builder batch[N_PRODUCER]; + +void producer(size_t pix) +{ + auto& a = ThreadAlloc::get(); + static constexpr size_t msgsizes[] = {48, 64, 96, 128}; + static constexpr size_t nmsgsizes = sizeof(msgsizes) / sizeof(msgsizes[0]); + + xoroshiro::p128r32 r(5489 + pix, pix); + batch[pix].init(0, RemoteAllocator::key_global); + + for (size_t batchix = N_PRODUCER_BATCH; batchix > 0; batchix--) + { + while (messages_outstanding >= N_MAX_OUTSTANDING) + { + std::this_thread::yield(); + } + + size_t nmsg = (r.next() & 15) + 1; + size_t msgsize = msgsizes[r.next() % nmsgsizes]; + + /* Allocate batch and form list */ + for (size_t msgix = 0; msgix < nmsg; msgix++) + { + auto msg = (use_malloc ? malloc(msgsize) : a.alloc(msgsize)); + chatty("Pd %zu make %p\n", pix, msg); + + auto msgc = capptr::Alloc::unsafe_from(msg) + .template as_reinterpret>(); + batch[pix].add(msgc, RemoteAllocator::key_global); + } + + /* Post to random queue */ + auto [bfirst, blast] = + batch[pix].extract_segment(RemoteAllocator::key_global); + auto rcptqix = r.next() % N_QUEUE; + msgqueue[rcptqix].enqueue(bfirst, blast, domesticate_nop); + messages_outstanding += nmsg; + + chatty("Pd %zu send %zu to %zu\n", pix, nmsg, rcptqix); + + /* Occasionally yield the CPU */ + if ((batchix & 0xF) == 1) + std::this_thread::yield(); + } + + chatty("Pd %zu fini\n", pix); +} + +int main() +{ + std::thread producer_threads[N_PRODUCER]; + std::thread queue_threads[N_QUEUE]; + + /* + * Force snmalloc to initialize itself, and especially + * RemoteAllocator::key_global, now, before we initialize our message queues. + * (Otherwise, we'll pick up the wrong key and pointers will fail to decode + * correctly and we will be sad for hours.) + */ + auto& a = ThreadAlloc::get(); + a.dealloc(a.alloc(32)); + + for (size_t i = 0; i < N_QUEUE; i++) + { + msgqueue[i].init(); + } + + producers_live = true; + queue_gate = N_QUEUE; + messages_outstanding = 0; + + /* Spawn consumers */ + for (size_t i = 0; i < N_CONSUMER; i++) + { + queue_threads[i] = std::thread(consumer, i); + } + + /* Spawn proxies */ + for (size_t i = N_CONSUMER; i < N_QUEUE; i++) + { + queue_threads[i] = std::thread(proxy, i); + } + + /* Spawn producers */ + for (size_t i = 0; i < N_PRODUCER; i++) + { + producer_threads[i] = std::thread(producer, i); + } + + /* Wait for producers to finish */ + for (size_t i = 0; i < N_PRODUCER; i++) + { + producer_threads[i].join(); + } + producers_live = false; + + /* Wait for proxies and consumers to finish */ + for (size_t i = 0; i < N_QUEUE; i++) + { + queue_threads[N_QUEUE - 1 - i].join(); + } + + return 0; +}