Skip to content

Commit

Permalink
Merge branch 'configurable-pools' into max-next
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Dec 15, 2022
2 parents 442902b + 58be8ee commit 8c1331e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
27 changes: 13 additions & 14 deletions src/core/group_freelist.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ struct group_freelist_chunk {

ffi.cdef([[
struct group_freelist {
uint32_t enqueue_pos[1];
uint8_t pad_enqueue_pos[]]..CACHELINE-1*INT..[[];
uint32_t enqueue_pos[1], enqueue_mask;
uint8_t pad_enqueue_pos[]]..CACHELINE-2*INT..[[];
uint32_t dequeue_pos[1];
uint8_t pad_dequeue_pos[]]..CACHELINE-1*INT..[[];
uint32_t dequeue_pos[1], dequeue_mask;
uint8_t pad_dequeue_pos[]]..CACHELINE-2*INT..[[];
uint32_t state[1], size;
uint8_t pad_state[]]..CACHELINE-2*INT..[[];
uint32_t size, state[1];
struct group_freelist_chunk chunk[?];
} __attribute__((packed, aligned(]]..CACHELINE..[[)))]])
Expand All @@ -56,10 +55,12 @@ function freelist_create (name, size)
local fl = shm.create(name, "struct group_freelist", size)
if sync.cas(fl.state, CREATE, INIT) then
fl.size = size
local mask = size - 1
fl.enqueue_mask, fl.dequeue_mask = mask, mask
for i = 0, fl.size-1 do
fl.chunk[i].sequence[0] = i
end
fl.state[0] = READY
assert(sync.cas(fl.state, INIT, READY))
return fl
else
shm.unmap(fl)
Expand All @@ -75,14 +76,11 @@ function freelist_open (name, readonly)
return shm.open(name, "struct group_freelist", readonly, size)
end

local function mask (fl, i)
return band(i, fl.size-1)
end

function start_add (fl)
local pos = fl.enqueue_pos[0]
local mask = fl.enqueue_mask
while true do
local chunk = fl.chunk[mask(fl, pos)]
local chunk = fl.chunk[band(pos, mask)]
local seq = chunk.sequence[0]
local dif = seq - pos
if dif == 0 then
Expand All @@ -100,13 +98,14 @@ end

function start_remove (fl)
local pos = fl.dequeue_pos[0]
local mask = fl.dequeue_mask
while true do
local chunk = fl.chunk[mask(fl, pos)]
local chunk = fl.chunk[band(pos, mask)]
local seq = chunk.sequence[0]
local dif = seq - (pos+1)
if dif == 0 then
if sync.cas(fl.dequeue_pos, pos, pos+1) then
return chunk, pos+fl.size
return chunk, pos+mask+1
end
elseif dif < 0 then
return
Expand Down
28 changes: 14 additions & 14 deletions src/lib/interlink.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,19 @@ local waitfor = require("core.lib").waitfor
local sync = require("core.sync")

local CACHELINE = 64 -- XXX - make dynamic
local INT = ffi.sizeof("int")
local INT = ffi.sizeof("uint32_t")

-- Based on MCRingBuffer, see
-- http://www.cse.cuhk.edu.hk/%7Epclee/www/pubs/ipdps10.pdf

ffi.cdef([[
struct interlink {
int size;
char pad0[]]..CACHELINE-1*INT..[[];
int read, write, state[1];
char pad1[]]..CACHELINE-3*INT..[[];
int lwrite, nread;
char pad2[]]..CACHELINE-2*INT..[[];
int lread, nwrite;
char pad3[]]..CACHELINE-2*INT..[[];
uint32_t read, write, size, state[1];
char pad1[]]..CACHELINE-4*INT..[[];
uint32_t lwrite, nread, rmask;
char pad2[]]..CACHELINE-3*INT..[[];
uint32_t lread, nwrite, wmask;
char pad3[]]..CACHELINE-3*INT..[[];
struct packet *packets[?];
} __attribute__((packed, aligned(]]..CACHELINE..[[)))
]])
Expand Down Expand Up @@ -151,6 +149,8 @@ local function attach (name, size, transitions)
-- (only one process can set size).
if sync.cas(r.state, INIT, CONF) then
r.size = size
local mask = size - 1
r.rmask, r.wmask = mask, mask
assert(sync.cas(r.state, CONF, FREE))
end
-- Return if we succeed to attach.
Expand Down Expand Up @@ -224,12 +224,12 @@ end

-- Queue operations follow below.

local function NEXT (size, i)
return band(i + 1, size - 1)
local function NEXT (mask, i)
return band(i + 1, mask)
end

function full (r)
local after_nwrite = NEXT(r.size, r.nwrite)
local after_nwrite = NEXT(r.wmask, r.nwrite)
if after_nwrite == r.lread then
if after_nwrite == r.read then
return true
Expand All @@ -240,7 +240,7 @@ end

function insert (r, p)
r.packets[r.nwrite] = p
r.nwrite = NEXT(r.size, r.nwrite)
r.nwrite = NEXT(r.wmask, r.nwrite)
end

function push (r)
Expand All @@ -259,7 +259,7 @@ end

function extract (r)
local p = r.packets[r.nread]
r.nread = NEXT(r.size, r.nread)
r.nread = NEXT(r.rmask, r.nread)
return p
end

Expand Down

0 comments on commit 8c1331e

Please sign in to comment.