1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * Asynchronous exceptions
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "sm/Storage.h"
13 #include "Threads.h"
14 #include "Trace.h"
15 #include "RaiseAsync.h"
16 #include "Schedule.h"
17 #include "Updates.h"
18 #include "STM.h"
19 #include "sm/Sanity.h"
20 #include "Profiling.h"
21 #include "Messages.h"
22 #if defined(mingw32_HOST_OS)
23 #include "win32/IOManager.h"
24 #endif
25
26 static void blockedThrowTo (Capability *cap,
27 StgTSO *target, MessageThrowTo *msg);
28
29 static void removeFromQueues(Capability *cap, StgTSO *tso);
30
31 static void removeFromMVarBlockedQueue (StgTSO *tso);
32
33 static void throwToSendMsg (Capability *cap USED_IF_THREADS,
34 Capability *target_cap USED_IF_THREADS,
35 MessageThrowTo *msg USED_IF_THREADS);
36
37 /* -----------------------------------------------------------------------------
38 throwToSingleThreaded
39
40 This version of throwTo is safe to use if and only if one of the
41 following holds:
42
43 - !THREADED_RTS
44
45 - all the other threads in the system are stopped (eg. during GC).
46
47 - we surely own the target TSO (eg. we just took it from the
48 run queue of the current capability, or we are running it).
49
50 It doesn't cater for blocking the source thread until the exception
51 has been raised.
52 -------------------------------------------------------------------------- */
53
54 static void
throwToSingleThreaded__(Capability * cap,StgTSO * tso,StgClosure * exception,bool stop_at_atomically,StgUpdateFrame * stop_here)55 throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
56 bool stop_at_atomically, StgUpdateFrame *stop_here)
57 {
58 // Thread already dead?
59 if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
60 return;
61 }
62
63 // Remove it from any blocking queues
64 removeFromQueues(cap,tso);
65
66 raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
67 }
68
69 void
throwToSingleThreaded(Capability * cap,StgTSO * tso,StgClosure * exception)70 throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
71 {
72 throwToSingleThreaded__(cap, tso, exception, false, NULL);
73 }
74
75 void
throwToSingleThreaded_(Capability * cap,StgTSO * tso,StgClosure * exception,bool stop_at_atomically)76 throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
77 bool stop_at_atomically)
78 {
79 throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
80 }
81
82 void // cannot return a different TSO
suspendComputation(Capability * cap,StgTSO * tso,StgUpdateFrame * stop_here)83 suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
84 {
85 throwToSingleThreaded__ (cap, tso, NULL, false, stop_here);
86 }
87
88 /* -----------------------------------------------------------------------------
89 throwToSelf
90
91 Useful for throwing an async exception in a thread from the
92 runtime. It handles unlocking the throwto message returned by
93 throwTo().
94
95 Note [Throw to self when masked]
96
97 When a StackOverflow occurs when the thread is masked, we want to
98 defer the exception to when the thread becomes unmasked/hits an
99 interruptible point. We already have a mechanism for doing this,
100 the blocked_exceptions list, but the use here is a bit unusual,
101 because an exception is normally only added to this list upon
102 an asynchronous 'throwTo' call (with all of the relevant
103 multithreaded nonsense). Morally, a stack overflow should be an
104 asynchronous exception sent by a thread to itself, and it should
105 have the same semantics. But there are a few key differences:
106
107 - If you actually tried to send an asynchronous exception to
108 yourself using throwTo, the exception would actually immediately
109 be delivered. This is because throwTo itself is considered an
110 interruptible point, so the exception is always deliverable. Thus,
111 ordinarily, we never end up with a message to oneself in the
112 blocked_exceptions queue.
113
114 - In the case of a StackOverflow, we don't actually care about the
115 wakeup semantics; when an exception is delivered, the thread that
116 originally threw the exception should be woken up, since throwTo
117 blocks until the exception is successfully thrown. Fortunately,
118 it is harmless to wakeup a thread that doesn't actually need waking
119 up, e.g. ourselves.
120
121 - No synchronization is necessary, because we own the TSO and the
122 capability. You can observe this by tracing through the execution
123 of throwTo. We skip synchronizing the message and inter-capability
124 communication.
125
126 We think this doesn't break any invariants, but do be careful!
127 -------------------------------------------------------------------------- */
128
129 void
throwToSelf(Capability * cap,StgTSO * tso,StgClosure * exception)130 throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception)
131 {
132 MessageThrowTo *m;
133
134 m = throwTo(cap, tso, tso, exception);
135
136 if (m != NULL) {
137 // throwTo leaves it locked
138 unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
139 }
140 }
141
142 /* -----------------------------------------------------------------------------
143 throwTo
144
145 This function may be used to throw an exception from one thread to
146 another, during the course of normal execution. This is a tricky
147 task: the target thread might be running on another CPU, or it
148 may be blocked and could be woken up at any point by another CPU.
149 We have some delicate synchronisation to do.
150
151 The underlying scheme when multiple Capabilities are in use is
152 message passing: when the target of a throwTo is on another
153 Capability, we send a message (a MessageThrowTo closure) to that
154 Capability.
155
156 If the throwTo needs to block because the target TSO is masking
157 exceptions (the TSO_BLOCKEX flag), then the message is placed on
158 the blocked_exceptions queue attached to the target TSO. When the
159 target TSO enters the unmasked state again, it must check the
160 queue. The blocked_exceptions queue is not locked; only the
161 Capability owning the TSO may modify it.
162
163 To make things simpler for throwTo, we always create the message
164 first before deciding what to do. The message may get sent, or it
165 may get attached to a TSO's blocked_exceptions queue, or the
166 exception may get thrown immediately and the message dropped,
167 depending on the current state of the target.
168
169 Currently we send a message if the target belongs to another
170 Capability, and it is
171
172 - NotBlocked, BlockedOnMsgThrowTo,
173 BlockedOnCCall_Interruptible
174
175 - or it is masking exceptions (TSO_BLOCKEX)
176
177 Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
178 BlockedOnBlackHole then we acquire ownership of the TSO by locking
179 its parent container (e.g. the MVar) and then raise the exception.
180 We might change these cases to be more message-passing-like in the
181 future.
182
183 Returns:
184
185 NULL exception was raised, ok to continue
186
187 MessageThrowTo * exception was not raised; the source TSO
188 should now put itself in the state
189 BlockedOnMsgThrowTo, and when it is ready
190 it should unlock the mssage using
191 unlockClosure(msg, &stg_MSG_THROWTO_info);
192 If it decides not to raise the exception after
193 all, it can revoke it safely with
194 unlockClosure(msg, &stg_MSG_NULL_info);
195
196 -------------------------------------------------------------------------- */
197
198 MessageThrowTo *
throwTo(Capability * cap,StgTSO * source,StgTSO * target,StgClosure * exception)199 throwTo (Capability *cap, // the Capability we hold
200 StgTSO *source, // the TSO sending the exception (or NULL)
201 StgTSO *target, // the TSO receiving the exception
202 StgClosure *exception) // the exception closure
203 {
204 MessageThrowTo *msg;
205
206 msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
207 // the message starts locked; see below
208 SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
209 msg->source = source;
210 msg->target = target;
211 msg->exception = exception;
212
213 switch (throwToMsg(cap, msg))
214 {
215 case THROWTO_SUCCESS:
216 // unlock the message now, otherwise we leave a WHITEHOLE in
217 // the heap (#6103)
218 SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
219 return NULL;
220
221 case THROWTO_BLOCKED:
222 default:
223 // the caller will unlock the message when it is ready. We
224 // cannot unlock it yet, because the calling thread will need
225 // to tidy up its state first.
226 return msg;
227 }
228 }
229
230
231 uint32_t
throwToMsg(Capability * cap,MessageThrowTo * msg)232 throwToMsg (Capability *cap, MessageThrowTo *msg)
233 {
234 StgWord status;
235 StgTSO *target = ACQUIRE_LOAD(&msg->target);
236 Capability *target_cap;
237
238 goto check_target;
239
240 retry:
241 write_barrier();
242 debugTrace(DEBUG_sched, "throwTo: retrying...");
243
244 check_target:
245 ASSERT(target != END_TSO_QUEUE);
246
247 // Thread already dead?
248 StgWord16 what_next = SEQ_CST_LOAD(&target->what_next);
249 if (what_next == ThreadComplete
250 || what_next == ThreadKilled) {
251 return THROWTO_SUCCESS;
252 }
253
254 debugTraceCap(DEBUG_sched, cap,
255 "throwTo: from thread %lu to thread %lu",
256 (unsigned long)msg->source->id,
257 (unsigned long)msg->target->id);
258
259 #if defined(DEBUG)
260 traceThreadStatus(DEBUG_sched, target);
261 #endif
262
263 target_cap = target->cap;
264 if (target->cap != cap) {
265 throwToSendMsg(cap, target_cap, msg);
266 return THROWTO_BLOCKED;
267 }
268
269 status = target->why_blocked;
270
271 switch (status) {
272 case NotBlocked:
273 {
274 if ((target->flags & TSO_BLOCKEX) == 0) {
275 // It's on our run queue and not blocking exceptions
276 raiseAsync(cap, target, msg->exception, false, NULL);
277 return THROWTO_SUCCESS;
278 } else {
279 blockedThrowTo(cap,target,msg);
280 return THROWTO_BLOCKED;
281 }
282 }
283
284 case BlockedOnMsgThrowTo:
285 {
286 const StgInfoTable *i;
287 MessageThrowTo *m;
288
289 m = target->block_info.throwto;
290
291 // target is local to this cap, but has sent a throwto
292 // message to another cap.
293 //
294 // The source message is locked. We need to revoke the
295 // target's message so that we can raise the exception, so
296 // we attempt to lock it.
297
298 // There's a possibility of a deadlock if two threads are both
299 // trying to throwTo each other (or more generally, a cycle of
300 // threads). To break the symmetry we compare the addresses
301 // of the MessageThrowTo objects, and the one for which m <
302 // msg gets to spin, while the other can only try to lock
303 // once, but must then back off and unlock both before trying
304 // again.
305 if (m < msg) {
306 i = lockClosure((StgClosure *)m);
307 } else {
308 i = tryLockClosure((StgClosure *)m);
309 if (i == NULL) {
310 // debugBelch("collision\n");
311 throwToSendMsg(cap, target->cap, msg);
312 return THROWTO_BLOCKED;
313 }
314 }
315
316 if (i == &stg_MSG_NULL_info) {
317 // we know there's a MSG_TRY_WAKEUP on the way, so we
318 // might as well just do it now. The message will
319 // be a no-op when it arrives.
320 unlockClosure((StgClosure*)m, i);
321 tryWakeupThread(cap, target);
322 goto retry;
323 }
324
325 if (i != &stg_MSG_THROWTO_info) {
326 // if it's a MSG_NULL, this TSO has been woken up by another Cap
327 unlockClosure((StgClosure*)m, i);
328 goto retry;
329 }
330
331 if ((target->flags & TSO_BLOCKEX) &&
332 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
333 unlockClosure((StgClosure*)m, i);
334 blockedThrowTo(cap,target,msg);
335 return THROWTO_BLOCKED;
336 }
337
338 // nobody else can wake up this TSO after we claim the message
339 doneWithMsgThrowTo(cap, m);
340
341 raiseAsync(cap, target, msg->exception, false, NULL);
342 return THROWTO_SUCCESS;
343 }
344
345 case BlockedOnMVar:
346 case BlockedOnMVarRead:
347 {
348 /*
349 To establish ownership of this TSO, we need to acquire a
350 lock on the MVar that it is blocked on.
351 */
352 StgMVar *mvar;
353 StgInfoTable *info USED_IF_THREADS;
354
355 mvar = (StgMVar *)target->block_info.closure;
356
357 // ASSUMPTION: tso->block_info must always point to a
358 // closure. In the threaded RTS it does.
359 switch (get_itbl((StgClosure *)mvar)->type) {
360 case MVAR_CLEAN:
361 case MVAR_DIRTY:
362 break;
363 default:
364 goto retry;
365 }
366
367 info = lockClosure((StgClosure *)mvar);
368
369 // we have the MVar, let's check whether the thread
370 // is still blocked on the same MVar.
371 if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
372 || (StgMVar *)target->block_info.closure != mvar) {
373 unlockClosure((StgClosure *)mvar, info);
374 goto retry;
375 }
376
377 if (target->_link == END_TSO_QUEUE) {
378 // the MVar operation has already completed. There is a
379 // MSG_TRY_WAKEUP on the way, but we can just wake up the
380 // thread now anyway and ignore the message when it
381 // arrives.
382 unlockClosure((StgClosure *)mvar, info);
383 tryWakeupThread(cap, target);
384 goto retry;
385 }
386
387 if ((target->flags & TSO_BLOCKEX) &&
388 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
389 blockedThrowTo(cap,target,msg);
390 unlockClosure((StgClosure *)mvar, info);
391 return THROWTO_BLOCKED;
392 } else {
393 // revoke the MVar operation
394 removeFromMVarBlockedQueue(target);
395 raiseAsync(cap, target, msg->exception, false, NULL);
396 unlockClosure((StgClosure *)mvar, info);
397 return THROWTO_SUCCESS;
398 }
399 }
400
401 case BlockedOnBlackHole:
402 {
403 if (target->flags & TSO_BLOCKEX) {
404 // BlockedOnBlackHole is not interruptible.
405 blockedThrowTo(cap,target,msg);
406 return THROWTO_BLOCKED;
407 } else {
408 // Revoke the message by replacing it with IND. We're not
409 // locking anything here, so we might still get a TRY_WAKEUP
410 // message from the owner of the blackhole some time in the
411 // future, but that doesn't matter.
412 ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
413 OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
414 raiseAsync(cap, target, msg->exception, false, NULL);
415 return THROWTO_SUCCESS;
416 }
417 }
418
419 case BlockedOnSTM:
420 if ((target->flags & TSO_BLOCKEX) &&
421 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
422 blockedThrowTo(cap,target,msg);
423 return THROWTO_BLOCKED;
424 } else {
425 raiseAsync(cap, target, msg->exception, false, NULL);
426 return THROWTO_SUCCESS;
427 }
428
429 case BlockedOnCCall_Interruptible:
430 #if defined(THREADED_RTS)
431 {
432 Task *task = NULL;
433 // walk suspended_ccalls to find the correct worker thread
434 InCall *incall;
435 for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
436 if (incall->suspended_tso == target) {
437 task = incall->task;
438 break;
439 }
440 }
441 if (task != NULL) {
442 blockedThrowTo(cap, target, msg);
443 if (!((target->flags & TSO_BLOCKEX) &&
444 ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
445 interruptWorkerTask(task);
446 }
447 return THROWTO_BLOCKED;
448 } else {
449 debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
450 }
451 // fall to next
452 }
453 FALLTHROUGH;
454 #endif
455 case BlockedOnCCall:
456 blockedThrowTo(cap,target,msg);
457 return THROWTO_BLOCKED;
458
459 #if !defined(THREADEDED_RTS)
460 case BlockedOnRead:
461 case BlockedOnWrite:
462 case BlockedOnDelay:
463 #if defined(mingw32_HOST_OS)
464 case BlockedOnDoProc:
465 #endif
466 if ((target->flags & TSO_BLOCKEX) &&
467 ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
468 blockedThrowTo(cap,target,msg);
469 return THROWTO_BLOCKED;
470 } else {
471 removeFromQueues(cap,target);
472 raiseAsync(cap, target, msg->exception, false, NULL);
473 return THROWTO_SUCCESS;
474 }
475 #endif
476
477 case ThreadMigrating:
478 // if it is ThreadMigrating and tso->cap is ours, then it
479 // *must* be migrating *to* this capability. If it were
480 // migrating away from the capability, then tso->cap would
481 // point to the destination.
482 //
483 // There is a MSG_WAKEUP in the message queue for this thread,
484 // but we can just do it preemptively:
485 tryWakeupThread(cap, target);
486 // and now retry, the thread should be runnable.
487 goto retry;
488
489 default:
490 barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
491 }
492 barf("throwTo");
493 }
494
495 static void
throwToSendMsg(Capability * cap STG_UNUSED,Capability * target_cap USED_IF_THREADS,MessageThrowTo * msg USED_IF_THREADS)496 throwToSendMsg (Capability *cap STG_UNUSED,
497 Capability *target_cap USED_IF_THREADS,
498 MessageThrowTo *msg USED_IF_THREADS)
499
500 {
501 #if defined(THREADED_RTS)
502 debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
503
504 sendMessage(cap, target_cap, (Message*)msg);
505 #endif
506 }
507
508 // Block a throwTo message on the target TSO's blocked_exceptions
509 // queue. The current Capability must own the target TSO in order to
510 // modify the blocked_exceptions queue.
511 void
blockedThrowTo(Capability * cap,StgTSO * target,MessageThrowTo * msg)512 blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
513 {
514 debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
515 (unsigned long)target->id);
516
517 ASSERT(target->cap == cap);
518
519 dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
520 msg->link = target->blocked_exceptions;
521 target->blocked_exceptions = msg;
522 }
523
524 /* -----------------------------------------------------------------------------
525 Waking up threads blocked in throwTo
526
527 There are two ways to do this: maybePerformBlockedException() will
528 perform the throwTo() for the thread at the head of the queue
529 immediately, and leave the other threads on the queue.
530 maybePerformBlockedException() also checks the TSO_BLOCKEX flag
531 before raising an exception.
532
533 awakenBlockedExceptionQueue() will wake up all the threads in the
534 queue, but not perform any throwTo() immediately. This might be
535 more appropriate when the target thread is the one actually running
536 (see Exception.cmm).
537
538 Returns: non-zero if an exception was raised, zero otherwise.
539 -------------------------------------------------------------------------- */
540
541 int
maybePerformBlockedException(Capability * cap,StgTSO * tso)542 maybePerformBlockedException (Capability *cap, StgTSO *tso)
543 {
544 MessageThrowTo *msg;
545 const StgInfoTable *i;
546 StgTSO *source;
547
548 if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
549 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
550 awakenBlockedExceptionQueue(cap,tso);
551 return 1;
552 } else {
553 return 0;
554 }
555 }
556
557 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
558 (tso->flags & TSO_BLOCKEX) != 0) {
559 debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
560 }
561
562 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
563 && ((tso->flags & TSO_BLOCKEX) == 0
564 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
565
566 // We unblock just the first thread on the queue, and perform
567 // its throw immediately.
568 loop:
569 msg = tso->blocked_exceptions;
570 if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
571 i = lockClosure((StgClosure*)msg);
572 tso->blocked_exceptions = (MessageThrowTo*)msg->link;
573 if (i == &stg_MSG_NULL_info) {
574 unlockClosure((StgClosure*)msg,i);
575 goto loop;
576 }
577
578 throwToSingleThreaded(cap, msg->target, msg->exception);
579 source = msg->source;
580 doneWithMsgThrowTo(cap, msg);
581 tryWakeupThread(cap, source);
582 return 1;
583 }
584 return 0;
585 }
586
587 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
588 // blocked exceptions.
589
590 void
awakenBlockedExceptionQueue(Capability * cap,StgTSO * tso)591 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
592 {
593 MessageThrowTo *msg;
594 const StgInfoTable *i;
595 StgTSO *source;
596
597 for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
598 msg = (MessageThrowTo*)msg->link) {
599 i = lockClosure((StgClosure *)msg);
600 if (i != &stg_MSG_NULL_info) {
601 source = msg->source;
602 doneWithMsgThrowTo(cap, msg);
603 tryWakeupThread(cap, source);
604 } else {
605 unlockClosure((StgClosure *)msg,i);
606 }
607 }
608 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
609 }
610
611 /* -----------------------------------------------------------------------------
612 Remove a thread from blocking queues.
613
614 This is for use when we raise an exception in another thread, which
615 may be blocked.
616
617 Precondition: we have exclusive access to the TSO, via the same set
618 of conditions as throwToSingleThreaded() (c.f.).
619 -------------------------------------------------------------------------- */
620
621 static void
removeFromMVarBlockedQueue(StgTSO * tso)622 removeFromMVarBlockedQueue (StgTSO *tso)
623 {
624 StgMVar *mvar = (StgMVar*)tso->block_info.closure;
625 StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
626
627 if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
628 // already removed from this MVar
629 return;
630 }
631
632 // Assume the MVar is locked. (not assertable; sometimes it isn't
633 // actually WHITEHOLE'd).
634
635 // We want to remove the MVAR_TSO_QUEUE object from the queue. It
636 // isn't doubly-linked so we can't actually remove it; instead we
637 // just overwrite it with an IND if possible and let the GC short
638 // it out. However, we have to be careful to maintain the deque
639 // structure:
640
641 if (mvar->head == q) {
642 mvar->head = q->link;
643 OVERWRITE_INFO(q, &stg_IND_info);
644 if (mvar->tail == q) {
645 mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
646 }
647 }
648 else if (mvar->tail == q) {
649 // we can't replace it with an IND in this case, because then
650 // we lose the tail pointer when the GC shorts out the IND.
651 // So we use MSG_NULL as a kind of non-dupable indirection;
652 // these are ignored by takeMVar/putMVar.
653 OVERWRITE_INFO(q, &stg_MSG_NULL_info);
654 }
655 else {
656 OVERWRITE_INFO(q, &stg_IND_info);
657 }
658
659 // revoke the MVar operation
660 tso->_link = END_TSO_QUEUE;
661 }
662
663 static void
removeFromQueues(Capability * cap,StgTSO * tso)664 removeFromQueues(Capability *cap, StgTSO *tso)
665 {
666 switch (tso->why_blocked) {
667
668 case NotBlocked:
669 case ThreadMigrating:
670 return;
671
672 case BlockedOnSTM:
673 // Be careful: nothing to do here! We tell the scheduler that the
674 // thread is runnable and we leave it to the stack-walking code to
675 // abort the transaction while unwinding the stack. We should
676 // perhaps have a debugging test to make sure that this really
677 // happens and that the 'zombie' transaction does not get
678 // committed.
679 goto done;
680
681 case BlockedOnMVar:
682 case BlockedOnMVarRead:
683 removeFromMVarBlockedQueue(tso);
684 goto done;
685
686 case BlockedOnBlackHole:
687 // nothing to do
688 goto done;
689
690 case BlockedOnMsgThrowTo:
691 {
692 MessageThrowTo *m = tso->block_info.throwto;
693 // The message is locked by us, unless we got here via
694 // deleteAllThreads(), in which case we own all the
695 // capabilities.
696 // ASSERT(m->header.info == &stg_WHITEHOLE_info);
697
698 // unlock and revoke it at the same time
699 doneWithMsgThrowTo(cap, m);
700 break;
701 }
702
703 #if !defined(THREADED_RTS)
704 case BlockedOnRead:
705 case BlockedOnWrite:
706 #if defined(mingw32_HOST_OS)
707 case BlockedOnDoProc:
708 #endif
709 removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
710 #if defined(mingw32_HOST_OS)
711 /* (Cooperatively) signal that the worker thread should abort
712 * the request.
713 */
714 abandonWorkRequest(tso->block_info.async_result->reqID);
715 #endif
716 goto done;
717
718 case BlockedOnDelay:
719 removeThreadFromQueue(cap, &sleeping_queue, tso);
720 goto done;
721 #endif
722
723 default:
724 barf("removeFromQueues: %d", tso->why_blocked);
725 }
726
727 done:
728 tso->why_blocked = NotBlocked;
729 appendToRunQueue(cap, tso);
730 }
731
732 /* -----------------------------------------------------------------------------
733 * raiseAsync()
734 *
735 * The following function implements the magic for raising an
736 * asynchronous exception in an existing thread.
737 *
738 * We first remove the thread from any queue on which it might be
739 * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
740 * TSO blocked_exception queues.
741 *
742 * We strip the stack down to the innermost CATCH_FRAME, building
743 * thunks in the heap for all the active computations, so they can
744 * be restarted if necessary. When we reach a CATCH_FRAME, we build
745 * an application of the handler to the exception, and push it on
746 * the top of the stack.
747 *
748 * How exactly do we save all the active computations? We create an
749 * AP_STACK for every UpdateFrame on the stack. Entering one of these
750 * AP_STACKs pushes everything from the corresponding update frame
751 * upwards onto the stack. (Actually, it pushes everything up to the
752 * next update frame plus a pointer to the next AP_STACK object.
753 * Entering the next AP_STACK object pushes more onto the stack until we
754 * reach the last AP_STACK object - at which point the stack should look
755 * exactly as it did when we killed the TSO and we can continue
756 * execution by entering the closure on top of the stack.
757 *
758 * We can also kill a thread entirely - this happens if either (a) the
759 * exception passed to raiseAsync is NULL, or (b) there's no
760 * CATCH_FRAME on the stack. In either case, we strip the entire
761 * stack and replace the thread with a zombie.
762 *
763 * ToDo: in THREADED_RTS mode, this function is only safe if either
764 * (a) we hold all the Capabilities (eg. in GC, or if there is only
765 * one Capability), or (b) we own the Capability that the TSO is
766 * currently blocked on or on the run queue of.
767 *
768 * -------------------------------------------------------------------------- */
769
770 StgTSO *
raiseAsync(Capability * cap,StgTSO * tso,StgClosure * exception,bool stop_at_atomically,StgUpdateFrame * stop_here)771 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
772 bool stop_at_atomically, StgUpdateFrame *stop_here)
773 {
774 const StgRetInfoTable *info;
775 StgPtr sp, frame;
776 StgClosure *updatee;
777 uint32_t i;
778 StgStack *stack;
779
780 debugTraceCap(DEBUG_sched, cap,
781 "raising exception in thread %ld.", (long)tso->id);
782
783 #if defined(PROFILING)
784 /*
785 * Debugging tool: on raising an exception, show where we are.
786 * See also Exception.cmm:stg_raisezh.
787 * This wasn't done for asynchronous exceptions originally; see #1450
788 */
789 if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
790 {
791 fprintCCS_stderr(tso->prof.cccs,exception,tso);
792 }
793 #endif
794 // ASSUMES: the thread is not already complete or dead
795 // Upper layers should deal with that.
796 ASSERT(tso->what_next != ThreadComplete &&
797 tso->what_next != ThreadKilled);
798
799 // only if we own this TSO (except that deleteThread() calls this
800 ASSERT(tso->cap == cap);
801
802 stack = tso->stackobj;
803
804 // mark it dirty; we're about to change its stack.
805 dirty_TSO(cap, tso);
806 dirty_STACK(cap, stack);
807
808 sp = stack->sp;
809
810 if (stop_here != NULL) {
811 updatee = stop_here->updatee;
812 } else {
813 updatee = NULL;
814 }
815
816 // The stack freezing code assumes there's a closure pointer on
817 // the top of the stack, so we have to arrange that this is the case...
818 //
819 if (sp[0] == (W_)&stg_enter_info) {
820 sp++;
821 } else {
822 sp--;
823 sp[0] = (W_)&stg_dummy_ret_closure;
824 }
825
826 frame = sp + 1;
827 while (stop_here == NULL || frame < (StgPtr)stop_here) {
828
829 // 1. Let the top of the stack be the "current closure"
830 //
831 // 2. Walk up the stack until we find either an UPDATE_FRAME or a
832 // CATCH_FRAME.
833 //
834 // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
835 // current closure applied to the chunk of stack up to (but not
836 // including) the update frame. This closure becomes the "current
837 // closure". Go back to step 2.
838 //
839 // 4. If it's a CATCH_FRAME, then leave the exception handler on
840 // top of the stack applied to the exception.
841 //
842 // 5. If it's a STOP_FRAME, then kill the thread.
843 //
844 // 6. If it's an UNDERFLOW_FRAME, then continue with the next
845 // stack chunk.
846 //
847 // NB: if we pass an ATOMICALLY_FRAME then abort the associated
848 // transaction
849
850 info = get_ret_itbl((StgClosure *)frame);
851
852 switch (info->i.type) {
853
854 case UPDATE_FRAME:
855 {
856 StgAP_STACK * ap;
857 uint32_t words;
858
859 // First build an AP_STACK consisting of the stack chunk above the
860 // current update frame, with the top word on the stack as the
861 // fun field.
862 //
863 words = frame - sp - 1;
864 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
865
866 ap->size = words;
867 ap->fun = (StgClosure *)sp[0];
868
869 sp++;
870 for(i=0; i < words; ++i) {
871 ap->payload[i] = (StgClosure *)*sp++;
872 }
873
874 write_barrier(); // XXX: Necessary?
875 SET_HDR(ap,&stg_AP_STACK_info,
876 ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
877 TICK_ALLOC_UP_THK(WDS(words+1),0);
878
879 //IF_DEBUG(scheduler,
880 // debugBelch("sched: Updating ");
881 // printPtr((P_)((StgUpdateFrame *)frame)->updatee);
882 // debugBelch(" with ");
883 // printObj((StgClosure *)ap);
884 // );
885
886 if (((StgUpdateFrame *)frame)->updatee == updatee) {
887 // If this update frame points to the same closure as
888 // the update frame further down the stack
889 // (stop_here), then don't perform the update. We
890 // want to keep the blackhole in this case, so we can
891 // detect and report the loop (#2783).
892 ap = (StgAP_STACK*)updatee;
893 } else {
894 // Perform the update
895 // TODO: this may waste some work, if the thunk has
896 // already been updated by another thread.
897 updateThunk(cap, tso,
898 ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
899 }
900
901 sp += sizeofW(StgUpdateFrame) - 1;
902 sp[0] = (W_)ap; // push onto stack
903 frame = sp + 1;
904 continue; //no need to bump frame
905 }
906
907 case UNDERFLOW_FRAME:
908 {
909 StgAP_STACK * ap;
910 uint32_t words;
911
912 // First build an AP_STACK consisting of the stack chunk above the
913 // current update frame, with the top word on the stack as the
914 // fun field.
915 //
916 words = frame - sp - 1;
917 ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
918
919 ap->size = words;
920 ap->fun = (StgClosure *)sp[0];
921 sp++;
922 for(i=0; i < words; ++i) {
923 ap->payload[i] = (StgClosure *)*sp++;
924 }
925
926 SET_HDR(ap,&stg_AP_STACK_NOUPD_info,stack->header.prof.ccs);
927 TICK_ALLOC_SE_THK(WDS(words+1),0);
928
929 stack->sp = sp;
930 threadStackUnderflow(cap,tso);
931 stack = tso->stackobj;
932 sp = stack->sp;
933
934 sp--;
935 sp[0] = (W_)ap;
936 frame = sp + 1;
937 continue;
938 }
939
940 case STOP_FRAME:
941 {
942 // We've stripped the entire stack, the thread is now dead.
943 tso->what_next = ThreadKilled;
944 stack->sp = frame + sizeofW(StgStopFrame);
945 goto done;
946 }
947
948 case CATCH_FRAME:
949 // If we find a CATCH_FRAME, and we've got an exception to raise,
950 // then build the THUNK raise(exception), and leave it on
951 // top of the CATCH_FRAME ready to enter.
952 //
953 {
954 StgCatchFrame *cf = (StgCatchFrame *)frame;
955 StgThunk *raise;
956
957 if (exception == NULL) break;
958
959 // we've got an exception to raise, so let's pass it to the
960 // handler in this frame.
961 //
962 raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
963 TICK_ALLOC_SE_THK(WDS(1),0);
964 SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
965 raise->payload[0] = exception;
966
967 // throw away the stack from Sp up to the CATCH_FRAME.
968 //
969 sp = frame - 1;
970
971 /* Ensure that async exceptions are blocked now, so we don't get
972 * a surprise exception before we get around to executing the
973 * handler.
974 */
975 tso->flags |= TSO_BLOCKEX;
976 if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
977 tso->flags &= ~TSO_INTERRUPTIBLE;
978 } else {
979 tso->flags |= TSO_INTERRUPTIBLE;
980 }
981
982 /* Put the newly-built THUNK on top of the stack, ready to execute
983 * when the thread restarts.
984 */
985 sp[0] = (W_)raise;
986 sp[-1] = (W_)&stg_enter_info;
987 stack->sp = sp-1;
988 RELAXED_STORE(&tso->what_next, ThreadRunGHC);
989 goto done;
990 }
991
992 case ATOMICALLY_FRAME:
993 if (stop_at_atomically) {
994 ASSERT(tso->trec->enclosing_trec == NO_TREC);
995 stmCondemnTransaction(cap, tso -> trec);
996 stack->sp = frame - 2;
997 // The ATOMICALLY_FRAME expects to be returned a
998 // result from the transaction, which it stores in the
999 // stack frame. Hence we arrange to return a dummy
1000 // result, so that the GC doesn't get upset (#3578).
1001 // Perhaps a better way would be to have a different
1002 // ATOMICALLY_FRAME instance for condemned
1003 // transactions, but I don't fully understand the
1004 // interaction with STM invariants.
1005 stack->sp[1] = (W_)&stg_NO_TREC_closure;
1006 stack->sp[0] = (W_)&stg_ret_p_info;
1007 tso->what_next = ThreadRunGHC;
1008 goto done;
1009 }
1010 else
1011 {
1012 // Freezing an STM transaction. Just aborting the
1013 // transaction would be wrong; this is what we used to
1014 // do, and it goes wrong if the ATOMICALLY_FRAME ever
1015 // gets back onto the stack again, which it will do if
1016 // the transaction is inside unsafePerformIO or
1017 // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
1018 //
1019 // So we want to make it so that if the enclosing
1020 // computation is resumed, we will re-execute the
1021 // transaction. We therefore:
1022 //
1023 // 1. abort the current transaction
1024 // 3. replace the stack up to and including the
1025 // atomically frame with a closure representing
1026 // a call to "atomically x", where x is the code
1027 // of the transaction.
1028 // 4. continue stripping the stack
1029 //
1030 StgTRecHeader *trec = tso->trec;
1031 StgTRecHeader *outer = trec->enclosing_trec;
1032
1033 StgThunk *atomically;
1034 StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
1035
1036 debugTraceCap(DEBUG_stm, cap,
1037 "raiseAsync: freezing atomically frame")
1038 stmAbortTransaction(cap, trec);
1039 stmFreeAbortedTRec(cap, trec);
1040 tso->trec = outer;
1041
1042 atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
1043 TICK_ALLOC_SE_THK(1,0);
1044 SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
1045 atomically->payload[0] = af->code;
1046
1047 // discard stack up to and including the ATOMICALLY_FRAME
1048 frame += sizeofW(StgAtomicallyFrame);
1049 sp = frame - 1;
1050
1051 // replace the ATOMICALLY_FRAME with call to atomically#
1052 sp[0] = (W_)atomically;
1053 continue;
1054 }
1055
1056 case CATCH_STM_FRAME:
1057 case CATCH_RETRY_FRAME:
1058 // CATCH frames within an atomically block: abort the
1059 // inner transaction and continue. Eventually we will
1060 // hit the outer transaction that will get frozen (see
1061 // above).
1062 //
1063 // In this case (unlike ordinary exceptions) we do not care
1064 // whether the transaction is valid or not because its
1065 // possible validity cannot have caused the exception
1066 // and will not be visible after the abort.
1067 {
1068 StgTRecHeader *trec = tso -> trec;
1069 StgTRecHeader *outer = trec -> enclosing_trec;
1070 debugTraceCap(DEBUG_stm, cap,
1071 "found atomically block delivering async exception");
1072 stmAbortTransaction(cap, trec);
1073 stmFreeAbortedTRec(cap, trec);
1074 tso -> trec = outer;
1075 break;
1076 };
1077
1078 default:
1079 break;
1080 }
1081
1082 // move on to the next stack frame
1083 frame += stack_frame_sizeW((StgClosure *)frame);
1084 }
1085
1086 done:
1087 IF_DEBUG(sanity, checkTSO(tso));
1088
1089 // wake it up
1090 if (tso->why_blocked != NotBlocked) {
1091 tso->why_blocked = NotBlocked;
1092 appendToRunQueue(cap,tso);
1093 }
1094
1095 return tso;
1096 }
1097