1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2009
4 *
5 * Work-stealing Deque data structure
6 *
7 * The implementation uses Double-Ended Queues with lock-free access
8 * (thereby often called "deque") as described in
9 *
10 * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11 * SPAA'05, July 2005, Las Vegas, USA.
12 * ACM 1-58113-986-1/05/0007
13 *
14 * This implementation closely follows the C11 implementation presented in
15 *
16 * N.M. Lê, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient
17 * Work-Stealing for Weak Memory Models". PPoPP'13, February 2013,
18 * ACM 978-1-4503-1922/13/02.
19 *
20 * Author: Jost Berthold MSRC 07-09/2008
21 * Rewritten by: Ben Gamari (Well-Typed)
22 *
23 *
24 * The DeQue is held as a circular array with known length. Positions
25 * of top (read-end) and bottom (write-end) always increase, and the
26 * array is accessed with indices modulo array-size. While this bears
27 * the risk of overflow, we assume that (with 64 bit indices), a
28 * program must run very long to reach that point.
29 *
30 * The write end of the queue (position bottom) can only be used with
31 * mutual exclusion, i.e. by exactly one caller at a time. At this
32 * end, new items can be enqueued using pushBottom()/newSpark(), and
33 * removed using popBottom()/reclaimSpark() (the latter implying a cas
34 * synchronisation with potential concurrent readers for the case of
35 * just one element).
36 *
37 * Multiple readers can steal from the read end (position top), and
38 * are synchronised without a lock, based on a cas of the top
39 * position. One reader wins, the others return NULL for a failure.
40 *
41 * Both popWSDeque and stealWSDeque also return NULL when the queue is empty.
42 *
43 * Testing: see testsuite/tests/rts/testwsdeque.c. If
44 * there's anything wrong with the deque implementation, this test
45 * will probably catch it.
46 *
47 * ---------------------------------------------------------------------------*/
48
49 #include "PosixSource.h"
50 #include "Rts.h"
51
52 #include "RtsUtils.h"
53 #include "WSDeque.h"
54
55 // Returns true on success.
56 static inline bool
cas_top(WSDeque * q,StgInt old,StgInt new)57 cas_top(WSDeque *q, StgInt old, StgInt new)
58 {
59 return (StgWord) old == cas((StgPtr) &q->top, (StgWord) old, (StgWord) new);
60 }
61
62
63 /* -----------------------------------------------------------------------------
64 * newWSDeque
65 * -------------------------------------------------------------------------- */
66
67 /* internal helpers ... */
68
69 static StgWord
roundUp2(StgWord val)70 roundUp2(StgWord val)
71 {
72 StgWord rounded = 1;
73
74 /* StgWord is unsigned anyway, only catch 0 */
75 if (val == 0) {
76 barf("DeQue,roundUp2: invalid size 0 requested");
77 }
78 /* at least 1 bit set, shift up to its place */
79 do {
80 rounded = rounded << 1;
81 } while (0 != (val = val>>1));
82 return rounded;
83 }
84
85 WSDeque *
newWSDeque(uint32_t size)86 newWSDeque (uint32_t size)
87 {
88 StgWord realsize;
89 WSDeque *q;
90
91 realsize = roundUp2(size); /* to compute modulo as a bitwise & */
92
93 q = (WSDeque*) stgMallocBytes(sizeof(WSDeque), /* admin fields */
94 "newWSDeque");
95 q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
96 "newWSDeque:data space");
97 q->size = realsize; /* power of 2 */
98 q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
99
100 q->top=0;
101 RELEASE_STORE(&q->bottom, 0); /* read by writer, updated each time top is read */
102
103 ASSERT_WSDEQUE_INVARIANTS(q);
104 return q;
105 }
106
107 /* -----------------------------------------------------------------------------
108 * freeWSDeque
109 * -------------------------------------------------------------------------- */
110
111 void
freeWSDeque(WSDeque * q)112 freeWSDeque (WSDeque *q)
113 {
114 stgFree(q->elements);
115 stgFree(q);
116 }
117
118 /* -----------------------------------------------------------------------------
119 *
120 * popWSDeque: remove an element from the write end of the queue.
121 * Returns the removed spark, and NULL if a race is lost or the pool
122 * empty.
123 *
124 * If only one spark is left in the pool, we synchronise with
125 * concurrently stealing threads by using cas to modify the top field.
126 * This routine should NEVER be called by a task which does not own
127 * this deque.
128 *
129 * -------------------------------------------------------------------------- */
130
131 void *
popWSDeque(WSDeque * q)132 popWSDeque (WSDeque *q)
133 {
134 StgInt b = RELAXED_LOAD(&q->bottom) - 1;
135 RELAXED_STORE(&q->bottom, b);
136 SEQ_CST_FENCE();
137 StgInt t = RELAXED_LOAD(&q->top);
138
139 void *result;
140 if (t <= b) {
141 /* Non-empty */
142 result = RELAXED_LOAD(&q->elements[b & q->moduloSize]);
143 if (t == b) {
144 /* Single last element in queue */
145 if (!cas_top(q, t, t+1)) {
146 /* Failed race */
147 result = NULL;
148 }
149
150 RELAXED_STORE(&q->bottom, b+1);
151 }
152 } else {
153 /* Empty queue */
154 result = NULL;
155 RELAXED_STORE(&q->bottom, b+1);
156 }
157
158 return result;
159 }
160
161 /* -----------------------------------------------------------------------------
162 * stealWSDeque
163 * -------------------------------------------------------------------------- */
164
165 void *
stealWSDeque_(WSDeque * q)166 stealWSDeque_ (WSDeque *q)
167 {
168 StgInt t = ACQUIRE_LOAD(&q->top);
169 SEQ_CST_FENCE();
170 StgInt b = ACQUIRE_LOAD(&q->bottom);
171
172 void *result = NULL;
173 if (t < b) {
174 /* Non-empty queue */
175 result = RELAXED_LOAD(&q->elements[t % q->size]);
176 if (!cas_top(q, t, t+1)) {
177 return NULL;
178 }
179 }
180 return result;
181 }
182
183 void *
stealWSDeque(WSDeque * q)184 stealWSDeque (WSDeque *q)
185 {
186 void *stolen;
187
188 do {
189 stolen = stealWSDeque_(q);
190 } while (stolen == NULL && !looksEmptyWSDeque(q));
191
192 return stolen;
193 }
194
195 /* -----------------------------------------------------------------------------
196 * pushWSQueue
197 * -------------------------------------------------------------------------- */
198
199 /* Enqueue an element. Must only be called by owner. Returns true if element was
200 * pushed, false if queue is full
201 */
202 bool
pushWSDeque(WSDeque * q,void * elem)203 pushWSDeque (WSDeque* q, void * elem)
204 {
205 StgInt b = ACQUIRE_LOAD(&q->bottom);
206 StgInt t = ACQUIRE_LOAD(&q->top);
207
208 if ( b - t > q->size - 1 ) {
209 /* Full queue */
210 /* We don't implement resizing, just say we didn't push anything. */
211 return false;
212 }
213
214 RELAXED_STORE(&q->elements[b & q->moduloSize], elem);
215 #if defined(TSAN_ENABLED)
216 // ThreadSanizer doesn't know about release fences, so we need to
217 // strengthen this to a release store lest we get spurious data race
218 // reports.
219 RELEASE_STORE(&q->bottom, b+1);
220 #else
221 RELEASE_FENCE();
222 RELAXED_STORE(&q->bottom, b+1);
223 #endif
224 return true;
225 }
226