1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2010
4  *
5  * Inter-Capability message passing
6  *
7  * --------------------------------------------------------------------------*/
8 
9 #include "Rts.h"
10 #include "Messages.h"
11 #include "Trace.h"
12 #include "Capability.h"
13 #include "Schedule.h"
14 #include "Threads.h"
15 #include "RaiseAsync.h"
16 #include "sm/Storage.h"
17 
18 /* ----------------------------------------------------------------------------
19    Send a message to another Capability
20    ------------------------------------------------------------------------- */
21 
22 #if defined(THREADED_RTS)
23 
sendMessage(Capability * from_cap,Capability * to_cap,Message * msg)24 void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
25 {
26     ACQUIRE_LOCK(&to_cap->lock);
27 
28 #if defined(DEBUG)
29     {
30         const StgInfoTable *i = msg->header.info;
31         if (i != &stg_MSG_THROWTO_info &&
32             i != &stg_MSG_BLACKHOLE_info &&
33             i != &stg_MSG_TRY_WAKEUP_info &&
34             i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
35             i != &stg_WHITEHOLE_info) {
36             barf("sendMessage: %p", i);
37         }
38     }
39 #endif
40 
41     msg->link = to_cap->inbox;
42     RELAXED_STORE(&to_cap->inbox, msg);
43 
44     recordClosureMutated(from_cap,(StgClosure*)msg);
45 
46     if (to_cap->running_task == NULL) {
47         to_cap->running_task = myTask();
48             // precond for releaseCapability_()
49         releaseCapability_(to_cap,false);
50     } else {
51         interruptCapability(to_cap);
52     }
53 
54     RELEASE_LOCK(&to_cap->lock);
55 }
56 
57 #endif /* THREADED_RTS */
58 
59 /* ----------------------------------------------------------------------------
60    Handle a message
61    ------------------------------------------------------------------------- */
62 
63 #if defined(THREADED_RTS)
64 
65 void
executeMessage(Capability * cap,Message * m)66 executeMessage (Capability *cap, Message *m)
67 {
68     const StgInfoTable *i;
69 
70 loop:
71     i = ACQUIRE_LOAD(&m->header.info);
72     if (i == &stg_MSG_TRY_WAKEUP_info)
73     {
74         StgTSO *tso = ((MessageWakeup *)m)->tso;
75         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
76                       (W_)tso->id);
77         tryWakeupThread(cap, tso);
78     }
79     else if (i == &stg_MSG_THROWTO_info)
80     {
81         MessageThrowTo *t = (MessageThrowTo *)m;
82         uint32_t r;
83         const StgInfoTable *i;
84 
85         i = lockClosure((StgClosure*)m);
86         if (i != &stg_MSG_THROWTO_info) {
87             unlockClosure((StgClosure*)m, i);
88             goto loop;
89         }
90 
91         debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
92                       (W_)t->source->id, (W_)t->target->id);
93 
94         ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
95         ASSERT(t->source->block_info.closure == (StgClosure *)m);
96 
97         r = throwToMsg(cap, t);
98 
99         switch (r) {
100         case THROWTO_SUCCESS: {
101             // this message is done
102             StgTSO *source = t->source;
103             doneWithMsgThrowTo(cap, t);
104             tryWakeupThread(cap, source);
105             break;
106         }
107         case THROWTO_BLOCKED:
108             // unlock the message
109             unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
110             break;
111         }
112     }
113     else if (i == &stg_MSG_BLACKHOLE_info)
114     {
115         uint32_t r;
116         MessageBlackHole *b = (MessageBlackHole*)m;
117 
118         r = messageBlackHole(cap, b);
119         if (r == 0) {
120             tryWakeupThread(cap, b->tso);
121         }
122         return;
123     }
124     else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
125     {
126         // message was revoked
127         return;
128     }
129     else if (i == &stg_WHITEHOLE_info)
130     {
131 #if defined(PROF_SPIN)
132         NONATOMIC_ADD(&whitehole_executeMessage_spin, 1);
133 #endif
134         goto loop;
135     }
136     else
137     {
138         barf("executeMessage: %p", i);
139     }
140 }
141 
142 #endif
143 
144 /* ----------------------------------------------------------------------------
145    Handle a MSG_BLACKHOLE message
146 
147    This is called from two places: either we just entered a BLACKHOLE
148    (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
149    cap->inbox.
150 
151    We need to establish whether the BLACKHOLE belongs to
152    this Capability, and
153      - if so, arrange to block the current thread on it
154      - otherwise, forward the message to the right place
155 
156    Returns:
157      - 0 if the blocked thread can be woken up by the caller
158      - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
159        at some point in the future.
160 
161    ------------------------------------------------------------------------- */
162 
messageBlackHole(Capability * cap,MessageBlackHole * msg)163 uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
164 {
165     const StgInfoTable *info;
166     StgClosure *p;
167     StgBlockingQueue *bq;
168     StgClosure *bh = UNTAG_CLOSURE(msg->bh);
169     StgTSO *owner;
170 
171     debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on "
172                   "blackhole %p", (W_)msg->tso->id, msg->bh);
173 
174     info = ACQUIRE_LOAD(&bh->header.info);
175 
176     // If we got this message in our inbox, it might be that the
177     // BLACKHOLE has already been updated, and GC has shorted out the
178     // indirection, so the pointer no longer points to a BLACKHOLE at
179     // all.
180     if (info != &stg_BLACKHOLE_info &&
181         info != &stg_CAF_BLACKHOLE_info &&
182         info != &__stg_EAGER_BLACKHOLE_info &&
183         info != &stg_WHITEHOLE_info) {
184         // if it is a WHITEHOLE, then a thread is in the process of
185         // trying to BLACKHOLE it.  But we know that it was once a
186         // BLACKHOLE, so there is at least a valid pointer in the
187         // payload, so we can carry on.
188         return 0;
189     }
190 
191     // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
192     // or a value.
193 loop:
194     // If we are being called from stg_BLACKHOLE then TSAN won't know about the
195     // previous read barrier that makes the following access safe.
196     TSAN_ANNOTATE_BENIGN_RACE(&((StgInd*)bh)->indirectee, "messageBlackHole");
197     p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
198     info = RELAXED_LOAD(&p->header.info);
199 
200     if (info == &stg_IND_info)
201     {
202         // This could happen, if e.g. we got a BLOCKING_QUEUE that has
203         // just been replaced with an IND by another thread in
204         // updateThunk().  In which case, if we read the indirectee
205         // again we should get the value.
206         // See Note [BLACKHOLE pointing to IND] in sm/Evac.c
207         goto loop;
208     }
209 
210     else if (info == &stg_TSO_info)
211     {
212         owner = (StgTSO*)p;
213 
214 #if defined(THREADED_RTS)
215         if (owner->cap != cap) {
216             sendMessage(cap, owner->cap, (Message*)msg);
217             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
218                           owner->cap->no);
219             return 1;
220         }
221 #endif
222         // owner is the owner of the BLACKHOLE, and resides on this
223         // Capability.  msg->tso is the first thread to block on this
224         // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
225 
226         bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
227 
228         // initialise the BLOCKING_QUEUE object
229         bq->bh = bh;
230         bq->queue = msg;
231         bq->owner = owner;
232 
233         msg->link = (MessageBlackHole*)END_TSO_QUEUE;
234 
235         // All BLOCKING_QUEUES are linked in a list on owner->bq, so
236         // that we can search through them in the event that there is
237         // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
238         // becomes orphaned (see updateThunk()).
239         bq->link = owner->bq;
240         SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
241         // We are about to make the newly-constructed message visible to other cores;
242         // a barrier is necessary to ensure that all writes are visible.
243         // See Note [Heap memory barriers] in SMP.h.
244         dirty_TSO(cap, owner); // we will modify owner->bq
245         RELEASE_STORE(&owner->bq, bq);
246 
247         // If the owner of the blackhole is currently runnable, then
248         // bump it to the front of the run queue.  This gives the
249         // blocked-on thread a little boost which should help unblock
250         // this thread, and may avoid a pile-up of other threads
251         // becoming blocked on the same BLACKHOLE (#3838).
252         //
253         // NB. we check to make sure that the owner is not the same as
254         // the current thread, since in that case it will not be on
255         // the run queue.
256         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
257             promoteInRunQueue(cap, owner);
258         }
259 
260         // point to the BLOCKING_QUEUE from the BLACKHOLE
261         // RELEASE to make the BQ visible, see Note [Heap memory barriers].
262         RELEASE_STORE(&((StgInd*)bh)->indirectee, (StgClosure *)bq);
263         IF_NONMOVING_WRITE_BARRIER_ENABLED {
264             updateRemembSetPushClosure(cap, (StgClosure*)p);
265         }
266         recordClosureMutated(cap,bh); // bh was mutated
267 
268         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
269                       (W_)msg->tso->id, (W_)owner->id);
270 
271         return 1; // blocked
272     }
273     else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
274              info == &stg_BLOCKING_QUEUE_DIRTY_info)
275     {
276         StgBlockingQueue *bq = (StgBlockingQueue *)p;
277 
278         ASSERT(bq->bh == bh);
279 
280         owner = bq->owner;
281 
282         ASSERT(owner != END_TSO_QUEUE);
283 
284 #if defined(THREADED_RTS)
285         if (owner->cap != cap) {
286             sendMessage(cap, owner->cap, (Message*)msg);
287             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
288                           owner->cap->no);
289             return 1;
290         }
291 #endif
292 
293         IF_NONMOVING_WRITE_BARRIER_ENABLED {
294             // We are about to overwrite bq->queue; make sure its current value
295             // makes it into the update remembered set
296             updateRemembSetPushClosure(cap, (StgClosure*)bq->queue);
297         }
298         RELAXED_STORE(&msg->link, bq->queue);
299         bq->queue = msg;
300         // No barrier is necessary here: we are only exposing the
301         // closure to the GC. See Note [Heap memory barriers] in SMP.h.
302         recordClosureMutated(cap,(StgClosure*)msg);
303 
304         if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
305             RELAXED_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
306             // No barrier is necessary here: we are only exposing the
307             // closure to the GC. See Note [Heap memory barriers] in SMP.h.
308             recordClosureMutated(cap,(StgClosure*)bq);
309         }
310 
311         debugTraceCap(DEBUG_sched, cap,
312                       "thread %d blocked on existing BLOCKING_QUEUE "
313                       "owned by thread %d",
314                       (W_)msg->tso->id, (W_)owner->id);
315 
316         // See above, #3838
317         if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
318             promoteInRunQueue(cap, owner);
319         }
320 
321         return 1; // blocked
322     }
323 
324     return 0; // not blocked
325 }
326 
327 // A shorter version of messageBlackHole(), that just returns the
328 // owner (or NULL if the owner cannot be found, because the blackhole
329 // has been updated in the meantime).
330 
blackHoleOwner(StgClosure * bh)331 StgTSO * blackHoleOwner (StgClosure *bh)
332 {
333     const StgInfoTable *info;
334     StgClosure *p;
335 
336     info = RELAXED_LOAD(&bh->header.info);
337 
338     if (info != &stg_BLACKHOLE_info &&
339         info != &stg_CAF_BLACKHOLE_info &&
340         info != &__stg_EAGER_BLACKHOLE_info &&
341         info != &stg_WHITEHOLE_info) {
342         return NULL;
343     }
344 
345     // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
346     // or a value.
347 loop:
348     p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
349     info = RELAXED_LOAD(&p->header.info);
350 
351     if (info == &stg_IND_info) goto loop;
352 
353     else if (info == &stg_TSO_info)
354     {
355         return (StgTSO*)p;
356     }
357     else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
358              info == &stg_BLOCKING_QUEUE_DIRTY_info)
359     {
360         StgBlockingQueue *bq = (StgBlockingQueue *)p;
361         return RELAXED_LOAD(&bq->owner);
362     }
363 
364     return NULL; // not blocked
365 }
366