1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2009
4  *
5  * Work-stealing Deque data structure
6  *
7  * The implementation uses Double-Ended Queues with lock-free access
8  * (thereby often called "deque") as described in
9  *
10  * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11  * SPAA'05, July 2005, Las Vegas, USA.
12  * ACM 1-58113-986-1/05/0007
13  *
14  * This implementation closely follows the C11 implementation presented in
15  *
16  * N.M. Lê, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient
17  * Work-Stealing for Weak Memory Models". PPoPP'13, February 2013,
18  * ACM 978-1-4503-1922/13/02.
19  *
20  * Author: Jost Berthold MSRC 07-09/2008
21  * Rewritten by: Ben Gamari (Well-Typed)
22  *
23  *
24  * The DeQue is held as a circular array with known length. Positions
25  * of top (read-end) and bottom (write-end) always increase, and the
26  * array is accessed with indices modulo array-size. While this bears
27  * the risk of overflow, we assume that (with 64 bit indices), a
28  * program must run very long to reach that point.
29  *
30  * The write end of the queue (position bottom) can only be used with
31  * mutual exclusion, i.e. by exactly one caller at a time.  At this
32  * end, new items can be enqueued using pushBottom()/newSpark(), and
33  * removed using popBottom()/reclaimSpark() (the latter implying a cas
34  * synchronisation with potential concurrent readers for the case of
35  * just one element).
36  *
37  * Multiple readers can steal from the read end (position top), and
38  * are synchronised without a lock, based on a cas of the top
39  * position. One reader wins, the others return NULL for a failure.
40  *
41  * Both popWSDeque and stealWSDeque also return NULL when the queue is empty.
42  *
43  * Testing: see testsuite/tests/rts/testwsdeque.c.  If
44  * there's anything wrong with the deque implementation, this test
45  * will probably catch it.
46  *
47  * ---------------------------------------------------------------------------*/
48 
49 #include "PosixSource.h"
50 #include "Rts.h"
51 
52 #include "RtsUtils.h"
53 #include "WSDeque.h"
54 
55 // Returns true on success.
56 static inline bool
cas_top(WSDeque * q,StgInt old,StgInt new)57 cas_top(WSDeque *q, StgInt old, StgInt new)
58 {
59     return (StgWord) old == cas((StgPtr) &q->top, (StgWord) old, (StgWord) new);
60 }
61 
62 
63 /* -----------------------------------------------------------------------------
64  * newWSDeque
65  * -------------------------------------------------------------------------- */
66 
67 /* internal helpers ... */
68 
69 static StgWord
roundUp2(StgWord val)70 roundUp2(StgWord val)
71 {
72     StgWord rounded = 1;
73 
74     /* StgWord is unsigned anyway, only catch 0 */
75     if (val == 0) {
76         barf("DeQue,roundUp2: invalid size 0 requested");
77     }
78     /* at least 1 bit set, shift up to its place */
79     do {
80         rounded = rounded << 1;
81     } while (0 != (val = val>>1));
82     return rounded;
83 }
84 
85 WSDeque *
newWSDeque(uint32_t size)86 newWSDeque (uint32_t size)
87 {
88     StgWord realsize;
89     WSDeque *q;
90 
91     realsize = roundUp2(size); /* to compute modulo as a bitwise & */
92 
93     q = (WSDeque*) stgMallocBytes(sizeof(WSDeque),   /* admin fields */
94                                   "newWSDeque");
95     q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
96                                  "newWSDeque:data space");
97     q->size = realsize;  /* power of 2 */
98     q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
99 
100     q->top=0;
101     RELEASE_STORE(&q->bottom, 0); /* read by writer, updated each time top is read */
102 
103     ASSERT_WSDEQUE_INVARIANTS(q);
104     return q;
105 }
106 
107 /* -----------------------------------------------------------------------------
108  * freeWSDeque
109  * -------------------------------------------------------------------------- */
110 
111 void
freeWSDeque(WSDeque * q)112 freeWSDeque (WSDeque *q)
113 {
114     stgFree(q->elements);
115     stgFree(q);
116 }
117 
118 /* -----------------------------------------------------------------------------
119  *
120  * popWSDeque: remove an element from the write end of the queue.
121  * Returns the removed spark, and NULL if a race is lost or the pool
122  * empty.
123  *
124  * If only one spark is left in the pool, we synchronise with
125  * concurrently stealing threads by using cas to modify the top field.
126  * This routine should NEVER be called by a task which does not own
127  * this deque.
128  *
129  * -------------------------------------------------------------------------- */
130 
131 void *
popWSDeque(WSDeque * q)132 popWSDeque (WSDeque *q)
133 {
134     StgInt b = RELAXED_LOAD(&q->bottom) - 1;
135     RELAXED_STORE(&q->bottom, b);
136     SEQ_CST_FENCE();
137     StgInt t = RELAXED_LOAD(&q->top);
138 
139     void *result;
140     if (t <= b) {
141         /* Non-empty */
142         result = RELAXED_LOAD(&q->elements[b & q->moduloSize]);
143         if (t == b) {
144             /* Single last element in queue */
145             if (!cas_top(q, t, t+1)) {
146                 /* Failed race */
147                 result = NULL;
148             }
149 
150             RELAXED_STORE(&q->bottom, b+1);
151         }
152     } else {
153         /* Empty queue */
154         result = NULL;
155         RELAXED_STORE(&q->bottom, b+1);
156     }
157 
158     return result;
159 }
160 
161 /* -----------------------------------------------------------------------------
162  * stealWSDeque
163  * -------------------------------------------------------------------------- */
164 
165 void *
stealWSDeque_(WSDeque * q)166 stealWSDeque_ (WSDeque *q)
167 {
168     StgInt t = ACQUIRE_LOAD(&q->top);
169     SEQ_CST_FENCE();
170     StgInt b = ACQUIRE_LOAD(&q->bottom);
171 
172     void *result = NULL;
173     if (t < b) {
174         /* Non-empty queue */
175         result = RELAXED_LOAD(&q->elements[t % q->size]);
176         if (!cas_top(q, t, t+1)) {
177             return NULL;
178         }
179     }
180     return result;
181 }
182 
183 void *
stealWSDeque(WSDeque * q)184 stealWSDeque (WSDeque *q)
185 {
186     void *stolen;
187 
188     do {
189         stolen = stealWSDeque_(q);
190     } while (stolen == NULL && !looksEmptyWSDeque(q));
191 
192     return stolen;
193 }
194 
195 /* -----------------------------------------------------------------------------
196  * pushWSQueue
197  * -------------------------------------------------------------------------- */
198 
199 /* Enqueue an element. Must only be called by owner. Returns true if element was
200  * pushed, false if queue is full
201  */
202 bool
pushWSDeque(WSDeque * q,void * elem)203 pushWSDeque (WSDeque* q, void * elem)
204 {
205     StgInt b = ACQUIRE_LOAD(&q->bottom);
206     StgInt t = ACQUIRE_LOAD(&q->top);
207 
208     if ( b - t > q->size - 1 ) {
209         /* Full queue */
210         /* We don't implement resizing, just say we didn't push anything. */
211         return false;
212     }
213 
214     RELAXED_STORE(&q->elements[b & q->moduloSize], elem);
215 #if defined(TSAN_ENABLED)
216     // ThreadSanizer doesn't know about release fences, so we need to
217     // strengthen this to a release store lest we get spurious data race
218     // reports.
219     RELEASE_STORE(&q->bottom, b+1);
220 #else
221     RELEASE_FENCE();
222     RELAXED_STORE(&q->bottom, b+1);
223 #endif
224     return true;
225 }
226