1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8 
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "StablePtr.h"
45 #include "StableName.h"
46 #include "TopHandler.h"
47 #include "sm/NonMoving.h"
48 #include "sm/NonMovingMark.h"
49 
50 #if defined(HAVE_SYS_TYPES_H)
51 #include <sys/types.h>
52 #endif
53 #if defined(HAVE_UNISTD_H)
54 #include <unistd.h>
55 #endif
56 
57 #include <string.h>
58 #include <stdlib.h>
59 #include <stdarg.h>
60 
61 #if defined(HAVE_ERRNO_H)
62 #include <errno.h>
63 #endif
64 
65 #if defined(TRACING)
66 #include "eventlog/EventLog.h"
67 #endif
68 /* -----------------------------------------------------------------------------
69  * Global variables
70  * -------------------------------------------------------------------------- */
71 
72 #if !defined(THREADED_RTS)
73 // Blocked/sleeping threads
74 StgTSO *blocked_queue_hd = NULL;
75 StgTSO *blocked_queue_tl = NULL;
76 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
77 #endif
78 
79 // Bytes allocated since the last time a HeapOverflow exception was thrown by
80 // the RTS
81 uint64_t allocated_bytes_at_heapoverflow = 0;
82 
83 /* Set to true when the latest garbage collection failed to reclaim enough
84  * space, and the runtime should proceed to shut itself down in an orderly
85  * fashion (emitting profiling info etc.), OR throw an exception to the main
86  * thread, if it is still alive.
87  */
88 bool heap_overflow = false;
89 
90 /* flag that tracks whether we have done any execution in this time slice.
91  * LOCK: currently none, perhaps we should lock (but needs to be
92  * updated in the fast path of the scheduler).
93  *
94  * NB. must be StgWord, we do xchg() on it.
95  */
96 volatile StgWord recent_activity = ACTIVITY_YES;
97 
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes monotonically)
100  */
101 volatile StgWord sched_state = SCHED_RUNNING;
102 
103 /*
104  * This mutex protects most of the global scheduler data in
105  * the THREADED_RTS runtime.
106  */
107 #if defined(THREADED_RTS)
108 Mutex sched_mutex;
109 #endif
110 
111 #if !defined(mingw32_HOST_OS)
112 #define FORKPROCESS_PRIMOP_SUPPORTED
113 #endif
114 
115 /*
116  * sync_finished_cond allows threads which do not own any capability (e.g. the
117  * concurrent mark thread) to participate in the sync protocol. In particular,
118  * if such a thread requests a sync while sync is already in progress it will
119  * block on sync_finished_cond, which will be signalled when the sync is
120  * finished (by releaseAllCapabilities).
121  */
122 #if defined(THREADED_RTS)
123 static Condition sync_finished_cond;
124 static Mutex sync_finished_mutex;
125 #endif
126 
127 
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131 
132 static Capability *schedule (Capability *initialCapability, Task *task);
133 
134 //
135 // These functions all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void scheduleFindWork (Capability **pcap);
140 #if defined(THREADED_RTS)
141 static void scheduleYield (Capability **pcap, Task *task);
142 #endif
143 #if defined(THREADED_RTS)
144 static bool requestSync (Capability **pcap, Task *task,
145                          PendingSync *sync_type, SyncType *prev_sync_type);
146 static void acquireAllCapabilities(Capability *cap, Task *task);
147 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
148                               uint32_t to USED_IF_THREADS);
149 #endif
150 static void scheduleStartSignalHandlers (Capability *cap);
151 static void scheduleCheckBlockedThreads (Capability *cap);
152 static void scheduleProcessInbox(Capability **cap);
153 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
154 static void schedulePushWork(Capability *cap, Task *task);
155 #if defined(THREADED_RTS)
156 static void scheduleActivateSpark(Capability *cap);
157 #endif
158 static void schedulePostRunThread(Capability *cap, StgTSO *t);
159 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
160 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
161                                  uint32_t prev_what_next );
162 static void scheduleHandleThreadBlocked( StgTSO *t );
163 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
164                                           StgTSO *t );
165 static bool scheduleNeedHeapProfile(bool ready_to_gc);
166 static void scheduleDoGC( Capability **pcap, Task *task,
167                           bool force_major, bool deadlock_detect );
168 
169 static void deleteThread (StgTSO *tso);
170 static void deleteAllThreads (void);
171 
172 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
173 static void deleteThread_(StgTSO *tso);
174 #endif
175 
176 /* ---------------------------------------------------------------------------
177    Main scheduling loop.
178 
179    We use round-robin scheduling, each thread returning to the
180    scheduler loop when one of these conditions is detected:
181 
182       * out of heap space
183       * timer expires (thread yields)
184       * thread blocks
185       * thread ends
186       * stack overflow
187 
188    ------------------------------------------------------------------------ */
189 
190 static Capability *
schedule(Capability * initialCapability,Task * task)191 schedule (Capability *initialCapability, Task *task)
192 {
193   StgTSO *t;
194   Capability *cap;
195   StgThreadReturnCode ret;
196   uint32_t prev_what_next;
197   bool ready_to_gc;
198 
199   cap = initialCapability;
200 
201   // Pre-condition: this task owns initialCapability.
202   // The sched_mutex is *NOT* held
203   // NB. on return, we still hold a capability.
204   ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
205 
206   debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
207 
208   // -----------------------------------------------------------
209   // Scheduler loop starts here:
210 
211   while (1) {
212     ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
213 
214     // Check whether we have re-entered the RTS from Haskell without
215     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
216     // call).
217     if (cap->in_haskell) {
218           errorBelch("schedule: re-entered unsafely.\n"
219                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
220           stg_exit(EXIT_FAILURE);
221     }
222 
223     // Note [shutdown]: The interruption / shutdown sequence.
224     //
225     // In order to cleanly shut down the runtime, we want to:
226     //   * make sure that all main threads return to their callers
227     //     with the state 'Interrupted'.
228     //   * clean up all OS threads assocated with the runtime
229     //   * free all memory etc.
230     //
231     // So the sequence goes like this:
232     //
233     //   * The shutdown sequence is initiated by calling hs_exit(),
234     //     interruptStgRts(), or running out of memory in the GC.
235     //
236     //   * Set sched_state = SCHED_INTERRUPTING
237     //
238     //   * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
239     //     scheduleDoGC(), which halts the whole runtime by acquiring all the
240     //     capabilities, does a GC and then calls deleteAllThreads() to kill all
241     //     the remaining threads.  The zombies are left on the run queue for
242     //     cleaning up.  We can't kill threads involved in foreign calls.
243     //
244     //   * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
245     //
246     //   * After this point, there can be NO MORE HASKELL EXECUTION.  This is
247     //     enforced by the scheduler, which won't run any Haskell code when
248     //     sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
249     //     other capabilities by doing the GC earlier.
250     //
251     //   * all workers exit when the run queue on their capability
252     //     drains.  All main threads will also exit when their TSO
253     //     reaches the head of the run queue and they can return.
254     //
255     //   * eventually all Capabilities will shut down, and the RTS can
256     //     exit.
257     //
258     //   * We might be left with threads blocked in foreign calls,
259     //     we should really attempt to kill these somehow (TODO).
260 
261     switch (RELAXED_LOAD(&sched_state)) {
262     case SCHED_RUNNING:
263         break;
264     case SCHED_INTERRUPTING:
265         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
266         /* scheduleDoGC() deletes all the threads */
267         scheduleDoGC(&cap,task,true,false);
268 
269         // after scheduleDoGC(), we must be shutting down.  Either some
270         // other Capability did the final GC, or we did it above,
271         // either way we can fall through to the SCHED_SHUTTING_DOWN
272         // case now.
273         ASSERT(RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN);
274         // fall through
275 
276     case SCHED_SHUTTING_DOWN:
277         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
278         // If we are a worker, just exit.  If we're a bound thread
279         // then we will exit below when we've removed our TSO from
280         // the run queue.
281         if (!isBoundTask(task) && emptyRunQueue(cap)) {
282             return cap;
283         }
284         break;
285     default:
286         barf("sched_state: %" FMT_Word, sched_state);
287     }
288 
289     scheduleFindWork(&cap);
290 
291     /* work pushing, currently relevant only for THREADED_RTS:
292        (pushes threads, wakes up idle capabilities for stealing) */
293     schedulePushWork(cap,task);
294 
295     scheduleDetectDeadlock(&cap,task);
296 
297     // Normally, the only way we can get here with no threads to
298     // run is if a keyboard interrupt received during
299     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
300     // Additionally, it is not fatal for the
301     // threaded RTS to reach here with no threads to run.
302     //
303     // win32: might be here due to awaitEvent() being abandoned
304     // as a result of a console event having been delivered.
305 
306 #if defined(THREADED_RTS)
307     scheduleYield(&cap,task);
308 
309     if (emptyRunQueue(cap)) continue; // look for work again
310 #endif
311 
312 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
313     if ( emptyRunQueue(cap) ) {
314         ASSERT(sched_state >= SCHED_INTERRUPTING);
315     }
316 #endif
317 
318     //
319     // Get a thread to run
320     //
321     t = popRunQueue(cap);
322 
323     // Sanity check the thread we're about to run.  This can be
324     // expensive if there is lots of thread switching going on...
325     IF_DEBUG(sanity,checkTSO(t));
326 
327 #if defined(THREADED_RTS)
328     // Check whether we can run this thread in the current task.
329     // If not, we have to pass our capability to the right task.
330     {
331         InCall *bound = t->bound;
332 
333         if (bound) {
334             if (bound->task == task) {
335                 // yes, the Haskell thread is bound to the current native thread
336             } else {
337                 debugTrace(DEBUG_sched,
338                            "thread %lu bound to another OS thread",
339                            (unsigned long)t->id);
340                 // no, bound to a different Haskell thread: pass to that thread
341                 pushOnRunQueue(cap,t);
342                 continue;
343             }
344         } else {
345             // The thread we want to run is unbound.
346             if (task->incall->tso) {
347                 debugTrace(DEBUG_sched,
348                            "this OS thread cannot run thread %lu",
349                            (unsigned long)t->id);
350                 // no, the current native thread is bound to a different
351                 // Haskell thread, so pass it to any worker thread
352                 pushOnRunQueue(cap,t);
353                 continue;
354             }
355         }
356     }
357 #endif
358 
359     // If we're shutting down, and this thread has not yet been
360     // killed, kill it now.  This sometimes happens when a finalizer
361     // thread is created by the final GC, or a thread previously
362     // in a foreign call returns.
363     if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING &&
364         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
365         deleteThread(t);
366     }
367 
368     // If this capability is disabled, migrate the thread away rather
369     // than running it.  NB. but not if the thread is bound: it is
370     // really hard for a bound thread to migrate itself.  Believe me,
371     // I tried several ways and couldn't find a way to do it.
372     // Instead, when everything is stopped for GC, we migrate all the
373     // threads on the run queue then (see scheduleDoGC()).
374     //
375     // ToDo: what about TSO_LOCKED?  Currently we're migrating those
376     // when the number of capabilities drops, but we never migrate
377     // them back if it rises again.  Presumably we should, but after
378     // the thread has been migrated we no longer know what capability
379     // it was originally on.
380 #if defined(THREADED_RTS)
381     if (cap->disabled && !t->bound) {
382         Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
383         migrateThread(cap, t, dest_cap);
384         continue;
385     }
386 #endif
387 
388     /* context switches are initiated by the timer signal, unless
389      * the user specified "context switch as often as possible", with
390      * +RTS -C0
391      */
392     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
393         && !emptyThreadQueues(cap)) {
394         RELAXED_STORE(&cap->context_switch, 1);
395     }
396 
397 run_thread:
398 
399     // CurrentTSO is the thread to run. It might be different if we
400     // loop back to run_thread, so make sure to set CurrentTSO after
401     // that.
402     cap->r.rCurrentTSO = t;
403 
404     startHeapProfTimer();
405 
406     // ----------------------------------------------------------------------
407     // Run the current thread
408 
409     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
410     ASSERT(t->cap == cap);
411     ASSERT(t->bound ? t->bound->task->cap == cap : 1);
412 
413     prev_what_next = t->what_next;
414 
415     errno = t->saved_errno;
416 #if defined(mingw32_HOST_OS)
417     SetLastError(t->saved_winerror);
418 #endif
419 
420     // reset the interrupt flag before running Haskell code
421     RELAXED_STORE(&cap->interrupt, false);
422 
423     cap->in_haskell = true;
424     RELAXED_STORE(&cap->idle, false);
425 
426     dirty_TSO(cap,t);
427     dirty_STACK(cap,t->stackobj);
428 
429     switch (SEQ_CST_LOAD(&recent_activity))
430     {
431     case ACTIVITY_DONE_GC: {
432         // ACTIVITY_DONE_GC means we turned off the timer signal to
433         // conserve power (see #1623).  Re-enable it here.
434         uint32_t prev;
435         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
436         if (prev == ACTIVITY_DONE_GC) {
437 #if !defined(PROFILING)
438             startTimer();
439 #endif
440         }
441         break;
442     }
443     case ACTIVITY_INACTIVE:
444         // If we reached ACTIVITY_INACTIVE, then don't reset it until
445         // we've done the GC.  The thread running here might just be
446         // the IO manager thread that handle_tick() woke up via
447         // wakeUpRts().
448         break;
449     default:
450         SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
451     }
452 
453     traceEventRunThread(cap, t);
454 
455     switch (prev_what_next) {
456 
457     case ThreadKilled:
458     case ThreadComplete:
459         /* Thread already finished, return to scheduler. */
460         ret = ThreadFinished;
461         break;
462 
463     case ThreadRunGHC:
464     {
465         StgRegTable *r;
466         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
467         cap = regTableToCapability(r);
468         ret = r->rRet;
469         break;
470     }
471 
472     case ThreadInterpret:
473         cap = interpretBCO(cap);
474         ret = cap->r.rRet;
475         break;
476 
477     default:
478         barf("schedule: invalid prev_what_next=%u field", prev_what_next);
479     }
480 
481     cap->in_haskell = false;
482 
483     // The TSO might have moved, eg. if it re-entered the RTS and a GC
484     // happened.  So find the new location:
485     t = cap->r.rCurrentTSO;
486 
487     // cap->r.rCurrentTSO is charged for calls to allocate(), so we
488     // don't want it set when not running a Haskell thread.
489     cap->r.rCurrentTSO = NULL;
490 
491     // And save the current errno in this thread.
492     // XXX: possibly bogus for SMP because this thread might already
493     // be running again, see code below.
494     t->saved_errno = errno;
495 #if defined(mingw32_HOST_OS)
496     // Similarly for Windows error code
497     t->saved_winerror = GetLastError();
498 #endif
499 
500     if (ret == ThreadBlocked) {
501         if (t->why_blocked == BlockedOnBlackHole) {
502             StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
503             traceEventStopThread(cap, t, t->why_blocked + 6,
504                                  owner != NULL ? owner->id : 0);
505         } else {
506             traceEventStopThread(cap, t, t->why_blocked + 6, 0);
507         }
508     } else {
509         if (ret == StackOverflow) {
510           traceEventStopThread(cap, t, ret, t->tot_stack_size);
511         } else {
512           traceEventStopThread(cap, t, ret, 0);
513         }
514     }
515 
516     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
517     ASSERT(t->cap == cap);
518 
519     // ----------------------------------------------------------------------
520 
521     // Costs for the scheduler are assigned to CCS_SYSTEM
522     stopHeapProfTimer();
523 #if defined(PROFILING)
524     cap->r.rCCCS = CCS_SYSTEM;
525 #endif
526 
527     schedulePostRunThread(cap,t);
528 
529     ready_to_gc = false;
530 
531     switch (ret) {
532     case HeapOverflow:
533         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
534         break;
535 
536     case StackOverflow:
537         // just adjust the stack for this thread, then pop it back
538         // on the run queue.
539         threadStackOverflow(cap, t);
540         pushOnRunQueue(cap,t);
541         break;
542 
543     case ThreadYielding:
544         if (scheduleHandleYield(cap, t, prev_what_next)) {
545             // shortcut for switching between compiler/interpreter:
546             goto run_thread;
547         }
548         break;
549 
550     case ThreadBlocked:
551         scheduleHandleThreadBlocked(t);
552         break;
553 
554     case ThreadFinished:
555         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
556         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
557         break;
558 
559     default:
560       barf("schedule: invalid thread return code %d", (int)ret);
561     }
562 
563     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
564       scheduleDoGC(&cap,task,false,false);
565     }
566   } /* end of while() */
567 }
568 
569 /* -----------------------------------------------------------------------------
570  * Run queue operations
571  * -------------------------------------------------------------------------- */
572 
573 static void
removeFromRunQueue(Capability * cap,StgTSO * tso)574 removeFromRunQueue (Capability *cap, StgTSO *tso)
575 {
576     if (tso->block_info.prev == END_TSO_QUEUE) {
577         ASSERT(cap->run_queue_hd == tso);
578         cap->run_queue_hd = tso->_link;
579     } else {
580         setTSOLink(cap, tso->block_info.prev, tso->_link);
581     }
582     if (tso->_link == END_TSO_QUEUE) {
583         ASSERT(cap->run_queue_tl == tso);
584         cap->run_queue_tl = tso->block_info.prev;
585     } else {
586         setTSOPrev(cap, tso->_link, tso->block_info.prev);
587     }
588     tso->_link = tso->block_info.prev = END_TSO_QUEUE;
589     cap->n_run_queue--;
590 
591     IF_DEBUG(sanity, checkRunQueue(cap));
592 }
593 
594 void
promoteInRunQueue(Capability * cap,StgTSO * tso)595 promoteInRunQueue (Capability *cap, StgTSO *tso)
596 {
597     removeFromRunQueue(cap, tso);
598     pushOnRunQueue(cap, tso);
599 }
600 
601 /* -----------------------------------------------------------------------------
602  * scheduleFindWork()
603  *
604  * Search for work to do, and handle messages from elsewhere.
605  * -------------------------------------------------------------------------- */
606 
607 static void
scheduleFindWork(Capability ** pcap)608 scheduleFindWork (Capability **pcap)
609 {
610     scheduleStartSignalHandlers(*pcap);
611 
612     scheduleProcessInbox(pcap);
613 
614     scheduleCheckBlockedThreads(*pcap);
615 
616 #if defined(THREADED_RTS)
617     if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
618 #endif
619 }
620 
621 #if defined(THREADED_RTS)
622 STATIC_INLINE bool
shouldYieldCapability(Capability * cap,Task * task,bool didGcLast)623 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
624 {
625     // we need to yield this capability to someone else if..
626     //   - another thread is initiating a GC, and we didn't just do a GC
627     //     (see Note [GC livelock])
628     //   - another Task is returning from a foreign call
629     //   - the thread at the head of the run queue cannot be run
630     //     by this Task (it is bound to another Task, or it is unbound
631     //     and this task it bound).
632     //
633     // Note [GC livelock]
634     //
635     // If we are interrupted to do a GC, then we do not immediately do
636     // another one.  This avoids a starvation situation where one
637     // Capability keeps forcing a GC and the other Capabilities make no
638     // progress at all.
639 
640     // Note [Data race in shouldYieldCapability]
641     // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
642     // We would usually need to hold cap->lock to look at n_returning_tasks but
643     // we don't here since this is just an approximate predicate. We
644     // consequently need to use atomic accesses whenever touching
645     // n_returning_tasks. However, since this is an approximate predicate we can
646     // use a RELAXED ordering.
647 
648     return ((RELAXED_LOAD(&pending_sync) && !didGcLast) ||
649             RELAXED_LOAD(&cap->n_returning_tasks) != 0 ||
650             (!emptyRunQueue(cap) && (task->incall->tso == NULL
651                                      ? peekRunQueue(cap)->bound != NULL
652                                      : peekRunQueue(cap)->bound != task->incall)));
653 }
654 
655 // This is the single place where a Task goes to sleep.  There are
656 // two reasons it might need to sleep:
657 //    - there are no threads to run
658 //    - we need to yield this Capability to someone else
659 //      (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate.  Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664 
665 static void
scheduleYield(Capability ** pcap,Task * task)666 scheduleYield (Capability **pcap, Task *task)
667 {
668     Capability *cap = *pcap;
669     bool didGcLast = false;
670 
671     // if we have work, and we don't need to give up the Capability, continue.
672     //
673     if (!shouldYieldCapability(cap,task,false) &&
674         (!emptyRunQueue(cap) ||
675          !emptyInbox(cap) ||
676          RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING)) {
677         return;
678     }
679 
680     // otherwise yield (sleep), and keep yielding if necessary.
681     do {
682         if (doIdleGCWork(cap, false)) {
683             // there's more idle GC work to do
684             didGcLast = false;
685         } else {
686             // no more idle GC work to do
687             didGcLast = yieldCapability(&cap,task, !didGcLast);
688         }
689     }
690     while (shouldYieldCapability(cap,task,didGcLast));
691 
692     // note there may still be no threads on the run queue at this
693     // point, the caller has to check.
694 
695     *pcap = cap;
696     return;
697 }
698 #endif
699 
700 /* -----------------------------------------------------------------------------
701  * schedulePushWork()
702  *
703  * Push work to other Capabilities if we have some.
704  * -------------------------------------------------------------------------- */
705 
706 static void
schedulePushWork(Capability * cap USED_IF_THREADS,Task * task USED_IF_THREADS)707 schedulePushWork(Capability *cap USED_IF_THREADS,
708                  Task *task      USED_IF_THREADS)
709 {
710 #if defined(THREADED_RTS)
711 
712     Capability *free_caps[n_capabilities], *cap0;
713     uint32_t i, n_wanted_caps, n_free_caps;
714 
715     uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
716 
717     // migration can be turned off with +RTS -qm
718     if (!RtsFlags.ParFlags.migrate) {
719         spare_threads = 0;
720     }
721 
722     // Figure out how many capabilities we want to wake up.  We need at least
723     // sparkPoolSize(cap) plus the number of spare threads we have.
724     n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
725     if (n_wanted_caps == 0) return;
726 
727     // First grab as many free Capabilities as we can.  ToDo: we should use
728     // capabilities on the same NUMA node preferably, but not exclusively.
729     for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
730          n_free_caps < n_wanted_caps && i != cap->no;
731          i = (i + 1) % n_capabilities) {
732         cap0 = capabilities[i];
733         if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
734             if (!emptyRunQueue(cap0)
735                 || RELAXED_LOAD(&cap0->n_returning_tasks) != 0
736                 || !emptyInbox(cap0)) {
737                 // it already has some work, we just grabbed it at
738                 // the wrong moment.  Or maybe it's deadlocked!
739                 releaseCapability(cap0);
740             } else {
741                 free_caps[n_free_caps++] = cap0;
742             }
743         }
744     }
745 
746     // We now have n_free_caps free capabilities stashed in
747     // free_caps[].  Attempt to share our run queue equally with them.
748     // This is complicated slightly by the fact that we can't move
749     // some threads:
750     //
751     //  - threads that have TSO_LOCKED cannot migrate
752     //  - a thread that is bound to the current Task cannot be migrated
753     //
754     // This is about the simplest thing we could do; improvements we
755     // might want to do include:
756     //
757     //   - giving high priority to moving relatively new threads, on
758     //     the gournds that they haven't had time to build up a
759     //     working set in the cache on this CPU/Capability.
760     //
761     //   - giving low priority to moving long-lived threads
762 
763     if (n_free_caps > 0) {
764         StgTSO *prev, *t, *next;
765 
766         debugTrace(DEBUG_sched,
767                    "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
768                    cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
769                    n_free_caps);
770 
771         // There are n_free_caps+1 caps in total.  We will share the threads
772         // evently between them, *except* that if the run queue does not divide
773         // evenly by n_free_caps+1 then we bias towards the current capability.
774         // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
775         uint32_t keep_threads =
776             (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
777 
778         // This also ensures that we don't give away all our threads, since
779         // (x + y) / (y + 1) >= 1 when x >= 1.
780 
781         // The number of threads we have left.
782         uint32_t n = cap->n_run_queue;
783 
784         // prev = the previous thread on this cap's run queue
785         prev = END_TSO_QUEUE;
786 
787         // We're going to walk through the run queue, migrating threads to other
788         // capabilities until we have only keep_threads left.  We might
789         // encounter a thread that cannot be migrated, in which case we add it
790         // to the current run queue and decrement keep_threads.
791         for (t = cap->run_queue_hd, i = 0;
792              t != END_TSO_QUEUE && n > keep_threads;
793              t = next)
794         {
795             next = t->_link;
796             t->_link = END_TSO_QUEUE;
797 
798             // Should we keep this thread?
799             if (t->bound == task->incall // don't move my bound thread
800                 || tsoLocked(t) // don't move a locked thread
801                 ) {
802                 if (prev == END_TSO_QUEUE) {
803                     cap->run_queue_hd = t;
804                 } else {
805                     setTSOLink(cap, prev, t);
806                 }
807                 setTSOPrev(cap, t, prev);
808                 prev = t;
809                 if (keep_threads > 0) keep_threads--;
810             }
811 
812             // Or migrate it?
813             else {
814                 appendToRunQueue(free_caps[i],t);
815                 traceEventMigrateThread (cap, t, free_caps[i]->no);
816 
817                 // See Note [Benign data race due to work-pushing].
818                 if (t->bound) {
819                     t->bound->task->cap = free_caps[i];
820                 }
821                 t->cap = free_caps[i];
822                 n--; // we have one fewer threads now
823                 i++; // move on to the next free_cap
824                 if (i == n_free_caps) i = 0;
825             }
826         }
827 
828         // Join up the beginning of the queue (prev)
829         // with the rest of the queue (t)
830         if (t == END_TSO_QUEUE) {
831             cap->run_queue_tl = prev;
832         } else {
833             setTSOPrev(cap, t, prev);
834         }
835         if (prev == END_TSO_QUEUE) {
836             cap->run_queue_hd = t;
837         } else {
838             setTSOLink(cap, prev, t);
839         }
840         cap->n_run_queue = n;
841 
842         IF_DEBUG(sanity, checkRunQueue(cap));
843 
844         // release the capabilities
845         for (i = 0; i < n_free_caps; i++) {
846             task->cap = free_caps[i];
847             if (sparkPoolSizeCap(cap) > 0) {
848                 // If we have sparks to steal, wake up a worker on the
849                 // capability, even if it has no threads to run.
850                 releaseAndWakeupCapability(free_caps[i]);
851             } else {
852                 releaseCapability(free_caps[i]);
853             }
854         }
855     }
856     task->cap = cap; // reset to point to our Capability.
857 
858 #endif /* THREADED_RTS */
859 
860 }
861 
862 /* ----------------------------------------------------------------------------
863  * Start any pending signal handlers
864  * ------------------------------------------------------------------------- */
865 
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
scheduleStartSignalHandlers(Capability * cap)868 scheduleStartSignalHandlers(Capability *cap)
869 {
870     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871         // safe outside the lock
872         startSignalHandlers(cap);
873     }
874 }
875 #else
876 static void
scheduleStartSignalHandlers(Capability * cap STG_UNUSED)877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881 
882 /* ----------------------------------------------------------------------------
883  * Check for blocked threads that can be woken up.
884  * ------------------------------------------------------------------------- */
885 
886 static void
scheduleCheckBlockedThreads(Capability * cap USED_IF_NOT_THREADS)887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890     //
891     // Check whether any waiting threads need to be woken up.  If the
892     // run queue is empty, and there are no other tasks running, we
893     // can wait indefinitely for something to happen.
894     //
895     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896     {
897         awaitEvent (emptyRunQueue(cap));
898     }
899 #endif
900 }
901 
902 /* ----------------------------------------------------------------------------
903  * Detect deadlock conditions and attempt to resolve them.
904  * ------------------------------------------------------------------------- */
905 
906 static void
scheduleDetectDeadlock(Capability ** pcap,Task * task)907 scheduleDetectDeadlock (Capability **pcap, Task *task)
908 {
909     Capability *cap = *pcap;
910     /*
911      * Detect deadlock: when we have no threads to run, there are no
912      * threads blocked, waiting for I/O, or sleeping, and all the
913      * other tasks are waiting for work, we must have a deadlock of
914      * some description.
915      */
916     if ( emptyThreadQueues(cap) )
917     {
918 #if defined(THREADED_RTS)
919         /*
920          * In the threaded RTS, we only check for deadlock if there
921          * has been no activity in a complete timeslice.  This means
922          * we won't eagerly start a full GC just because we don't have
923          * any threads to run currently.
924          */
925         if (SEQ_CST_LOAD(&recent_activity) != ACTIVITY_INACTIVE) return;
926 #endif
927 
928         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
929 
930         // Garbage collection can release some new threads due to
931         // either (a) finalizers or (b) threads resurrected because
932         // they are unreachable and will therefore be sent an
933         // exception.  Any threads thus released will be immediately
934         // runnable.
935         scheduleDoGC (pcap, task, true/*force major GC*/, true/*deadlock detection*/);
936         cap = *pcap;
937         // when force_major == true. scheduleDoGC sets
938         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
939         // signal.
940 
941         if ( !emptyRunQueue(cap) ) return;
942 
943 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
944         /* If we have user-installed signal handlers, then wait
945          * for signals to arrive rather then bombing out with a
946          * deadlock.
947          */
948         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
949             debugTrace(DEBUG_sched,
950                        "still deadlocked, waiting for signals...");
951 
952             awaitUserSignals();
953 
954             if (signals_pending()) {
955                 startSignalHandlers(cap);
956             }
957 
958             // either we have threads to run, or we were interrupted:
959             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
960 
961             return;
962         }
963 #endif
964 
965 #if !defined(THREADED_RTS)
966         /* Probably a real deadlock.  Send the current main thread the
967          * Deadlock exception.
968          */
969         if (task->incall->tso) {
970             switch (task->incall->tso->why_blocked) {
971             case BlockedOnSTM:
972             case BlockedOnBlackHole:
973             case BlockedOnMsgThrowTo:
974             case BlockedOnMVar:
975             case BlockedOnMVarRead:
976                 throwToSingleThreaded(cap, task->incall->tso,
977                                       (StgClosure *)nonTermination_closure);
978                 return;
979             default:
980                 barf("deadlock: main thread blocked in a strange way");
981             }
982         }
983         return;
984 #endif
985     }
986 }
987 
988 
989 /* ----------------------------------------------------------------------------
990  * Process message in the current Capability's inbox
991  * ------------------------------------------------------------------------- */
992 
993 static void
scheduleProcessInbox(Capability ** pcap USED_IF_THREADS)994 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
995 {
996 #if defined(THREADED_RTS)
997     Message *m, *next;
998     PutMVar *p, *pnext;
999     int r;
1000     Capability *cap = *pcap;
1001 
1002     while (!emptyInbox(cap)) {
1003         // Executing messages might use heap, so we should check for GC.
1004         if (doYouWantToGC(cap)) {
1005             scheduleDoGC(pcap, cap->running_task, false, false);
1006             cap = *pcap;
1007         }
1008 
1009         // don't use a blocking acquire; if the lock is held by
1010         // another thread then just carry on.  This seems to avoid
1011         // getting stuck in a message ping-pong situation with other
1012         // processors.  We'll check the inbox again later anyway.
1013         //
1014         // We should really use a more efficient queue data structure
1015         // here.  The trickiness is that we must ensure a Capability
1016         // never goes idle if the inbox is non-empty, which is why we
1017         // use cap->lock (cap->lock is released as the last thing
1018         // before going idle; see Capability.c:releaseCapability()).
1019         r = TRY_ACQUIRE_LOCK(&cap->lock);
1020         if (r != 0) return;
1021 
1022         m = cap->inbox;
1023         p = cap->putMVars;
1024         cap->inbox = (Message*)END_TSO_QUEUE;
1025         cap->putMVars = NULL;
1026 
1027         RELEASE_LOCK(&cap->lock);
1028 
1029         while (m != (Message*)END_TSO_QUEUE) {
1030             next = m->link;
1031             executeMessage(cap, m);
1032             m = next;
1033         }
1034 
1035         while (p != NULL) {
1036             pnext = p->link;
1037             performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1038                               Unit_closure);
1039             freeStablePtr(p->mvar);
1040             stgFree(p);
1041             p = pnext;
1042         }
1043     }
1044 #endif
1045 }
1046 
1047 
1048 /* ----------------------------------------------------------------------------
1049  * Activate spark threads (THREADED_RTS)
1050  * ------------------------------------------------------------------------- */
1051 
1052 #if defined(THREADED_RTS)
1053 static void
scheduleActivateSpark(Capability * cap)1054 scheduleActivateSpark(Capability *cap)
1055 {
1056     if (anySparks() && !cap->disabled)
1057     {
1058         createSparkThread(cap);
1059         debugTrace(DEBUG_sched, "creating a spark thread");
1060     }
1061 }
1062 #endif // THREADED_RTS
1063 
1064 /* ----------------------------------------------------------------------------
1065  * After running a thread...
1066  * ------------------------------------------------------------------------- */
1067 
1068 static void
schedulePostRunThread(Capability * cap,StgTSO * t)1069 schedulePostRunThread (Capability *cap, StgTSO *t)
1070 {
1071     // We have to be able to catch transactions that are in an
1072     // infinite loop as a result of seeing an inconsistent view of
1073     // memory, e.g.
1074     //
1075     //   atomically $ do
1076     //       [a,b] <- mapM readTVar [ta,tb]
1077     //       when (a == b) loop
1078     //
1079     // and a is never equal to b given a consistent view of memory.
1080     //
1081     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1082         if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1083             debugTrace(DEBUG_sched | DEBUG_stm,
1084                        "trec %p found wasting its time", t);
1085 
1086             // strip the stack back to the
1087             // ATOMICALLY_FRAME, aborting the (nested)
1088             // transaction, and saving the stack of any
1089             // partially-evaluated thunks on the heap.
1090             throwToSingleThreaded_(cap, t, NULL, true);
1091 
1092 //            ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1093         }
1094     }
1095 
1096     //
1097     // If the current thread's allocation limit has run out, send it
1098     // the AllocationLimitExceeded exception.
1099 
1100     if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1101         // Use a throwToSelf rather than a throwToSingleThreaded, because
1102         // it correctly handles the case where the thread is currently
1103         // inside mask.  Also the thread might be blocked (e.g. on an
1104         // MVar), and throwToSingleThreaded doesn't unblock it
1105         // correctly in that case.
1106         throwToSelf(cap, t, allocationLimitExceeded_closure);
1107         ASSIGN_Int64((W_*)&(t->alloc_limit),
1108                      (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1109     }
1110 
1111   /* some statistics gathering in the parallel case */
1112 }
1113 
1114 /* -----------------------------------------------------------------------------
1115  * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1116  * -------------------------------------------------------------------------- */
1117 
1118 static bool
scheduleHandleHeapOverflow(Capability * cap,StgTSO * t)1119 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1120 {
1121     if (cap->r.rHpLim == NULL || RELAXED_LOAD(&cap->context_switch)) {
1122         // Sometimes we miss a context switch, e.g. when calling
1123         // primitives in a tight loop, MAYBE_GC() doesn't check the
1124         // context switch flag, and we end up waiting for a GC.
1125         // See #1984, and concurrent/should_run/1984
1126         RELAXED_STORE(&cap->context_switch, 0);
1127         appendToRunQueue(cap,t);
1128     } else {
1129         pushOnRunQueue(cap,t);
1130     }
1131 
1132     // did the task ask for a large block?
1133     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1134         // if so, get one and push it on the front of the nursery.
1135         bdescr *bd;
1136         W_ blocks;
1137 
1138         blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1139 
1140         if (blocks > BLOCKS_PER_MBLOCK) {
1141             barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1142         }
1143 
1144         debugTrace(DEBUG_sched,
1145                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1146                    (long)t->id, what_next_strs[t->what_next], blocks);
1147 
1148         // don't do this if the nursery is (nearly) full, we'll GC first.
1149         if (cap->r.rCurrentNursery->link != NULL ||
1150             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent
1151                                                // infinite loop if the
1152                                                // nursery has only one
1153                                                // block.
1154 
1155             bd = allocGroupOnNode_lock(cap->node,blocks);
1156             cap->r.rNursery->n_blocks += blocks;
1157 
1158             // link the new group after CurrentNursery
1159             dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1160 
1161             // initialise it as a nursery block.  We initialise the
1162             // step, gen_no, and flags field of *every* sub-block in
1163             // this large block, because this is easier than making
1164             // sure that we always find the block head of a large
1165             // block whenever we call Bdescr() (eg. evacuate() and
1166             // isAlive() in the GC would both have to do this, at
1167             // least).
1168             {
1169                 bdescr *x;
1170                 for (x = bd; x < bd + blocks; x++) {
1171                     initBdescr(x,g0,g0);
1172                     x->free = x->start;
1173                     x->flags = 0;
1174                 }
1175             }
1176 
1177             // This assert can be a killer if the app is doing lots
1178             // of large block allocations.
1179             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1180 
1181             // now update the nursery to point to the new block
1182             finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1183             cap->r.rCurrentNursery = bd;
1184 
1185             // we might be unlucky and have another thread get on the
1186             // run queue before us and steal the large block, but in that
1187             // case the thread will just end up requesting another large
1188             // block.
1189             return false;  /* not actually GC'ing */
1190         }
1191     }
1192 
1193     return doYouWantToGC(cap);
1194     /* actual GC is done at the end of the while loop in schedule() */
1195 }
1196 
1197 /* -----------------------------------------------------------------------------
1198  * Handle a thread that returned to the scheduler with ThreadYielding
1199  * -------------------------------------------------------------------------- */
1200 
1201 static bool
scheduleHandleYield(Capability * cap,StgTSO * t,uint32_t prev_what_next)1202 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1203 {
1204     /* put the thread back on the run queue.  Then, if we're ready to
1205      * GC, check whether this is the last task to stop.  If so, wake
1206      * up the GC thread.  getThread will block during a GC until the
1207      * GC is finished.
1208      */
1209 
1210     ASSERT(t->_link == END_TSO_QUEUE);
1211 
1212     // Shortcut if we're just switching evaluators: just run the thread.  See
1213     // Note [avoiding threadPaused] in Interpreter.c.
1214     if (t->what_next != prev_what_next) {
1215         debugTrace(DEBUG_sched,
1216                    "--<< thread %ld (%s) stopped to switch evaluators",
1217                    (long)t->id, what_next_strs[t->what_next]);
1218         return true;
1219     }
1220 
1221     // Reset the context switch flag.  We don't do this just before
1222     // running the thread, because that would mean we would lose ticks
1223     // during GC, which can lead to unfair scheduling (a thread hogs
1224     // the CPU because the tick always arrives during GC).  This way
1225     // penalises threads that do a lot of allocation, but that seems
1226     // better than the alternative.
1227     if (RELAXED_LOAD(&cap->context_switch) != 0) {
1228         RELAXED_STORE(&cap->context_switch, 0);
1229         appendToRunQueue(cap,t);
1230     } else {
1231         pushOnRunQueue(cap,t);
1232     }
1233 
1234     IF_DEBUG(sanity,
1235              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1236              checkTSO(t));
1237 
1238     return false;
1239 }
1240 
1241 /* -----------------------------------------------------------------------------
1242  * Handle a thread that returned to the scheduler with ThreadBlocked
1243  * -------------------------------------------------------------------------- */
1244 
1245 static void
scheduleHandleThreadBlocked(StgTSO * t STG_UNUSED)1246 scheduleHandleThreadBlocked( StgTSO *t
1247 #if !defined(DEBUG)
1248     STG_UNUSED
1249 #endif
1250     )
1251 {
1252 
1253       // We don't need to do anything.  The thread is blocked, and it
1254       // has tidied up its stack and placed itself on whatever queue
1255       // it needs to be on.
1256 
1257     // ASSERT(t->why_blocked != NotBlocked);
1258     // Not true: for example,
1259     //    - the thread may have woken itself up already, because
1260     //      threadPaused() might have raised a blocked throwTo
1261     //      exception, see maybePerformBlockedException().
1262 
1263 #if defined(DEBUG)
1264     traceThreadStatus(DEBUG_sched, t);
1265 #endif
1266 }
1267 
1268 /* -----------------------------------------------------------------------------
1269  * Handle a thread that returned to the scheduler with ThreadFinished
1270  * -------------------------------------------------------------------------- */
1271 
1272 static bool
scheduleHandleThreadFinished(Capability * cap,Task * task,StgTSO * t)1273 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1274 {
1275     /* Need to check whether this was a main thread, and if so,
1276      * return with the return value.
1277      *
1278      * We also end up here if the thread kills itself with an
1279      * uncaught exception, see Exception.cmm.
1280      */
1281 
1282     // blocked exceptions can now complete, even if the thread was in
1283     // blocked mode (see #2910).
1284     awakenBlockedExceptionQueue (cap, t);
1285 
1286       //
1287       // Check whether the thread that just completed was a bound
1288       // thread, and if so return with the result.
1289       //
1290       // There is an assumption here that all thread completion goes
1291       // through this point; we need to make sure that if a thread
1292       // ends up in the ThreadKilled state, that it stays on the run
1293       // queue so it can be dealt with here.
1294       //
1295 
1296       if (t->bound) {
1297 
1298           if (t->bound != task->incall) {
1299 #if !defined(THREADED_RTS)
1300               // Must be a bound thread that is not the topmost one.  Leave
1301               // it on the run queue until the stack has unwound to the
1302               // point where we can deal with this.  Leaving it on the run
1303               // queue also ensures that the garbage collector knows about
1304               // this thread and its return value (it gets dropped from the
1305               // step->threads list so there's no other way to find it).
1306               appendToRunQueue(cap,t);
1307               return false;
1308 #else
1309               // this cannot happen in the threaded RTS, because a
1310               // bound thread can only be run by the appropriate Task.
1311               barf("finished bound thread that isn't mine");
1312 #endif
1313           }
1314 
1315           ASSERT(task->incall->tso == t);
1316 
1317           if (t->what_next == ThreadComplete) {
1318               if (task->incall->ret) {
1319                   // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1320                   *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1321               }
1322               task->incall->rstat = Success;
1323           } else {
1324               if (task->incall->ret) {
1325                   *(task->incall->ret) = NULL;
1326               }
1327               if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING) {
1328                   if (heap_overflow) {
1329                       task->incall->rstat = HeapExhausted;
1330                   } else {
1331                       task->incall->rstat = Interrupted;
1332                   }
1333               } else {
1334                   task->incall->rstat = Killed;
1335               }
1336           }
1337 #if defined(DEBUG)
1338           removeThreadLabel((StgWord)task->incall->tso->id);
1339 #endif
1340 
1341           // We no longer consider this thread and task to be bound to
1342           // each other.  The TSO lives on until it is GC'd, but the
1343           // task is about to be released by the caller, and we don't
1344           // want anyone following the pointer from the TSO to the
1345           // defunct task (which might have already been
1346           // re-used). This was a real bug: the GC updated
1347           // tso->bound->tso which lead to a deadlock.
1348           t->bound = NULL;
1349           task->incall->tso = NULL;
1350 
1351           return true; // tells schedule() to return
1352       }
1353 
1354       return false;
1355 }
1356 
1357 /* -----------------------------------------------------------------------------
1358  * Perform a heap census
1359  * -------------------------------------------------------------------------- */
1360 
1361 static bool
scheduleNeedHeapProfile(bool ready_to_gc)1362 scheduleNeedHeapProfile( bool ready_to_gc )
1363 {
1364     // When we have +RTS -i0 and we're heap profiling, do a census at
1365     // every GC.  This lets us get repeatable runs for debugging.
1366     if (performHeapProfile ||
1367         (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1368          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1369         return true;
1370     } else {
1371         return false;
1372     }
1373 }
1374 
1375 /* -----------------------------------------------------------------------------
1376  * stopAllCapabilities()
1377  *
1378  * Stop all Haskell execution.  This is used when we need to make some global
1379  * change to the system, such as altering the number of capabilities, or
1380  * forking.
1381  *
1382  * pCap may be NULL in the event that the caller doesn't yet own a capability.
1383  *
1384  * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1385  * -------------------------------------------------------------------------- */
1386 
1387 #if defined(THREADED_RTS)
stopAllCapabilities(Capability ** pCap,Task * task)1388 void stopAllCapabilities (Capability **pCap, Task *task)
1389 {
1390     stopAllCapabilitiesWith(pCap, task, SYNC_OTHER);
1391 }
1392 
stopAllCapabilitiesWith(Capability ** pCap,Task * task,SyncType sync_type)1393 void stopAllCapabilitiesWith (Capability **pCap, Task *task, SyncType sync_type)
1394 {
1395     bool was_syncing;
1396     SyncType prev_sync_type;
1397 
1398     PendingSync sync = {
1399         .type = sync_type,
1400         .idle = NULL,
1401         .task = task
1402     };
1403 
1404     do {
1405         was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1406     } while (was_syncing);
1407 
1408     acquireAllCapabilities(pCap ? *pCap : NULL, task);
1409 
1410     pending_sync = 0;
1411     signalCondition(&sync_finished_cond);
1412 }
1413 #endif
1414 
1415 /* -----------------------------------------------------------------------------
1416  * requestSync()
1417  *
1418  * Commence a synchronisation between all capabilities.  Normally not called
1419  * directly, instead use stopAllCapabilities().  This is used by the GC, which
1420  * has some special synchronisation requirements.
1421  *
1422  * Note that this can be called in two ways:
1423  *
1424  * - where *pcap points to a capability owned by the caller: in this case
1425  *   *prev_sync_type will reflect the in-progress sync type on return, if one
1426  *   *was found
1427  *
1428  *  - where pcap == NULL: in this case the caller doesn't hold a capability.
1429  *    we only return whether or not a pending sync was found and prev_sync_type
1430  *    is unchanged.
1431  *
1432  * Returns:
1433  *    false if we successfully got a sync
1434  *    true  if there was another sync request in progress,
1435  *             and we yielded to it.  The value returned is the
1436  *             type of the other sync request.
1437  * -------------------------------------------------------------------------- */
1438 
1439 #if defined(THREADED_RTS)
requestSync(Capability ** pcap,Task * task,PendingSync * new_sync,SyncType * prev_sync_type)1440 static bool requestSync (
1441     Capability **pcap, Task *task, PendingSync *new_sync,
1442     SyncType *prev_sync_type)
1443 {
1444     PendingSync *sync;
1445 
1446     sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1447                              (StgWord)NULL,
1448                              (StgWord)new_sync);
1449 
1450     if (sync != NULL)
1451     {
1452         // sync is valid until we have called yieldCapability().
1453         // After the sync is completed, we cannot read that struct any
1454         // more because it has been freed.
1455         *prev_sync_type = sync->type;
1456         if (pcap == NULL) {
1457             // The caller does not hold a capability (e.g. may be a concurrent
1458             // mark thread). Consequently we must wait until the pending sync is
1459             // finished before proceeding to ensure we don't loop.
1460             // TODO: Don't busy-wait
1461             ACQUIRE_LOCK(&sync_finished_mutex);
1462             while (pending_sync) {
1463                 waitCondition(&sync_finished_cond, &sync_finished_mutex);
1464             }
1465             RELEASE_LOCK(&sync_finished_mutex);
1466         } else {
1467             do {
1468                 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1469                           sync->type);
1470                 ASSERT(*pcap);
1471                 yieldCapability(pcap,task,true);
1472                 sync = SEQ_CST_LOAD(&pending_sync);
1473             } while (sync != NULL);
1474         }
1475 
1476         // NOTE: task->cap might have changed now
1477         return true;
1478     }
1479     else
1480     {
1481         return false;
1482     }
1483 }
1484 #endif
1485 
1486 /* -----------------------------------------------------------------------------
1487  * acquireAllCapabilities()
1488  *
1489  * Grab all the capabilities except the one we already hold (cap may be NULL is
1490  * the caller does not currently hold a capability). Used when synchronising
1491  * before a single-threaded GC (SYNC_SEQ_GC), and before a fork (SYNC_OTHER).
1492  *
1493  * Only call this after requestSync(), otherwise a deadlock might
1494  * ensue if another thread is trying to synchronise.
1495  * -------------------------------------------------------------------------- */
1496 
1497 #if defined(THREADED_RTS)
acquireAllCapabilities(Capability * cap,Task * task)1498 static void acquireAllCapabilities(Capability *cap, Task *task)
1499 {
1500     Capability *tmpcap;
1501     uint32_t i;
1502 
1503     ASSERT(SEQ_CST_LOAD(&pending_sync) != NULL);
1504     for (i=0; i < n_capabilities; i++) {
1505         debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1506                    i, n_capabilities);
1507         tmpcap = capabilities[i];
1508         if (tmpcap != cap) {
1509             // we better hope this task doesn't get migrated to
1510             // another Capability while we're waiting for this one.
1511             // It won't, because load balancing happens while we have
1512             // all the Capabilities, but even so it's a slightly
1513             // unsavoury invariant.
1514             task->cap = tmpcap;
1515             waitForCapability(&tmpcap, task);
1516             if (tmpcap->no != i) {
1517                 barf("acquireAllCapabilities: got the wrong capability");
1518             }
1519         }
1520     }
1521     task->cap = cap == NULL ? tmpcap : cap;
1522 }
1523 #endif
1524 
1525 /* -----------------------------------------------------------------------------
1526  * releaseAllCapabilities()
1527  *
1528  * Assuming this thread holds all the capabilities, release them all (except for
1529  * the one passed in as keep_cap, if non-NULL).
1530  * -------------------------------------------------------------------------- */
1531 
1532 #if defined(THREADED_RTS)
releaseAllCapabilities(uint32_t n,Capability * keep_cap,Task * task)1533 void releaseAllCapabilities(uint32_t n, Capability *keep_cap, Task *task)
1534 {
1535     uint32_t i;
1536 
1537     for (i = 0; i < n; i++) {
1538         Capability *tmpcap = capabilities[i];
1539         if (keep_cap != tmpcap) {
1540             task->cap = tmpcap;
1541             releaseCapability(tmpcap);
1542         }
1543     }
1544     task->cap = keep_cap;
1545 }
1546 #endif
1547 
1548 /* -----------------------------------------------------------------------------
1549  * Perform a garbage collection if necessary
1550  * -------------------------------------------------------------------------- */
1551 
1552 // N.B. See Note [Deadlock detection under nonmoving collector] for rationale
1553 // behind deadlock_detect argument.
1554 static void
scheduleDoGC(Capability ** pcap,Task * task USED_IF_THREADS,bool force_major,bool deadlock_detect)1555 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1556               bool force_major, bool deadlock_detect)
1557 {
1558     Capability *cap = *pcap;
1559     bool heap_census;
1560     uint32_t collect_gen;
1561     bool major_gc;
1562 #if defined(THREADED_RTS)
1563     uint32_t gc_type;
1564     uint32_t i;
1565     uint32_t need_idle;
1566     uint32_t n_gc_threads;
1567     uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1568     StgTSO *tso;
1569     bool *idle_cap;
1570       // idle_cap is an array (allocated later) of size n_capabilities, where
1571       // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1572       // cycle.
1573 #endif
1574 
1575     if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
1576         // The final GC has already been done, and the system is
1577         // shutting down.  We'll probably deadlock if we try to GC
1578         // now.
1579         return;
1580     }
1581 
1582     heap_census = scheduleNeedHeapProfile(true);
1583 
1584     // Figure out which generation we are collecting, so that we can
1585     // decide whether this is a parallel GC or not.
1586     collect_gen = calcNeeded(force_major || heap_census, NULL);
1587     major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1588 
1589 #if defined(THREADED_RTS)
1590     if (RELAXED_LOAD(&sched_state) < SCHED_INTERRUPTING
1591         && RtsFlags.ParFlags.parGcEnabled
1592         && collect_gen >= RtsFlags.ParFlags.parGcGen
1593         && ! oldest_gen->mark)
1594     {
1595         gc_type = SYNC_GC_PAR;
1596     } else {
1597         gc_type = SYNC_GC_SEQ;
1598     }
1599 
1600     // In order to GC, there must be no threads running Haskell code.
1601     // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1602     // capabilities, and release them after the GC has completed.  For parallel
1603     // GC, we synchronise all the running threads using requestSync().
1604     //
1605     // Other capabilities are prevented from running yet more Haskell threads if
1606     // pending_sync is set. Tested inside yieldCapability() and
1607     // releaseCapability() in Capability.c
1608 
1609     PendingSync sync = {
1610         .type = gc_type,
1611         .idle = NULL,
1612         .task = task
1613     };
1614 
1615     {
1616         SyncType prev_sync = 0;
1617         bool was_syncing;
1618         do {
1619             // If -qn is not set and we have more capabilities than cores, set
1620             // the number of GC threads to #cores.  We do this here rather than
1621             // in normaliseRtsOpts() because here it will work if the program
1622             // calls setNumCapabilities.
1623             //
1624             n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1625             if (n_gc_threads == 0 &&
1626                 enabled_capabilities > getNumberOfProcessors()) {
1627                 n_gc_threads = getNumberOfProcessors();
1628             }
1629 
1630             // This calculation must be inside the loop because
1631             // enabled_capabilities may change if requestSync() below fails and
1632             // we retry.
1633             if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1634                 if (n_gc_threads >= enabled_capabilities) {
1635                     need_idle = 0;
1636                 } else {
1637                     need_idle = enabled_capabilities - n_gc_threads;
1638                 }
1639             } else {
1640                 need_idle = 0;
1641             }
1642 
1643             // We need an array of size n_capabilities, but since this may
1644             // change each time around the loop we must allocate it afresh.
1645             idle_cap = (bool *)stgMallocBytes(n_capabilities *
1646                                               sizeof(bool),
1647                                               "scheduleDoGC");
1648             sync.idle = idle_cap;
1649 
1650             // When using +RTS -qn, we need some capabilities to be idle during
1651             // GC.  The best bet is to choose some inactive ones, so we look for
1652             // those first:
1653             uint32_t n_idle = need_idle;
1654             for (i=0; i < n_capabilities; i++) {
1655                 if (capabilities[i]->disabled) {
1656                     idle_cap[i] = true;
1657                 } else if (n_idle > 0 &&
1658                            capabilities[i]->running_task == NULL) {
1659                     debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1660                     n_idle--;
1661                     idle_cap[i] = true;
1662                 } else {
1663                     idle_cap[i] = false;
1664                 }
1665             }
1666             // If we didn't find enough inactive capabilities, just pick some
1667             // more to be idle.
1668             for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1669                 if (!idle_cap[i] && i != cap->no) {
1670                     idle_cap[i] = true;
1671                     n_idle--;
1672                 }
1673             }
1674             ASSERT(n_idle == 0);
1675 
1676             was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1677             cap = *pcap;
1678             if (was_syncing) {
1679                 stgFree(idle_cap);
1680             }
1681             if (was_syncing &&
1682                 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1683                 !(RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && force_major)) {
1684                 // someone else had a pending sync request for a GC, so
1685                 // let's assume GC has been done and we don't need to GC
1686                 // again.
1687                 // Exception to this: if SCHED_INTERRUPTING, then we still
1688                 // need to do the final GC.
1689                 return;
1690             }
1691             if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
1692                 // The scheduler might now be shutting down.  We tested
1693                 // this above, but it might have become true since then as
1694                 // we yielded the capability in requestSync().
1695                 return;
1696             }
1697         } while (was_syncing);
1698     }
1699 
1700     stat_startGCSync(gc_threads[cap->no]);
1701 
1702 #if defined(DEBUG)
1703     unsigned int old_n_capabilities = n_capabilities;
1704 #endif
1705 
1706     interruptAllCapabilities();
1707 
1708     // The final shutdown GC is always single-threaded, because it's
1709     // possible that some of the Capabilities have no worker threads.
1710 
1711     if (gc_type == SYNC_GC_SEQ) {
1712         traceEventRequestSeqGc(cap);
1713     } else {
1714         traceEventRequestParGc(cap);
1715     }
1716 
1717     if (gc_type == SYNC_GC_SEQ) {
1718         // single-threaded GC: grab all the capabilities
1719         acquireAllCapabilities(cap,task);
1720     }
1721     else
1722     {
1723         // If we are load-balancing collections in this
1724         // generation, then we require all GC threads to participate
1725         // in the collection.  Otherwise, we only require active
1726         // threads to participate, and we set gc_threads[i]->idle for
1727         // any idle capabilities.  The rationale here is that waking
1728         // up an idle Capability takes much longer than just doing any
1729         // GC work on its behalf.
1730 
1731         if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1732             || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1733                 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1734         {
1735             for (i=0; i < n_capabilities; i++) {
1736                 if (capabilities[i]->disabled) {
1737                     idle_cap[i] = tryGrabCapability(capabilities[i], task);
1738                     if (idle_cap[i]) {
1739                         n_idle_caps++;
1740                     }
1741                 } else {
1742                     if (i != cap->no && idle_cap[i]) {
1743                         Capability *tmpcap = capabilities[i];
1744                         task->cap = tmpcap;
1745                         waitForCapability(&tmpcap, task);
1746                         n_idle_caps++;
1747                     }
1748                 }
1749             }
1750         }
1751         else
1752         {
1753             for (i=0; i < n_capabilities; i++) {
1754                 if (capabilities[i]->disabled) {
1755                     idle_cap[i] = tryGrabCapability(capabilities[i], task);
1756                     if (idle_cap[i]) {
1757                         n_idle_caps++;
1758                     }
1759                 } else if (i != cap->no &&
1760                            capabilities[i]->idle >=
1761                            RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1762                     idle_cap[i] = tryGrabCapability(capabilities[i], task);
1763                     if (idle_cap[i]) {
1764                         n_idle_caps++;
1765                     } else {
1766                         n_failed_trygrab_idles++;
1767                     }
1768                 }
1769             }
1770         }
1771         debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1772 
1773         for (i=0; i < n_capabilities; i++) {
1774             NONATOMIC_ADD(&capabilities[i]->idle, 1);
1775         }
1776 
1777         // For all capabilities participating in this GC, wait until
1778         // they have stopped mutating and are standing by for GC.
1779         waitForGcThreads(cap, idle_cap);
1780 
1781         // Stable point where we can do a global check on our spark counters
1782         ASSERT(checkSparkCountInvariant());
1783     }
1784 
1785 #endif
1786 
1787     IF_DEBUG(scheduler, printAllThreads());
1788 
1789 delete_threads_and_gc:
1790     /*
1791      * We now have all the capabilities; if we're in an interrupting
1792      * state, then we should take the opportunity to delete all the
1793      * threads in the system.
1794      * Checking for major_gc ensures that the last GC is major.
1795      */
1796     if (RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && major_gc) {
1797         deleteAllThreads();
1798 #if defined(THREADED_RTS)
1799         // Discard all the sparks from every Capability.  Why?
1800         // They'll probably be GC'd anyway since we've killed all the
1801         // threads.  It just avoids the GC having to do any work to
1802         // figure out that any remaining sparks are garbage.
1803         for (i = 0; i < n_capabilities; i++) {
1804             capabilities[i]->spark_stats.gcd +=
1805                 sparkPoolSize(capabilities[i]->sparks);
1806             // No race here since all Caps are stopped.
1807             discardSparksCap(capabilities[i]);
1808         }
1809 #endif
1810         RELAXED_STORE(&sched_state, SCHED_SHUTTING_DOWN);
1811     }
1812 
1813     /*
1814      * When there are disabled capabilities, we want to migrate any
1815      * threads away from them.  Normally this happens in the
1816      * scheduler's loop, but only for unbound threads - it's really
1817      * hard for a bound thread to migrate itself.  So we have another
1818      * go here.
1819      */
1820 #if defined(THREADED_RTS)
1821     for (i = enabled_capabilities; i < n_capabilities; i++) {
1822         Capability *tmp_cap, *dest_cap;
1823         tmp_cap = capabilities[i];
1824         ASSERT(tmp_cap->disabled);
1825         if (i != cap->no) {
1826             dest_cap = capabilities[i % enabled_capabilities];
1827             while (!emptyRunQueue(tmp_cap)) {
1828                 tso = popRunQueue(tmp_cap);
1829                 migrateThread(tmp_cap, tso, dest_cap);
1830                 if (tso->bound) {
1831                   traceTaskMigrate(tso->bound->task,
1832                                    tso->bound->task->cap,
1833                                    dest_cap);
1834                   tso->bound->task->cap = dest_cap;
1835                 }
1836             }
1837         }
1838     }
1839 #endif
1840 
1841     // Do any remaining idle GC work from the previous GC
1842     doIdleGCWork(cap, true /* all of it */);
1843 
1844 #if defined(THREADED_RTS)
1845     // reset pending_sync *before* GC, so that when the GC threads
1846     // emerge they don't immediately re-enter the GC.
1847     pending_sync = 0;
1848     signalCondition(&sync_finished_cond);
1849     GarbageCollect(collect_gen, heap_census, deadlock_detect, gc_type, cap, idle_cap);
1850 #else
1851     GarbageCollect(collect_gen, heap_census, deadlock_detect, 0, cap, NULL);
1852 #endif
1853 
1854     // If we're shutting down, don't leave any idle GC work to do.
1855     if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
1856         doIdleGCWork(cap, true /* all of it */);
1857     }
1858 
1859     traceSparkCounters(cap);
1860 
1861     switch (SEQ_CST_LOAD(&recent_activity)) {
1862     case ACTIVITY_INACTIVE:
1863         if (force_major) {
1864             // We are doing a GC because the system has been idle for a
1865             // timeslice and we need to check for deadlock.  Record the
1866             // fact that we've done a GC and turn off the timer signal;
1867             // it will get re-enabled if we run any threads after the GC.
1868             SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC);
1869 #if !defined(PROFILING)
1870             stopTimer();
1871 #endif
1872             break;
1873         }
1874         // fall through...
1875 
1876     case ACTIVITY_MAYBE_NO:
1877         // the GC might have taken long enough for the timer to set
1878         // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1879         // but we aren't necessarily deadlocked:
1880         SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
1881         break;
1882 
1883     case ACTIVITY_DONE_GC:
1884         // If we are actually active, the scheduler will reset the
1885         // recent_activity flag and re-enable the timer.
1886         break;
1887     }
1888 
1889 #if defined(THREADED_RTS)
1890     // Stable point where we can do a global check on our spark counters
1891     ASSERT(checkSparkCountInvariant());
1892 #endif
1893 
1894     // The heap census itself is done during GarbageCollect().
1895     if (heap_census) {
1896         performHeapProfile = false;
1897     }
1898 
1899 #if defined(THREADED_RTS)
1900 
1901     // If n_capabilities has changed during GC, we're in trouble.
1902     ASSERT(n_capabilities == old_n_capabilities);
1903 
1904     if (gc_type == SYNC_GC_PAR)
1905     {
1906         for (i = 0; i < n_capabilities; i++) {
1907             if (i != cap->no) {
1908                 if (idle_cap[i]) {
1909                     ASSERT(capabilities[i]->running_task == task);
1910                     task->cap = capabilities[i];
1911                     releaseCapability(capabilities[i]);
1912                 } else {
1913                     ASSERT(capabilities[i]->running_task != task);
1914                 }
1915             }
1916         }
1917         task->cap = cap;
1918 
1919         // releaseGCThreads() happens *after* we have released idle
1920         // capabilities.  Otherwise what can happen is one of the released
1921         // threads starts a new GC, and finds that it can't acquire some of
1922         // the disabled capabilities, because the previous GC still holds
1923         // them, so those disabled capabilities will not be idle during the
1924         // next GC round.  However, if we release the capabilities first,
1925         // then they will be free (because they're disabled) when the next
1926         // GC cycle happens.
1927         releaseGCThreads(cap, idle_cap);
1928     }
1929 #endif
1930     if (heap_overflow && RELAXED_LOAD(&sched_state) == SCHED_RUNNING) {
1931         // GC set the heap_overflow flag.  We should throw an exception if we
1932         // can, or shut down otherwise.
1933 
1934         // Get the thread to which Ctrl-C is thrown
1935         StgTSO *main_thread = getTopHandlerThread();
1936         if (main_thread == NULL) {
1937             // GC set the heap_overflow flag, and there is no main thread to
1938             // throw an exception to, so we should proceed with an orderly
1939             // shutdown now.  Ultimately we want the main thread to return to
1940             // its caller with HeapExhausted, at which point the caller should
1941             // call hs_exit().  The first step is to delete all the threads.
1942             RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
1943             goto delete_threads_and_gc;
1944         }
1945 
1946         heap_overflow = false;
1947         const uint64_t allocation_count = getAllocations();
1948         if (RtsFlags.GcFlags.heapLimitGrace <
1949               allocation_count - allocated_bytes_at_heapoverflow ||
1950               allocated_bytes_at_heapoverflow == 0) {
1951             allocated_bytes_at_heapoverflow = allocation_count;
1952             // We used to simply exit, but throwing an exception gives the
1953             // program a chance to clean up.  It also lets the exception be
1954             // caught.
1955 
1956             // FIXME this is not a good way to tell a program to release
1957             // resources.  It is neither reliable (the RTS crashes if it fails
1958             // to allocate memory from the OS) nor very usable (it is always
1959             // thrown to the main thread, which might not be able to do anything
1960             // useful with it).  We really should have a more general way to
1961             // release resources in low-memory conditions.  Nevertheless, this
1962             // is still a big improvement over just exiting.
1963 
1964             // FIXME again: perhaps we should throw a synchronous exception
1965             // instead an asynchronous one, or have a way for the program to
1966             // register a handler to be called when heap overflow happens.
1967             throwToSelf(cap, main_thread, heapOverflow_closure);
1968         }
1969     }
1970 
1971 #if defined(THREADED_RTS)
1972     stgFree(idle_cap);
1973 
1974     if (gc_type == SYNC_GC_SEQ) {
1975         // release our stash of capabilities.
1976         releaseAllCapabilities(n_capabilities, cap, task);
1977     }
1978 #endif
1979 
1980     return;
1981 }
1982 
1983 /* ---------------------------------------------------------------------------
1984  * Singleton fork(). Do not copy any running threads.
1985  * ------------------------------------------------------------------------- */
1986 
1987 pid_t
forkProcess(HsStablePtr * entry STG_UNUSED)1988 forkProcess(HsStablePtr *entry
1989 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1990             STG_UNUSED
1991 #endif
1992            )
1993 {
1994 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1995     pid_t pid;
1996     StgTSO* t,*next;
1997     Capability *cap;
1998     uint32_t g;
1999     Task *task = NULL;
2000     uint32_t i;
2001 
2002     debugTrace(DEBUG_sched, "forking!");
2003 
2004     task = newBoundTask();
2005 
2006     cap = NULL;
2007     waitForCapability(&cap, task);
2008 
2009 #if defined(THREADED_RTS)
2010     stopAllCapabilities(&cap, task);
2011 #endif
2012 
2013     // no funny business: hold locks while we fork, otherwise if some
2014     // other thread is holding a lock when the fork happens, the data
2015     // structure protected by the lock will forever be in an
2016     // inconsistent state in the child.  See also #1391.
2017     ACQUIRE_LOCK(&sched_mutex);
2018     ACQUIRE_LOCK(&sm_mutex);
2019     ACQUIRE_LOCK(&stable_ptr_mutex);
2020     ACQUIRE_LOCK(&stable_name_mutex);
2021 
2022     for (i=0; i < n_capabilities; i++) {
2023         ACQUIRE_LOCK(&capabilities[i]->lock);
2024     }
2025 
2026     // Take task lock after capability lock to avoid order inversion (#17275).
2027     ACQUIRE_LOCK(&task->lock);
2028 
2029 #if defined(THREADED_RTS)
2030     ACQUIRE_LOCK(&all_tasks_mutex);
2031 #endif
2032 
2033     stopTimer(); // See #4074
2034 
2035 #if defined(TRACING)
2036     flushEventLog(); // so that child won't inherit dirty file buffers
2037 #endif
2038 
2039     pid = fork();
2040 
2041     if (pid) { // parent
2042 
2043         startTimer(); // #4074
2044 
2045         RELEASE_LOCK(&sched_mutex);
2046         RELEASE_LOCK(&sm_mutex);
2047         RELEASE_LOCK(&stable_ptr_mutex);
2048         RELEASE_LOCK(&stable_name_mutex);
2049         RELEASE_LOCK(&task->lock);
2050 
2051 #if defined(THREADED_RTS)
2052         /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2053         RELEASE_LOCK(&all_tasks_mutex);
2054 #endif
2055 
2056         for (i=0; i < n_capabilities; i++) {
2057             releaseCapability_(capabilities[i],false);
2058             RELEASE_LOCK(&capabilities[i]->lock);
2059         }
2060 
2061         boundTaskExiting(task);
2062 
2063         // just return the pid
2064         return pid;
2065 
2066     } else { // child
2067 
2068         // Current process times reset in the child process, so we should reset
2069         // the stats too. See #16102.
2070         resetChildProcessStats();
2071 
2072 #if defined(THREADED_RTS)
2073         initMutex(&sched_mutex);
2074         initMutex(&sm_mutex);
2075         initMutex(&stable_ptr_mutex);
2076         initMutex(&stable_name_mutex);
2077         initMutex(&task->lock);
2078 
2079         for (i=0; i < n_capabilities; i++) {
2080             initMutex(&capabilities[i]->lock);
2081         }
2082 
2083         initMutex(&all_tasks_mutex);
2084 #endif
2085 
2086 #if defined(TRACING)
2087         resetTracing();
2088 #endif
2089 
2090         // Now, all OS threads except the thread that forked are
2091         // stopped.  We need to stop all Haskell threads, including
2092         // those involved in foreign calls.  Also we need to delete
2093         // all Tasks, because they correspond to OS threads that are
2094         // now gone.
2095 
2096         for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2097           for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2098                 next = t->global_link;
2099                 // don't allow threads to catch the ThreadKilled
2100                 // exception, but we do want to raiseAsync() because these
2101                 // threads may be evaluating thunks that we need later.
2102                 deleteThread_(t);
2103 
2104                 // stop the GC from updating the InCall to point to
2105                 // the TSO.  This is only necessary because the
2106                 // OSThread bound to the TSO has been killed, and
2107                 // won't get a chance to exit in the usual way (see
2108                 // also scheduleHandleThreadFinished).
2109                 t->bound = NULL;
2110           }
2111         }
2112 
2113         discardTasksExcept(task);
2114 
2115         for (i=0; i < n_capabilities; i++) {
2116             cap = capabilities[i];
2117 
2118             // Empty the run queue.  It seems tempting to let all the
2119             // killed threads stay on the run queue as zombies to be
2120             // cleaned up later, but some of them may correspond to
2121             // bound threads for which the corresponding Task does not
2122             // exist.
2123             truncateRunQueue(cap);
2124             cap->n_run_queue = 0;
2125 
2126             // Any suspended C-calling Tasks are no more, their OS threads
2127             // don't exist now:
2128             cap->suspended_ccalls = NULL;
2129             cap->n_suspended_ccalls = 0;
2130 
2131 #if defined(THREADED_RTS)
2132             // Wipe our spare workers list, they no longer exist.  New
2133             // workers will be created if necessary.
2134             cap->spare_workers = NULL;
2135             cap->n_spare_workers = 0;
2136             cap->returning_tasks_hd = NULL;
2137             cap->returning_tasks_tl = NULL;
2138             cap->n_returning_tasks = 0;
2139 #endif
2140 
2141             // Release all caps except 0, we'll use that for starting
2142             // the IO manager and running the client action below.
2143             if (cap->no != 0) {
2144                 task->cap = cap;
2145                 releaseCapability(cap);
2146             }
2147         }
2148         cap = capabilities[0];
2149         task->cap = cap;
2150 
2151         // Empty the threads lists.  Otherwise, the garbage
2152         // collector may attempt to resurrect some of these threads.
2153         for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2154             generations[g].threads = END_TSO_QUEUE;
2155         }
2156 
2157         // On Unix, all timers are reset in the child, so we need to start
2158         // the timer again.
2159         initTimer();
2160 
2161         // TODO: need to trace various other things in the child
2162         // like startup event, capabilities, process info etc
2163         traceTaskCreate(task, cap);
2164 
2165 #if defined(THREADED_RTS)
2166         ioManagerStartCap(&cap);
2167 #endif
2168 
2169         // start timer after the IOManager is initialized
2170         // (the idle GC may wake up the IOManager)
2171         startTimer();
2172 
2173         // Install toplevel exception handlers, so interruption
2174         // signal will be sent to the main thread.
2175         // See #12903
2176         rts_evalStableIOMain(&cap, entry, NULL);  // run the action
2177         rts_checkSchedStatus("forkProcess",cap);
2178 
2179         rts_unlock(cap);
2180         shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2181     }
2182 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2183     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2184 #endif
2185 }
2186 
2187 /* ---------------------------------------------------------------------------
2188  * Changing the number of Capabilities
2189  *
2190  * Changing the number of Capabilities is very tricky!  We can only do
2191  * it with the system fully stopped, so we do a full sync with
2192  * requestSync(SYNC_OTHER) and grab all the capabilities.
2193  *
2194  * Then we resize the appropriate data structures, and update all
2195  * references to the old data structures which have now moved.
2196  * Finally we release the Capabilities we are holding, and start
2197  * worker Tasks on the new Capabilities we created.
2198  *
2199  * ------------------------------------------------------------------------- */
2200 
2201 void
setNumCapabilities(uint32_t new_n_capabilities USED_IF_THREADS)2202 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2203 {
2204 #if !defined(THREADED_RTS)
2205     if (new_n_capabilities != 1) {
2206         errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2207     }
2208     return;
2209 #elif defined(NOSMP)
2210     if (new_n_capabilities != 1) {
2211         errorBelch("setNumCapabilities: not supported on this platform");
2212     }
2213     return;
2214 #else
2215     Task *task;
2216     Capability *cap;
2217     uint32_t n;
2218     Capability *old_capabilities = NULL;
2219     uint32_t old_n_capabilities = n_capabilities;
2220 
2221     if (new_n_capabilities == enabled_capabilities) {
2222         return;
2223     } else if (new_n_capabilities <= 0) {
2224         errorBelch("setNumCapabilities: Capability count must be positive");
2225         return;
2226     }
2227 
2228 
2229     debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2230                enabled_capabilities, new_n_capabilities);
2231 
2232     cap = rts_lock();
2233     task = cap->running_task;
2234 
2235 
2236     // N.B. We must stop the interval timer while we are changing the
2237     // capabilities array lest handle_tick may try to context switch
2238     // an old capability. See #17289.
2239     stopTimer();
2240 
2241     stopAllCapabilities(&cap, task);
2242 
2243     if (new_n_capabilities < enabled_capabilities)
2244     {
2245         // Reducing the number of capabilities: we do not actually
2246         // remove the extra capabilities, we just mark them as
2247         // "disabled". This has the following effects:
2248         //
2249         //   - threads on a disabled capability are migrated away by the
2250         //     scheduler loop
2251         //
2252         //   - disabled capabilities do not participate in GC
2253         //     (see scheduleDoGC())
2254         //
2255         //   - No spark threads are created on this capability
2256         //     (see scheduleActivateSpark())
2257         //
2258         //   - We do not attempt to migrate threads *to* a disabled
2259         //     capability (see schedulePushWork()).
2260         //
2261         // but in other respects, a disabled capability remains
2262         // alive.  Threads may be woken up on a disabled capability,
2263         // but they will be immediately migrated away.
2264         //
2265         // This approach is much easier than trying to actually remove
2266         // the capability; we don't have to worry about GC data
2267         // structures, the nursery, etc.
2268         //
2269         for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2270             capabilities[n]->disabled = true;
2271             traceCapDisable(capabilities[n]);
2272         }
2273         enabled_capabilities = new_n_capabilities;
2274     }
2275     else
2276     {
2277         // Increasing the number of enabled capabilities.
2278         //
2279         // enable any disabled capabilities, up to the required number
2280         for (n = enabled_capabilities;
2281              n < new_n_capabilities && n < n_capabilities; n++) {
2282             capabilities[n]->disabled = false;
2283             traceCapEnable(capabilities[n]);
2284         }
2285         enabled_capabilities = n;
2286 
2287         if (new_n_capabilities > n_capabilities) {
2288 #if defined(TRACING)
2289             // Allocate eventlog buffers for the new capabilities.  Note this
2290             // must be done before calling moreCapabilities(), because that
2291             // will emit events about creating the new capabilities and adding
2292             // them to existing capsets.
2293             tracingAddCapapilities(n_capabilities, new_n_capabilities);
2294 #endif
2295 
2296             // Resize the capabilities array
2297             // NB. after this, capabilities points somewhere new.  Any pointers
2298             // of type (Capability *) are now invalid.
2299             moreCapabilities(n_capabilities, new_n_capabilities);
2300 
2301             // Resize and update storage manager data structures
2302             storageAddCapabilities(n_capabilities, new_n_capabilities);
2303         }
2304     }
2305 
2306     // update n_capabilities before things start running
2307     if (new_n_capabilities > n_capabilities) {
2308         n_capabilities = enabled_capabilities = new_n_capabilities;
2309     }
2310 
2311     // We're done: release the original Capabilities
2312     releaseAllCapabilities(old_n_capabilities, cap,task);
2313 
2314     // We can't free the old array until now, because we access it
2315     // while updating pointers in updateCapabilityRefs().
2316     if (old_capabilities) {
2317         stgFree(old_capabilities);
2318     }
2319 
2320     // Notify IO manager that the number of capabilities has changed.
2321     rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2322 
2323     startTimer();
2324 
2325     rts_unlock(cap);
2326 
2327 #endif // THREADED_RTS
2328 }
2329 
2330 
2331 
2332 /* ---------------------------------------------------------------------------
2333  * Delete all the threads in the system
2334  * ------------------------------------------------------------------------- */
2335 
2336 static void
deleteAllThreads()2337 deleteAllThreads ()
2338 {
2339     // NOTE: only safe to call if we own all capabilities.
2340 
2341     StgTSO* t, *next;
2342     uint32_t g;
2343 
2344     debugTrace(DEBUG_sched,"deleting all threads");
2345     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2346         for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2347                 next = t->global_link;
2348                 deleteThread(t);
2349         }
2350     }
2351 
2352     // The run queue now contains a bunch of ThreadKilled threads.  We
2353     // must not throw these away: the main thread(s) will be in there
2354     // somewhere, and the main scheduler loop has to deal with it.
2355     // Also, the run queue is the only thing keeping these threads from
2356     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2357 
2358 #if !defined(THREADED_RTS)
2359     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2360     ASSERT(sleeping_queue == END_TSO_QUEUE);
2361 #endif
2362 }
2363 
2364 /* -----------------------------------------------------------------------------
2365    Managing the suspended_ccalls list.
2366    Locks required: sched_mutex
2367    -------------------------------------------------------------------------- */
2368 
2369 STATIC_INLINE void
suspendTask(Capability * cap,Task * task)2370 suspendTask (Capability *cap, Task *task)
2371 {
2372     InCall *incall;
2373 
2374     incall = task->incall;
2375     ASSERT(incall->next == NULL && incall->prev == NULL);
2376     incall->next = cap->suspended_ccalls;
2377     incall->prev = NULL;
2378     if (cap->suspended_ccalls) {
2379         cap->suspended_ccalls->prev = incall;
2380     }
2381     cap->suspended_ccalls = incall;
2382     cap->n_suspended_ccalls++;
2383 }
2384 
2385 STATIC_INLINE void
recoverSuspendedTask(Capability * cap,Task * task)2386 recoverSuspendedTask (Capability *cap, Task *task)
2387 {
2388     InCall *incall;
2389 
2390     incall = task->incall;
2391     if (incall->prev) {
2392         incall->prev->next = incall->next;
2393     } else {
2394         ASSERT(cap->suspended_ccalls == incall);
2395         cap->suspended_ccalls = incall->next;
2396     }
2397     if (incall->next) {
2398         incall->next->prev = incall->prev;
2399     }
2400     incall->next = incall->prev = NULL;
2401     cap->n_suspended_ccalls--;
2402 }
2403 
2404 /* ---------------------------------------------------------------------------
2405  * Suspending & resuming Haskell threads.
2406  *
2407  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2408  * its capability before calling the C function.  This allows another
2409  * task to pick up the capability and carry on running Haskell
2410  * threads.  It also means that if the C call blocks, it won't lock
2411  * the whole system.
2412  *
2413  * The Haskell thread making the C call is put to sleep for the
2414  * duration of the call, on the suspended_ccalling_threads queue.  We
2415  * give out a token to the task, which it can use to resume the thread
2416  * on return from the C function.
2417  *
2418  * If this is an interruptible C call, this means that the FFI call may be
2419  * unceremoniously terminated and should be scheduled on an
2420  * unbound worker thread.
2421  * ------------------------------------------------------------------------- */
2422 
2423 void *
suspendThread(StgRegTable * reg,bool interruptible)2424 suspendThread (StgRegTable *reg, bool interruptible)
2425 {
2426   Capability *cap;
2427   int saved_errno;
2428   StgTSO *tso;
2429   Task *task;
2430 #if defined(mingw32_HOST_OS)
2431   StgWord32 saved_winerror;
2432 #endif
2433 
2434   saved_errno = errno;
2435 #if defined(mingw32_HOST_OS)
2436   saved_winerror = GetLastError();
2437 #endif
2438 
2439   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2440    */
2441   cap = regTableToCapability(reg);
2442 
2443   task = cap->running_task;
2444   tso = cap->r.rCurrentTSO;
2445 
2446   traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2447 
2448   // XXX this might not be necessary --SDM
2449   tso->what_next = ThreadRunGHC;
2450 
2451   threadPaused(cap,tso);
2452 
2453   if (interruptible) {
2454     tso->why_blocked = BlockedOnCCall_Interruptible;
2455   } else {
2456     tso->why_blocked = BlockedOnCCall;
2457   }
2458 
2459   // Hand back capability
2460   task->incall->suspended_tso = tso;
2461   task->incall->suspended_cap = cap;
2462 
2463   // Otherwise allocate() will write to invalid memory.
2464   cap->r.rCurrentTSO = NULL;
2465 
2466   ACQUIRE_LOCK(&cap->lock);
2467 
2468   suspendTask(cap,task);
2469   cap->in_haskell = false;
2470   releaseCapability_(cap,false);
2471 
2472   RELEASE_LOCK(&cap->lock);
2473 
2474   errno = saved_errno;
2475 #if defined(mingw32_HOST_OS)
2476   SetLastError(saved_winerror);
2477 #endif
2478   return task;
2479 }
2480 
2481 StgRegTable *
resumeThread(void * task_)2482 resumeThread (void *task_)
2483 {
2484     StgTSO *tso;
2485     InCall *incall;
2486     Capability *cap;
2487     Task *task = task_;
2488     int saved_errno;
2489 #if defined(mingw32_HOST_OS)
2490     StgWord32 saved_winerror;
2491 #endif
2492 
2493     saved_errno = errno;
2494 #if defined(mingw32_HOST_OS)
2495     saved_winerror = GetLastError();
2496 #endif
2497 
2498     incall = task->incall;
2499     cap = incall->suspended_cap;
2500     task->cap = cap;
2501 
2502     // Wait for permission to re-enter the RTS with the result.
2503     waitForCapability(&cap,task);
2504     // we might be on a different capability now... but if so, our
2505     // entry on the suspended_ccalls list will also have been
2506     // migrated.
2507 
2508     // Remove the thread from the suspended list
2509     recoverSuspendedTask(cap,task);
2510 
2511     tso = incall->suspended_tso;
2512     incall->suspended_tso = NULL;
2513     incall->suspended_cap = NULL;
2514     // we will modify tso->_link
2515     IF_NONMOVING_WRITE_BARRIER_ENABLED {
2516         updateRemembSetPushClosure(cap, (StgClosure *)tso->_link);
2517     }
2518     tso->_link = END_TSO_QUEUE;
2519 
2520     traceEventRunThread(cap, tso);
2521 
2522     /* Reset blocking status */
2523     tso->why_blocked  = NotBlocked;
2524 
2525     if ((tso->flags & TSO_BLOCKEX) == 0) {
2526         // avoid locking the TSO if we don't have to
2527         if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2528             maybePerformBlockedException(cap,tso);
2529         }
2530     }
2531 
2532     cap->r.rCurrentTSO = tso;
2533     cap->in_haskell = true;
2534     errno = saved_errno;
2535 #if defined(mingw32_HOST_OS)
2536     SetLastError(saved_winerror);
2537 #endif
2538 
2539     /* We might have GC'd, mark the TSO dirty again */
2540     dirty_TSO(cap,tso);
2541     dirty_STACK(cap,tso->stackobj);
2542 
2543     IF_DEBUG(sanity, checkTSO(tso));
2544 
2545     return &cap->r;
2546 }
2547 
2548 /* ---------------------------------------------------------------------------
2549  * scheduleThread()
2550  *
2551  * scheduleThread puts a thread on the end  of the runnable queue.
2552  * This will usually be done immediately after a thread is created.
2553  * The caller of scheduleThread must create the thread using e.g.
2554  * createThread and push an appropriate closure
2555  * on this thread's stack before the scheduler is invoked.
2556  * ------------------------------------------------------------------------ */
2557 
2558 void
scheduleThread(Capability * cap,StgTSO * tso)2559 scheduleThread(Capability *cap, StgTSO *tso)
2560 {
2561     // The thread goes at the *end* of the run-queue, to avoid possible
2562     // starvation of any threads already on the queue.
2563     appendToRunQueue(cap,tso);
2564 }
2565 
2566 void
scheduleThreadOn(Capability * cap,StgWord cpu USED_IF_THREADS,StgTSO * tso)2567 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2568 {
2569     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2570                               // move this thread from now on.
2571 #if defined(THREADED_RTS)
2572     cpu %= enabled_capabilities;
2573     if (cpu == cap->no) {
2574         appendToRunQueue(cap,tso);
2575     } else {
2576         migrateThread(cap, tso, capabilities[cpu]);
2577     }
2578 #else
2579     appendToRunQueue(cap,tso);
2580 #endif
2581 }
2582 
2583 void
scheduleWaitThread(StgTSO * tso,HaskellObj * ret,Capability ** pcap)2584 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2585 {
2586     Task *task;
2587     DEBUG_ONLY( StgThreadID id );
2588     Capability *cap;
2589 
2590     cap = *pcap;
2591 
2592     // We already created/initialised the Task
2593     task = cap->running_task;
2594 
2595     // This TSO is now a bound thread; make the Task and TSO
2596     // point to each other.
2597     tso->bound = task->incall;
2598     tso->cap = cap;
2599 
2600     task->incall->tso = tso;
2601     task->incall->ret = ret;
2602     task->incall->rstat = NoStatus;
2603 
2604     appendToRunQueue(cap,tso);
2605 
2606     DEBUG_ONLY( id = tso->id );
2607     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2608 
2609     cap = schedule(cap,task);
2610 
2611     ASSERT(task->incall->rstat != NoStatus);
2612     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2613 
2614     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2615     *pcap = cap;
2616 }
2617 
2618 /* ----------------------------------------------------------------------------
2619  * Starting Tasks
2620  * ------------------------------------------------------------------------- */
2621 
2622 #if defined(THREADED_RTS)
scheduleWorker(Capability * cap,Task * task)2623 void scheduleWorker (Capability *cap, Task *task)
2624 {
2625     ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
2626     cap = schedule(cap,task);
2627     ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
2628 
2629     // On exit from schedule(), we have a Capability, but possibly not
2630     // the same one we started with.
2631 
2632     // During shutdown, the requirement is that after all the
2633     // Capabilities are shut down, all workers that are shutting down
2634     // have finished workerTaskStop().  This is why we hold on to
2635     // cap->lock until we've finished workerTaskStop() below.
2636     //
2637     // There may be workers still involved in foreign calls; those
2638     // will just block in waitForCapability() because the
2639     // Capability has been shut down.
2640     //
2641     ACQUIRE_LOCK(&cap->lock);
2642     releaseCapability_(cap,false);
2643     workerTaskStop(task);
2644     RELEASE_LOCK(&cap->lock);
2645 }
2646 #endif
2647 
2648 /* ---------------------------------------------------------------------------
2649  * Start new worker tasks on Capabilities from--to
2650  * -------------------------------------------------------------------------- */
2651 
2652 static void
startWorkerTasks(uint32_t from USED_IF_THREADS,uint32_t to USED_IF_THREADS)2653 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2654 {
2655 #if defined(THREADED_RTS)
2656     uint32_t i;
2657     Capability *cap;
2658 
2659     for (i = from; i < to; i++) {
2660         cap = capabilities[i];
2661         ACQUIRE_LOCK(&cap->lock);
2662         startWorkerTask(cap);
2663         RELEASE_LOCK(&cap->lock);
2664     }
2665 #endif
2666 }
2667 
2668 /* ---------------------------------------------------------------------------
2669  * initScheduler()
2670  *
2671  * Initialise the scheduler.  This resets all the queues - if the
2672  * queues contained any threads, they'll be garbage collected at the
2673  * next pass.
2674  *
2675  * ------------------------------------------------------------------------ */
2676 
2677 void
initScheduler(void)2678 initScheduler(void)
2679 {
2680 #if !defined(THREADED_RTS)
2681   blocked_queue_hd  = END_TSO_QUEUE;
2682   blocked_queue_tl  = END_TSO_QUEUE;
2683   sleeping_queue    = END_TSO_QUEUE;
2684 #endif
2685 
2686   sched_state    = SCHED_RUNNING;
2687   SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
2688 
2689 #if defined(THREADED_RTS)
2690   /* Initialise the mutex and condition variables used by
2691    * the scheduler. */
2692   initMutex(&sched_mutex);
2693   initMutex(&sync_finished_mutex);
2694   initCondition(&sync_finished_cond);
2695 #endif
2696 
2697   ACQUIRE_LOCK(&sched_mutex);
2698 
2699   allocated_bytes_at_heapoverflow = 0;
2700 
2701   /* A capability holds the state a native thread needs in
2702    * order to execute STG code. At least one capability is
2703    * floating around (only THREADED_RTS builds have more than one).
2704    */
2705   initCapabilities();
2706 
2707   initTaskManager();
2708 
2709   /*
2710    * Eagerly start one worker to run each Capability, except for
2711    * Capability 0.  The idea is that we're probably going to start a
2712    * bound thread on Capability 0 pretty soon, so we don't want a
2713    * worker task hogging it.
2714    */
2715   startWorkerTasks(1, n_capabilities);
2716 
2717   RELEASE_LOCK(&sched_mutex);
2718 
2719 }
2720 
2721 void
exitScheduler(bool wait_foreign USED_IF_THREADS)2722 exitScheduler (bool wait_foreign USED_IF_THREADS)
2723                /* see Capability.c, shutdownCapability() */
2724 {
2725     Task *task = newBoundTask();
2726 
2727     // If we haven't killed all the threads yet, do it now.
2728     if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN) {
2729         RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
2730         nonmovingStop();
2731         Capability *cap = task->cap;
2732         waitForCapability(&cap,task);
2733         scheduleDoGC(&cap,task,true,false);
2734         ASSERT(task->incall->tso == NULL);
2735         releaseCapability(cap);
2736     }
2737     ASSERT(sched_state == SCHED_SHUTTING_DOWN);
2738 
2739     shutdownCapabilities(task, wait_foreign);
2740 
2741     // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2742     //            n_failed_trygrab_idles, n_idle_caps);
2743 
2744     boundTaskExiting(task);
2745 }
2746 
2747 void
freeScheduler(void)2748 freeScheduler( void )
2749 {
2750     uint32_t still_running;
2751 
2752     ACQUIRE_LOCK(&sched_mutex);
2753     still_running = freeTaskManager();
2754     // We can only free the Capabilities if there are no Tasks still
2755     // running.  We might have a Task about to return from a foreign
2756     // call into waitForCapability(), for example (actually,
2757     // this should be the *only* thing that a still-running Task can
2758     // do at this point, and it will block waiting for the
2759     // Capability).
2760     if (still_running == 0) {
2761         freeCapabilities();
2762     }
2763     RELEASE_LOCK(&sched_mutex);
2764 #if defined(THREADED_RTS)
2765     closeMutex(&sched_mutex);
2766 #endif
2767 }
2768 
markScheduler(evac_fn evac USED_IF_NOT_THREADS,void * user USED_IF_NOT_THREADS)2769 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2770                     void *user USED_IF_NOT_THREADS)
2771 {
2772 #if !defined(THREADED_RTS)
2773     evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2774     evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2775     evac(user, (StgClosure **)(void *)&sleeping_queue);
2776 #endif
2777 }
2778 
2779 /* -----------------------------------------------------------------------------
2780    performGC
2781 
2782    This is the interface to the garbage collector from Haskell land.
2783    We provide this so that external C code can allocate and garbage
2784    collect when called from Haskell via _ccall_GC.
2785    -------------------------------------------------------------------------- */
2786 
2787 static void
performGC_(bool force_major)2788 performGC_(bool force_major)
2789 {
2790     Task *task;
2791     Capability *cap = NULL;
2792 
2793     // We must grab a new Task here, because the existing Task may be
2794     // associated with a particular Capability, and chained onto the
2795     // suspended_ccalls queue.
2796     task = newBoundTask();
2797 
2798     // TODO: do we need to traceTask*() here?
2799 
2800     waitForCapability(&cap,task);
2801     scheduleDoGC(&cap,task,force_major,false);
2802     releaseCapability(cap);
2803     boundTaskExiting(task);
2804 }
2805 
2806 void
performGC(void)2807 performGC(void)
2808 {
2809     performGC_(false);
2810 }
2811 
2812 void
performMajorGC(void)2813 performMajorGC(void)
2814 {
2815     performGC_(true);
2816 }
2817 
2818 /* ---------------------------------------------------------------------------
2819    Interrupt execution.
2820    Might be called inside a signal handler so it mustn't do anything fancy.
2821    ------------------------------------------------------------------------ */
2822 
2823 void
interruptStgRts(void)2824 interruptStgRts(void)
2825 {
2826     ASSERT(sched_state != SCHED_SHUTTING_DOWN);
2827     sched_state = SCHED_INTERRUPTING;
2828     interruptAllCapabilities();
2829 #if defined(THREADED_RTS)
2830     wakeUpRts();
2831 #endif
2832 }
2833 
2834 /* -----------------------------------------------------------------------------
2835    Wake up the RTS
2836 
2837    This function causes at least one OS thread to wake up and run the
2838    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2839    an external event has arrived that may need servicing (eg. a
2840    keyboard interrupt).
2841 
2842    In the single-threaded RTS we don't do anything here; we only have
2843    one thread anyway, and the event that caused us to want to wake up
2844    will have interrupted any blocking system call in progress anyway.
2845    -------------------------------------------------------------------------- */
2846 
2847 #if defined(THREADED_RTS)
wakeUpRts(void)2848 void wakeUpRts(void)
2849 {
2850     // This forces the IO Manager thread to wakeup, which will
2851     // in turn ensure that some OS thread wakes up and runs the
2852     // scheduler loop, which will cause a GC and deadlock check.
2853     ioManagerWakeup();
2854 }
2855 #endif
2856 
2857 /* -----------------------------------------------------------------------------
2858    Deleting threads
2859 
2860    This is used for interruption (^C) and forking, and corresponds to
2861    raising an exception but without letting the thread catch the
2862    exception.
2863    -------------------------------------------------------------------------- */
2864 
2865 static void
deleteThread(StgTSO * tso)2866 deleteThread (StgTSO *tso)
2867 {
2868     // NOTE: must only be called on a TSO that we have exclusive
2869     // access to, because we will call throwToSingleThreaded() below.
2870     // The TSO must be on the run queue of the Capability we own, or
2871     // we must own all Capabilities.
2872 
2873     if (tso->why_blocked != BlockedOnCCall &&
2874         tso->why_blocked != BlockedOnCCall_Interruptible) {
2875         throwToSingleThreaded(tso->cap,tso,NULL);
2876     }
2877 }
2878 
2879 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2880 static void
deleteThread_(StgTSO * tso)2881 deleteThread_(StgTSO *tso)
2882 { // for forkProcess only:
2883   // like deleteThread(), but we delete threads in foreign calls, too.
2884 
2885     if (tso->why_blocked == BlockedOnCCall ||
2886         tso->why_blocked == BlockedOnCCall_Interruptible) {
2887         tso->what_next = ThreadKilled;
2888         appendToRunQueue(tso->cap, tso);
2889     } else {
2890         deleteThread(tso);
2891     }
2892 }
2893 #endif
2894 
2895 /* -----------------------------------------------------------------------------
2896    raiseExceptionHelper
2897 
2898    This function is called by the raise# primitive, just so that we can
2899    move some of the tricky bits of raising an exception from C-- into
2900    C.  Who knows, it might be a useful re-useable thing here too.
2901    -------------------------------------------------------------------------- */
2902 
2903 StgWord
raiseExceptionHelper(StgRegTable * reg,StgTSO * tso,StgClosure * exception)2904 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2905 {
2906     Capability *cap = regTableToCapability(reg);
2907     StgThunk *raise_closure = NULL;
2908     StgPtr p, next;
2909     const StgRetInfoTable *info;
2910     //
2911     // This closure represents the expression 'raise# E' where E
2912     // is the exception raise.  It is used to overwrite all the
2913     // thunks which are currently under evaluation.
2914     //
2915 
2916     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2917     // LDV profiling: stg_raise_info has THUNK as its closure
2918     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2919     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2920     // 1 does not cause any problem unless profiling is performed.
2921     // However, when LDV profiling goes on, we need to linearly scan
2922     // small object pool, where raise_closure is stored, so we should
2923     // use MIN_UPD_SIZE.
2924     //
2925     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2926     //                                 sizeofW(StgClosure)+1);
2927     //
2928 
2929     //
2930     // Walk up the stack, looking for the catch frame.  On the way,
2931     // we update any closures pointed to from update frames with the
2932     // raise closure that we just built.
2933     //
2934     p = tso->stackobj->sp;
2935     while(1) {
2936         info = get_ret_itbl((StgClosure *)p);
2937         next = p + stack_frame_sizeW((StgClosure *)p);
2938         switch (info->i.type) {
2939 
2940         case UPDATE_FRAME:
2941             // Only create raise_closure if we need to.
2942             if (raise_closure == NULL) {
2943                 raise_closure =
2944                     (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2945                 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2946                 raise_closure->payload[0] = exception;
2947             }
2948             updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2949                         (StgClosure *)raise_closure);
2950             p = next;
2951             continue;
2952 
2953         case ATOMICALLY_FRAME:
2954             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2955             tso->stackobj->sp = p;
2956             return ATOMICALLY_FRAME;
2957 
2958         case CATCH_FRAME:
2959             tso->stackobj->sp = p;
2960             return CATCH_FRAME;
2961 
2962         case CATCH_STM_FRAME:
2963             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2964             tso->stackobj->sp = p;
2965             return CATCH_STM_FRAME;
2966 
2967         case UNDERFLOW_FRAME:
2968             tso->stackobj->sp = p;
2969             threadStackUnderflow(cap,tso);
2970             p = tso->stackobj->sp;
2971             continue;
2972 
2973         case STOP_FRAME:
2974             tso->stackobj->sp = p;
2975             return STOP_FRAME;
2976 
2977         case CATCH_RETRY_FRAME: {
2978             StgTRecHeader *trec = tso -> trec;
2979             StgTRecHeader *outer = trec -> enclosing_trec;
2980             debugTrace(DEBUG_stm,
2981                        "found CATCH_RETRY_FRAME at %p during raise", p);
2982             debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2983             stmAbortTransaction(cap, trec);
2984             stmFreeAbortedTRec(cap, trec);
2985             tso -> trec = outer;
2986             p = next;
2987             continue;
2988         }
2989 
2990         default:
2991             p = next;
2992             continue;
2993         }
2994     }
2995 }
2996 
2997 
2998 /* -----------------------------------------------------------------------------
2999    findRetryFrameHelper
3000 
3001    This function is called by the retry# primitive.  It traverses the stack
3002    leaving tso->sp referring to the frame which should handle the retry.
3003 
3004    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3005    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3006 
3007    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3008    create) because retries are not considered to be exceptions, despite the
3009    similar implementation.
3010 
3011    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3012    not be created within memory transactions.
3013    -------------------------------------------------------------------------- */
3014 
3015 StgWord
findRetryFrameHelper(Capability * cap,StgTSO * tso)3016 findRetryFrameHelper (Capability *cap, StgTSO *tso)
3017 {
3018   const StgRetInfoTable *info;
3019   StgPtr    p, next;
3020 
3021   p = tso->stackobj->sp;
3022   while (1) {
3023     info = get_ret_itbl((const StgClosure *)p);
3024     next = p + stack_frame_sizeW((StgClosure *)p);
3025     switch (info->i.type) {
3026 
3027     case ATOMICALLY_FRAME:
3028         debugTrace(DEBUG_stm,
3029                    "found ATOMICALLY_FRAME at %p during retry", p);
3030         tso->stackobj->sp = p;
3031         return ATOMICALLY_FRAME;
3032 
3033     case CATCH_RETRY_FRAME:
3034         debugTrace(DEBUG_stm,
3035                    "found CATCH_RETRY_FRAME at %p during retry", p);
3036         tso->stackobj->sp = p;
3037         return CATCH_RETRY_FRAME;
3038 
3039     case CATCH_STM_FRAME: {
3040         StgTRecHeader *trec = tso -> trec;
3041         StgTRecHeader *outer = trec -> enclosing_trec;
3042         debugTrace(DEBUG_stm,
3043                    "found CATCH_STM_FRAME at %p during retry", p);
3044         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3045         stmAbortTransaction(cap, trec);
3046         stmFreeAbortedTRec(cap, trec);
3047         tso -> trec = outer;
3048         p = next;
3049         continue;
3050     }
3051 
3052     case UNDERFLOW_FRAME:
3053         tso->stackobj->sp = p;
3054         threadStackUnderflow(cap,tso);
3055         p = tso->stackobj->sp;
3056         continue;
3057 
3058     default:
3059       ASSERT(info->i.type != CATCH_FRAME);
3060       ASSERT(info->i.type != STOP_FRAME);
3061       p = next;
3062       continue;
3063     }
3064   }
3065 }
3066 
3067 /* -----------------------------------------------------------------------------
3068    findAtomicallyFrameHelper
3069 
3070    This function is called by stg_abort via catch_retry_frame primitive.  It is
3071    like findRetryFrameHelper but it will only stop at ATOMICALLY_FRAME.
3072    -------------------------------------------------------------------------- */
3073 
3074 StgWord
findAtomicallyFrameHelper(Capability * cap,StgTSO * tso)3075 findAtomicallyFrameHelper (Capability *cap, StgTSO *tso)
3076 {
3077   const StgRetInfoTable *info;
3078   StgPtr    p, next;
3079 
3080   p = tso->stackobj->sp;
3081   while (1) {
3082     info = get_ret_itbl((const StgClosure *)p);
3083     next = p + stack_frame_sizeW((StgClosure *)p);
3084     switch (info->i.type) {
3085 
3086     case ATOMICALLY_FRAME:
3087         debugTrace(DEBUG_stm,
3088                    "found ATOMICALLY_FRAME at %p while aborting after orElse", p);
3089         tso->stackobj->sp = p;
3090         return ATOMICALLY_FRAME;
3091 
3092     case CATCH_RETRY_FRAME: {
3093         StgTRecHeader *trec = tso -> trec;
3094         StgTRecHeader *outer = trec -> enclosing_trec;
3095         debugTrace(DEBUG_stm,
3096                    "found CATCH_RETRY_FRAME at %p while aborting after orElse", p);
3097         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3098         stmAbortTransaction(cap, trec);
3099         stmFreeAbortedTRec(cap, trec);
3100         tso -> trec = outer;
3101         p = next;
3102         continue;
3103     }
3104 
3105     case CATCH_STM_FRAME: {
3106         StgTRecHeader *trec = tso -> trec;
3107         StgTRecHeader *outer = trec -> enclosing_trec;
3108         debugTrace(DEBUG_stm,
3109                    "found CATCH_STM_FRAME at %p while aborting after orElse", p);
3110         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3111         stmAbortTransaction(cap, trec);
3112         stmFreeAbortedTRec(cap, trec);
3113         tso -> trec = outer;
3114         p = next;
3115         continue;
3116     }
3117 
3118     case UNDERFLOW_FRAME:
3119         tso->stackobj->sp = p;
3120         threadStackUnderflow(cap,tso);
3121         p = tso->stackobj->sp;
3122         continue;
3123 
3124     default:
3125       ASSERT(info->i.type != CATCH_FRAME);
3126       ASSERT(info->i.type != STOP_FRAME);
3127       p = next;
3128       continue;
3129     }
3130   }
3131 }
3132 
3133 /* -----------------------------------------------------------------------------
3134    resurrectThreads is called after garbage collection on the list of
3135    threads found to be garbage.  Each of these threads will be woken
3136    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3137    on an MVar, or NonTermination if the thread was blocked on a Black
3138    Hole.
3139 
3140    Locks: assumes we hold *all* the capabilities.
3141    -------------------------------------------------------------------------- */
3142 
3143 void
resurrectThreads(StgTSO * threads)3144 resurrectThreads (StgTSO *threads)
3145 {
3146     StgTSO *tso, *next;
3147     Capability *cap;
3148     generation *gen;
3149 
3150     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3151         next = tso->global_link;
3152 
3153         gen = Bdescr((P_)tso)->gen;
3154         tso->global_link = gen->threads;
3155         gen->threads = tso;
3156 
3157         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3158 
3159         // Wake up the thread on the Capability it was last on
3160         cap = tso->cap;
3161 
3162         switch (tso->why_blocked) {
3163         case BlockedOnMVar:
3164         case BlockedOnMVarRead:
3165             /* Called by GC - sched_mutex lock is currently held. */
3166             throwToSingleThreaded(cap, tso,
3167                                   (StgClosure *)blockedIndefinitelyOnMVar_closure);
3168             break;
3169         case BlockedOnBlackHole:
3170             throwToSingleThreaded(cap, tso,
3171                                   (StgClosure *)nonTermination_closure);
3172             break;
3173         case BlockedOnSTM:
3174             throwToSingleThreaded(cap, tso,
3175                                   (StgClosure *)blockedIndefinitelyOnSTM_closure);
3176             break;
3177         case NotBlocked:
3178             /* This might happen if the thread was blocked on a black hole
3179              * belonging to a thread that we've just woken up (raiseAsync
3180              * can wake up threads, remember...).
3181              */
3182             continue;
3183         case BlockedOnMsgThrowTo:
3184             // This can happen if the target is masking, blocks on a
3185             // black hole, and then is found to be unreachable.  In
3186             // this case, we want to let the target wake up and carry
3187             // on, and do nothing to this thread.
3188             continue;
3189         default:
3190             barf("resurrectThreads: thread blocked in a strange way: %d",
3191                  tso->why_blocked);
3192         }
3193     }
3194 }
3195