1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2010
4 *
5 * Inter-Capability message passing
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "Rts.h"
10 #include "Messages.h"
11 #include "Trace.h"
12 #include "Capability.h"
13 #include "Schedule.h"
14 #include "Threads.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
17
18 /* ----------------------------------------------------------------------------
19 Send a message to another Capability
20 ------------------------------------------------------------------------- */
21
22 #if defined(THREADED_RTS)
23
sendMessage(Capability * from_cap,Capability * to_cap,Message * msg)24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
25 {
26 ACQUIRE_LOCK(&to_cap->lock);
27
28 #if defined(DEBUG)
29 {
30 const StgInfoTable *i = msg->header.info;
31 if (i != &stg_MSG_THROWTO_info &&
32 i != &stg_MSG_BLACKHOLE_info &&
33 i != &stg_MSG_TRY_WAKEUP_info &&
34 i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
35 i != &stg_WHITEHOLE_info) {
36 barf("sendMessage: %p", i);
37 }
38 }
39 #endif
40
41 msg->link = to_cap->inbox;
42 RELAXED_STORE(&to_cap->inbox, msg);
43
44 recordClosureMutated(from_cap,(StgClosure*)msg);
45
46 if (to_cap->running_task == NULL) {
47 to_cap->running_task = myTask();
48 // precond for releaseCapability_()
49 releaseCapability_(to_cap,false);
50 } else {
51 interruptCapability(to_cap);
52 }
53
54 RELEASE_LOCK(&to_cap->lock);
55 }
56
57 #endif /* THREADED_RTS */
58
59 /* ----------------------------------------------------------------------------
60 Handle a message
61 ------------------------------------------------------------------------- */
62
63 #if defined(THREADED_RTS)
64
65 void
executeMessage(Capability * cap,Message * m)66 executeMessage (Capability *cap, Message *m)
67 {
68 const StgInfoTable *i;
69
70 loop:
71 i = ACQUIRE_LOAD(&m->header.info);
72 if (i == &stg_MSG_TRY_WAKEUP_info)
73 {
74 StgTSO *tso = ((MessageWakeup *)m)->tso;
75 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
76 (W_)tso->id);
77 tryWakeupThread(cap, tso);
78 }
79 else if (i == &stg_MSG_THROWTO_info)
80 {
81 MessageThrowTo *t = (MessageThrowTo *)m;
82 uint32_t r;
83 const StgInfoTable *i;
84
85 i = lockClosure((StgClosure*)m);
86 if (i != &stg_MSG_THROWTO_info) {
87 unlockClosure((StgClosure*)m, i);
88 goto loop;
89 }
90
91 debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
92 (W_)t->source->id, (W_)t->target->id);
93
94 ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
95 ASSERT(t->source->block_info.closure == (StgClosure *)m);
96
97 r = throwToMsg(cap, t);
98
99 switch (r) {
100 case THROWTO_SUCCESS: {
101 // this message is done
102 StgTSO *source = t->source;
103 doneWithMsgThrowTo(cap, t);
104 tryWakeupThread(cap, source);
105 break;
106 }
107 case THROWTO_BLOCKED:
108 // unlock the message
109 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
110 break;
111 }
112 }
113 else if (i == &stg_MSG_BLACKHOLE_info)
114 {
115 uint32_t r;
116 MessageBlackHole *b = (MessageBlackHole*)m;
117
118 r = messageBlackHole(cap, b);
119 if (r == 0) {
120 tryWakeupThread(cap, b->tso);
121 }
122 return;
123 }
124 else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
125 {
126 // message was revoked
127 return;
128 }
129 else if (i == &stg_WHITEHOLE_info)
130 {
131 #if defined(PROF_SPIN)
132 NONATOMIC_ADD(&whitehole_executeMessage_spin, 1);
133 #endif
134 goto loop;
135 }
136 else
137 {
138 barf("executeMessage: %p", i);
139 }
140 }
141
142 #endif
143
144 /* ----------------------------------------------------------------------------
145 Handle a MSG_BLACKHOLE message
146
147 This is called from two places: either we just entered a BLACKHOLE
148 (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
149 cap->inbox.
150
151 We need to establish whether the BLACKHOLE belongs to
152 this Capability, and
153 - if so, arrange to block the current thread on it
154 - otherwise, forward the message to the right place
155
156 Returns:
157 - 0 if the blocked thread can be woken up by the caller
158 - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
159 at some point in the future.
160
161 ------------------------------------------------------------------------- */
162
messageBlackHole(Capability * cap,MessageBlackHole * msg)163 uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
164 {
165 const StgInfoTable *info;
166 StgClosure *p;
167 StgBlockingQueue *bq;
168 StgClosure *bh = UNTAG_CLOSURE(msg->bh);
169 StgTSO *owner;
170
171 debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on "
172 "blackhole %p", (W_)msg->tso->id, msg->bh);
173
174 info = ACQUIRE_LOAD(&bh->header.info);
175
176 // If we got this message in our inbox, it might be that the
177 // BLACKHOLE has already been updated, and GC has shorted out the
178 // indirection, so the pointer no longer points to a BLACKHOLE at
179 // all.
180 if (info != &stg_BLACKHOLE_info &&
181 info != &stg_CAF_BLACKHOLE_info &&
182 info != &__stg_EAGER_BLACKHOLE_info &&
183 info != &stg_WHITEHOLE_info) {
184 // if it is a WHITEHOLE, then a thread is in the process of
185 // trying to BLACKHOLE it. But we know that it was once a
186 // BLACKHOLE, so there is at least a valid pointer in the
187 // payload, so we can carry on.
188 return 0;
189 }
190
191 // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
192 // or a value.
193 loop:
194 // If we are being called from stg_BLACKHOLE then TSAN won't know about the
195 // previous read barrier that makes the following access safe.
196 TSAN_ANNOTATE_BENIGN_RACE(&((StgInd*)bh)->indirectee, "messageBlackHole");
197 p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
198 info = RELAXED_LOAD(&p->header.info);
199
200 if (info == &stg_IND_info)
201 {
202 // This could happen, if e.g. we got a BLOCKING_QUEUE that has
203 // just been replaced with an IND by another thread in
204 // updateThunk(). In which case, if we read the indirectee
205 // again we should get the value.
206 // See Note [BLACKHOLE pointing to IND] in sm/Evac.c
207 goto loop;
208 }
209
210 else if (info == &stg_TSO_info)
211 {
212 owner = (StgTSO*)p;
213
214 #if defined(THREADED_RTS)
215 if (owner->cap != cap) {
216 sendMessage(cap, owner->cap, (Message*)msg);
217 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
218 owner->cap->no);
219 return 1;
220 }
221 #endif
222 // owner is the owner of the BLACKHOLE, and resides on this
223 // Capability. msg->tso is the first thread to block on this
224 // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
225
226 bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
227
228 // initialise the BLOCKING_QUEUE object
229 bq->bh = bh;
230 bq->queue = msg;
231 bq->owner = owner;
232
233 msg->link = (MessageBlackHole*)END_TSO_QUEUE;
234
235 // All BLOCKING_QUEUES are linked in a list on owner->bq, so
236 // that we can search through them in the event that there is
237 // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
238 // becomes orphaned (see updateThunk()).
239 bq->link = owner->bq;
240 SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
241 // We are about to make the newly-constructed message visible to other cores;
242 // a barrier is necessary to ensure that all writes are visible.
243 // See Note [Heap memory barriers] in SMP.h.
244 dirty_TSO(cap, owner); // we will modify owner->bq
245 RELEASE_STORE(&owner->bq, bq);
246
247 // If the owner of the blackhole is currently runnable, then
248 // bump it to the front of the run queue. This gives the
249 // blocked-on thread a little boost which should help unblock
250 // this thread, and may avoid a pile-up of other threads
251 // becoming blocked on the same BLACKHOLE (#3838).
252 //
253 // NB. we check to make sure that the owner is not the same as
254 // the current thread, since in that case it will not be on
255 // the run queue.
256 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
257 promoteInRunQueue(cap, owner);
258 }
259
260 // point to the BLOCKING_QUEUE from the BLACKHOLE
261 // RELEASE to make the BQ visible, see Note [Heap memory barriers].
262 RELEASE_STORE(&((StgInd*)bh)->indirectee, (StgClosure *)bq);
263 IF_NONMOVING_WRITE_BARRIER_ENABLED {
264 updateRemembSetPushClosure(cap, (StgClosure*)p);
265 }
266 recordClosureMutated(cap,bh); // bh was mutated
267
268 debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
269 (W_)msg->tso->id, (W_)owner->id);
270
271 return 1; // blocked
272 }
273 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
274 info == &stg_BLOCKING_QUEUE_DIRTY_info)
275 {
276 StgBlockingQueue *bq = (StgBlockingQueue *)p;
277
278 ASSERT(bq->bh == bh);
279
280 owner = bq->owner;
281
282 ASSERT(owner != END_TSO_QUEUE);
283
284 #if defined(THREADED_RTS)
285 if (owner->cap != cap) {
286 sendMessage(cap, owner->cap, (Message*)msg);
287 debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
288 owner->cap->no);
289 return 1;
290 }
291 #endif
292
293 IF_NONMOVING_WRITE_BARRIER_ENABLED {
294 // We are about to overwrite bq->queue; make sure its current value
295 // makes it into the update remembered set
296 updateRemembSetPushClosure(cap, (StgClosure*)bq->queue);
297 }
298 RELAXED_STORE(&msg->link, bq->queue);
299 bq->queue = msg;
300 // No barrier is necessary here: we are only exposing the
301 // closure to the GC. See Note [Heap memory barriers] in SMP.h.
302 recordClosureMutated(cap,(StgClosure*)msg);
303
304 if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
305 RELAXED_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
306 // No barrier is necessary here: we are only exposing the
307 // closure to the GC. See Note [Heap memory barriers] in SMP.h.
308 recordClosureMutated(cap,(StgClosure*)bq);
309 }
310
311 debugTraceCap(DEBUG_sched, cap,
312 "thread %d blocked on existing BLOCKING_QUEUE "
313 "owned by thread %d",
314 (W_)msg->tso->id, (W_)owner->id);
315
316 // See above, #3838
317 if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
318 promoteInRunQueue(cap, owner);
319 }
320
321 return 1; // blocked
322 }
323
324 return 0; // not blocked
325 }
326
327 // A shorter version of messageBlackHole(), that just returns the
328 // owner (or NULL if the owner cannot be found, because the blackhole
329 // has been updated in the meantime).
330
blackHoleOwner(StgClosure * bh)331 StgTSO * blackHoleOwner (StgClosure *bh)
332 {
333 const StgInfoTable *info;
334 StgClosure *p;
335
336 info = RELAXED_LOAD(&bh->header.info);
337
338 if (info != &stg_BLACKHOLE_info &&
339 info != &stg_CAF_BLACKHOLE_info &&
340 info != &__stg_EAGER_BLACKHOLE_info &&
341 info != &stg_WHITEHOLE_info) {
342 return NULL;
343 }
344
345 // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
346 // or a value.
347 loop:
348 p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
349 info = RELAXED_LOAD(&p->header.info);
350
351 if (info == &stg_IND_info) goto loop;
352
353 else if (info == &stg_TSO_info)
354 {
355 return (StgTSO*)p;
356 }
357 else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
358 info == &stg_BLOCKING_QUEUE_DIRTY_info)
359 {
360 StgBlockingQueue *bq = (StgBlockingQueue *)p;
361 return RELAXED_LOAD(&bq->owner);
362 }
363
364 return NULL; // not blocked
365 }
366