1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * Asynchronous exceptions
6  *
7  * --------------------------------------------------------------------------*/
8 
9 #include "PosixSource.h"
10 #include "Rts.h"
11 
12 #include "sm/Storage.h"
13 #include "Threads.h"
14 #include "Trace.h"
15 #include "RaiseAsync.h"
16 #include "Schedule.h"
17 #include "Updates.h"
18 #include "STM.h"
19 #include "sm/Sanity.h"
20 #include "Profiling.h"
21 #include "Messages.h"
22 #if defined(mingw32_HOST_OS)
23 #include "win32/IOManager.h"
24 #endif
25 
26 static void blockedThrowTo (Capability *cap,
27                             StgTSO *target, MessageThrowTo *msg);
28 
29 static void removeFromQueues(Capability *cap, StgTSO *tso);
30 
31 static void removeFromMVarBlockedQueue (StgTSO *tso);
32 
33 static void throwToSendMsg (Capability *cap USED_IF_THREADS,
34                             Capability *target_cap USED_IF_THREADS,
35                             MessageThrowTo *msg USED_IF_THREADS);
36 
37 /* -----------------------------------------------------------------------------
38    throwToSingleThreaded
39 
40    This version of throwTo is safe to use if and only if one of the
41    following holds:
42 
43      - !THREADED_RTS
44 
45      - all the other threads in the system are stopped (eg. during GC).
46 
47      - we surely own the target TSO (eg. we just took it from the
48        run queue of the current capability, or we are running it).
49 
50    It doesn't cater for blocking the source thread until the exception
51    has been raised.
52    -------------------------------------------------------------------------- */
53 
54 static void
throwToSingleThreaded__(Capability * cap,StgTSO * tso,StgClosure * exception,bool stop_at_atomically,StgUpdateFrame * stop_here)55 throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
56                          bool stop_at_atomically, StgUpdateFrame *stop_here)
57 {
58     // Thread already dead?
59     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
60         return;
61     }
62 
63     // Remove it from any blocking queues
64     removeFromQueues(cap,tso);
65 
66     raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
67 }
68 
69 void
throwToSingleThreaded(Capability * cap,StgTSO * tso,StgClosure * exception)70 throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
71 {
72     throwToSingleThreaded__(cap, tso, exception, false, NULL);
73 }
74 
75 void
throwToSingleThreaded_(Capability * cap,StgTSO * tso,StgClosure * exception,bool stop_at_atomically)76 throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
77                         bool stop_at_atomically)
78 {
79     throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
80 }
81 
82 void // cannot return a different TSO
suspendComputation(Capability * cap,StgTSO * tso,StgUpdateFrame * stop_here)83 suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
84 {
85     throwToSingleThreaded__ (cap, tso, NULL, false, stop_here);
86 }
87 
88 /* -----------------------------------------------------------------------------
89    throwToSelf
90 
91    Useful for throwing an async exception in a thread from the
92    runtime.  It handles unlocking the throwto message returned by
93    throwTo().
94 
95    Note [Throw to self when masked]
96 
97    When a StackOverflow occurs when the thread is masked, we want to
98    defer the exception to when the thread becomes unmasked/hits an
99    interruptible point.  We already have a mechanism for doing this,
100    the blocked_exceptions list, but the use here is a bit unusual,
101    because an exception is normally only added to this list upon
102    an asynchronous 'throwTo' call (with all of the relevant
103    multithreaded nonsense). Morally, a stack overflow should be an
104    asynchronous exception sent by a thread to itself, and it should
105    have the same semantics.  But there are a few key differences:
106 
107    - If you actually tried to send an asynchronous exception to
108      yourself using throwTo, the exception would actually immediately
109      be delivered.  This is because throwTo itself is considered an
110      interruptible point, so the exception is always deliverable. Thus,
111      ordinarily, we never end up with a message to oneself in the
112      blocked_exceptions queue.
113 
114    - In the case of a StackOverflow, we don't actually care about the
115      wakeup semantics; when an exception is delivered, the thread that
116      originally threw the exception should be woken up, since throwTo
117      blocks until the exception is successfully thrown.  Fortunately,
118      it is harmless to wakeup a thread that doesn't actually need waking
119      up, e.g. ourselves.
120 
121    - No synchronization is necessary, because we own the TSO and the
122      capability.  You can observe this by tracing through the execution
123      of throwTo.  We skip synchronizing the message and inter-capability
124      communication.
125 
126    We think this doesn't break any invariants, but do be careful!
127    -------------------------------------------------------------------------- */
128 
129 void
throwToSelf(Capability * cap,StgTSO * tso,StgClosure * exception)130 throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception)
131 {
132     MessageThrowTo *m;
133 
134     m = throwTo(cap, tso, tso, exception);
135 
136     if (m != NULL) {
137         // throwTo leaves it locked
138         unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
139     }
140 }
141 
142 /* -----------------------------------------------------------------------------
143    throwTo
144 
145    This function may be used to throw an exception from one thread to
146    another, during the course of normal execution.  This is a tricky
147    task: the target thread might be running on another CPU, or it
148    may be blocked and could be woken up at any point by another CPU.
149    We have some delicate synchronisation to do.
150 
151    The underlying scheme when multiple Capabilities are in use is
152    message passing: when the target of a throwTo is on another
153    Capability, we send a message (a MessageThrowTo closure) to that
154    Capability.
155 
156    If the throwTo needs to block because the target TSO is masking
157    exceptions (the TSO_BLOCKEX flag), then the message is placed on
158    the blocked_exceptions queue attached to the target TSO.  When the
159    target TSO enters the unmasked state again, it must check the
160    queue.  The blocked_exceptions queue is not locked; only the
161    Capability owning the TSO may modify it.
162 
163    To make things simpler for throwTo, we always create the message
164    first before deciding what to do.  The message may get sent, or it
165    may get attached to a TSO's blocked_exceptions queue, or the
166    exception may get thrown immediately and the message dropped,
167    depending on the current state of the target.
168 
169    Currently we send a message if the target belongs to another
170    Capability, and it is
171 
172      - NotBlocked, BlockedOnMsgThrowTo,
173        BlockedOnCCall_Interruptible
174 
175      - or it is masking exceptions (TSO_BLOCKEX)
176 
177    Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
178    BlockedOnBlackHole then we acquire ownership of the TSO by locking
179    its parent container (e.g. the MVar) and then raise the exception.
180    We might change these cases to be more message-passing-like in the
181    future.
182 
183    Returns:
184 
185    NULL               exception was raised, ok to continue
186 
187    MessageThrowTo *   exception was not raised; the source TSO
188                       should now put itself in the state
189                       BlockedOnMsgThrowTo, and when it is ready
190                       it should unlock the mssage using
191                       unlockClosure(msg, &stg_MSG_THROWTO_info);
192                       If it decides not to raise the exception after
193                       all, it can revoke it safely with
194                       unlockClosure(msg, &stg_MSG_NULL_info);
195 
196    -------------------------------------------------------------------------- */
197 
198 MessageThrowTo *
throwTo(Capability * cap,StgTSO * source,StgTSO * target,StgClosure * exception)199 throwTo (Capability *cap,       // the Capability we hold
200          StgTSO *source,        // the TSO sending the exception (or NULL)
201          StgTSO *target,        // the TSO receiving the exception
202          StgClosure *exception) // the exception closure
203 {
204     MessageThrowTo *msg;
205 
206     msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
207     // the message starts locked; see below
208     SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
209     msg->source      = source;
210     msg->target      = target;
211     msg->exception   = exception;
212 
213     switch (throwToMsg(cap, msg))
214     {
215     case THROWTO_SUCCESS:
216         // unlock the message now, otherwise we leave a WHITEHOLE in
217         // the heap (#6103)
218         SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
219         return NULL;
220 
221     case THROWTO_BLOCKED:
222     default:
223         // the caller will unlock the message when it is ready.  We
224         // cannot unlock it yet, because the calling thread will need
225         // to tidy up its state first.
226         return msg;
227     }
228 }
229 
230 
231 uint32_t
throwToMsg(Capability * cap,MessageThrowTo * msg)232 throwToMsg (Capability *cap, MessageThrowTo *msg)
233 {
234     StgWord status;
235     StgTSO *target = ACQUIRE_LOAD(&msg->target);
236     Capability *target_cap;
237 
238     goto check_target;
239 
240 retry:
241     write_barrier();
242     debugTrace(DEBUG_sched, "throwTo: retrying...");
243 
244 check_target:
245     ASSERT(target != END_TSO_QUEUE);
246 
247     // Thread already dead?
248     StgWord16 what_next = SEQ_CST_LOAD(&target->what_next);
249     if (what_next == ThreadComplete
250         || what_next == ThreadKilled) {
251         return THROWTO_SUCCESS;
252     }
253 
254     debugTraceCap(DEBUG_sched, cap,
255                   "throwTo: from thread %lu to thread %lu",
256                   (unsigned long)msg->source->id,
257                   (unsigned long)msg->target->id);
258 
259 #if defined(DEBUG)
260     traceThreadStatus(DEBUG_sched, target);
261 #endif
262 
263     target_cap = target->cap;
264     if (target->cap != cap) {
265         throwToSendMsg(cap, target_cap, msg);
266         return THROWTO_BLOCKED;
267     }
268 
269     status = target->why_blocked;
270 
271     switch (status) {
272     case NotBlocked:
273     {
274         if ((target->flags & TSO_BLOCKEX) == 0) {
275             // It's on our run queue and not blocking exceptions
276             raiseAsync(cap, target, msg->exception, false, NULL);
277             return THROWTO_SUCCESS;
278         } else {
279             blockedThrowTo(cap,target,msg);
280             return THROWTO_BLOCKED;
281         }
282     }
283 
284     case BlockedOnMsgThrowTo:
285     {
286         const StgInfoTable *i;
287         MessageThrowTo *m;
288 
289         m = target->block_info.throwto;
290 
291         // target is local to this cap, but has sent a throwto
292         // message to another cap.
293         //
294         // The source message is locked.  We need to revoke the
295         // target's message so that we can raise the exception, so
296         // we attempt to lock it.
297 
298         // There's a possibility of a deadlock if two threads are both
299         // trying to throwTo each other (or more generally, a cycle of
300         // threads).  To break the symmetry we compare the addresses
301         // of the MessageThrowTo objects, and the one for which m <
302         // msg gets to spin, while the other can only try to lock
303         // once, but must then back off and unlock both before trying
304         // again.
305         if (m < msg) {
306             i = lockClosure((StgClosure *)m);
307         } else {
308             i = tryLockClosure((StgClosure *)m);
309             if (i == NULL) {
310 //            debugBelch("collision\n");
311                 throwToSendMsg(cap, target->cap, msg);
312                 return THROWTO_BLOCKED;
313             }
314         }
315 
316         if (i == &stg_MSG_NULL_info) {
317             // we know there's a MSG_TRY_WAKEUP on the way, so we
318             // might as well just do it now.  The message will
319             // be a no-op when it arrives.
320             unlockClosure((StgClosure*)m, i);
321             tryWakeupThread(cap, target);
322             goto retry;
323         }
324 
325         if (i != &stg_MSG_THROWTO_info) {
326             // if it's a MSG_NULL, this TSO has been woken up by another Cap
327             unlockClosure((StgClosure*)m, i);
328             goto retry;
329         }
330 
331         if ((target->flags & TSO_BLOCKEX) &&
332             ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
333             unlockClosure((StgClosure*)m, i);
334             blockedThrowTo(cap,target,msg);
335             return THROWTO_BLOCKED;
336         }
337 
338         // nobody else can wake up this TSO after we claim the message
339         doneWithMsgThrowTo(cap, m);
340 
341         raiseAsync(cap, target, msg->exception, false, NULL);
342         return THROWTO_SUCCESS;
343     }
344 
345     case BlockedOnMVar:
346     case BlockedOnMVarRead:
347     {
348         /*
349           To establish ownership of this TSO, we need to acquire a
350           lock on the MVar that it is blocked on.
351         */
352         StgMVar *mvar;
353         StgInfoTable *info USED_IF_THREADS;
354 
355         mvar = (StgMVar *)target->block_info.closure;
356 
357         // ASSUMPTION: tso->block_info must always point to a
358         // closure.  In the threaded RTS it does.
359         switch (get_itbl((StgClosure *)mvar)->type) {
360         case MVAR_CLEAN:
361         case MVAR_DIRTY:
362             break;
363         default:
364             goto retry;
365         }
366 
367         info = lockClosure((StgClosure *)mvar);
368 
369         // we have the MVar, let's check whether the thread
370         // is still blocked on the same MVar.
371         if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
372             || (StgMVar *)target->block_info.closure != mvar) {
373             unlockClosure((StgClosure *)mvar, info);
374             goto retry;
375         }
376 
377         if (target->_link == END_TSO_QUEUE) {
378             // the MVar operation has already completed.  There is a
379             // MSG_TRY_WAKEUP on the way, but we can just wake up the
380             // thread now anyway and ignore the message when it
381             // arrives.
382             unlockClosure((StgClosure *)mvar, info);
383             tryWakeupThread(cap, target);
384             goto retry;
385         }
386 
387         if ((target->flags & TSO_BLOCKEX) &&
388             ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
389             blockedThrowTo(cap,target,msg);
390             unlockClosure((StgClosure *)mvar, info);
391             return THROWTO_BLOCKED;
392         } else {
393             // revoke the MVar operation
394             removeFromMVarBlockedQueue(target);
395             raiseAsync(cap, target, msg->exception, false, NULL);
396             unlockClosure((StgClosure *)mvar, info);
397             return THROWTO_SUCCESS;
398         }
399     }
400 
401     case BlockedOnBlackHole:
402     {
403         if (target->flags & TSO_BLOCKEX) {
404             // BlockedOnBlackHole is not interruptible.
405             blockedThrowTo(cap,target,msg);
406             return THROWTO_BLOCKED;
407         } else {
408             // Revoke the message by replacing it with IND. We're not
409             // locking anything here, so we might still get a TRY_WAKEUP
410             // message from the owner of the blackhole some time in the
411             // future, but that doesn't matter.
412             ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
413             OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
414             raiseAsync(cap, target, msg->exception, false, NULL);
415             return THROWTO_SUCCESS;
416         }
417     }
418 
419     case BlockedOnSTM:
420         if ((target->flags & TSO_BLOCKEX) &&
421             ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
422             blockedThrowTo(cap,target,msg);
423             return THROWTO_BLOCKED;
424         } else {
425             raiseAsync(cap, target, msg->exception, false, NULL);
426             return THROWTO_SUCCESS;
427         }
428 
429     case BlockedOnCCall_Interruptible:
430 #if defined(THREADED_RTS)
431     {
432         Task *task = NULL;
433         // walk suspended_ccalls to find the correct worker thread
434         InCall *incall;
435         for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
436             if (incall->suspended_tso == target) {
437                 task = incall->task;
438                 break;
439             }
440         }
441         if (task != NULL) {
442             blockedThrowTo(cap, target, msg);
443             if (!((target->flags & TSO_BLOCKEX) &&
444                   ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
445                 interruptWorkerTask(task);
446             }
447             return THROWTO_BLOCKED;
448         } else {
449             debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
450         }
451         // fall to next
452     }
453     FALLTHROUGH;
454 #endif
455     case BlockedOnCCall:
456         blockedThrowTo(cap,target,msg);
457         return THROWTO_BLOCKED;
458 
459 #if !defined(THREADEDED_RTS)
460     case BlockedOnRead:
461     case BlockedOnWrite:
462     case BlockedOnDelay:
463 #if defined(mingw32_HOST_OS)
464     case BlockedOnDoProc:
465 #endif
466         if ((target->flags & TSO_BLOCKEX) &&
467             ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
468             blockedThrowTo(cap,target,msg);
469             return THROWTO_BLOCKED;
470         } else {
471             removeFromQueues(cap,target);
472             raiseAsync(cap, target, msg->exception, false, NULL);
473             return THROWTO_SUCCESS;
474         }
475 #endif
476 
477     case ThreadMigrating:
478         // if it is ThreadMigrating and tso->cap is ours, then it
479         // *must* be migrating *to* this capability.  If it were
480         // migrating away from the capability, then tso->cap would
481         // point to the destination.
482         //
483         // There is a MSG_WAKEUP in the message queue for this thread,
484         // but we can just do it preemptively:
485         tryWakeupThread(cap, target);
486         // and now retry, the thread should be runnable.
487         goto retry;
488 
489     default:
490         barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
491     }
492     barf("throwTo");
493 }
494 
495 static void
throwToSendMsg(Capability * cap STG_UNUSED,Capability * target_cap USED_IF_THREADS,MessageThrowTo * msg USED_IF_THREADS)496 throwToSendMsg (Capability *cap STG_UNUSED,
497                 Capability *target_cap USED_IF_THREADS,
498                 MessageThrowTo *msg USED_IF_THREADS)
499 
500 {
501 #if defined(THREADED_RTS)
502     debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
503 
504     sendMessage(cap, target_cap, (Message*)msg);
505 #endif
506 }
507 
508 // Block a throwTo message on the target TSO's blocked_exceptions
509 // queue.  The current Capability must own the target TSO in order to
510 // modify the blocked_exceptions queue.
511 void
blockedThrowTo(Capability * cap,StgTSO * target,MessageThrowTo * msg)512 blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
513 {
514     debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
515                   (unsigned long)target->id);
516 
517     ASSERT(target->cap == cap);
518 
519     dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
520     msg->link = target->blocked_exceptions;
521     target->blocked_exceptions = msg;
522 }
523 
524 /* -----------------------------------------------------------------------------
525    Waking up threads blocked in throwTo
526 
527    There are two ways to do this: maybePerformBlockedException() will
528    perform the throwTo() for the thread at the head of the queue
529    immediately, and leave the other threads on the queue.
530    maybePerformBlockedException() also checks the TSO_BLOCKEX flag
531    before raising an exception.
532 
533    awakenBlockedExceptionQueue() will wake up all the threads in the
534    queue, but not perform any throwTo() immediately.  This might be
535    more appropriate when the target thread is the one actually running
536    (see Exception.cmm).
537 
538    Returns: non-zero if an exception was raised, zero otherwise.
539    -------------------------------------------------------------------------- */
540 
541 int
maybePerformBlockedException(Capability * cap,StgTSO * tso)542 maybePerformBlockedException (Capability *cap, StgTSO *tso)
543 {
544     MessageThrowTo *msg;
545     const StgInfoTable *i;
546     StgTSO *source;
547 
548     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
549         if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
550             awakenBlockedExceptionQueue(cap,tso);
551             return 1;
552         } else {
553             return 0;
554         }
555     }
556 
557     if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
558         (tso->flags & TSO_BLOCKEX) != 0) {
559         debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
560     }
561 
562     if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
563         && ((tso->flags & TSO_BLOCKEX) == 0
564             || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
565 
566         // We unblock just the first thread on the queue, and perform
567         // its throw immediately.
568     loop:
569         msg = tso->blocked_exceptions;
570         if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
571         i = lockClosure((StgClosure*)msg);
572         tso->blocked_exceptions = (MessageThrowTo*)msg->link;
573         if (i == &stg_MSG_NULL_info) {
574             unlockClosure((StgClosure*)msg,i);
575             goto loop;
576         }
577 
578         throwToSingleThreaded(cap, msg->target, msg->exception);
579         source = msg->source;
580         doneWithMsgThrowTo(cap, msg);
581         tryWakeupThread(cap, source);
582         return 1;
583     }
584     return 0;
585 }
586 
587 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
588 // blocked exceptions.
589 
590 void
awakenBlockedExceptionQueue(Capability * cap,StgTSO * tso)591 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
592 {
593     MessageThrowTo *msg;
594     const StgInfoTable *i;
595     StgTSO *source;
596 
597     for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
598          msg = (MessageThrowTo*)msg->link) {
599         i = lockClosure((StgClosure *)msg);
600         if (i != &stg_MSG_NULL_info) {
601             source = msg->source;
602             doneWithMsgThrowTo(cap, msg);
603             tryWakeupThread(cap, source);
604         } else {
605             unlockClosure((StgClosure *)msg,i);
606         }
607     }
608     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
609 }
610 
611 /* -----------------------------------------------------------------------------
612    Remove a thread from blocking queues.
613 
614    This is for use when we raise an exception in another thread, which
615    may be blocked.
616 
617    Precondition: we have exclusive access to the TSO, via the same set
618    of conditions as throwToSingleThreaded() (c.f.).
619    -------------------------------------------------------------------------- */
620 
621 static void
removeFromMVarBlockedQueue(StgTSO * tso)622 removeFromMVarBlockedQueue (StgTSO *tso)
623 {
624     StgMVar *mvar = (StgMVar*)tso->block_info.closure;
625     StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
626 
627     if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
628         // already removed from this MVar
629         return;
630     }
631 
632     // Assume the MVar is locked. (not assertable; sometimes it isn't
633     // actually WHITEHOLE'd).
634 
635     // We want to remove the MVAR_TSO_QUEUE object from the queue.  It
636     // isn't doubly-linked so we can't actually remove it; instead we
637     // just overwrite it with an IND if possible and let the GC short
638     // it out.  However, we have to be careful to maintain the deque
639     // structure:
640 
641     if (mvar->head == q) {
642         mvar->head = q->link;
643         OVERWRITE_INFO(q, &stg_IND_info);
644         if (mvar->tail == q) {
645             mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
646         }
647     }
648     else if (mvar->tail == q) {
649         // we can't replace it with an IND in this case, because then
650         // we lose the tail pointer when the GC shorts out the IND.
651         // So we use MSG_NULL as a kind of non-dupable indirection;
652         // these are ignored by takeMVar/putMVar.
653         OVERWRITE_INFO(q, &stg_MSG_NULL_info);
654     }
655     else {
656         OVERWRITE_INFO(q, &stg_IND_info);
657     }
658 
659     // revoke the MVar operation
660     tso->_link = END_TSO_QUEUE;
661 }
662 
663 static void
removeFromQueues(Capability * cap,StgTSO * tso)664 removeFromQueues(Capability *cap, StgTSO *tso)
665 {
666   switch (tso->why_blocked) {
667 
668   case NotBlocked:
669   case ThreadMigrating:
670       return;
671 
672   case BlockedOnSTM:
673     // Be careful: nothing to do here!  We tell the scheduler that the
674     // thread is runnable and we leave it to the stack-walking code to
675     // abort the transaction while unwinding the stack.  We should
676     // perhaps have a debugging test to make sure that this really
677     // happens and that the 'zombie' transaction does not get
678     // committed.
679     goto done;
680 
681   case BlockedOnMVar:
682   case BlockedOnMVarRead:
683       removeFromMVarBlockedQueue(tso);
684       goto done;
685 
686   case BlockedOnBlackHole:
687       // nothing to do
688       goto done;
689 
690   case BlockedOnMsgThrowTo:
691   {
692       MessageThrowTo *m = tso->block_info.throwto;
693       // The message is locked by us, unless we got here via
694       // deleteAllThreads(), in which case we own all the
695       // capabilities.
696       // ASSERT(m->header.info == &stg_WHITEHOLE_info);
697 
698       // unlock and revoke it at the same time
699       doneWithMsgThrowTo(cap, m);
700       break;
701   }
702 
703 #if !defined(THREADED_RTS)
704   case BlockedOnRead:
705   case BlockedOnWrite:
706 #if defined(mingw32_HOST_OS)
707   case BlockedOnDoProc:
708 #endif
709       removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
710 #if defined(mingw32_HOST_OS)
711       /* (Cooperatively) signal that the worker thread should abort
712        * the request.
713        */
714       abandonWorkRequest(tso->block_info.async_result->reqID);
715 #endif
716       goto done;
717 
718   case BlockedOnDelay:
719         removeThreadFromQueue(cap, &sleeping_queue, tso);
720         goto done;
721 #endif
722 
723   default:
724       barf("removeFromQueues: %d", tso->why_blocked);
725   }
726 
727  done:
728   tso->why_blocked = NotBlocked;
729   appendToRunQueue(cap, tso);
730 }
731 
732 /* -----------------------------------------------------------------------------
733  * raiseAsync()
734  *
735  * The following function implements the magic for raising an
736  * asynchronous exception in an existing thread.
737  *
738  * We first remove the thread from any queue on which it might be
739  * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
740  * TSO blocked_exception queues.
741  *
742  * We strip the stack down to the innermost CATCH_FRAME, building
743  * thunks in the heap for all the active computations, so they can
744  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
745  * an application of the handler to the exception, and push it on
746  * the top of the stack.
747  *
748  * How exactly do we save all the active computations?  We create an
749  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
750  * AP_STACKs pushes everything from the corresponding update frame
751  * upwards onto the stack.  (Actually, it pushes everything up to the
752  * next update frame plus a pointer to the next AP_STACK object.
753  * Entering the next AP_STACK object pushes more onto the stack until we
754  * reach the last AP_STACK object - at which point the stack should look
755  * exactly as it did when we killed the TSO and we can continue
756  * execution by entering the closure on top of the stack.
757  *
758  * We can also kill a thread entirely - this happens if either (a) the
759  * exception passed to raiseAsync is NULL, or (b) there's no
760  * CATCH_FRAME on the stack.  In either case, we strip the entire
761  * stack and replace the thread with a zombie.
762  *
763  * ToDo: in THREADED_RTS mode, this function is only safe if either
764  * (a) we hold all the Capabilities (eg. in GC, or if there is only
765  * one Capability), or (b) we own the Capability that the TSO is
766  * currently blocked on or on the run queue of.
767  *
768  * -------------------------------------------------------------------------- */
769 
770 StgTSO *
raiseAsync(Capability * cap,StgTSO * tso,StgClosure * exception,bool stop_at_atomically,StgUpdateFrame * stop_here)771 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
772            bool stop_at_atomically, StgUpdateFrame *stop_here)
773 {
774     const StgRetInfoTable *info;
775     StgPtr sp, frame;
776     StgClosure *updatee;
777     uint32_t i;
778     StgStack *stack;
779 
780     debugTraceCap(DEBUG_sched, cap,
781                   "raising exception in thread %ld.", (long)tso->id);
782 
783 #if defined(PROFILING)
784     /*
785      * Debugging tool: on raising an  exception, show where we are.
786      * See also Exception.cmm:stg_raisezh.
787      * This wasn't done for asynchronous exceptions originally; see #1450
788      */
789     if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
790     {
791         fprintCCS_stderr(tso->prof.cccs,exception,tso);
792     }
793 #endif
794     // ASSUMES: the thread is not already complete or dead
795     // Upper layers should deal with that.
796     ASSERT(tso->what_next != ThreadComplete &&
797            tso->what_next != ThreadKilled);
798 
799     // only if we own this TSO (except that deleteThread() calls this
800     ASSERT(tso->cap == cap);
801 
802     stack = tso->stackobj;
803 
804     // mark it dirty; we're about to change its stack.
805     dirty_TSO(cap, tso);
806     dirty_STACK(cap, stack);
807 
808     sp = stack->sp;
809 
810     if (stop_here != NULL) {
811         updatee = stop_here->updatee;
812     } else {
813         updatee = NULL;
814     }
815 
816     // The stack freezing code assumes there's a closure pointer on
817     // the top of the stack, so we have to arrange that this is the case...
818     //
819     if (sp[0] == (W_)&stg_enter_info) {
820         sp++;
821     } else {
822         sp--;
823         sp[0] = (W_)&stg_dummy_ret_closure;
824     }
825 
826     frame = sp + 1;
827     while (stop_here == NULL || frame < (StgPtr)stop_here) {
828 
829         // 1. Let the top of the stack be the "current closure"
830         //
831         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
832         // CATCH_FRAME.
833         //
834         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
835         // current closure applied to the chunk of stack up to (but not
836         // including) the update frame.  This closure becomes the "current
837         // closure".  Go back to step 2.
838         //
839         // 4. If it's a CATCH_FRAME, then leave the exception handler on
840         // top of the stack applied to the exception.
841         //
842         // 5. If it's a STOP_FRAME, then kill the thread.
843         //
844         // 6. If it's an UNDERFLOW_FRAME, then continue with the next
845         //    stack chunk.
846         //
847         // NB: if we pass an ATOMICALLY_FRAME then abort the associated
848         // transaction
849 
850         info = get_ret_itbl((StgClosure *)frame);
851 
852         switch (info->i.type) {
853 
854         case UPDATE_FRAME:
855         {
856             StgAP_STACK * ap;
857             uint32_t words;
858 
859             // First build an AP_STACK consisting of the stack chunk above the
860             // current update frame, with the top word on the stack as the
861             // fun field.
862             //
863             words = frame - sp - 1;
864             ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
865 
866             ap->size = words;
867             ap->fun  = (StgClosure *)sp[0];
868 
869             sp++;
870             for(i=0; i < words; ++i) {
871                 ap->payload[i] = (StgClosure *)*sp++;
872             }
873 
874             write_barrier(); // XXX: Necessary?
875             SET_HDR(ap,&stg_AP_STACK_info,
876                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
877             TICK_ALLOC_UP_THK(WDS(words+1),0);
878 
879             //IF_DEBUG(scheduler,
880             //       debugBelch("sched: Updating ");
881             //       printPtr((P_)((StgUpdateFrame *)frame)->updatee);
882             //       debugBelch(" with ");
883             //       printObj((StgClosure *)ap);
884             //  );
885 
886             if (((StgUpdateFrame *)frame)->updatee == updatee) {
887                 // If this update frame points to the same closure as
888                 // the update frame further down the stack
889                 // (stop_here), then don't perform the update.  We
890                 // want to keep the blackhole in this case, so we can
891                 // detect and report the loop (#2783).
892                 ap = (StgAP_STACK*)updatee;
893             } else {
894                 // Perform the update
895                 // TODO: this may waste some work, if the thunk has
896                 // already been updated by another thread.
897                 updateThunk(cap, tso,
898                             ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
899             }
900 
901             sp += sizeofW(StgUpdateFrame) - 1;
902             sp[0] = (W_)ap; // push onto stack
903             frame = sp + 1;
904             continue; //no need to bump frame
905         }
906 
907         case UNDERFLOW_FRAME:
908         {
909             StgAP_STACK * ap;
910             uint32_t words;
911 
912             // First build an AP_STACK consisting of the stack chunk above the
913             // current update frame, with the top word on the stack as the
914             // fun field.
915             //
916             words = frame - sp - 1;
917             ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
918 
919             ap->size = words;
920             ap->fun  = (StgClosure *)sp[0];
921             sp++;
922             for(i=0; i < words; ++i) {
923                 ap->payload[i] = (StgClosure *)*sp++;
924             }
925 
926             SET_HDR(ap,&stg_AP_STACK_NOUPD_info,stack->header.prof.ccs);
927             TICK_ALLOC_SE_THK(WDS(words+1),0);
928 
929             stack->sp = sp;
930             threadStackUnderflow(cap,tso);
931             stack = tso->stackobj;
932             sp = stack->sp;
933 
934             sp--;
935             sp[0] = (W_)ap;
936             frame = sp + 1;
937             continue;
938         }
939 
940         case STOP_FRAME:
941         {
942             // We've stripped the entire stack, the thread is now dead.
943             tso->what_next = ThreadKilled;
944             stack->sp = frame + sizeofW(StgStopFrame);
945             goto done;
946         }
947 
948         case CATCH_FRAME:
949             // If we find a CATCH_FRAME, and we've got an exception to raise,
950             // then build the THUNK raise(exception), and leave it on
951             // top of the CATCH_FRAME ready to enter.
952             //
953         {
954             StgCatchFrame *cf = (StgCatchFrame *)frame;
955             StgThunk *raise;
956 
957             if (exception == NULL) break;
958 
959             // we've got an exception to raise, so let's pass it to the
960             // handler in this frame.
961             //
962             raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
963             TICK_ALLOC_SE_THK(WDS(1),0);
964             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
965             raise->payload[0] = exception;
966 
967             // throw away the stack from Sp up to the CATCH_FRAME.
968             //
969             sp = frame - 1;
970 
971             /* Ensure that async exceptions are blocked now, so we don't get
972              * a surprise exception before we get around to executing the
973              * handler.
974              */
975             tso->flags |= TSO_BLOCKEX;
976             if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
977                 tso->flags &= ~TSO_INTERRUPTIBLE;
978             } else {
979                 tso->flags |= TSO_INTERRUPTIBLE;
980             }
981 
982             /* Put the newly-built THUNK on top of the stack, ready to execute
983              * when the thread restarts.
984              */
985             sp[0] = (W_)raise;
986             sp[-1] = (W_)&stg_enter_info;
987             stack->sp = sp-1;
988             RELAXED_STORE(&tso->what_next, ThreadRunGHC);
989             goto done;
990         }
991 
992         case ATOMICALLY_FRAME:
993             if (stop_at_atomically) {
994                 ASSERT(tso->trec->enclosing_trec == NO_TREC);
995                 stmCondemnTransaction(cap, tso -> trec);
996                 stack->sp = frame - 2;
997                 // The ATOMICALLY_FRAME expects to be returned a
998                 // result from the transaction, which it stores in the
999                 // stack frame.  Hence we arrange to return a dummy
1000                 // result, so that the GC doesn't get upset (#3578).
1001                 // Perhaps a better way would be to have a different
1002                 // ATOMICALLY_FRAME instance for condemned
1003                 // transactions, but I don't fully understand the
1004                 // interaction with STM invariants.
1005                 stack->sp[1] = (W_)&stg_NO_TREC_closure;
1006                 stack->sp[0] = (W_)&stg_ret_p_info;
1007                 tso->what_next = ThreadRunGHC;
1008                 goto done;
1009             }
1010             else
1011             {
1012                 // Freezing an STM transaction.  Just aborting the
1013                 // transaction would be wrong; this is what we used to
1014                 // do, and it goes wrong if the ATOMICALLY_FRAME ever
1015                 // gets back onto the stack again, which it will do if
1016                 // the transaction is inside unsafePerformIO or
1017                 // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
1018                 //
1019                 // So we want to make it so that if the enclosing
1020                 // computation is resumed, we will re-execute the
1021                 // transaction.  We therefore:
1022                 //
1023                 //   1. abort the current transaction
1024                 //   3. replace the stack up to and including the
1025                 //      atomically frame with a closure representing
1026                 //      a call to "atomically x", where x is the code
1027                 //      of the transaction.
1028                 //   4. continue stripping the stack
1029                 //
1030                 StgTRecHeader *trec = tso->trec;
1031                 StgTRecHeader *outer = trec->enclosing_trec;
1032 
1033                 StgThunk *atomically;
1034                 StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
1035 
1036                 debugTraceCap(DEBUG_stm, cap,
1037                               "raiseAsync: freezing atomically frame")
1038                 stmAbortTransaction(cap, trec);
1039                 stmFreeAbortedTRec(cap, trec);
1040                 tso->trec = outer;
1041 
1042                 atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
1043                 TICK_ALLOC_SE_THK(1,0);
1044                 SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
1045                 atomically->payload[0] = af->code;
1046 
1047                 // discard stack up to and including the ATOMICALLY_FRAME
1048                 frame += sizeofW(StgAtomicallyFrame);
1049                 sp = frame - 1;
1050 
1051                 // replace the ATOMICALLY_FRAME with call to atomically#
1052                 sp[0] = (W_)atomically;
1053                 continue;
1054             }
1055 
1056         case CATCH_STM_FRAME:
1057         case CATCH_RETRY_FRAME:
1058             // CATCH frames within an atomically block: abort the
1059             // inner transaction and continue.  Eventually we will
1060             // hit the outer transaction that will get frozen (see
1061             // above).
1062             //
1063             // In this case (unlike ordinary exceptions) we do not care
1064             // whether the transaction is valid or not because its
1065             // possible validity cannot have caused the exception
1066             // and will not be visible after the abort.
1067         {
1068             StgTRecHeader *trec = tso -> trec;
1069             StgTRecHeader *outer = trec -> enclosing_trec;
1070             debugTraceCap(DEBUG_stm, cap,
1071                           "found atomically block delivering async exception");
1072             stmAbortTransaction(cap, trec);
1073             stmFreeAbortedTRec(cap, trec);
1074             tso -> trec = outer;
1075             break;
1076         };
1077 
1078         default:
1079             break;
1080         }
1081 
1082         // move on to the next stack frame
1083         frame += stack_frame_sizeW((StgClosure *)frame);
1084     }
1085 
1086 done:
1087     IF_DEBUG(sanity, checkTSO(tso));
1088 
1089     // wake it up
1090     if (tso->why_blocked != NotBlocked) {
1091         tso->why_blocked = NotBlocked;
1092         appendToRunQueue(cap,tso);
1093     }
1094 
1095     return tso;
1096 }
1097