Mailboxes are one of the fundamental parts of the actor model originated in 1973:
An actor is an object that carries out its actions in response to communications it receives. Through the mailbox mechanism, actors can decouple the reception of a message from its elaboration. A mailbox is nothing more than the data structure (FIFO) that holds messages.
I first encountered MailBox in the late 80s while working on a real-time system:
"A mailbox is object that can be used for inter-task communication. When task A wants to send an object to task B, task A must send the object to the mailbox, and task B must visit the mailbox, where, if an object isn't there, it has the option of waiting for any desired length of time..." iRMX 86™ NUCLEUS REFERENCE MANUAL _Copyright @ 1980, 1981 Intel Corporation.
Since than I have used it in:
OS | Language(s) |
---|---|
iRMX | PL/M-86 |
AIX | C |
Windows | C++/C# |
Linux | Go |
Now it's Zig time!!!
If your thread runs in "Fire and Forget" mode, you don't need Mailbox.
But in real multithreaded applications, threads communicate with each other as members of a work team.
Mailbox provides a convenient and simple inter-thread communication:
- thread safe
- asynchronous
- non-blocking
- cancelable
- no own allocations
- unbounded
- fan-out/fan-in
// Mbx is Mailbox with usize letter(data)
const Mbx = mailbox.MailBox(usize);
// Echo - runs on own thread
// It has two mailboxes
// "TO" and "FROM" - from the client point of the view
// Receives letter via 'TO' mailbox
// Replies letter without change (echo) to "FROM" mailbox
const Echo = struct {
const Self = @This();
to: Mbx = undefined,
from: Mbx = undefined,
thread: Thread = undefined,
// Mailboxes creation and start of the thread
// Pay attention, that client code does not use
// any thread "API" - all embedded within Echo
pub fn start(echo: *Self) void {
echo.to = .{};
echo.from = .{};
echo.thread = std.Thread.spawn(.{}, run, .{echo}) catch unreachable;
}
// Echo thread function
fn run(echo: *Self) void {
// Main loop:
while (true) {
// Receive - exit from the thread if mailbox was closed
const envelope = echo.to.receive(100000000) catch break;
// Reply to the client
// Exit from the thread if mailbox was closed
_ = echo.from.send(envelope) catch break;
}
}
// Wait exit from the thread
pub fn waitFinish(echo: *Self) void {
echo.thread.join();
}
// Close mailboxes
// As result Echo should stop processing
// and exit from the thread.
pub fn stop(echo: *Self) !void {
_ = echo.to.close();
_ = echo.from.close();
}
};
var echo = try std.testing.allocator.create(Echo);
// Start Echo(on own thread)
echo.start();
defer echo.stop();
defer {
// Wait finish of Echo
echo.waitFinish();
std.testing.allocator.destroy(echo);
}
// because nothing was send to 'TO' mailbox, nothing should be received
// from 'FROM' mailbox
try testing.expectError(error.Timeout, echo.from.receive(100));
// Create wrapper for the data
const envl = try std.testing.allocator.create(Mbx.Envelope);
defer std.testing.allocator.destroy(envl);
// Send/Receive loop
for (0..6) |indx| {
// Set value for send [0-5]
envl.letter = indx;
// Send to 'TO' mailbox
try echo.to.send(envl);
// Wait received data from OUT mailbox
const back = echo.from.receive(1000000);
if (back) |val| {
// Expected value == index [0-5]
try testing.expect(val.letter == indx);
} else |_| {
try testing.expect(false);
}
}
Mailbox of []const u8 'Letters':
const Rumors = mailbox.MailBox([]const u8);
const rmrsMbx : Rumors = .{};
Envelope is a wrapper of actual user defined type Letter.
pub const Envelope = struct {
prev: ?*Envelope = null,
next: ?*Envelope = null,
letter: Letter,
};
In fact Mailbox is a queue(FIFO) of Envelope(s).
MailBox supports following operations:
- send Envelope to MailBox (enqueue) and wakeup waiting receiver(s)
- receive Envelope from Mailbox (dequeue) with time-out
- close Mailbox:
- disables further operations
- first close returns List of non-processed Envelope(s) for free/reuse etc.
Feel free to suggest improvements in doc and code.
You finally got to installation!
Create folder 'deps' under 'src' and mailbox submodule:
mkdif src/deps
git submodule add https://github.com/g41797/mailbox src/deps/mailbox
Import mailbox:
const mailbox = @import("deps/mailbox/src/mailbox.zig");
Use mailbox:
const MsgBlock = struct {
len: usize = undefined,
buff: [1024]u8 = undefined,
};
const Msgs = mailbox.MailBox(MsgBlock);
var msgs: Msgs = .{};
...................
_ = msgs.close();
Periodically update submodule(s):
git submodule update --remote
With an existing Zig project, adding Mailbox to it is easy:
- Add mailbox to your
build.zig.zon
- Add mailbox to your
build.zig
To add mailbox to build.zig.zon
simply run the following in your terminal:
cd my-example-project
zig fetch --save=mailbox git+https://github.com/g41797/mailbox
and in your build.zig.zon
you should find a new dependency like:
.{
.name = "My example project",
.version = "0.0.1",
.dependencies = .{
.mailbox = .{
.url = "git+https://github.com/g41797/mailbox#3f794f34f5d859e7090c608da998f3b8856f8329",
.hash = "122068e7811ec1bfc2a81c9250078dd5dafa9dca4eb3f1910191ba060585526f03fe",
},
},
.paths = .{
"",
},
}
Then, in your build.zig
's build
function, add the following before
b.installArtifact(exe)
:
const mailbox = b.dependency("mailbox", .{
.target = target,
.optimize = optimize,
});
exe.root_module.addImport("mailbox", mailbox.module("mailbox"));
From then on, you can use the Mailbox package in your project.
First rule of multithreading:
If you can do without multithreading - do without.