-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathobqueue.h
229 lines (203 loc) · 7.42 KB
/
obqueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#include "align.h"
#include "primitives.h"
#define NODE_SIZE (1 << 12)
#define N NODE_SIZE
#define NBITS (N - 1)
#define BOT ((void*)0)
#include <linux/futex.h>
#include <pthread.h>
#include <syscall.h>
struct _node_t {
struct _node_t* volatile next DOUBLE_CACHE_ALIGNED;
long id DOUBLE_CACHE_ALIGNED;
void* volatile cells[NODE_SIZE] DOUBLE_CACHE_ALIGNED;
};
typedef struct _node_t node_t;
// Support 127 threads.
#define HANDLES 128
struct _obqueue_t {
struct _node_t* init_node;
volatile long init_id DOUBLE_CACHE_ALIGNED;
volatile long put_index DOUBLE_CACHE_ALIGNED;
volatile long pop_index DOUBLE_CACHE_ALIGNED;
struct _handle_t* volatile enq_handles[HANDLES];
struct _handle_t* volatile deq_handles[HANDLES];
int threshold;
pthread_barrier_t enq_barrier;
pthread_barrier_t deq_barrier;
};
typedef struct _obqueue_t obqueue_t;
struct _handle_t {
struct _node_t* spare;
struct _node_t* volatile put_node CACHE_ALIGNED;
struct _node_t* volatile pop_node CACHE_ALIGNED;
};
typedef struct _handle_t handle_t;
static inline node_t* ob_new_node() {
node_t* n = align_malloc(PAGE_SIZE, sizeof(node_t));
memset(n, 0, sizeof(node_t));
return n;
}
#define ENQ (1 << 1)
#define DEQ (1 << 0)
// regiseter the enqueuers first, dequeuers second.
void ob_queue_register(obqueue_t* q, handle_t* th, int flag) {
th->spare = ob_new_node();
th->put_node = th->pop_node = q->init_node;
if (flag & ENQ) {
handle_t** tail = q->enq_handles;
int i = 0;
for (;; ++i) {
handle_t* init = NULL;
if (tail[i] == NULL && CAS(tail + i, &init, th)) {
break;
}
}
// wait for the other enqueuers to register.
pthread_barrier_wait(&q->enq_barrier);
}
if (flag & DEQ) {
handle_t** tail = q->deq_handles;
int i = 0;
for (;; ++i) {
handle_t* init = NULL;
if (tail[i] == NULL && CAS(tail + i, &init, th)) {
break;
}
}
// wait for the other dequeuers to register.
pthread_barrier_wait(&q->deq_barrier);
}
}
void ob_init_queue(obqueue_t* q, int enqs, int deqs, int threshold) {
q->init_node = ob_new_node();
q->threshold = threshold;
q->put_index = q->pop_index = q->init_id = 0;
// We take enqs enqueuers, deqs dequeuers.
pthread_barrier_init(&q->enq_barrier, NULL, enqs);
pthread_barrier_init(&q->deq_barrier, NULL, deqs);
}
/*
* ob_find_cell: This is our core operation, locating the offset on the nodes and nodes needed.
*/
static void* ob_find_cell(node_t* volatile* ptr, long i, handle_t* th) {
// get current node
node_t* curr = *ptr;
/*j is thread's local node'id(put node or pop node), (i / N) is the cell needed node'id.
and we shoud take it, By filling the nodes between the j and (i / N) through 'next' field*/
long j = curr->id;
for (; j < i / N; ++j) {
node_t* next = curr->next;
// next is NULL, so we Start filling.
if (next == NULL) {
// use thread's standby node.
node_t* temp = th->spare;
if (!temp) {
temp = ob_new_node();
th->spare = temp;
}
// next node's id is j + 1.
temp->id = j + 1;
// if true, then use this thread's node, else then thread has have done this.
if (CASra(&curr->next, &next, temp)) {
next = temp;
// now thread there is no standby node.
th->spare = NULL;
}
}
// take the next node.
curr = next;
}
// update our node to the present node.
*ptr = curr;
// Orders processor execution, so other thread can see the '*ptr = curr'.
asm("sfence" ::: "cc", "memory");
// now we get the needed cell, its' node is curr and index is i % N.
return &curr->cells[i % N];
}
int ob_futex_wake(void* addr, int val) {
return syscall(SYS_futex, addr, FUTEX_WAKE, val, NULL, NULL, 0);
}
void ob_enqueue(obqueue_t* q, handle_t* th, void* v) {
// FAAcs(&q->put_index, 1) return the needed index.
void* volatile* c = ob_find_cell(&th->put_node, FAAcs(&q->put_index, 1), th);
// now c is the nedded cell
void* cv;
/* if XCHG(ATOMIC: XCHG—Exchange Register/Memory with Register)
return BOT, so our value has put into the cell, just return.*/
if ((cv = XCHG(c, v)) == BOT) return;
/* else the couterpart pop thread has wait this cell, so we just change the wati'value to 0 and wake it*/
*((int*)cv) = 0;
ob_futex_wake(cv, 1);
}
int ob_futex_wait(void* addr, int val) {
return syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL, NULL, 0);
}
void* ob_dequeue(obqueue_t* q, handle_t* th) {
int times;
void* cv;
int futex_addr = 1;
// index is the needed pop_index.
long index = FAAcs(&q->pop_index, 1);
// locate the needed cell.
void* volatile* c = ob_find_cell(&th->pop_node, index, th);
// because the queue is a blocking queue, so we just use more spin.
times = (1 << 20);
do {
cv = *c;
if (cv) goto over;
__asm__("pause");
} while (times-- > 0);
// XCHG, if return BOT so this cell is NULL, we just wait and observe the futex_addr'value to 0.
if ((cv = XCHG(c, &futex_addr)) == BOT) {
// call wait before compare futex_addr to prevent use-after-free of futex_addr at ob_enqueue(call wake);
do {
ob_futex_wait(&futex_addr, 1);
} while (futex_addr == 1);
// the couterpart put thread has change futex_addr's value to 0. and the data has into cell(c).
cv = *c;
}
over:
/* if the index is the node's last cell: (NBITS == 4095), it Try to reclaim the memory.
* so we just take the smallest ID node that is not reclaimed(init_node), and At the same time, by traversing
* the local data of other threads, we get a larger ID node(min_node).
* So it is safe to recycle the memory [init_node, min_node).
*/
if ((index & NBITS) == NBITS) {
long init_index = ACQUIRE(&q->init_id);
if ((th->pop_node->id - init_index) >= q->threshold && init_index >= 0 &&
CASa(&q->init_id, &init_index, -1)) {
node_t* init_node = q->init_node;
th = q->deq_handles[0];
node_t* min_node = th->pop_node;
int i;
handle_t* next = q->deq_handles[i = 1];
while (next != NULL) {
node_t* next_min = next->pop_node;
if (next_min->id < min_node->id) min_node = next_min;
if (min_node->id <= init_index) break;
next = q->deq_handles[++i];
}
next = q->enq_handles[i = 0];
while (next != NULL) {
node_t* next_min = next->put_node;
if (next_min->id < min_node->id) min_node = next_min;
if (min_node->id <= init_index) break;
next = q->enq_handles[++i];
}
long new_id = min_node->id;
if (new_id <= init_index)
RELEASE(&q->init_id, init_index);
else {
q->init_node = min_node;
RELEASE(&q->init_id, new_id);
do {
node_t* tmp = init_node->next;
free(init_node);
init_node = tmp;
} while (init_node != min_node);
}
}
}
return cv;
}