1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * The scheduler and thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "sm/GCThread.h"
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43 #include "Messages.h"
44 #include "StablePtr.h"
45 #include "StableName.h"
46 #include "TopHandler.h"
47 #include "sm/NonMoving.h"
48 #include "sm/NonMovingMark.h"
49
50 #if defined(HAVE_SYS_TYPES_H)
51 #include <sys/types.h>
52 #endif
53 #if defined(HAVE_UNISTD_H)
54 #include <unistd.h>
55 #endif
56
57 #include <string.h>
58 #include <stdlib.h>
59 #include <stdarg.h>
60
61 #if defined(HAVE_ERRNO_H)
62 #include <errno.h>
63 #endif
64
65 #if defined(TRACING)
66 #include "eventlog/EventLog.h"
67 #endif
68 /* -----------------------------------------------------------------------------
69 * Global variables
70 * -------------------------------------------------------------------------- */
71
72 #if !defined(THREADED_RTS)
73 // Blocked/sleeping threads
74 StgTSO *blocked_queue_hd = NULL;
75 StgTSO *blocked_queue_tl = NULL;
76 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
77 #endif
78
79 // Bytes allocated since the last time a HeapOverflow exception was thrown by
80 // the RTS
81 uint64_t allocated_bytes_at_heapoverflow = 0;
82
83 /* Set to true when the latest garbage collection failed to reclaim enough
84 * space, and the runtime should proceed to shut itself down in an orderly
85 * fashion (emitting profiling info etc.), OR throw an exception to the main
86 * thread, if it is still alive.
87 */
88 bool heap_overflow = false;
89
90 /* flag that tracks whether we have done any execution in this time slice.
91 * LOCK: currently none, perhaps we should lock (but needs to be
92 * updated in the fast path of the scheduler).
93 *
94 * NB. must be StgWord, we do xchg() on it.
95 */
96 volatile StgWord recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99 * LOCK: none (changes monotonically)
100 */
101 volatile StgWord sched_state = SCHED_RUNNING;
102
103 /*
104 * This mutex protects most of the global scheduler data in
105 * the THREADED_RTS runtime.
106 */
107 #if defined(THREADED_RTS)
108 Mutex sched_mutex;
109 #endif
110
111 #if !defined(mingw32_HOST_OS)
112 #define FORKPROCESS_PRIMOP_SUPPORTED
113 #endif
114
115 /*
116 * sync_finished_cond allows threads which do not own any capability (e.g. the
117 * concurrent mark thread) to participate in the sync protocol. In particular,
118 * if such a thread requests a sync while sync is already in progress it will
119 * block on sync_finished_cond, which will be signalled when the sync is
120 * finished (by releaseAllCapabilities).
121 */
122 #if defined(THREADED_RTS)
123 static Condition sync_finished_cond;
124 static Mutex sync_finished_mutex;
125 #endif
126
127
128 /* -----------------------------------------------------------------------------
129 * static function prototypes
130 * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These functions all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void scheduleFindWork (Capability **pcap);
140 #if defined(THREADED_RTS)
141 static void scheduleYield (Capability **pcap, Task *task);
142 #endif
143 #if defined(THREADED_RTS)
144 static bool requestSync (Capability **pcap, Task *task,
145 PendingSync *sync_type, SyncType *prev_sync_type);
146 static void acquireAllCapabilities(Capability *cap, Task *task);
147 static void startWorkerTasks (uint32_t from USED_IF_THREADS,
148 uint32_t to USED_IF_THREADS);
149 #endif
150 static void scheduleStartSignalHandlers (Capability *cap);
151 static void scheduleCheckBlockedThreads (Capability *cap);
152 static void scheduleProcessInbox(Capability **cap);
153 static void scheduleDetectDeadlock (Capability **pcap, Task *task);
154 static void schedulePushWork(Capability *cap, Task *task);
155 #if defined(THREADED_RTS)
156 static void scheduleActivateSpark(Capability *cap);
157 #endif
158 static void schedulePostRunThread(Capability *cap, StgTSO *t);
159 static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
160 static bool scheduleHandleYield( Capability *cap, StgTSO *t,
161 uint32_t prev_what_next );
162 static void scheduleHandleThreadBlocked( StgTSO *t );
163 static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
164 StgTSO *t );
165 static bool scheduleNeedHeapProfile(bool ready_to_gc);
166 static void scheduleDoGC( Capability **pcap, Task *task,
167 bool force_major, bool deadlock_detect );
168
169 static void deleteThread (StgTSO *tso);
170 static void deleteAllThreads (void);
171
172 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
173 static void deleteThread_(StgTSO *tso);
174 #endif
175
176 /* ---------------------------------------------------------------------------
177 Main scheduling loop.
178
179 We use round-robin scheduling, each thread returning to the
180 scheduler loop when one of these conditions is detected:
181
182 * out of heap space
183 * timer expires (thread yields)
184 * thread blocks
185 * thread ends
186 * stack overflow
187
188 ------------------------------------------------------------------------ */
189
190 static Capability *
schedule(Capability * initialCapability,Task * task)191 schedule (Capability *initialCapability, Task *task)
192 {
193 StgTSO *t;
194 Capability *cap;
195 StgThreadReturnCode ret;
196 uint32_t prev_what_next;
197 bool ready_to_gc;
198
199 cap = initialCapability;
200
201 // Pre-condition: this task owns initialCapability.
202 // The sched_mutex is *NOT* held
203 // NB. on return, we still hold a capability.
204 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
205
206 debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
207
208 // -----------------------------------------------------------
209 // Scheduler loop starts here:
210
211 while (1) {
212 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
213
214 // Check whether we have re-entered the RTS from Haskell without
215 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
216 // call).
217 if (cap->in_haskell) {
218 errorBelch("schedule: re-entered unsafely.\n"
219 " Perhaps a 'foreign import unsafe' should be 'safe'?");
220 stg_exit(EXIT_FAILURE);
221 }
222
223 // Note [shutdown]: The interruption / shutdown sequence.
224 //
225 // In order to cleanly shut down the runtime, we want to:
226 // * make sure that all main threads return to their callers
227 // with the state 'Interrupted'.
228 // * clean up all OS threads assocated with the runtime
229 // * free all memory etc.
230 //
231 // So the sequence goes like this:
232 //
233 // * The shutdown sequence is initiated by calling hs_exit(),
234 // interruptStgRts(), or running out of memory in the GC.
235 //
236 // * Set sched_state = SCHED_INTERRUPTING
237 //
238 // * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
239 // scheduleDoGC(), which halts the whole runtime by acquiring all the
240 // capabilities, does a GC and then calls deleteAllThreads() to kill all
241 // the remaining threads. The zombies are left on the run queue for
242 // cleaning up. We can't kill threads involved in foreign calls.
243 //
244 // * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
245 //
246 // * After this point, there can be NO MORE HASKELL EXECUTION. This is
247 // enforced by the scheduler, which won't run any Haskell code when
248 // sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
249 // other capabilities by doing the GC earlier.
250 //
251 // * all workers exit when the run queue on their capability
252 // drains. All main threads will also exit when their TSO
253 // reaches the head of the run queue and they can return.
254 //
255 // * eventually all Capabilities will shut down, and the RTS can
256 // exit.
257 //
258 // * We might be left with threads blocked in foreign calls,
259 // we should really attempt to kill these somehow (TODO).
260
261 switch (RELAXED_LOAD(&sched_state)) {
262 case SCHED_RUNNING:
263 break;
264 case SCHED_INTERRUPTING:
265 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
266 /* scheduleDoGC() deletes all the threads */
267 scheduleDoGC(&cap,task,true,false);
268
269 // after scheduleDoGC(), we must be shutting down. Either some
270 // other Capability did the final GC, or we did it above,
271 // either way we can fall through to the SCHED_SHUTTING_DOWN
272 // case now.
273 ASSERT(RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN);
274 // fall through
275
276 case SCHED_SHUTTING_DOWN:
277 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
278 // If we are a worker, just exit. If we're a bound thread
279 // then we will exit below when we've removed our TSO from
280 // the run queue.
281 if (!isBoundTask(task) && emptyRunQueue(cap)) {
282 return cap;
283 }
284 break;
285 default:
286 barf("sched_state: %" FMT_Word, sched_state);
287 }
288
289 scheduleFindWork(&cap);
290
291 /* work pushing, currently relevant only for THREADED_RTS:
292 (pushes threads, wakes up idle capabilities for stealing) */
293 schedulePushWork(cap,task);
294
295 scheduleDetectDeadlock(&cap,task);
296
297 // Normally, the only way we can get here with no threads to
298 // run is if a keyboard interrupt received during
299 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
300 // Additionally, it is not fatal for the
301 // threaded RTS to reach here with no threads to run.
302 //
303 // win32: might be here due to awaitEvent() being abandoned
304 // as a result of a console event having been delivered.
305
306 #if defined(THREADED_RTS)
307 scheduleYield(&cap,task);
308
309 if (emptyRunQueue(cap)) continue; // look for work again
310 #endif
311
312 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
313 if ( emptyRunQueue(cap) ) {
314 ASSERT(sched_state >= SCHED_INTERRUPTING);
315 }
316 #endif
317
318 //
319 // Get a thread to run
320 //
321 t = popRunQueue(cap);
322
323 // Sanity check the thread we're about to run. This can be
324 // expensive if there is lots of thread switching going on...
325 IF_DEBUG(sanity,checkTSO(t));
326
327 #if defined(THREADED_RTS)
328 // Check whether we can run this thread in the current task.
329 // If not, we have to pass our capability to the right task.
330 {
331 InCall *bound = t->bound;
332
333 if (bound) {
334 if (bound->task == task) {
335 // yes, the Haskell thread is bound to the current native thread
336 } else {
337 debugTrace(DEBUG_sched,
338 "thread %lu bound to another OS thread",
339 (unsigned long)t->id);
340 // no, bound to a different Haskell thread: pass to that thread
341 pushOnRunQueue(cap,t);
342 continue;
343 }
344 } else {
345 // The thread we want to run is unbound.
346 if (task->incall->tso) {
347 debugTrace(DEBUG_sched,
348 "this OS thread cannot run thread %lu",
349 (unsigned long)t->id);
350 // no, the current native thread is bound to a different
351 // Haskell thread, so pass it to any worker thread
352 pushOnRunQueue(cap,t);
353 continue;
354 }
355 }
356 }
357 #endif
358
359 // If we're shutting down, and this thread has not yet been
360 // killed, kill it now. This sometimes happens when a finalizer
361 // thread is created by the final GC, or a thread previously
362 // in a foreign call returns.
363 if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING &&
364 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
365 deleteThread(t);
366 }
367
368 // If this capability is disabled, migrate the thread away rather
369 // than running it. NB. but not if the thread is bound: it is
370 // really hard for a bound thread to migrate itself. Believe me,
371 // I tried several ways and couldn't find a way to do it.
372 // Instead, when everything is stopped for GC, we migrate all the
373 // threads on the run queue then (see scheduleDoGC()).
374 //
375 // ToDo: what about TSO_LOCKED? Currently we're migrating those
376 // when the number of capabilities drops, but we never migrate
377 // them back if it rises again. Presumably we should, but after
378 // the thread has been migrated we no longer know what capability
379 // it was originally on.
380 #if defined(THREADED_RTS)
381 if (cap->disabled && !t->bound) {
382 Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
383 migrateThread(cap, t, dest_cap);
384 continue;
385 }
386 #endif
387
388 /* context switches are initiated by the timer signal, unless
389 * the user specified "context switch as often as possible", with
390 * +RTS -C0
391 */
392 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
393 && !emptyThreadQueues(cap)) {
394 RELAXED_STORE(&cap->context_switch, 1);
395 }
396
397 run_thread:
398
399 // CurrentTSO is the thread to run. It might be different if we
400 // loop back to run_thread, so make sure to set CurrentTSO after
401 // that.
402 cap->r.rCurrentTSO = t;
403
404 startHeapProfTimer();
405
406 // ----------------------------------------------------------------------
407 // Run the current thread
408
409 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
410 ASSERT(t->cap == cap);
411 ASSERT(t->bound ? t->bound->task->cap == cap : 1);
412
413 prev_what_next = t->what_next;
414
415 errno = t->saved_errno;
416 #if defined(mingw32_HOST_OS)
417 SetLastError(t->saved_winerror);
418 #endif
419
420 // reset the interrupt flag before running Haskell code
421 RELAXED_STORE(&cap->interrupt, false);
422
423 cap->in_haskell = true;
424 RELAXED_STORE(&cap->idle, false);
425
426 dirty_TSO(cap,t);
427 dirty_STACK(cap,t->stackobj);
428
429 switch (SEQ_CST_LOAD(&recent_activity))
430 {
431 case ACTIVITY_DONE_GC: {
432 // ACTIVITY_DONE_GC means we turned off the timer signal to
433 // conserve power (see #1623). Re-enable it here.
434 uint32_t prev;
435 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
436 if (prev == ACTIVITY_DONE_GC) {
437 #if !defined(PROFILING)
438 startTimer();
439 #endif
440 }
441 break;
442 }
443 case ACTIVITY_INACTIVE:
444 // If we reached ACTIVITY_INACTIVE, then don't reset it until
445 // we've done the GC. The thread running here might just be
446 // the IO manager thread that handle_tick() woke up via
447 // wakeUpRts().
448 break;
449 default:
450 SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
451 }
452
453 traceEventRunThread(cap, t);
454
455 switch (prev_what_next) {
456
457 case ThreadKilled:
458 case ThreadComplete:
459 /* Thread already finished, return to scheduler. */
460 ret = ThreadFinished;
461 break;
462
463 case ThreadRunGHC:
464 {
465 StgRegTable *r;
466 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
467 cap = regTableToCapability(r);
468 ret = r->rRet;
469 break;
470 }
471
472 case ThreadInterpret:
473 cap = interpretBCO(cap);
474 ret = cap->r.rRet;
475 break;
476
477 default:
478 barf("schedule: invalid prev_what_next=%u field", prev_what_next);
479 }
480
481 cap->in_haskell = false;
482
483 // The TSO might have moved, eg. if it re-entered the RTS and a GC
484 // happened. So find the new location:
485 t = cap->r.rCurrentTSO;
486
487 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
488 // don't want it set when not running a Haskell thread.
489 cap->r.rCurrentTSO = NULL;
490
491 // And save the current errno in this thread.
492 // XXX: possibly bogus for SMP because this thread might already
493 // be running again, see code below.
494 t->saved_errno = errno;
495 #if defined(mingw32_HOST_OS)
496 // Similarly for Windows error code
497 t->saved_winerror = GetLastError();
498 #endif
499
500 if (ret == ThreadBlocked) {
501 if (t->why_blocked == BlockedOnBlackHole) {
502 StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
503 traceEventStopThread(cap, t, t->why_blocked + 6,
504 owner != NULL ? owner->id : 0);
505 } else {
506 traceEventStopThread(cap, t, t->why_blocked + 6, 0);
507 }
508 } else {
509 if (ret == StackOverflow) {
510 traceEventStopThread(cap, t, ret, t->tot_stack_size);
511 } else {
512 traceEventStopThread(cap, t, ret, 0);
513 }
514 }
515
516 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
517 ASSERT(t->cap == cap);
518
519 // ----------------------------------------------------------------------
520
521 // Costs for the scheduler are assigned to CCS_SYSTEM
522 stopHeapProfTimer();
523 #if defined(PROFILING)
524 cap->r.rCCCS = CCS_SYSTEM;
525 #endif
526
527 schedulePostRunThread(cap,t);
528
529 ready_to_gc = false;
530
531 switch (ret) {
532 case HeapOverflow:
533 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
534 break;
535
536 case StackOverflow:
537 // just adjust the stack for this thread, then pop it back
538 // on the run queue.
539 threadStackOverflow(cap, t);
540 pushOnRunQueue(cap,t);
541 break;
542
543 case ThreadYielding:
544 if (scheduleHandleYield(cap, t, prev_what_next)) {
545 // shortcut for switching between compiler/interpreter:
546 goto run_thread;
547 }
548 break;
549
550 case ThreadBlocked:
551 scheduleHandleThreadBlocked(t);
552 break;
553
554 case ThreadFinished:
555 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
556 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
557 break;
558
559 default:
560 barf("schedule: invalid thread return code %d", (int)ret);
561 }
562
563 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
564 scheduleDoGC(&cap,task,false,false);
565 }
566 } /* end of while() */
567 }
568
569 /* -----------------------------------------------------------------------------
570 * Run queue operations
571 * -------------------------------------------------------------------------- */
572
573 static void
removeFromRunQueue(Capability * cap,StgTSO * tso)574 removeFromRunQueue (Capability *cap, StgTSO *tso)
575 {
576 if (tso->block_info.prev == END_TSO_QUEUE) {
577 ASSERT(cap->run_queue_hd == tso);
578 cap->run_queue_hd = tso->_link;
579 } else {
580 setTSOLink(cap, tso->block_info.prev, tso->_link);
581 }
582 if (tso->_link == END_TSO_QUEUE) {
583 ASSERT(cap->run_queue_tl == tso);
584 cap->run_queue_tl = tso->block_info.prev;
585 } else {
586 setTSOPrev(cap, tso->_link, tso->block_info.prev);
587 }
588 tso->_link = tso->block_info.prev = END_TSO_QUEUE;
589 cap->n_run_queue--;
590
591 IF_DEBUG(sanity, checkRunQueue(cap));
592 }
593
594 void
promoteInRunQueue(Capability * cap,StgTSO * tso)595 promoteInRunQueue (Capability *cap, StgTSO *tso)
596 {
597 removeFromRunQueue(cap, tso);
598 pushOnRunQueue(cap, tso);
599 }
600
601 /* -----------------------------------------------------------------------------
602 * scheduleFindWork()
603 *
604 * Search for work to do, and handle messages from elsewhere.
605 * -------------------------------------------------------------------------- */
606
607 static void
scheduleFindWork(Capability ** pcap)608 scheduleFindWork (Capability **pcap)
609 {
610 scheduleStartSignalHandlers(*pcap);
611
612 scheduleProcessInbox(pcap);
613
614 scheduleCheckBlockedThreads(*pcap);
615
616 #if defined(THREADED_RTS)
617 if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
618 #endif
619 }
620
621 #if defined(THREADED_RTS)
622 STATIC_INLINE bool
shouldYieldCapability(Capability * cap,Task * task,bool didGcLast)623 shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
624 {
625 // we need to yield this capability to someone else if..
626 // - another thread is initiating a GC, and we didn't just do a GC
627 // (see Note [GC livelock])
628 // - another Task is returning from a foreign call
629 // - the thread at the head of the run queue cannot be run
630 // by this Task (it is bound to another Task, or it is unbound
631 // and this task it bound).
632 //
633 // Note [GC livelock]
634 //
635 // If we are interrupted to do a GC, then we do not immediately do
636 // another one. This avoids a starvation situation where one
637 // Capability keeps forcing a GC and the other Capabilities make no
638 // progress at all.
639
640 // Note [Data race in shouldYieldCapability]
641 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
642 // We would usually need to hold cap->lock to look at n_returning_tasks but
643 // we don't here since this is just an approximate predicate. We
644 // consequently need to use atomic accesses whenever touching
645 // n_returning_tasks. However, since this is an approximate predicate we can
646 // use a RELAXED ordering.
647
648 return ((RELAXED_LOAD(&pending_sync) && !didGcLast) ||
649 RELAXED_LOAD(&cap->n_returning_tasks) != 0 ||
650 (!emptyRunQueue(cap) && (task->incall->tso == NULL
651 ? peekRunQueue(cap)->bound != NULL
652 : peekRunQueue(cap)->bound != task->incall)));
653 }
654
655 // This is the single place where a Task goes to sleep. There are
656 // two reasons it might need to sleep:
657 // - there are no threads to run
658 // - we need to yield this Capability to someone else
659 // (see shouldYieldCapability())
660 //
661 // Careful: the scheduler loop is quite delicate. Make sure you run
662 // the tests in testsuite/concurrent (all ways) after modifying this,
663 // and also check the benchmarks in nofib/parallel for regressions.
664
665 static void
scheduleYield(Capability ** pcap,Task * task)666 scheduleYield (Capability **pcap, Task *task)
667 {
668 Capability *cap = *pcap;
669 bool didGcLast = false;
670
671 // if we have work, and we don't need to give up the Capability, continue.
672 //
673 if (!shouldYieldCapability(cap,task,false) &&
674 (!emptyRunQueue(cap) ||
675 !emptyInbox(cap) ||
676 RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING)) {
677 return;
678 }
679
680 // otherwise yield (sleep), and keep yielding if necessary.
681 do {
682 if (doIdleGCWork(cap, false)) {
683 // there's more idle GC work to do
684 didGcLast = false;
685 } else {
686 // no more idle GC work to do
687 didGcLast = yieldCapability(&cap,task, !didGcLast);
688 }
689 }
690 while (shouldYieldCapability(cap,task,didGcLast));
691
692 // note there may still be no threads on the run queue at this
693 // point, the caller has to check.
694
695 *pcap = cap;
696 return;
697 }
698 #endif
699
700 /* -----------------------------------------------------------------------------
701 * schedulePushWork()
702 *
703 * Push work to other Capabilities if we have some.
704 * -------------------------------------------------------------------------- */
705
706 static void
schedulePushWork(Capability * cap USED_IF_THREADS,Task * task USED_IF_THREADS)707 schedulePushWork(Capability *cap USED_IF_THREADS,
708 Task *task USED_IF_THREADS)
709 {
710 #if defined(THREADED_RTS)
711
712 Capability *free_caps[n_capabilities], *cap0;
713 uint32_t i, n_wanted_caps, n_free_caps;
714
715 uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
716
717 // migration can be turned off with +RTS -qm
718 if (!RtsFlags.ParFlags.migrate) {
719 spare_threads = 0;
720 }
721
722 // Figure out how many capabilities we want to wake up. We need at least
723 // sparkPoolSize(cap) plus the number of spare threads we have.
724 n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
725 if (n_wanted_caps == 0) return;
726
727 // First grab as many free Capabilities as we can. ToDo: we should use
728 // capabilities on the same NUMA node preferably, but not exclusively.
729 for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
730 n_free_caps < n_wanted_caps && i != cap->no;
731 i = (i + 1) % n_capabilities) {
732 cap0 = capabilities[i];
733 if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
734 if (!emptyRunQueue(cap0)
735 || RELAXED_LOAD(&cap0->n_returning_tasks) != 0
736 || !emptyInbox(cap0)) {
737 // it already has some work, we just grabbed it at
738 // the wrong moment. Or maybe it's deadlocked!
739 releaseCapability(cap0);
740 } else {
741 free_caps[n_free_caps++] = cap0;
742 }
743 }
744 }
745
746 // We now have n_free_caps free capabilities stashed in
747 // free_caps[]. Attempt to share our run queue equally with them.
748 // This is complicated slightly by the fact that we can't move
749 // some threads:
750 //
751 // - threads that have TSO_LOCKED cannot migrate
752 // - a thread that is bound to the current Task cannot be migrated
753 //
754 // This is about the simplest thing we could do; improvements we
755 // might want to do include:
756 //
757 // - giving high priority to moving relatively new threads, on
758 // the gournds that they haven't had time to build up a
759 // working set in the cache on this CPU/Capability.
760 //
761 // - giving low priority to moving long-lived threads
762
763 if (n_free_caps > 0) {
764 StgTSO *prev, *t, *next;
765
766 debugTrace(DEBUG_sched,
767 "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
768 cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
769 n_free_caps);
770
771 // There are n_free_caps+1 caps in total. We will share the threads
772 // evently between them, *except* that if the run queue does not divide
773 // evenly by n_free_caps+1 then we bias towards the current capability.
774 // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
775 uint32_t keep_threads =
776 (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
777
778 // This also ensures that we don't give away all our threads, since
779 // (x + y) / (y + 1) >= 1 when x >= 1.
780
781 // The number of threads we have left.
782 uint32_t n = cap->n_run_queue;
783
784 // prev = the previous thread on this cap's run queue
785 prev = END_TSO_QUEUE;
786
787 // We're going to walk through the run queue, migrating threads to other
788 // capabilities until we have only keep_threads left. We might
789 // encounter a thread that cannot be migrated, in which case we add it
790 // to the current run queue and decrement keep_threads.
791 for (t = cap->run_queue_hd, i = 0;
792 t != END_TSO_QUEUE && n > keep_threads;
793 t = next)
794 {
795 next = t->_link;
796 t->_link = END_TSO_QUEUE;
797
798 // Should we keep this thread?
799 if (t->bound == task->incall // don't move my bound thread
800 || tsoLocked(t) // don't move a locked thread
801 ) {
802 if (prev == END_TSO_QUEUE) {
803 cap->run_queue_hd = t;
804 } else {
805 setTSOLink(cap, prev, t);
806 }
807 setTSOPrev(cap, t, prev);
808 prev = t;
809 if (keep_threads > 0) keep_threads--;
810 }
811
812 // Or migrate it?
813 else {
814 appendToRunQueue(free_caps[i],t);
815 traceEventMigrateThread (cap, t, free_caps[i]->no);
816
817 // See Note [Benign data race due to work-pushing].
818 if (t->bound) {
819 t->bound->task->cap = free_caps[i];
820 }
821 t->cap = free_caps[i];
822 n--; // we have one fewer threads now
823 i++; // move on to the next free_cap
824 if (i == n_free_caps) i = 0;
825 }
826 }
827
828 // Join up the beginning of the queue (prev)
829 // with the rest of the queue (t)
830 if (t == END_TSO_QUEUE) {
831 cap->run_queue_tl = prev;
832 } else {
833 setTSOPrev(cap, t, prev);
834 }
835 if (prev == END_TSO_QUEUE) {
836 cap->run_queue_hd = t;
837 } else {
838 setTSOLink(cap, prev, t);
839 }
840 cap->n_run_queue = n;
841
842 IF_DEBUG(sanity, checkRunQueue(cap));
843
844 // release the capabilities
845 for (i = 0; i < n_free_caps; i++) {
846 task->cap = free_caps[i];
847 if (sparkPoolSizeCap(cap) > 0) {
848 // If we have sparks to steal, wake up a worker on the
849 // capability, even if it has no threads to run.
850 releaseAndWakeupCapability(free_caps[i]);
851 } else {
852 releaseCapability(free_caps[i]);
853 }
854 }
855 }
856 task->cap = cap; // reset to point to our Capability.
857
858 #endif /* THREADED_RTS */
859
860 }
861
862 /* ----------------------------------------------------------------------------
863 * Start any pending signal handlers
864 * ------------------------------------------------------------------------- */
865
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
scheduleStartSignalHandlers(Capability * cap)868 scheduleStartSignalHandlers(Capability *cap)
869 {
870 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871 // safe outside the lock
872 startSignalHandlers(cap);
873 }
874 }
875 #else
876 static void
scheduleStartSignalHandlers(Capability * cap STG_UNUSED)877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881
882 /* ----------------------------------------------------------------------------
883 * Check for blocked threads that can be woken up.
884 * ------------------------------------------------------------------------- */
885
886 static void
scheduleCheckBlockedThreads(Capability * cap USED_IF_NOT_THREADS)887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890 //
891 // Check whether any waiting threads need to be woken up. If the
892 // run queue is empty, and there are no other tasks running, we
893 // can wait indefinitely for something to happen.
894 //
895 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896 {
897 awaitEvent (emptyRunQueue(cap));
898 }
899 #endif
900 }
901
902 /* ----------------------------------------------------------------------------
903 * Detect deadlock conditions and attempt to resolve them.
904 * ------------------------------------------------------------------------- */
905
906 static void
scheduleDetectDeadlock(Capability ** pcap,Task * task)907 scheduleDetectDeadlock (Capability **pcap, Task *task)
908 {
909 Capability *cap = *pcap;
910 /*
911 * Detect deadlock: when we have no threads to run, there are no
912 * threads blocked, waiting for I/O, or sleeping, and all the
913 * other tasks are waiting for work, we must have a deadlock of
914 * some description.
915 */
916 if ( emptyThreadQueues(cap) )
917 {
918 #if defined(THREADED_RTS)
919 /*
920 * In the threaded RTS, we only check for deadlock if there
921 * has been no activity in a complete timeslice. This means
922 * we won't eagerly start a full GC just because we don't have
923 * any threads to run currently.
924 */
925 if (SEQ_CST_LOAD(&recent_activity) != ACTIVITY_INACTIVE) return;
926 #endif
927
928 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
929
930 // Garbage collection can release some new threads due to
931 // either (a) finalizers or (b) threads resurrected because
932 // they are unreachable and will therefore be sent an
933 // exception. Any threads thus released will be immediately
934 // runnable.
935 scheduleDoGC (pcap, task, true/*force major GC*/, true/*deadlock detection*/);
936 cap = *pcap;
937 // when force_major == true. scheduleDoGC sets
938 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
939 // signal.
940
941 if ( !emptyRunQueue(cap) ) return;
942
943 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
944 /* If we have user-installed signal handlers, then wait
945 * for signals to arrive rather then bombing out with a
946 * deadlock.
947 */
948 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
949 debugTrace(DEBUG_sched,
950 "still deadlocked, waiting for signals...");
951
952 awaitUserSignals();
953
954 if (signals_pending()) {
955 startSignalHandlers(cap);
956 }
957
958 // either we have threads to run, or we were interrupted:
959 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
960
961 return;
962 }
963 #endif
964
965 #if !defined(THREADED_RTS)
966 /* Probably a real deadlock. Send the current main thread the
967 * Deadlock exception.
968 */
969 if (task->incall->tso) {
970 switch (task->incall->tso->why_blocked) {
971 case BlockedOnSTM:
972 case BlockedOnBlackHole:
973 case BlockedOnMsgThrowTo:
974 case BlockedOnMVar:
975 case BlockedOnMVarRead:
976 throwToSingleThreaded(cap, task->incall->tso,
977 (StgClosure *)nonTermination_closure);
978 return;
979 default:
980 barf("deadlock: main thread blocked in a strange way");
981 }
982 }
983 return;
984 #endif
985 }
986 }
987
988
989 /* ----------------------------------------------------------------------------
990 * Process message in the current Capability's inbox
991 * ------------------------------------------------------------------------- */
992
993 static void
scheduleProcessInbox(Capability ** pcap USED_IF_THREADS)994 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
995 {
996 #if defined(THREADED_RTS)
997 Message *m, *next;
998 PutMVar *p, *pnext;
999 int r;
1000 Capability *cap = *pcap;
1001
1002 while (!emptyInbox(cap)) {
1003 // Executing messages might use heap, so we should check for GC.
1004 if (doYouWantToGC(cap)) {
1005 scheduleDoGC(pcap, cap->running_task, false, false);
1006 cap = *pcap;
1007 }
1008
1009 // don't use a blocking acquire; if the lock is held by
1010 // another thread then just carry on. This seems to avoid
1011 // getting stuck in a message ping-pong situation with other
1012 // processors. We'll check the inbox again later anyway.
1013 //
1014 // We should really use a more efficient queue data structure
1015 // here. The trickiness is that we must ensure a Capability
1016 // never goes idle if the inbox is non-empty, which is why we
1017 // use cap->lock (cap->lock is released as the last thing
1018 // before going idle; see Capability.c:releaseCapability()).
1019 r = TRY_ACQUIRE_LOCK(&cap->lock);
1020 if (r != 0) return;
1021
1022 m = cap->inbox;
1023 p = cap->putMVars;
1024 cap->inbox = (Message*)END_TSO_QUEUE;
1025 cap->putMVars = NULL;
1026
1027 RELEASE_LOCK(&cap->lock);
1028
1029 while (m != (Message*)END_TSO_QUEUE) {
1030 next = m->link;
1031 executeMessage(cap, m);
1032 m = next;
1033 }
1034
1035 while (p != NULL) {
1036 pnext = p->link;
1037 performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
1038 Unit_closure);
1039 freeStablePtr(p->mvar);
1040 stgFree(p);
1041 p = pnext;
1042 }
1043 }
1044 #endif
1045 }
1046
1047
1048 /* ----------------------------------------------------------------------------
1049 * Activate spark threads (THREADED_RTS)
1050 * ------------------------------------------------------------------------- */
1051
1052 #if defined(THREADED_RTS)
1053 static void
scheduleActivateSpark(Capability * cap)1054 scheduleActivateSpark(Capability *cap)
1055 {
1056 if (anySparks() && !cap->disabled)
1057 {
1058 createSparkThread(cap);
1059 debugTrace(DEBUG_sched, "creating a spark thread");
1060 }
1061 }
1062 #endif // THREADED_RTS
1063
1064 /* ----------------------------------------------------------------------------
1065 * After running a thread...
1066 * ------------------------------------------------------------------------- */
1067
1068 static void
schedulePostRunThread(Capability * cap,StgTSO * t)1069 schedulePostRunThread (Capability *cap, StgTSO *t)
1070 {
1071 // We have to be able to catch transactions that are in an
1072 // infinite loop as a result of seeing an inconsistent view of
1073 // memory, e.g.
1074 //
1075 // atomically $ do
1076 // [a,b] <- mapM readTVar [ta,tb]
1077 // when (a == b) loop
1078 //
1079 // and a is never equal to b given a consistent view of memory.
1080 //
1081 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1082 if (!stmValidateNestOfTransactions(cap, t -> trec)) {
1083 debugTrace(DEBUG_sched | DEBUG_stm,
1084 "trec %p found wasting its time", t);
1085
1086 // strip the stack back to the
1087 // ATOMICALLY_FRAME, aborting the (nested)
1088 // transaction, and saving the stack of any
1089 // partially-evaluated thunks on the heap.
1090 throwToSingleThreaded_(cap, t, NULL, true);
1091
1092 // ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1093 }
1094 }
1095
1096 //
1097 // If the current thread's allocation limit has run out, send it
1098 // the AllocationLimitExceeded exception.
1099
1100 if (PK_Int64((W_*)&(t->alloc_limit)) < 0 && (t->flags & TSO_ALLOC_LIMIT)) {
1101 // Use a throwToSelf rather than a throwToSingleThreaded, because
1102 // it correctly handles the case where the thread is currently
1103 // inside mask. Also the thread might be blocked (e.g. on an
1104 // MVar), and throwToSingleThreaded doesn't unblock it
1105 // correctly in that case.
1106 throwToSelf(cap, t, allocationLimitExceeded_closure);
1107 ASSIGN_Int64((W_*)&(t->alloc_limit),
1108 (StgInt64)RtsFlags.GcFlags.allocLimitGrace * BLOCK_SIZE);
1109 }
1110
1111 /* some statistics gathering in the parallel case */
1112 }
1113
1114 /* -----------------------------------------------------------------------------
1115 * Handle a thread that returned to the scheduler with ThreadHeapOverflow
1116 * -------------------------------------------------------------------------- */
1117
1118 static bool
scheduleHandleHeapOverflow(Capability * cap,StgTSO * t)1119 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1120 {
1121 if (cap->r.rHpLim == NULL || RELAXED_LOAD(&cap->context_switch)) {
1122 // Sometimes we miss a context switch, e.g. when calling
1123 // primitives in a tight loop, MAYBE_GC() doesn't check the
1124 // context switch flag, and we end up waiting for a GC.
1125 // See #1984, and concurrent/should_run/1984
1126 RELAXED_STORE(&cap->context_switch, 0);
1127 appendToRunQueue(cap,t);
1128 } else {
1129 pushOnRunQueue(cap,t);
1130 }
1131
1132 // did the task ask for a large block?
1133 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1134 // if so, get one and push it on the front of the nursery.
1135 bdescr *bd;
1136 W_ blocks;
1137
1138 blocks = (W_)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1139
1140 if (blocks > BLOCKS_PER_MBLOCK) {
1141 barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
1142 }
1143
1144 debugTrace(DEBUG_sched,
1145 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1146 (long)t->id, what_next_strs[t->what_next], blocks);
1147
1148 // don't do this if the nursery is (nearly) full, we'll GC first.
1149 if (cap->r.rCurrentNursery->link != NULL ||
1150 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent
1151 // infinite loop if the
1152 // nursery has only one
1153 // block.
1154
1155 bd = allocGroupOnNode_lock(cap->node,blocks);
1156 cap->r.rNursery->n_blocks += blocks;
1157
1158 // link the new group after CurrentNursery
1159 dbl_link_insert_after(bd, cap->r.rCurrentNursery);
1160
1161 // initialise it as a nursery block. We initialise the
1162 // step, gen_no, and flags field of *every* sub-block in
1163 // this large block, because this is easier than making
1164 // sure that we always find the block head of a large
1165 // block whenever we call Bdescr() (eg. evacuate() and
1166 // isAlive() in the GC would both have to do this, at
1167 // least).
1168 {
1169 bdescr *x;
1170 for (x = bd; x < bd + blocks; x++) {
1171 initBdescr(x,g0,g0);
1172 x->free = x->start;
1173 x->flags = 0;
1174 }
1175 }
1176
1177 // This assert can be a killer if the app is doing lots
1178 // of large block allocations.
1179 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1180
1181 // now update the nursery to point to the new block
1182 finishedNurseryBlock(cap, cap->r.rCurrentNursery);
1183 cap->r.rCurrentNursery = bd;
1184
1185 // we might be unlucky and have another thread get on the
1186 // run queue before us and steal the large block, but in that
1187 // case the thread will just end up requesting another large
1188 // block.
1189 return false; /* not actually GC'ing */
1190 }
1191 }
1192
1193 return doYouWantToGC(cap);
1194 /* actual GC is done at the end of the while loop in schedule() */
1195 }
1196
1197 /* -----------------------------------------------------------------------------
1198 * Handle a thread that returned to the scheduler with ThreadYielding
1199 * -------------------------------------------------------------------------- */
1200
1201 static bool
scheduleHandleYield(Capability * cap,StgTSO * t,uint32_t prev_what_next)1202 scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
1203 {
1204 /* put the thread back on the run queue. Then, if we're ready to
1205 * GC, check whether this is the last task to stop. If so, wake
1206 * up the GC thread. getThread will block during a GC until the
1207 * GC is finished.
1208 */
1209
1210 ASSERT(t->_link == END_TSO_QUEUE);
1211
1212 // Shortcut if we're just switching evaluators: just run the thread. See
1213 // Note [avoiding threadPaused] in Interpreter.c.
1214 if (t->what_next != prev_what_next) {
1215 debugTrace(DEBUG_sched,
1216 "--<< thread %ld (%s) stopped to switch evaluators",
1217 (long)t->id, what_next_strs[t->what_next]);
1218 return true;
1219 }
1220
1221 // Reset the context switch flag. We don't do this just before
1222 // running the thread, because that would mean we would lose ticks
1223 // during GC, which can lead to unfair scheduling (a thread hogs
1224 // the CPU because the tick always arrives during GC). This way
1225 // penalises threads that do a lot of allocation, but that seems
1226 // better than the alternative.
1227 if (RELAXED_LOAD(&cap->context_switch) != 0) {
1228 RELAXED_STORE(&cap->context_switch, 0);
1229 appendToRunQueue(cap,t);
1230 } else {
1231 pushOnRunQueue(cap,t);
1232 }
1233
1234 IF_DEBUG(sanity,
1235 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1236 checkTSO(t));
1237
1238 return false;
1239 }
1240
1241 /* -----------------------------------------------------------------------------
1242 * Handle a thread that returned to the scheduler with ThreadBlocked
1243 * -------------------------------------------------------------------------- */
1244
1245 static void
scheduleHandleThreadBlocked(StgTSO * t STG_UNUSED)1246 scheduleHandleThreadBlocked( StgTSO *t
1247 #if !defined(DEBUG)
1248 STG_UNUSED
1249 #endif
1250 )
1251 {
1252
1253 // We don't need to do anything. The thread is blocked, and it
1254 // has tidied up its stack and placed itself on whatever queue
1255 // it needs to be on.
1256
1257 // ASSERT(t->why_blocked != NotBlocked);
1258 // Not true: for example,
1259 // - the thread may have woken itself up already, because
1260 // threadPaused() might have raised a blocked throwTo
1261 // exception, see maybePerformBlockedException().
1262
1263 #if defined(DEBUG)
1264 traceThreadStatus(DEBUG_sched, t);
1265 #endif
1266 }
1267
1268 /* -----------------------------------------------------------------------------
1269 * Handle a thread that returned to the scheduler with ThreadFinished
1270 * -------------------------------------------------------------------------- */
1271
1272 static bool
scheduleHandleThreadFinished(Capability * cap,Task * task,StgTSO * t)1273 scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
1274 {
1275 /* Need to check whether this was a main thread, and if so,
1276 * return with the return value.
1277 *
1278 * We also end up here if the thread kills itself with an
1279 * uncaught exception, see Exception.cmm.
1280 */
1281
1282 // blocked exceptions can now complete, even if the thread was in
1283 // blocked mode (see #2910).
1284 awakenBlockedExceptionQueue (cap, t);
1285
1286 //
1287 // Check whether the thread that just completed was a bound
1288 // thread, and if so return with the result.
1289 //
1290 // There is an assumption here that all thread completion goes
1291 // through this point; we need to make sure that if a thread
1292 // ends up in the ThreadKilled state, that it stays on the run
1293 // queue so it can be dealt with here.
1294 //
1295
1296 if (t->bound) {
1297
1298 if (t->bound != task->incall) {
1299 #if !defined(THREADED_RTS)
1300 // Must be a bound thread that is not the topmost one. Leave
1301 // it on the run queue until the stack has unwound to the
1302 // point where we can deal with this. Leaving it on the run
1303 // queue also ensures that the garbage collector knows about
1304 // this thread and its return value (it gets dropped from the
1305 // step->threads list so there's no other way to find it).
1306 appendToRunQueue(cap,t);
1307 return false;
1308 #else
1309 // this cannot happen in the threaded RTS, because a
1310 // bound thread can only be run by the appropriate Task.
1311 barf("finished bound thread that isn't mine");
1312 #endif
1313 }
1314
1315 ASSERT(task->incall->tso == t);
1316
1317 if (t->what_next == ThreadComplete) {
1318 if (task->incall->ret) {
1319 // NOTE: return val is stack->sp[1] (see StgStartup.cmm)
1320 *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
1321 }
1322 task->incall->rstat = Success;
1323 } else {
1324 if (task->incall->ret) {
1325 *(task->incall->ret) = NULL;
1326 }
1327 if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING) {
1328 if (heap_overflow) {
1329 task->incall->rstat = HeapExhausted;
1330 } else {
1331 task->incall->rstat = Interrupted;
1332 }
1333 } else {
1334 task->incall->rstat = Killed;
1335 }
1336 }
1337 #if defined(DEBUG)
1338 removeThreadLabel((StgWord)task->incall->tso->id);
1339 #endif
1340
1341 // We no longer consider this thread and task to be bound to
1342 // each other. The TSO lives on until it is GC'd, but the
1343 // task is about to be released by the caller, and we don't
1344 // want anyone following the pointer from the TSO to the
1345 // defunct task (which might have already been
1346 // re-used). This was a real bug: the GC updated
1347 // tso->bound->tso which lead to a deadlock.
1348 t->bound = NULL;
1349 task->incall->tso = NULL;
1350
1351 return true; // tells schedule() to return
1352 }
1353
1354 return false;
1355 }
1356
1357 /* -----------------------------------------------------------------------------
1358 * Perform a heap census
1359 * -------------------------------------------------------------------------- */
1360
1361 static bool
scheduleNeedHeapProfile(bool ready_to_gc)1362 scheduleNeedHeapProfile( bool ready_to_gc )
1363 {
1364 // When we have +RTS -i0 and we're heap profiling, do a census at
1365 // every GC. This lets us get repeatable runs for debugging.
1366 if (performHeapProfile ||
1367 (RtsFlags.ProfFlags.heapProfileInterval==0 &&
1368 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1369 return true;
1370 } else {
1371 return false;
1372 }
1373 }
1374
1375 /* -----------------------------------------------------------------------------
1376 * stopAllCapabilities()
1377 *
1378 * Stop all Haskell execution. This is used when we need to make some global
1379 * change to the system, such as altering the number of capabilities, or
1380 * forking.
1381 *
1382 * pCap may be NULL in the event that the caller doesn't yet own a capability.
1383 *
1384 * To resume after stopAllCapabilities(), use releaseAllCapabilities().
1385 * -------------------------------------------------------------------------- */
1386
1387 #if defined(THREADED_RTS)
stopAllCapabilities(Capability ** pCap,Task * task)1388 void stopAllCapabilities (Capability **pCap, Task *task)
1389 {
1390 stopAllCapabilitiesWith(pCap, task, SYNC_OTHER);
1391 }
1392
stopAllCapabilitiesWith(Capability ** pCap,Task * task,SyncType sync_type)1393 void stopAllCapabilitiesWith (Capability **pCap, Task *task, SyncType sync_type)
1394 {
1395 bool was_syncing;
1396 SyncType prev_sync_type;
1397
1398 PendingSync sync = {
1399 .type = sync_type,
1400 .idle = NULL,
1401 .task = task
1402 };
1403
1404 do {
1405 was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
1406 } while (was_syncing);
1407
1408 acquireAllCapabilities(pCap ? *pCap : NULL, task);
1409
1410 pending_sync = 0;
1411 signalCondition(&sync_finished_cond);
1412 }
1413 #endif
1414
1415 /* -----------------------------------------------------------------------------
1416 * requestSync()
1417 *
1418 * Commence a synchronisation between all capabilities. Normally not called
1419 * directly, instead use stopAllCapabilities(). This is used by the GC, which
1420 * has some special synchronisation requirements.
1421 *
1422 * Note that this can be called in two ways:
1423 *
1424 * - where *pcap points to a capability owned by the caller: in this case
1425 * *prev_sync_type will reflect the in-progress sync type on return, if one
1426 * *was found
1427 *
1428 * - where pcap == NULL: in this case the caller doesn't hold a capability.
1429 * we only return whether or not a pending sync was found and prev_sync_type
1430 * is unchanged.
1431 *
1432 * Returns:
1433 * false if we successfully got a sync
1434 * true if there was another sync request in progress,
1435 * and we yielded to it. The value returned is the
1436 * type of the other sync request.
1437 * -------------------------------------------------------------------------- */
1438
1439 #if defined(THREADED_RTS)
requestSync(Capability ** pcap,Task * task,PendingSync * new_sync,SyncType * prev_sync_type)1440 static bool requestSync (
1441 Capability **pcap, Task *task, PendingSync *new_sync,
1442 SyncType *prev_sync_type)
1443 {
1444 PendingSync *sync;
1445
1446 sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
1447 (StgWord)NULL,
1448 (StgWord)new_sync);
1449
1450 if (sync != NULL)
1451 {
1452 // sync is valid until we have called yieldCapability().
1453 // After the sync is completed, we cannot read that struct any
1454 // more because it has been freed.
1455 *prev_sync_type = sync->type;
1456 if (pcap == NULL) {
1457 // The caller does not hold a capability (e.g. may be a concurrent
1458 // mark thread). Consequently we must wait until the pending sync is
1459 // finished before proceeding to ensure we don't loop.
1460 // TODO: Don't busy-wait
1461 ACQUIRE_LOCK(&sync_finished_mutex);
1462 while (pending_sync) {
1463 waitCondition(&sync_finished_cond, &sync_finished_mutex);
1464 }
1465 RELEASE_LOCK(&sync_finished_mutex);
1466 } else {
1467 do {
1468 debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
1469 sync->type);
1470 ASSERT(*pcap);
1471 yieldCapability(pcap,task,true);
1472 sync = SEQ_CST_LOAD(&pending_sync);
1473 } while (sync != NULL);
1474 }
1475
1476 // NOTE: task->cap might have changed now
1477 return true;
1478 }
1479 else
1480 {
1481 return false;
1482 }
1483 }
1484 #endif
1485
1486 /* -----------------------------------------------------------------------------
1487 * acquireAllCapabilities()
1488 *
1489 * Grab all the capabilities except the one we already hold (cap may be NULL is
1490 * the caller does not currently hold a capability). Used when synchronising
1491 * before a single-threaded GC (SYNC_SEQ_GC), and before a fork (SYNC_OTHER).
1492 *
1493 * Only call this after requestSync(), otherwise a deadlock might
1494 * ensue if another thread is trying to synchronise.
1495 * -------------------------------------------------------------------------- */
1496
1497 #if defined(THREADED_RTS)
acquireAllCapabilities(Capability * cap,Task * task)1498 static void acquireAllCapabilities(Capability *cap, Task *task)
1499 {
1500 Capability *tmpcap;
1501 uint32_t i;
1502
1503 ASSERT(SEQ_CST_LOAD(&pending_sync) != NULL);
1504 for (i=0; i < n_capabilities; i++) {
1505 debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
1506 i, n_capabilities);
1507 tmpcap = capabilities[i];
1508 if (tmpcap != cap) {
1509 // we better hope this task doesn't get migrated to
1510 // another Capability while we're waiting for this one.
1511 // It won't, because load balancing happens while we have
1512 // all the Capabilities, but even so it's a slightly
1513 // unsavoury invariant.
1514 task->cap = tmpcap;
1515 waitForCapability(&tmpcap, task);
1516 if (tmpcap->no != i) {
1517 barf("acquireAllCapabilities: got the wrong capability");
1518 }
1519 }
1520 }
1521 task->cap = cap == NULL ? tmpcap : cap;
1522 }
1523 #endif
1524
1525 /* -----------------------------------------------------------------------------
1526 * releaseAllCapabilities()
1527 *
1528 * Assuming this thread holds all the capabilities, release them all (except for
1529 * the one passed in as keep_cap, if non-NULL).
1530 * -------------------------------------------------------------------------- */
1531
1532 #if defined(THREADED_RTS)
releaseAllCapabilities(uint32_t n,Capability * keep_cap,Task * task)1533 void releaseAllCapabilities(uint32_t n, Capability *keep_cap, Task *task)
1534 {
1535 uint32_t i;
1536
1537 for (i = 0; i < n; i++) {
1538 Capability *tmpcap = capabilities[i];
1539 if (keep_cap != tmpcap) {
1540 task->cap = tmpcap;
1541 releaseCapability(tmpcap);
1542 }
1543 }
1544 task->cap = keep_cap;
1545 }
1546 #endif
1547
1548 /* -----------------------------------------------------------------------------
1549 * Perform a garbage collection if necessary
1550 * -------------------------------------------------------------------------- */
1551
1552 // N.B. See Note [Deadlock detection under nonmoving collector] for rationale
1553 // behind deadlock_detect argument.
1554 static void
scheduleDoGC(Capability ** pcap,Task * task USED_IF_THREADS,bool force_major,bool deadlock_detect)1555 scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
1556 bool force_major, bool deadlock_detect)
1557 {
1558 Capability *cap = *pcap;
1559 bool heap_census;
1560 uint32_t collect_gen;
1561 bool major_gc;
1562 #if defined(THREADED_RTS)
1563 uint32_t gc_type;
1564 uint32_t i;
1565 uint32_t need_idle;
1566 uint32_t n_gc_threads;
1567 uint32_t n_idle_caps = 0, n_failed_trygrab_idles = 0;
1568 StgTSO *tso;
1569 bool *idle_cap;
1570 // idle_cap is an array (allocated later) of size n_capabilities, where
1571 // idle_cap[i] is rtsTrue if capability i will be idle during this GC
1572 // cycle.
1573 #endif
1574
1575 if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
1576 // The final GC has already been done, and the system is
1577 // shutting down. We'll probably deadlock if we try to GC
1578 // now.
1579 return;
1580 }
1581
1582 heap_census = scheduleNeedHeapProfile(true);
1583
1584 // Figure out which generation we are collecting, so that we can
1585 // decide whether this is a parallel GC or not.
1586 collect_gen = calcNeeded(force_major || heap_census, NULL);
1587 major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
1588
1589 #if defined(THREADED_RTS)
1590 if (RELAXED_LOAD(&sched_state) < SCHED_INTERRUPTING
1591 && RtsFlags.ParFlags.parGcEnabled
1592 && collect_gen >= RtsFlags.ParFlags.parGcGen
1593 && ! oldest_gen->mark)
1594 {
1595 gc_type = SYNC_GC_PAR;
1596 } else {
1597 gc_type = SYNC_GC_SEQ;
1598 }
1599
1600 // In order to GC, there must be no threads running Haskell code.
1601 // Therefore, for single-threaded GC, the GC thread needs to hold *all* the
1602 // capabilities, and release them after the GC has completed. For parallel
1603 // GC, we synchronise all the running threads using requestSync().
1604 //
1605 // Other capabilities are prevented from running yet more Haskell threads if
1606 // pending_sync is set. Tested inside yieldCapability() and
1607 // releaseCapability() in Capability.c
1608
1609 PendingSync sync = {
1610 .type = gc_type,
1611 .idle = NULL,
1612 .task = task
1613 };
1614
1615 {
1616 SyncType prev_sync = 0;
1617 bool was_syncing;
1618 do {
1619 // If -qn is not set and we have more capabilities than cores, set
1620 // the number of GC threads to #cores. We do this here rather than
1621 // in normaliseRtsOpts() because here it will work if the program
1622 // calls setNumCapabilities.
1623 //
1624 n_gc_threads = RtsFlags.ParFlags.parGcThreads;
1625 if (n_gc_threads == 0 &&
1626 enabled_capabilities > getNumberOfProcessors()) {
1627 n_gc_threads = getNumberOfProcessors();
1628 }
1629
1630 // This calculation must be inside the loop because
1631 // enabled_capabilities may change if requestSync() below fails and
1632 // we retry.
1633 if (gc_type == SYNC_GC_PAR && n_gc_threads > 0) {
1634 if (n_gc_threads >= enabled_capabilities) {
1635 need_idle = 0;
1636 } else {
1637 need_idle = enabled_capabilities - n_gc_threads;
1638 }
1639 } else {
1640 need_idle = 0;
1641 }
1642
1643 // We need an array of size n_capabilities, but since this may
1644 // change each time around the loop we must allocate it afresh.
1645 idle_cap = (bool *)stgMallocBytes(n_capabilities *
1646 sizeof(bool),
1647 "scheduleDoGC");
1648 sync.idle = idle_cap;
1649
1650 // When using +RTS -qn, we need some capabilities to be idle during
1651 // GC. The best bet is to choose some inactive ones, so we look for
1652 // those first:
1653 uint32_t n_idle = need_idle;
1654 for (i=0; i < n_capabilities; i++) {
1655 if (capabilities[i]->disabled) {
1656 idle_cap[i] = true;
1657 } else if (n_idle > 0 &&
1658 capabilities[i]->running_task == NULL) {
1659 debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
1660 n_idle--;
1661 idle_cap[i] = true;
1662 } else {
1663 idle_cap[i] = false;
1664 }
1665 }
1666 // If we didn't find enough inactive capabilities, just pick some
1667 // more to be idle.
1668 for (i=0; n_idle > 0 && i < n_capabilities; i++) {
1669 if (!idle_cap[i] && i != cap->no) {
1670 idle_cap[i] = true;
1671 n_idle--;
1672 }
1673 }
1674 ASSERT(n_idle == 0);
1675
1676 was_syncing = requestSync(pcap, task, &sync, &prev_sync);
1677 cap = *pcap;
1678 if (was_syncing) {
1679 stgFree(idle_cap);
1680 }
1681 if (was_syncing &&
1682 (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
1683 !(RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && force_major)) {
1684 // someone else had a pending sync request for a GC, so
1685 // let's assume GC has been done and we don't need to GC
1686 // again.
1687 // Exception to this: if SCHED_INTERRUPTING, then we still
1688 // need to do the final GC.
1689 return;
1690 }
1691 if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
1692 // The scheduler might now be shutting down. We tested
1693 // this above, but it might have become true since then as
1694 // we yielded the capability in requestSync().
1695 return;
1696 }
1697 } while (was_syncing);
1698 }
1699
1700 stat_startGCSync(gc_threads[cap->no]);
1701
1702 #if defined(DEBUG)
1703 unsigned int old_n_capabilities = n_capabilities;
1704 #endif
1705
1706 interruptAllCapabilities();
1707
1708 // The final shutdown GC is always single-threaded, because it's
1709 // possible that some of the Capabilities have no worker threads.
1710
1711 if (gc_type == SYNC_GC_SEQ) {
1712 traceEventRequestSeqGc(cap);
1713 } else {
1714 traceEventRequestParGc(cap);
1715 }
1716
1717 if (gc_type == SYNC_GC_SEQ) {
1718 // single-threaded GC: grab all the capabilities
1719 acquireAllCapabilities(cap,task);
1720 }
1721 else
1722 {
1723 // If we are load-balancing collections in this
1724 // generation, then we require all GC threads to participate
1725 // in the collection. Otherwise, we only require active
1726 // threads to participate, and we set gc_threads[i]->idle for
1727 // any idle capabilities. The rationale here is that waking
1728 // up an idle Capability takes much longer than just doing any
1729 // GC work on its behalf.
1730
1731 if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
1732 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
1733 collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
1734 {
1735 for (i=0; i < n_capabilities; i++) {
1736 if (capabilities[i]->disabled) {
1737 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1738 if (idle_cap[i]) {
1739 n_idle_caps++;
1740 }
1741 } else {
1742 if (i != cap->no && idle_cap[i]) {
1743 Capability *tmpcap = capabilities[i];
1744 task->cap = tmpcap;
1745 waitForCapability(&tmpcap, task);
1746 n_idle_caps++;
1747 }
1748 }
1749 }
1750 }
1751 else
1752 {
1753 for (i=0; i < n_capabilities; i++) {
1754 if (capabilities[i]->disabled) {
1755 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1756 if (idle_cap[i]) {
1757 n_idle_caps++;
1758 }
1759 } else if (i != cap->no &&
1760 capabilities[i]->idle >=
1761 RtsFlags.ParFlags.parGcNoSyncWithIdle) {
1762 idle_cap[i] = tryGrabCapability(capabilities[i], task);
1763 if (idle_cap[i]) {
1764 n_idle_caps++;
1765 } else {
1766 n_failed_trygrab_idles++;
1767 }
1768 }
1769 }
1770 }
1771 debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
1772
1773 for (i=0; i < n_capabilities; i++) {
1774 NONATOMIC_ADD(&capabilities[i]->idle, 1);
1775 }
1776
1777 // For all capabilities participating in this GC, wait until
1778 // they have stopped mutating and are standing by for GC.
1779 waitForGcThreads(cap, idle_cap);
1780
1781 // Stable point where we can do a global check on our spark counters
1782 ASSERT(checkSparkCountInvariant());
1783 }
1784
1785 #endif
1786
1787 IF_DEBUG(scheduler, printAllThreads());
1788
1789 delete_threads_and_gc:
1790 /*
1791 * We now have all the capabilities; if we're in an interrupting
1792 * state, then we should take the opportunity to delete all the
1793 * threads in the system.
1794 * Checking for major_gc ensures that the last GC is major.
1795 */
1796 if (RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && major_gc) {
1797 deleteAllThreads();
1798 #if defined(THREADED_RTS)
1799 // Discard all the sparks from every Capability. Why?
1800 // They'll probably be GC'd anyway since we've killed all the
1801 // threads. It just avoids the GC having to do any work to
1802 // figure out that any remaining sparks are garbage.
1803 for (i = 0; i < n_capabilities; i++) {
1804 capabilities[i]->spark_stats.gcd +=
1805 sparkPoolSize(capabilities[i]->sparks);
1806 // No race here since all Caps are stopped.
1807 discardSparksCap(capabilities[i]);
1808 }
1809 #endif
1810 RELAXED_STORE(&sched_state, SCHED_SHUTTING_DOWN);
1811 }
1812
1813 /*
1814 * When there are disabled capabilities, we want to migrate any
1815 * threads away from them. Normally this happens in the
1816 * scheduler's loop, but only for unbound threads - it's really
1817 * hard for a bound thread to migrate itself. So we have another
1818 * go here.
1819 */
1820 #if defined(THREADED_RTS)
1821 for (i = enabled_capabilities; i < n_capabilities; i++) {
1822 Capability *tmp_cap, *dest_cap;
1823 tmp_cap = capabilities[i];
1824 ASSERT(tmp_cap->disabled);
1825 if (i != cap->no) {
1826 dest_cap = capabilities[i % enabled_capabilities];
1827 while (!emptyRunQueue(tmp_cap)) {
1828 tso = popRunQueue(tmp_cap);
1829 migrateThread(tmp_cap, tso, dest_cap);
1830 if (tso->bound) {
1831 traceTaskMigrate(tso->bound->task,
1832 tso->bound->task->cap,
1833 dest_cap);
1834 tso->bound->task->cap = dest_cap;
1835 }
1836 }
1837 }
1838 }
1839 #endif
1840
1841 // Do any remaining idle GC work from the previous GC
1842 doIdleGCWork(cap, true /* all of it */);
1843
1844 #if defined(THREADED_RTS)
1845 // reset pending_sync *before* GC, so that when the GC threads
1846 // emerge they don't immediately re-enter the GC.
1847 pending_sync = 0;
1848 signalCondition(&sync_finished_cond);
1849 GarbageCollect(collect_gen, heap_census, deadlock_detect, gc_type, cap, idle_cap);
1850 #else
1851 GarbageCollect(collect_gen, heap_census, deadlock_detect, 0, cap, NULL);
1852 #endif
1853
1854 // If we're shutting down, don't leave any idle GC work to do.
1855 if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
1856 doIdleGCWork(cap, true /* all of it */);
1857 }
1858
1859 traceSparkCounters(cap);
1860
1861 switch (SEQ_CST_LOAD(&recent_activity)) {
1862 case ACTIVITY_INACTIVE:
1863 if (force_major) {
1864 // We are doing a GC because the system has been idle for a
1865 // timeslice and we need to check for deadlock. Record the
1866 // fact that we've done a GC and turn off the timer signal;
1867 // it will get re-enabled if we run any threads after the GC.
1868 SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC);
1869 #if !defined(PROFILING)
1870 stopTimer();
1871 #endif
1872 break;
1873 }
1874 // fall through...
1875
1876 case ACTIVITY_MAYBE_NO:
1877 // the GC might have taken long enough for the timer to set
1878 // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
1879 // but we aren't necessarily deadlocked:
1880 SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
1881 break;
1882
1883 case ACTIVITY_DONE_GC:
1884 // If we are actually active, the scheduler will reset the
1885 // recent_activity flag and re-enable the timer.
1886 break;
1887 }
1888
1889 #if defined(THREADED_RTS)
1890 // Stable point where we can do a global check on our spark counters
1891 ASSERT(checkSparkCountInvariant());
1892 #endif
1893
1894 // The heap census itself is done during GarbageCollect().
1895 if (heap_census) {
1896 performHeapProfile = false;
1897 }
1898
1899 #if defined(THREADED_RTS)
1900
1901 // If n_capabilities has changed during GC, we're in trouble.
1902 ASSERT(n_capabilities == old_n_capabilities);
1903
1904 if (gc_type == SYNC_GC_PAR)
1905 {
1906 for (i = 0; i < n_capabilities; i++) {
1907 if (i != cap->no) {
1908 if (idle_cap[i]) {
1909 ASSERT(capabilities[i]->running_task == task);
1910 task->cap = capabilities[i];
1911 releaseCapability(capabilities[i]);
1912 } else {
1913 ASSERT(capabilities[i]->running_task != task);
1914 }
1915 }
1916 }
1917 task->cap = cap;
1918
1919 // releaseGCThreads() happens *after* we have released idle
1920 // capabilities. Otherwise what can happen is one of the released
1921 // threads starts a new GC, and finds that it can't acquire some of
1922 // the disabled capabilities, because the previous GC still holds
1923 // them, so those disabled capabilities will not be idle during the
1924 // next GC round. However, if we release the capabilities first,
1925 // then they will be free (because they're disabled) when the next
1926 // GC cycle happens.
1927 releaseGCThreads(cap, idle_cap);
1928 }
1929 #endif
1930 if (heap_overflow && RELAXED_LOAD(&sched_state) == SCHED_RUNNING) {
1931 // GC set the heap_overflow flag. We should throw an exception if we
1932 // can, or shut down otherwise.
1933
1934 // Get the thread to which Ctrl-C is thrown
1935 StgTSO *main_thread = getTopHandlerThread();
1936 if (main_thread == NULL) {
1937 // GC set the heap_overflow flag, and there is no main thread to
1938 // throw an exception to, so we should proceed with an orderly
1939 // shutdown now. Ultimately we want the main thread to return to
1940 // its caller with HeapExhausted, at which point the caller should
1941 // call hs_exit(). The first step is to delete all the threads.
1942 RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
1943 goto delete_threads_and_gc;
1944 }
1945
1946 heap_overflow = false;
1947 const uint64_t allocation_count = getAllocations();
1948 if (RtsFlags.GcFlags.heapLimitGrace <
1949 allocation_count - allocated_bytes_at_heapoverflow ||
1950 allocated_bytes_at_heapoverflow == 0) {
1951 allocated_bytes_at_heapoverflow = allocation_count;
1952 // We used to simply exit, but throwing an exception gives the
1953 // program a chance to clean up. It also lets the exception be
1954 // caught.
1955
1956 // FIXME this is not a good way to tell a program to release
1957 // resources. It is neither reliable (the RTS crashes if it fails
1958 // to allocate memory from the OS) nor very usable (it is always
1959 // thrown to the main thread, which might not be able to do anything
1960 // useful with it). We really should have a more general way to
1961 // release resources in low-memory conditions. Nevertheless, this
1962 // is still a big improvement over just exiting.
1963
1964 // FIXME again: perhaps we should throw a synchronous exception
1965 // instead an asynchronous one, or have a way for the program to
1966 // register a handler to be called when heap overflow happens.
1967 throwToSelf(cap, main_thread, heapOverflow_closure);
1968 }
1969 }
1970
1971 #if defined(THREADED_RTS)
1972 stgFree(idle_cap);
1973
1974 if (gc_type == SYNC_GC_SEQ) {
1975 // release our stash of capabilities.
1976 releaseAllCapabilities(n_capabilities, cap, task);
1977 }
1978 #endif
1979
1980 return;
1981 }
1982
1983 /* ---------------------------------------------------------------------------
1984 * Singleton fork(). Do not copy any running threads.
1985 * ------------------------------------------------------------------------- */
1986
1987 pid_t
forkProcess(HsStablePtr * entry STG_UNUSED)1988 forkProcess(HsStablePtr *entry
1989 #if !defined(FORKPROCESS_PRIMOP_SUPPORTED)
1990 STG_UNUSED
1991 #endif
1992 )
1993 {
1994 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
1995 pid_t pid;
1996 StgTSO* t,*next;
1997 Capability *cap;
1998 uint32_t g;
1999 Task *task = NULL;
2000 uint32_t i;
2001
2002 debugTrace(DEBUG_sched, "forking!");
2003
2004 task = newBoundTask();
2005
2006 cap = NULL;
2007 waitForCapability(&cap, task);
2008
2009 #if defined(THREADED_RTS)
2010 stopAllCapabilities(&cap, task);
2011 #endif
2012
2013 // no funny business: hold locks while we fork, otherwise if some
2014 // other thread is holding a lock when the fork happens, the data
2015 // structure protected by the lock will forever be in an
2016 // inconsistent state in the child. See also #1391.
2017 ACQUIRE_LOCK(&sched_mutex);
2018 ACQUIRE_LOCK(&sm_mutex);
2019 ACQUIRE_LOCK(&stable_ptr_mutex);
2020 ACQUIRE_LOCK(&stable_name_mutex);
2021
2022 for (i=0; i < n_capabilities; i++) {
2023 ACQUIRE_LOCK(&capabilities[i]->lock);
2024 }
2025
2026 // Take task lock after capability lock to avoid order inversion (#17275).
2027 ACQUIRE_LOCK(&task->lock);
2028
2029 #if defined(THREADED_RTS)
2030 ACQUIRE_LOCK(&all_tasks_mutex);
2031 #endif
2032
2033 stopTimer(); // See #4074
2034
2035 #if defined(TRACING)
2036 flushEventLog(); // so that child won't inherit dirty file buffers
2037 #endif
2038
2039 pid = fork();
2040
2041 if (pid) { // parent
2042
2043 startTimer(); // #4074
2044
2045 RELEASE_LOCK(&sched_mutex);
2046 RELEASE_LOCK(&sm_mutex);
2047 RELEASE_LOCK(&stable_ptr_mutex);
2048 RELEASE_LOCK(&stable_name_mutex);
2049 RELEASE_LOCK(&task->lock);
2050
2051 #if defined(THREADED_RTS)
2052 /* N.B. releaseCapability_ below may need to take all_tasks_mutex */
2053 RELEASE_LOCK(&all_tasks_mutex);
2054 #endif
2055
2056 for (i=0; i < n_capabilities; i++) {
2057 releaseCapability_(capabilities[i],false);
2058 RELEASE_LOCK(&capabilities[i]->lock);
2059 }
2060
2061 boundTaskExiting(task);
2062
2063 // just return the pid
2064 return pid;
2065
2066 } else { // child
2067
2068 // Current process times reset in the child process, so we should reset
2069 // the stats too. See #16102.
2070 resetChildProcessStats();
2071
2072 #if defined(THREADED_RTS)
2073 initMutex(&sched_mutex);
2074 initMutex(&sm_mutex);
2075 initMutex(&stable_ptr_mutex);
2076 initMutex(&stable_name_mutex);
2077 initMutex(&task->lock);
2078
2079 for (i=0; i < n_capabilities; i++) {
2080 initMutex(&capabilities[i]->lock);
2081 }
2082
2083 initMutex(&all_tasks_mutex);
2084 #endif
2085
2086 #if defined(TRACING)
2087 resetTracing();
2088 #endif
2089
2090 // Now, all OS threads except the thread that forked are
2091 // stopped. We need to stop all Haskell threads, including
2092 // those involved in foreign calls. Also we need to delete
2093 // all Tasks, because they correspond to OS threads that are
2094 // now gone.
2095
2096 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2097 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2098 next = t->global_link;
2099 // don't allow threads to catch the ThreadKilled
2100 // exception, but we do want to raiseAsync() because these
2101 // threads may be evaluating thunks that we need later.
2102 deleteThread_(t);
2103
2104 // stop the GC from updating the InCall to point to
2105 // the TSO. This is only necessary because the
2106 // OSThread bound to the TSO has been killed, and
2107 // won't get a chance to exit in the usual way (see
2108 // also scheduleHandleThreadFinished).
2109 t->bound = NULL;
2110 }
2111 }
2112
2113 discardTasksExcept(task);
2114
2115 for (i=0; i < n_capabilities; i++) {
2116 cap = capabilities[i];
2117
2118 // Empty the run queue. It seems tempting to let all the
2119 // killed threads stay on the run queue as zombies to be
2120 // cleaned up later, but some of them may correspond to
2121 // bound threads for which the corresponding Task does not
2122 // exist.
2123 truncateRunQueue(cap);
2124 cap->n_run_queue = 0;
2125
2126 // Any suspended C-calling Tasks are no more, their OS threads
2127 // don't exist now:
2128 cap->suspended_ccalls = NULL;
2129 cap->n_suspended_ccalls = 0;
2130
2131 #if defined(THREADED_RTS)
2132 // Wipe our spare workers list, they no longer exist. New
2133 // workers will be created if necessary.
2134 cap->spare_workers = NULL;
2135 cap->n_spare_workers = 0;
2136 cap->returning_tasks_hd = NULL;
2137 cap->returning_tasks_tl = NULL;
2138 cap->n_returning_tasks = 0;
2139 #endif
2140
2141 // Release all caps except 0, we'll use that for starting
2142 // the IO manager and running the client action below.
2143 if (cap->no != 0) {
2144 task->cap = cap;
2145 releaseCapability(cap);
2146 }
2147 }
2148 cap = capabilities[0];
2149 task->cap = cap;
2150
2151 // Empty the threads lists. Otherwise, the garbage
2152 // collector may attempt to resurrect some of these threads.
2153 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2154 generations[g].threads = END_TSO_QUEUE;
2155 }
2156
2157 // On Unix, all timers are reset in the child, so we need to start
2158 // the timer again.
2159 initTimer();
2160
2161 // TODO: need to trace various other things in the child
2162 // like startup event, capabilities, process info etc
2163 traceTaskCreate(task, cap);
2164
2165 #if defined(THREADED_RTS)
2166 ioManagerStartCap(&cap);
2167 #endif
2168
2169 // start timer after the IOManager is initialized
2170 // (the idle GC may wake up the IOManager)
2171 startTimer();
2172
2173 // Install toplevel exception handlers, so interruption
2174 // signal will be sent to the main thread.
2175 // See #12903
2176 rts_evalStableIOMain(&cap, entry, NULL); // run the action
2177 rts_checkSchedStatus("forkProcess",cap);
2178
2179 rts_unlock(cap);
2180 shutdownHaskellAndExit(EXIT_SUCCESS, 0 /* !fastExit */);
2181 }
2182 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2183 barf("forkProcess#: primop not supported on this platform, sorry!\n");
2184 #endif
2185 }
2186
2187 /* ---------------------------------------------------------------------------
2188 * Changing the number of Capabilities
2189 *
2190 * Changing the number of Capabilities is very tricky! We can only do
2191 * it with the system fully stopped, so we do a full sync with
2192 * requestSync(SYNC_OTHER) and grab all the capabilities.
2193 *
2194 * Then we resize the appropriate data structures, and update all
2195 * references to the old data structures which have now moved.
2196 * Finally we release the Capabilities we are holding, and start
2197 * worker Tasks on the new Capabilities we created.
2198 *
2199 * ------------------------------------------------------------------------- */
2200
2201 void
setNumCapabilities(uint32_t new_n_capabilities USED_IF_THREADS)2202 setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
2203 {
2204 #if !defined(THREADED_RTS)
2205 if (new_n_capabilities != 1) {
2206 errorBelch("setNumCapabilities: not supported in the non-threaded RTS");
2207 }
2208 return;
2209 #elif defined(NOSMP)
2210 if (new_n_capabilities != 1) {
2211 errorBelch("setNumCapabilities: not supported on this platform");
2212 }
2213 return;
2214 #else
2215 Task *task;
2216 Capability *cap;
2217 uint32_t n;
2218 Capability *old_capabilities = NULL;
2219 uint32_t old_n_capabilities = n_capabilities;
2220
2221 if (new_n_capabilities == enabled_capabilities) {
2222 return;
2223 } else if (new_n_capabilities <= 0) {
2224 errorBelch("setNumCapabilities: Capability count must be positive");
2225 return;
2226 }
2227
2228
2229 debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
2230 enabled_capabilities, new_n_capabilities);
2231
2232 cap = rts_lock();
2233 task = cap->running_task;
2234
2235
2236 // N.B. We must stop the interval timer while we are changing the
2237 // capabilities array lest handle_tick may try to context switch
2238 // an old capability. See #17289.
2239 stopTimer();
2240
2241 stopAllCapabilities(&cap, task);
2242
2243 if (new_n_capabilities < enabled_capabilities)
2244 {
2245 // Reducing the number of capabilities: we do not actually
2246 // remove the extra capabilities, we just mark them as
2247 // "disabled". This has the following effects:
2248 //
2249 // - threads on a disabled capability are migrated away by the
2250 // scheduler loop
2251 //
2252 // - disabled capabilities do not participate in GC
2253 // (see scheduleDoGC())
2254 //
2255 // - No spark threads are created on this capability
2256 // (see scheduleActivateSpark())
2257 //
2258 // - We do not attempt to migrate threads *to* a disabled
2259 // capability (see schedulePushWork()).
2260 //
2261 // but in other respects, a disabled capability remains
2262 // alive. Threads may be woken up on a disabled capability,
2263 // but they will be immediately migrated away.
2264 //
2265 // This approach is much easier than trying to actually remove
2266 // the capability; we don't have to worry about GC data
2267 // structures, the nursery, etc.
2268 //
2269 for (n = new_n_capabilities; n < enabled_capabilities; n++) {
2270 capabilities[n]->disabled = true;
2271 traceCapDisable(capabilities[n]);
2272 }
2273 enabled_capabilities = new_n_capabilities;
2274 }
2275 else
2276 {
2277 // Increasing the number of enabled capabilities.
2278 //
2279 // enable any disabled capabilities, up to the required number
2280 for (n = enabled_capabilities;
2281 n < new_n_capabilities && n < n_capabilities; n++) {
2282 capabilities[n]->disabled = false;
2283 traceCapEnable(capabilities[n]);
2284 }
2285 enabled_capabilities = n;
2286
2287 if (new_n_capabilities > n_capabilities) {
2288 #if defined(TRACING)
2289 // Allocate eventlog buffers for the new capabilities. Note this
2290 // must be done before calling moreCapabilities(), because that
2291 // will emit events about creating the new capabilities and adding
2292 // them to existing capsets.
2293 tracingAddCapapilities(n_capabilities, new_n_capabilities);
2294 #endif
2295
2296 // Resize the capabilities array
2297 // NB. after this, capabilities points somewhere new. Any pointers
2298 // of type (Capability *) are now invalid.
2299 moreCapabilities(n_capabilities, new_n_capabilities);
2300
2301 // Resize and update storage manager data structures
2302 storageAddCapabilities(n_capabilities, new_n_capabilities);
2303 }
2304 }
2305
2306 // update n_capabilities before things start running
2307 if (new_n_capabilities > n_capabilities) {
2308 n_capabilities = enabled_capabilities = new_n_capabilities;
2309 }
2310
2311 // We're done: release the original Capabilities
2312 releaseAllCapabilities(old_n_capabilities, cap,task);
2313
2314 // We can't free the old array until now, because we access it
2315 // while updating pointers in updateCapabilityRefs().
2316 if (old_capabilities) {
2317 stgFree(old_capabilities);
2318 }
2319
2320 // Notify IO manager that the number of capabilities has changed.
2321 rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
2322
2323 startTimer();
2324
2325 rts_unlock(cap);
2326
2327 #endif // THREADED_RTS
2328 }
2329
2330
2331
2332 /* ---------------------------------------------------------------------------
2333 * Delete all the threads in the system
2334 * ------------------------------------------------------------------------- */
2335
2336 static void
deleteAllThreads()2337 deleteAllThreads ()
2338 {
2339 // NOTE: only safe to call if we own all capabilities.
2340
2341 StgTSO* t, *next;
2342 uint32_t g;
2343
2344 debugTrace(DEBUG_sched,"deleting all threads");
2345 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
2346 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
2347 next = t->global_link;
2348 deleteThread(t);
2349 }
2350 }
2351
2352 // The run queue now contains a bunch of ThreadKilled threads. We
2353 // must not throw these away: the main thread(s) will be in there
2354 // somewhere, and the main scheduler loop has to deal with it.
2355 // Also, the run queue is the only thing keeping these threads from
2356 // being GC'd, and we don't want the "main thread has been GC'd" panic.
2357
2358 #if !defined(THREADED_RTS)
2359 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2360 ASSERT(sleeping_queue == END_TSO_QUEUE);
2361 #endif
2362 }
2363
2364 /* -----------------------------------------------------------------------------
2365 Managing the suspended_ccalls list.
2366 Locks required: sched_mutex
2367 -------------------------------------------------------------------------- */
2368
2369 STATIC_INLINE void
suspendTask(Capability * cap,Task * task)2370 suspendTask (Capability *cap, Task *task)
2371 {
2372 InCall *incall;
2373
2374 incall = task->incall;
2375 ASSERT(incall->next == NULL && incall->prev == NULL);
2376 incall->next = cap->suspended_ccalls;
2377 incall->prev = NULL;
2378 if (cap->suspended_ccalls) {
2379 cap->suspended_ccalls->prev = incall;
2380 }
2381 cap->suspended_ccalls = incall;
2382 cap->n_suspended_ccalls++;
2383 }
2384
2385 STATIC_INLINE void
recoverSuspendedTask(Capability * cap,Task * task)2386 recoverSuspendedTask (Capability *cap, Task *task)
2387 {
2388 InCall *incall;
2389
2390 incall = task->incall;
2391 if (incall->prev) {
2392 incall->prev->next = incall->next;
2393 } else {
2394 ASSERT(cap->suspended_ccalls == incall);
2395 cap->suspended_ccalls = incall->next;
2396 }
2397 if (incall->next) {
2398 incall->next->prev = incall->prev;
2399 }
2400 incall->next = incall->prev = NULL;
2401 cap->n_suspended_ccalls--;
2402 }
2403
2404 /* ---------------------------------------------------------------------------
2405 * Suspending & resuming Haskell threads.
2406 *
2407 * When making a "safe" call to C (aka _ccall_GC), the task gives back
2408 * its capability before calling the C function. This allows another
2409 * task to pick up the capability and carry on running Haskell
2410 * threads. It also means that if the C call blocks, it won't lock
2411 * the whole system.
2412 *
2413 * The Haskell thread making the C call is put to sleep for the
2414 * duration of the call, on the suspended_ccalling_threads queue. We
2415 * give out a token to the task, which it can use to resume the thread
2416 * on return from the C function.
2417 *
2418 * If this is an interruptible C call, this means that the FFI call may be
2419 * unceremoniously terminated and should be scheduled on an
2420 * unbound worker thread.
2421 * ------------------------------------------------------------------------- */
2422
2423 void *
suspendThread(StgRegTable * reg,bool interruptible)2424 suspendThread (StgRegTable *reg, bool interruptible)
2425 {
2426 Capability *cap;
2427 int saved_errno;
2428 StgTSO *tso;
2429 Task *task;
2430 #if defined(mingw32_HOST_OS)
2431 StgWord32 saved_winerror;
2432 #endif
2433
2434 saved_errno = errno;
2435 #if defined(mingw32_HOST_OS)
2436 saved_winerror = GetLastError();
2437 #endif
2438
2439 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2440 */
2441 cap = regTableToCapability(reg);
2442
2443 task = cap->running_task;
2444 tso = cap->r.rCurrentTSO;
2445
2446 traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
2447
2448 // XXX this might not be necessary --SDM
2449 tso->what_next = ThreadRunGHC;
2450
2451 threadPaused(cap,tso);
2452
2453 if (interruptible) {
2454 tso->why_blocked = BlockedOnCCall_Interruptible;
2455 } else {
2456 tso->why_blocked = BlockedOnCCall;
2457 }
2458
2459 // Hand back capability
2460 task->incall->suspended_tso = tso;
2461 task->incall->suspended_cap = cap;
2462
2463 // Otherwise allocate() will write to invalid memory.
2464 cap->r.rCurrentTSO = NULL;
2465
2466 ACQUIRE_LOCK(&cap->lock);
2467
2468 suspendTask(cap,task);
2469 cap->in_haskell = false;
2470 releaseCapability_(cap,false);
2471
2472 RELEASE_LOCK(&cap->lock);
2473
2474 errno = saved_errno;
2475 #if defined(mingw32_HOST_OS)
2476 SetLastError(saved_winerror);
2477 #endif
2478 return task;
2479 }
2480
2481 StgRegTable *
resumeThread(void * task_)2482 resumeThread (void *task_)
2483 {
2484 StgTSO *tso;
2485 InCall *incall;
2486 Capability *cap;
2487 Task *task = task_;
2488 int saved_errno;
2489 #if defined(mingw32_HOST_OS)
2490 StgWord32 saved_winerror;
2491 #endif
2492
2493 saved_errno = errno;
2494 #if defined(mingw32_HOST_OS)
2495 saved_winerror = GetLastError();
2496 #endif
2497
2498 incall = task->incall;
2499 cap = incall->suspended_cap;
2500 task->cap = cap;
2501
2502 // Wait for permission to re-enter the RTS with the result.
2503 waitForCapability(&cap,task);
2504 // we might be on a different capability now... but if so, our
2505 // entry on the suspended_ccalls list will also have been
2506 // migrated.
2507
2508 // Remove the thread from the suspended list
2509 recoverSuspendedTask(cap,task);
2510
2511 tso = incall->suspended_tso;
2512 incall->suspended_tso = NULL;
2513 incall->suspended_cap = NULL;
2514 // we will modify tso->_link
2515 IF_NONMOVING_WRITE_BARRIER_ENABLED {
2516 updateRemembSetPushClosure(cap, (StgClosure *)tso->_link);
2517 }
2518 tso->_link = END_TSO_QUEUE;
2519
2520 traceEventRunThread(cap, tso);
2521
2522 /* Reset blocking status */
2523 tso->why_blocked = NotBlocked;
2524
2525 if ((tso->flags & TSO_BLOCKEX) == 0) {
2526 // avoid locking the TSO if we don't have to
2527 if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
2528 maybePerformBlockedException(cap,tso);
2529 }
2530 }
2531
2532 cap->r.rCurrentTSO = tso;
2533 cap->in_haskell = true;
2534 errno = saved_errno;
2535 #if defined(mingw32_HOST_OS)
2536 SetLastError(saved_winerror);
2537 #endif
2538
2539 /* We might have GC'd, mark the TSO dirty again */
2540 dirty_TSO(cap,tso);
2541 dirty_STACK(cap,tso->stackobj);
2542
2543 IF_DEBUG(sanity, checkTSO(tso));
2544
2545 return &cap->r;
2546 }
2547
2548 /* ---------------------------------------------------------------------------
2549 * scheduleThread()
2550 *
2551 * scheduleThread puts a thread on the end of the runnable queue.
2552 * This will usually be done immediately after a thread is created.
2553 * The caller of scheduleThread must create the thread using e.g.
2554 * createThread and push an appropriate closure
2555 * on this thread's stack before the scheduler is invoked.
2556 * ------------------------------------------------------------------------ */
2557
2558 void
scheduleThread(Capability * cap,StgTSO * tso)2559 scheduleThread(Capability *cap, StgTSO *tso)
2560 {
2561 // The thread goes at the *end* of the run-queue, to avoid possible
2562 // starvation of any threads already on the queue.
2563 appendToRunQueue(cap,tso);
2564 }
2565
2566 void
scheduleThreadOn(Capability * cap,StgWord cpu USED_IF_THREADS,StgTSO * tso)2567 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2568 {
2569 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2570 // move this thread from now on.
2571 #if defined(THREADED_RTS)
2572 cpu %= enabled_capabilities;
2573 if (cpu == cap->no) {
2574 appendToRunQueue(cap,tso);
2575 } else {
2576 migrateThread(cap, tso, capabilities[cpu]);
2577 }
2578 #else
2579 appendToRunQueue(cap,tso);
2580 #endif
2581 }
2582
2583 void
scheduleWaitThread(StgTSO * tso,HaskellObj * ret,Capability ** pcap)2584 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
2585 {
2586 Task *task;
2587 DEBUG_ONLY( StgThreadID id );
2588 Capability *cap;
2589
2590 cap = *pcap;
2591
2592 // We already created/initialised the Task
2593 task = cap->running_task;
2594
2595 // This TSO is now a bound thread; make the Task and TSO
2596 // point to each other.
2597 tso->bound = task->incall;
2598 tso->cap = cap;
2599
2600 task->incall->tso = tso;
2601 task->incall->ret = ret;
2602 task->incall->rstat = NoStatus;
2603
2604 appendToRunQueue(cap,tso);
2605
2606 DEBUG_ONLY( id = tso->id );
2607 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
2608
2609 cap = schedule(cap,task);
2610
2611 ASSERT(task->incall->rstat != NoStatus);
2612 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2613
2614 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
2615 *pcap = cap;
2616 }
2617
2618 /* ----------------------------------------------------------------------------
2619 * Starting Tasks
2620 * ------------------------------------------------------------------------- */
2621
2622 #if defined(THREADED_RTS)
scheduleWorker(Capability * cap,Task * task)2623 void scheduleWorker (Capability *cap, Task *task)
2624 {
2625 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
2626 cap = schedule(cap,task);
2627 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
2628
2629 // On exit from schedule(), we have a Capability, but possibly not
2630 // the same one we started with.
2631
2632 // During shutdown, the requirement is that after all the
2633 // Capabilities are shut down, all workers that are shutting down
2634 // have finished workerTaskStop(). This is why we hold on to
2635 // cap->lock until we've finished workerTaskStop() below.
2636 //
2637 // There may be workers still involved in foreign calls; those
2638 // will just block in waitForCapability() because the
2639 // Capability has been shut down.
2640 //
2641 ACQUIRE_LOCK(&cap->lock);
2642 releaseCapability_(cap,false);
2643 workerTaskStop(task);
2644 RELEASE_LOCK(&cap->lock);
2645 }
2646 #endif
2647
2648 /* ---------------------------------------------------------------------------
2649 * Start new worker tasks on Capabilities from--to
2650 * -------------------------------------------------------------------------- */
2651
2652 static void
startWorkerTasks(uint32_t from USED_IF_THREADS,uint32_t to USED_IF_THREADS)2653 startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
2654 {
2655 #if defined(THREADED_RTS)
2656 uint32_t i;
2657 Capability *cap;
2658
2659 for (i = from; i < to; i++) {
2660 cap = capabilities[i];
2661 ACQUIRE_LOCK(&cap->lock);
2662 startWorkerTask(cap);
2663 RELEASE_LOCK(&cap->lock);
2664 }
2665 #endif
2666 }
2667
2668 /* ---------------------------------------------------------------------------
2669 * initScheduler()
2670 *
2671 * Initialise the scheduler. This resets all the queues - if the
2672 * queues contained any threads, they'll be garbage collected at the
2673 * next pass.
2674 *
2675 * ------------------------------------------------------------------------ */
2676
2677 void
initScheduler(void)2678 initScheduler(void)
2679 {
2680 #if !defined(THREADED_RTS)
2681 blocked_queue_hd = END_TSO_QUEUE;
2682 blocked_queue_tl = END_TSO_QUEUE;
2683 sleeping_queue = END_TSO_QUEUE;
2684 #endif
2685
2686 sched_state = SCHED_RUNNING;
2687 SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
2688
2689 #if defined(THREADED_RTS)
2690 /* Initialise the mutex and condition variables used by
2691 * the scheduler. */
2692 initMutex(&sched_mutex);
2693 initMutex(&sync_finished_mutex);
2694 initCondition(&sync_finished_cond);
2695 #endif
2696
2697 ACQUIRE_LOCK(&sched_mutex);
2698
2699 allocated_bytes_at_heapoverflow = 0;
2700
2701 /* A capability holds the state a native thread needs in
2702 * order to execute STG code. At least one capability is
2703 * floating around (only THREADED_RTS builds have more than one).
2704 */
2705 initCapabilities();
2706
2707 initTaskManager();
2708
2709 /*
2710 * Eagerly start one worker to run each Capability, except for
2711 * Capability 0. The idea is that we're probably going to start a
2712 * bound thread on Capability 0 pretty soon, so we don't want a
2713 * worker task hogging it.
2714 */
2715 startWorkerTasks(1, n_capabilities);
2716
2717 RELEASE_LOCK(&sched_mutex);
2718
2719 }
2720
2721 void
exitScheduler(bool wait_foreign USED_IF_THREADS)2722 exitScheduler (bool wait_foreign USED_IF_THREADS)
2723 /* see Capability.c, shutdownCapability() */
2724 {
2725 Task *task = newBoundTask();
2726
2727 // If we haven't killed all the threads yet, do it now.
2728 if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN) {
2729 RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
2730 nonmovingStop();
2731 Capability *cap = task->cap;
2732 waitForCapability(&cap,task);
2733 scheduleDoGC(&cap,task,true,false);
2734 ASSERT(task->incall->tso == NULL);
2735 releaseCapability(cap);
2736 }
2737 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
2738
2739 shutdownCapabilities(task, wait_foreign);
2740
2741 // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n",
2742 // n_failed_trygrab_idles, n_idle_caps);
2743
2744 boundTaskExiting(task);
2745 }
2746
2747 void
freeScheduler(void)2748 freeScheduler( void )
2749 {
2750 uint32_t still_running;
2751
2752 ACQUIRE_LOCK(&sched_mutex);
2753 still_running = freeTaskManager();
2754 // We can only free the Capabilities if there are no Tasks still
2755 // running. We might have a Task about to return from a foreign
2756 // call into waitForCapability(), for example (actually,
2757 // this should be the *only* thing that a still-running Task can
2758 // do at this point, and it will block waiting for the
2759 // Capability).
2760 if (still_running == 0) {
2761 freeCapabilities();
2762 }
2763 RELEASE_LOCK(&sched_mutex);
2764 #if defined(THREADED_RTS)
2765 closeMutex(&sched_mutex);
2766 #endif
2767 }
2768
markScheduler(evac_fn evac USED_IF_NOT_THREADS,void * user USED_IF_NOT_THREADS)2769 void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
2770 void *user USED_IF_NOT_THREADS)
2771 {
2772 #if !defined(THREADED_RTS)
2773 evac(user, (StgClosure **)(void *)&blocked_queue_hd);
2774 evac(user, (StgClosure **)(void *)&blocked_queue_tl);
2775 evac(user, (StgClosure **)(void *)&sleeping_queue);
2776 #endif
2777 }
2778
2779 /* -----------------------------------------------------------------------------
2780 performGC
2781
2782 This is the interface to the garbage collector from Haskell land.
2783 We provide this so that external C code can allocate and garbage
2784 collect when called from Haskell via _ccall_GC.
2785 -------------------------------------------------------------------------- */
2786
2787 static void
performGC_(bool force_major)2788 performGC_(bool force_major)
2789 {
2790 Task *task;
2791 Capability *cap = NULL;
2792
2793 // We must grab a new Task here, because the existing Task may be
2794 // associated with a particular Capability, and chained onto the
2795 // suspended_ccalls queue.
2796 task = newBoundTask();
2797
2798 // TODO: do we need to traceTask*() here?
2799
2800 waitForCapability(&cap,task);
2801 scheduleDoGC(&cap,task,force_major,false);
2802 releaseCapability(cap);
2803 boundTaskExiting(task);
2804 }
2805
2806 void
performGC(void)2807 performGC(void)
2808 {
2809 performGC_(false);
2810 }
2811
2812 void
performMajorGC(void)2813 performMajorGC(void)
2814 {
2815 performGC_(true);
2816 }
2817
2818 /* ---------------------------------------------------------------------------
2819 Interrupt execution.
2820 Might be called inside a signal handler so it mustn't do anything fancy.
2821 ------------------------------------------------------------------------ */
2822
2823 void
interruptStgRts(void)2824 interruptStgRts(void)
2825 {
2826 ASSERT(sched_state != SCHED_SHUTTING_DOWN);
2827 sched_state = SCHED_INTERRUPTING;
2828 interruptAllCapabilities();
2829 #if defined(THREADED_RTS)
2830 wakeUpRts();
2831 #endif
2832 }
2833
2834 /* -----------------------------------------------------------------------------
2835 Wake up the RTS
2836
2837 This function causes at least one OS thread to wake up and run the
2838 scheduler loop. It is invoked when the RTS might be deadlocked, or
2839 an external event has arrived that may need servicing (eg. a
2840 keyboard interrupt).
2841
2842 In the single-threaded RTS we don't do anything here; we only have
2843 one thread anyway, and the event that caused us to want to wake up
2844 will have interrupted any blocking system call in progress anyway.
2845 -------------------------------------------------------------------------- */
2846
2847 #if defined(THREADED_RTS)
wakeUpRts(void)2848 void wakeUpRts(void)
2849 {
2850 // This forces the IO Manager thread to wakeup, which will
2851 // in turn ensure that some OS thread wakes up and runs the
2852 // scheduler loop, which will cause a GC and deadlock check.
2853 ioManagerWakeup();
2854 }
2855 #endif
2856
2857 /* -----------------------------------------------------------------------------
2858 Deleting threads
2859
2860 This is used for interruption (^C) and forking, and corresponds to
2861 raising an exception but without letting the thread catch the
2862 exception.
2863 -------------------------------------------------------------------------- */
2864
2865 static void
deleteThread(StgTSO * tso)2866 deleteThread (StgTSO *tso)
2867 {
2868 // NOTE: must only be called on a TSO that we have exclusive
2869 // access to, because we will call throwToSingleThreaded() below.
2870 // The TSO must be on the run queue of the Capability we own, or
2871 // we must own all Capabilities.
2872
2873 if (tso->why_blocked != BlockedOnCCall &&
2874 tso->why_blocked != BlockedOnCCall_Interruptible) {
2875 throwToSingleThreaded(tso->cap,tso,NULL);
2876 }
2877 }
2878
2879 #if defined(FORKPROCESS_PRIMOP_SUPPORTED)
2880 static void
deleteThread_(StgTSO * tso)2881 deleteThread_(StgTSO *tso)
2882 { // for forkProcess only:
2883 // like deleteThread(), but we delete threads in foreign calls, too.
2884
2885 if (tso->why_blocked == BlockedOnCCall ||
2886 tso->why_blocked == BlockedOnCCall_Interruptible) {
2887 tso->what_next = ThreadKilled;
2888 appendToRunQueue(tso->cap, tso);
2889 } else {
2890 deleteThread(tso);
2891 }
2892 }
2893 #endif
2894
2895 /* -----------------------------------------------------------------------------
2896 raiseExceptionHelper
2897
2898 This function is called by the raise# primitive, just so that we can
2899 move some of the tricky bits of raising an exception from C-- into
2900 C. Who knows, it might be a useful re-useable thing here too.
2901 -------------------------------------------------------------------------- */
2902
2903 StgWord
raiseExceptionHelper(StgRegTable * reg,StgTSO * tso,StgClosure * exception)2904 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2905 {
2906 Capability *cap = regTableToCapability(reg);
2907 StgThunk *raise_closure = NULL;
2908 StgPtr p, next;
2909 const StgRetInfoTable *info;
2910 //
2911 // This closure represents the expression 'raise# E' where E
2912 // is the exception raise. It is used to overwrite all the
2913 // thunks which are currently under evaluation.
2914 //
2915
2916 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2917 // LDV profiling: stg_raise_info has THUNK as its closure
2918 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2919 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2920 // 1 does not cause any problem unless profiling is performed.
2921 // However, when LDV profiling goes on, we need to linearly scan
2922 // small object pool, where raise_closure is stored, so we should
2923 // use MIN_UPD_SIZE.
2924 //
2925 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2926 // sizeofW(StgClosure)+1);
2927 //
2928
2929 //
2930 // Walk up the stack, looking for the catch frame. On the way,
2931 // we update any closures pointed to from update frames with the
2932 // raise closure that we just built.
2933 //
2934 p = tso->stackobj->sp;
2935 while(1) {
2936 info = get_ret_itbl((StgClosure *)p);
2937 next = p + stack_frame_sizeW((StgClosure *)p);
2938 switch (info->i.type) {
2939
2940 case UPDATE_FRAME:
2941 // Only create raise_closure if we need to.
2942 if (raise_closure == NULL) {
2943 raise_closure =
2944 (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2945 SET_HDR(raise_closure, &stg_raise_info, cap->r.rCCCS);
2946 raise_closure->payload[0] = exception;
2947 }
2948 updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2949 (StgClosure *)raise_closure);
2950 p = next;
2951 continue;
2952
2953 case ATOMICALLY_FRAME:
2954 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2955 tso->stackobj->sp = p;
2956 return ATOMICALLY_FRAME;
2957
2958 case CATCH_FRAME:
2959 tso->stackobj->sp = p;
2960 return CATCH_FRAME;
2961
2962 case CATCH_STM_FRAME:
2963 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2964 tso->stackobj->sp = p;
2965 return CATCH_STM_FRAME;
2966
2967 case UNDERFLOW_FRAME:
2968 tso->stackobj->sp = p;
2969 threadStackUnderflow(cap,tso);
2970 p = tso->stackobj->sp;
2971 continue;
2972
2973 case STOP_FRAME:
2974 tso->stackobj->sp = p;
2975 return STOP_FRAME;
2976
2977 case CATCH_RETRY_FRAME: {
2978 StgTRecHeader *trec = tso -> trec;
2979 StgTRecHeader *outer = trec -> enclosing_trec;
2980 debugTrace(DEBUG_stm,
2981 "found CATCH_RETRY_FRAME at %p during raise", p);
2982 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2983 stmAbortTransaction(cap, trec);
2984 stmFreeAbortedTRec(cap, trec);
2985 tso -> trec = outer;
2986 p = next;
2987 continue;
2988 }
2989
2990 default:
2991 p = next;
2992 continue;
2993 }
2994 }
2995 }
2996
2997
2998 /* -----------------------------------------------------------------------------
2999 findRetryFrameHelper
3000
3001 This function is called by the retry# primitive. It traverses the stack
3002 leaving tso->sp referring to the frame which should handle the retry.
3003
3004 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
3005 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
3006
3007 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3008 create) because retries are not considered to be exceptions, despite the
3009 similar implementation.
3010
3011 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3012 not be created within memory transactions.
3013 -------------------------------------------------------------------------- */
3014
3015 StgWord
findRetryFrameHelper(Capability * cap,StgTSO * tso)3016 findRetryFrameHelper (Capability *cap, StgTSO *tso)
3017 {
3018 const StgRetInfoTable *info;
3019 StgPtr p, next;
3020
3021 p = tso->stackobj->sp;
3022 while (1) {
3023 info = get_ret_itbl((const StgClosure *)p);
3024 next = p + stack_frame_sizeW((StgClosure *)p);
3025 switch (info->i.type) {
3026
3027 case ATOMICALLY_FRAME:
3028 debugTrace(DEBUG_stm,
3029 "found ATOMICALLY_FRAME at %p during retry", p);
3030 tso->stackobj->sp = p;
3031 return ATOMICALLY_FRAME;
3032
3033 case CATCH_RETRY_FRAME:
3034 debugTrace(DEBUG_stm,
3035 "found CATCH_RETRY_FRAME at %p during retry", p);
3036 tso->stackobj->sp = p;
3037 return CATCH_RETRY_FRAME;
3038
3039 case CATCH_STM_FRAME: {
3040 StgTRecHeader *trec = tso -> trec;
3041 StgTRecHeader *outer = trec -> enclosing_trec;
3042 debugTrace(DEBUG_stm,
3043 "found CATCH_STM_FRAME at %p during retry", p);
3044 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3045 stmAbortTransaction(cap, trec);
3046 stmFreeAbortedTRec(cap, trec);
3047 tso -> trec = outer;
3048 p = next;
3049 continue;
3050 }
3051
3052 case UNDERFLOW_FRAME:
3053 tso->stackobj->sp = p;
3054 threadStackUnderflow(cap,tso);
3055 p = tso->stackobj->sp;
3056 continue;
3057
3058 default:
3059 ASSERT(info->i.type != CATCH_FRAME);
3060 ASSERT(info->i.type != STOP_FRAME);
3061 p = next;
3062 continue;
3063 }
3064 }
3065 }
3066
3067 /* -----------------------------------------------------------------------------
3068 findAtomicallyFrameHelper
3069
3070 This function is called by stg_abort via catch_retry_frame primitive. It is
3071 like findRetryFrameHelper but it will only stop at ATOMICALLY_FRAME.
3072 -------------------------------------------------------------------------- */
3073
3074 StgWord
findAtomicallyFrameHelper(Capability * cap,StgTSO * tso)3075 findAtomicallyFrameHelper (Capability *cap, StgTSO *tso)
3076 {
3077 const StgRetInfoTable *info;
3078 StgPtr p, next;
3079
3080 p = tso->stackobj->sp;
3081 while (1) {
3082 info = get_ret_itbl((const StgClosure *)p);
3083 next = p + stack_frame_sizeW((StgClosure *)p);
3084 switch (info->i.type) {
3085
3086 case ATOMICALLY_FRAME:
3087 debugTrace(DEBUG_stm,
3088 "found ATOMICALLY_FRAME at %p while aborting after orElse", p);
3089 tso->stackobj->sp = p;
3090 return ATOMICALLY_FRAME;
3091
3092 case CATCH_RETRY_FRAME: {
3093 StgTRecHeader *trec = tso -> trec;
3094 StgTRecHeader *outer = trec -> enclosing_trec;
3095 debugTrace(DEBUG_stm,
3096 "found CATCH_RETRY_FRAME at %p while aborting after orElse", p);
3097 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3098 stmAbortTransaction(cap, trec);
3099 stmFreeAbortedTRec(cap, trec);
3100 tso -> trec = outer;
3101 p = next;
3102 continue;
3103 }
3104
3105 case CATCH_STM_FRAME: {
3106 StgTRecHeader *trec = tso -> trec;
3107 StgTRecHeader *outer = trec -> enclosing_trec;
3108 debugTrace(DEBUG_stm,
3109 "found CATCH_STM_FRAME at %p while aborting after orElse", p);
3110 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3111 stmAbortTransaction(cap, trec);
3112 stmFreeAbortedTRec(cap, trec);
3113 tso -> trec = outer;
3114 p = next;
3115 continue;
3116 }
3117
3118 case UNDERFLOW_FRAME:
3119 tso->stackobj->sp = p;
3120 threadStackUnderflow(cap,tso);
3121 p = tso->stackobj->sp;
3122 continue;
3123
3124 default:
3125 ASSERT(info->i.type != CATCH_FRAME);
3126 ASSERT(info->i.type != STOP_FRAME);
3127 p = next;
3128 continue;
3129 }
3130 }
3131 }
3132
3133 /* -----------------------------------------------------------------------------
3134 resurrectThreads is called after garbage collection on the list of
3135 threads found to be garbage. Each of these threads will be woken
3136 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3137 on an MVar, or NonTermination if the thread was blocked on a Black
3138 Hole.
3139
3140 Locks: assumes we hold *all* the capabilities.
3141 -------------------------------------------------------------------------- */
3142
3143 void
resurrectThreads(StgTSO * threads)3144 resurrectThreads (StgTSO *threads)
3145 {
3146 StgTSO *tso, *next;
3147 Capability *cap;
3148 generation *gen;
3149
3150 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3151 next = tso->global_link;
3152
3153 gen = Bdescr((P_)tso)->gen;
3154 tso->global_link = gen->threads;
3155 gen->threads = tso;
3156
3157 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3158
3159 // Wake up the thread on the Capability it was last on
3160 cap = tso->cap;
3161
3162 switch (tso->why_blocked) {
3163 case BlockedOnMVar:
3164 case BlockedOnMVarRead:
3165 /* Called by GC - sched_mutex lock is currently held. */
3166 throwToSingleThreaded(cap, tso,
3167 (StgClosure *)blockedIndefinitelyOnMVar_closure);
3168 break;
3169 case BlockedOnBlackHole:
3170 throwToSingleThreaded(cap, tso,
3171 (StgClosure *)nonTermination_closure);
3172 break;
3173 case BlockedOnSTM:
3174 throwToSingleThreaded(cap, tso,
3175 (StgClosure *)blockedIndefinitelyOnSTM_closure);
3176 break;
3177 case NotBlocked:
3178 /* This might happen if the thread was blocked on a black hole
3179 * belonging to a thread that we've just woken up (raiseAsync
3180 * can wake up threads, remember...).
3181 */
3182 continue;
3183 case BlockedOnMsgThrowTo:
3184 // This can happen if the target is masking, blocks on a
3185 // black hole, and then is found to be unreachable. In
3186 // this case, we want to let the target wake up and carry
3187 // on, and do nothing to this thread.
3188 continue;
3189 default:
3190 barf("resurrectThreads: thread blocked in a strange way: %d",
3191 tso->why_blocked);
3192 }
3193 }
3194 }
3195