1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2003-2012
4  *
5  * Capabilities
6  *
7  * A Capability represents the token required to execute STG code,
8  * and all the state an OS thread/task needs to run Haskell code:
9  * its STG registers, a pointer to its TSO, a nursery etc. During
10  * STG execution, a pointer to the capabilitity is kept in a
11  * register (BaseReg; actually it is a pointer to cap->r).
12  *
13  * Only in a THREADED_RTS build will there be multiple capabilities,
14  * for non-threaded builds there is only one global capability, namely
15  * MainCapability.
16  *
17  * --------------------------------------------------------------------------*/
18 
19 #include "PosixSource.h"
20 #include "Rts.h"
21 
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30 #include "sm/BlockAlloc.h" // for countBlocks()
31 
32 #if !defined(mingw32_HOST_OS)
33 #include "rts/IOManager.h" // for setIOManagerControlFd()
34 #endif
35 
36 #include <string.h>
37 
38 // one global capability, this is the Capability for non-threaded
39 // builds, and for +RTS -N1
40 Capability MainCapability;
41 
42 uint32_t n_capabilities = 0;
43 uint32_t enabled_capabilities = 0;
44 
45 // The array of Capabilities.  It's important that when we need
46 // to allocate more Capabilities we don't have to move the existing
47 // Capabilities, because there may be pointers to them in use
48 // (e.g. threads in waitForCapability(), see #8209), so this is
49 // an array of Capability* rather than an array of Capability.
50 Capability **capabilities = NULL;
51 
52 // Holds the Capability which last became free.  This is used so that
53 // an in-call has a chance of quickly finding a free Capability.
54 // Maintaining a global free list of Capabilities would require global
55 // locking, so we don't do that.
56 static Capability *last_free_capability[MAX_NUMA_NODES];
57 
58 /*
59  * Indicates that the RTS wants to synchronise all the Capabilities
60  * for some reason.  All Capabilities should yieldCapability().
61  */
62 PendingSync * volatile pending_sync = 0;
63 
64 // Number of logical NUMA nodes
65 uint32_t n_numa_nodes;
66 
67 // Map logical NUMA node to OS node numbers
68 uint32_t numa_map[MAX_NUMA_NODES];
69 
70 /* Let foreign code get the current Capability -- assuming there is one!
71  * This is useful for unsafe foreign calls because they are called with
72  * the current Capability held, but they are not passed it. For example,
73  * see see the integer-gmp package which calls allocate() in its
74  * stgAllocForGMP() function (which gets called by gmp functions).
75  * */
rts_unsafeGetMyCapability(void)76 Capability * rts_unsafeGetMyCapability (void)
77 {
78 #if defined(THREADED_RTS)
79   return myTask()->cap;
80 #else
81   return &MainCapability;
82 #endif
83 }
84 
85 #if defined(THREADED_RTS)
86 STATIC_INLINE bool
globalWorkToDo(void)87 globalWorkToDo (void)
88 {
89     return RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING
90       || RELAXED_LOAD(&recent_activity) == ACTIVITY_INACTIVE; // need to check for deadlock
91 }
92 #endif
93 
94 #if defined(THREADED_RTS)
95 StgClosure *
findSpark(Capability * cap)96 findSpark (Capability *cap)
97 {
98   Capability *robbed;
99   StgClosurePtr spark;
100   bool retry;
101   uint32_t i = 0;
102 
103   if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
104       // If there are other threads, don't try to run any new
105       // sparks: sparks might be speculative, we don't want to take
106       // resources away from the main computation.
107       return 0;
108   }
109 
110   do {
111       retry = false;
112 
113       // first try to get a spark from our own pool.
114       // We should be using reclaimSpark(), because it works without
115       // needing any atomic instructions:
116       //   spark = reclaimSpark(cap->sparks);
117       // However, measurements show that this makes at least one benchmark
118       // slower (prsa) and doesn't affect the others.
119       spark = tryStealSpark(cap->sparks);
120       while (spark != NULL && fizzledSpark(spark)) {
121           cap->spark_stats.fizzled++;
122           traceEventSparkFizzle(cap);
123           spark = tryStealSpark(cap->sparks);
124       }
125       if (spark != NULL) {
126           cap->spark_stats.converted++;
127 
128           // Post event for running a spark from capability's own pool.
129           traceEventSparkRun(cap);
130 
131           return spark;
132       }
133       if (!emptySparkPoolCap(cap)) {
134           retry = true;
135       }
136 
137       if (n_capabilities == 1) { return NULL; } // makes no sense...
138 
139       debugTrace(DEBUG_sched,
140                  "cap %d: Trying to steal work from other capabilities",
141                  cap->no);
142 
143       /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
144       start at a random place instead of 0 as well.  */
145       for ( i=0 ; i < n_capabilities ; i++ ) {
146           robbed = capabilities[i];
147           if (cap == robbed)  // ourselves...
148               continue;
149 
150           if (emptySparkPoolCap(robbed)) // nothing to steal here
151               continue;
152 
153           spark = tryStealSpark(robbed->sparks);
154           while (spark != NULL && fizzledSpark(spark)) {
155               cap->spark_stats.fizzled++;
156               traceEventSparkFizzle(cap);
157               spark = tryStealSpark(robbed->sparks);
158           }
159           if (spark == NULL && !emptySparkPoolCap(robbed)) {
160               // we conflicted with another thread while trying to steal;
161               // try again later.
162               retry = true;
163           }
164 
165           if (spark != NULL) {
166               cap->spark_stats.converted++;
167               traceEventSparkSteal(cap, robbed->no);
168 
169               return spark;
170           }
171           // otherwise: no success, try next one
172       }
173   } while (retry);
174 
175   debugTrace(DEBUG_sched, "No sparks stolen");
176   return NULL;
177 }
178 
179 // Returns True if any spark pool is non-empty at this moment in time
180 // The result is only valid for an instant, of course, so in a sense
181 // is immediately invalid, and should not be relied upon for
182 // correctness.
183 bool
anySparks(void)184 anySparks (void)
185 {
186     uint32_t i;
187 
188     for (i=0; i < n_capabilities; i++) {
189         if (!emptySparkPoolCap(capabilities[i])) {
190             return true;
191         }
192     }
193     return false;
194 }
195 #endif
196 
197 /* -----------------------------------------------------------------------------
198  * Manage the returning_tasks lists.
199  *
200  * These functions require cap->lock
201  * -------------------------------------------------------------------------- */
202 
203 #if defined(THREADED_RTS)
204 STATIC_INLINE void
newReturningTask(Capability * cap,Task * task)205 newReturningTask (Capability *cap, Task *task)
206 {
207     ASSERT_LOCK_HELD(&cap->lock);
208     ASSERT(task->next == NULL);
209     if (cap->returning_tasks_hd) {
210         ASSERT(cap->returning_tasks_tl->next == NULL);
211         cap->returning_tasks_tl->next = task;
212     } else {
213         cap->returning_tasks_hd = task;
214     }
215     cap->returning_tasks_tl = task;
216 
217     // See Note [Data race in shouldYieldCapability] in Schedule.c.
218     RELAXED_ADD(&cap->n_returning_tasks, 1);
219 
220     ASSERT_RETURNING_TASKS(cap,task);
221 }
222 
223 STATIC_INLINE Task *
popReturningTask(Capability * cap)224 popReturningTask (Capability *cap)
225 {
226     ASSERT_LOCK_HELD(&cap->lock);
227     Task *task;
228     task = cap->returning_tasks_hd;
229     ASSERT(task);
230     cap->returning_tasks_hd = task->next;
231     if (!cap->returning_tasks_hd) {
232         cap->returning_tasks_tl = NULL;
233     }
234     task->next = NULL;
235 
236     // See Note [Data race in shouldYieldCapability] in Schedule.c.
237     RELAXED_ADD(&cap->n_returning_tasks, -1);
238 
239     ASSERT_RETURNING_TASKS(cap,task);
240     return task;
241 }
242 #endif
243 
244 /* ----------------------------------------------------------------------------
245  * Initialisation
246  *
247  * The Capability is initially marked not free.
248  * ------------------------------------------------------------------------- */
249 
250 static void
initCapability(Capability * cap,uint32_t i)251 initCapability (Capability *cap, uint32_t i)
252 {
253     uint32_t g;
254 
255     cap->no = i;
256     cap->node = capNoToNumaNode(i);
257     cap->in_haskell        = false;
258     cap->idle              = 0;
259     cap->disabled          = false;
260 
261     cap->run_queue_hd      = END_TSO_QUEUE;
262     cap->run_queue_tl      = END_TSO_QUEUE;
263     cap->n_run_queue       = 0;
264 
265 #if defined(THREADED_RTS)
266     initMutex(&cap->lock);
267     cap->running_task      = NULL; // indicates cap is free
268     cap->spare_workers     = NULL;
269     cap->n_spare_workers   = 0;
270     cap->suspended_ccalls  = NULL;
271     cap->n_suspended_ccalls = 0;
272     cap->returning_tasks_hd = NULL;
273     cap->returning_tasks_tl = NULL;
274     cap->n_returning_tasks  = 0;
275     cap->inbox              = (Message*)END_TSO_QUEUE;
276     cap->putMVars           = NULL;
277     cap->sparks             = allocSparkPool();
278     cap->spark_stats.created    = 0;
279     cap->spark_stats.dud        = 0;
280     cap->spark_stats.overflowed = 0;
281     cap->spark_stats.converted  = 0;
282     cap->spark_stats.gcd        = 0;
283     cap->spark_stats.fizzled    = 0;
284 #if !defined(mingw32_HOST_OS)
285     cap->io_manager_control_wr_fd = -1;
286 #endif
287 #endif
288     cap->total_allocated        = 0;
289 
290     cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
291     cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
292     cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
293 
294     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
295                                      RtsFlags.GcFlags.generations,
296                                      "initCapability");
297     cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
298                                           RtsFlags.GcFlags.generations,
299                                           "initCapability");
300 
301 
302     // At this point storage manager is not initialized yet, so this will be
303     // initialized in initStorage().
304     cap->upd_rem_set.queue.blocks = NULL;
305 
306     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
307         cap->mut_lists[g] = NULL;
308     }
309 
310     cap->weak_ptr_list_hd = NULL;
311     cap->weak_ptr_list_tl = NULL;
312     cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
313     cap->free_trec_chunks = END_STM_CHUNK_LIST;
314     cap->free_trec_headers = NO_TREC;
315     cap->transaction_tokens = 0;
316     cap->context_switch = 0;
317     cap->interrupt = 0;
318     cap->pinned_object_block = NULL;
319     cap->pinned_object_blocks = NULL;
320 
321 #if defined(PROFILING)
322     cap->r.rCCCS = CCS_SYSTEM;
323 #else
324     cap->r.rCCCS = NULL;
325 #endif
326 
327     // cap->r.rCurrentTSO is charged for calls to allocate(), so we
328     // don't want it set when not running a Haskell thread.
329     cap->r.rCurrentTSO = NULL;
330 
331     traceCapCreate(cap);
332     traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
333     traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
334 #if defined(THREADED_RTS)
335     traceSparkCounters(cap);
336 #endif
337 }
338 
339 /* ---------------------------------------------------------------------------
340  * Function:  initCapabilities()
341  *
342  * Purpose:   set up the Capability handling. For the THREADED_RTS build,
343  *            we keep a table of them, the size of which is
344  *            controlled by the user via the RTS flag -N.
345  *
346  * ------------------------------------------------------------------------- */
initCapabilities(void)347 void initCapabilities (void)
348 {
349     uint32_t i;
350 
351     /* Declare a couple capability sets representing the process and
352        clock domain. Each capability will get added to these capsets. */
353     traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
354     traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
355 
356     // Initialise NUMA
357     if (!RtsFlags.GcFlags.numa) {
358         n_numa_nodes = 1;
359         for (i = 0; i < MAX_NUMA_NODES; i++) {
360             numa_map[i] = 0;
361         }
362     } else if (RtsFlags.DebugFlags.numa) {
363         // n_numa_nodes was set by RtsFlags.c
364     } else {
365         uint32_t nNodes = osNumaNodes();
366         if (nNodes > MAX_NUMA_NODES) {
367             barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
368         }
369         StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
370         uint32_t logical = 0, physical = 0;
371         for (; physical < MAX_NUMA_NODES; physical++) {
372             if (mask & 1) {
373                 numa_map[logical++] = physical;
374             }
375             mask = mask >> 1;
376         }
377         n_numa_nodes = logical;
378         if (logical == 0) {
379             barf("available NUMA node set is empty");
380         }
381     }
382 
383 #if defined(THREADED_RTS)
384 
385 #if !defined(REG_Base)
386     // We can't support multiple CPUs if BaseReg is not a register
387     if (RtsFlags.ParFlags.nCapabilities > 1) {
388         errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
389         RtsFlags.ParFlags.nCapabilities = 1;
390     }
391 #endif
392 
393     n_capabilities = 0;
394     moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
395     n_capabilities = RtsFlags.ParFlags.nCapabilities;
396 
397 #else /* !THREADED_RTS */
398 
399     n_capabilities = 1;
400     capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
401     capabilities[0] = &MainCapability;
402 
403     initCapability(&MainCapability, 0);
404 
405 #endif
406 
407     enabled_capabilities = n_capabilities;
408 
409     // There are no free capabilities to begin with.  We will start
410     // a worker Task to each Capability, which will quickly put the
411     // Capability on the free list when it finds nothing to do.
412     for (i = 0; i < n_numa_nodes; i++) {
413         last_free_capability[i] = capabilities[0];
414     }
415 }
416 
417 void
moreCapabilities(uint32_t from USED_IF_THREADS,uint32_t to USED_IF_THREADS)418 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
419 {
420 #if defined(THREADED_RTS)
421     Capability **new_capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
422 
423     // We must disable the timer while we do this since the tick handler may
424     // call contextSwitchAllCapabilities, which may see the capabilities array
425     // as we free it. The alternative would be to protect the capabilities
426     // array with a lock but this seems more expensive than necessary.
427     // See #17289.
428     stopTimer();
429 
430     if (to == 1) {
431         // THREADED_RTS must work on builds that don't have a mutable
432         // BaseReg (eg. unregisterised), so in this case
433         // capabilities[0] must coincide with &MainCapability.
434         new_capabilities[0] = &MainCapability;
435         initCapability(&MainCapability, 0);
436     }
437     else
438     {
439         for (uint32_t i = 0; i < to; i++) {
440             if (i < from) {
441                 new_capabilities[i] = capabilities[i];
442             } else {
443                 new_capabilities[i] = stgMallocBytes(sizeof(Capability),
444                                                      "moreCapabilities");
445                 initCapability(new_capabilities[i], i);
446             }
447         }
448     }
449 
450     debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
451 
452     Capability **old_capabilities = ACQUIRE_LOAD(&capabilities);
453     RELEASE_STORE(&capabilities, new_capabilities);
454     if (old_capabilities != NULL) {
455         stgFree(old_capabilities);
456     }
457 
458     startTimer();
459 #endif
460 }
461 
462 /* ----------------------------------------------------------------------------
463  * setContextSwitches: cause all capabilities to context switch as
464  * soon as possible.
465  * ------------------------------------------------------------------------- */
466 
contextSwitchAllCapabilities(void)467 void contextSwitchAllCapabilities(void)
468 {
469     uint32_t i;
470     for (i=0; i < n_capabilities; i++) {
471         contextSwitchCapability(capabilities[i]);
472     }
473 }
474 
interruptAllCapabilities(void)475 void interruptAllCapabilities(void)
476 {
477     uint32_t i;
478     for (i=0; i < n_capabilities; i++) {
479         interruptCapability(capabilities[i]);
480     }
481 }
482 
483 /* ----------------------------------------------------------------------------
484  * Give a Capability to a Task.  The task must currently be sleeping
485  * on its condition variable.
486  *
487  * Requires cap->lock (modifies cap->running_task).
488  *
489  * When migrating a Task, the migrater must take task->lock before
490  * modifying task->cap, to synchronise with the waking up Task.
491  * Additionally, the migrater should own the Capability (when
492  * migrating the run queue), or cap->lock (when migrating
493  * returning_workers).
494  *
495  * ------------------------------------------------------------------------- */
496 
497 #if defined(THREADED_RTS)
498 static void
giveCapabilityToTask(Capability * cap USED_IF_DEBUG,Task * task)499 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
500 {
501     ASSERT_LOCK_HELD(&cap->lock);
502     ASSERT(task->cap == cap);
503     debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
504                cap->no, task->incall->tso ? "bound task" : "worker",
505                serialisableTaskId(task));
506     ACQUIRE_LOCK(&task->lock);
507     if (task->wakeup == false) {
508         task->wakeup = true;
509         // the wakeup flag is needed because signalCondition() doesn't
510         // flag the condition if the thread is already running, but we want
511         // it to be sticky.
512         signalCondition(&task->cond);
513     }
514     RELEASE_LOCK(&task->lock);
515 }
516 #endif
517 
518 /* ----------------------------------------------------------------------------
519  * releaseCapability
520  *
521  * The current Task (cap->task) releases the Capability.  The Capability is
522  * marked free, and if there is any work to do, an appropriate Task is woken up.
523  *
524  * The caller must hold cap->lock and will still hold it after
525  * releaseCapability returns.
526  *
527  * N.B. May need to take all_tasks_mutex.
528  *
529  * ------------------------------------------------------------------------- */
530 
531 #if defined(THREADED_RTS)
532 void
releaseCapability_(Capability * cap,bool always_wakeup)533 releaseCapability_ (Capability* cap,
534                     bool always_wakeup)
535 {
536     Task *task;
537 
538     task = cap->running_task;
539 
540     ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
541     ASSERT_RETURNING_TASKS(cap,task);
542     ASSERT_LOCK_HELD(&cap->lock);
543 
544     RELAXED_STORE(&cap->running_task, NULL);
545 
546     // Check to see whether a worker thread can be given
547     // the go-ahead to return the result of an external call..
548     if (cap->n_returning_tasks != 0) {
549         giveCapabilityToTask(cap,cap->returning_tasks_hd);
550         // The Task pops itself from the queue (see waitForCapability())
551         return;
552     }
553 
554     // If there is a pending sync, then we should just leave the Capability
555     // free.  The thread trying to sync will be about to call
556     // waitForCapability().
557     //
558     // Note: this is *after* we check for a returning task above,
559     // because the task attempting to acquire all the capabilities may
560     // be currently in waitForCapability() waiting for this
561     // capability, in which case simply setting it as free would not
562     // wake up the waiting task.
563     PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
564     if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
565         debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
566         return;
567     }
568 
569     // If the next thread on the run queue is a bound thread,
570     // give this Capability to the appropriate Task.
571     if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
572         // Make sure we're not about to try to wake ourselves up
573         // ASSERT(task != cap->run_queue_hd->bound);
574         // assertion is false: in schedule() we force a yield after
575         // ThreadBlocked, but the thread may be back on the run queue
576         // by now.
577         task = peekRunQueue(cap)->bound->task;
578         giveCapabilityToTask(cap, task);
579         return;
580     }
581 
582     if (!cap->spare_workers) {
583         // Create a worker thread if we don't have one.  If the system
584         // is interrupted, we only create a worker task if there
585         // are threads that need to be completed.  If the system is
586         // shutting down, we never create a new worker.
587         if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
588             debugTrace(DEBUG_sched,
589                        "starting new worker on capability %d", cap->no);
590             startWorkerTask(cap);
591             return;
592         }
593     }
594 
595     // If we have an unbound thread on the run queue, or if there's
596     // anything else to do, give the Capability to a worker thread.
597     if (always_wakeup ||
598         !emptyRunQueue(cap) || !emptyInbox(cap) ||
599         (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
600         if (cap->spare_workers) {
601             giveCapabilityToTask(cap, cap->spare_workers);
602             // The worker Task pops itself from the queue;
603             return;
604         }
605     }
606 
607 #if defined(PROFILING)
608     cap->r.rCCCS = CCS_IDLE;
609 #endif
610     RELAXED_STORE(&last_free_capability[cap->node], cap);
611     debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
612 }
613 
614 void
releaseCapability(Capability * cap USED_IF_THREADS)615 releaseCapability (Capability* cap USED_IF_THREADS)
616 {
617     ACQUIRE_LOCK(&cap->lock);
618     releaseCapability_(cap, false);
619     RELEASE_LOCK(&cap->lock);
620 }
621 
622 void
releaseAndWakeupCapability(Capability * cap USED_IF_THREADS)623 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
624 {
625     ACQUIRE_LOCK(&cap->lock);
626     releaseCapability_(cap, true);
627     RELEASE_LOCK(&cap->lock);
628 }
629 
630 static void
enqueueWorker(Capability * cap USED_IF_THREADS)631 enqueueWorker (Capability* cap USED_IF_THREADS)
632 {
633     Task *task;
634 
635     task = cap->running_task;
636 
637     // If the Task is stopped, we shouldn't be yielding, we should
638     // be just exiting.
639     ASSERT(!task->stopped);
640     ASSERT(task->worker);
641 
642     if (cap->n_spare_workers < MAX_SPARE_WORKERS)
643     {
644         task->next = cap->spare_workers;
645         cap->spare_workers = task;
646         cap->n_spare_workers++;
647     }
648     else
649     {
650         debugTrace(DEBUG_sched, "%d spare workers already, exiting",
651                    cap->n_spare_workers);
652         releaseCapability_(cap,false);
653         // hold the lock until after workerTaskStop; c.f. scheduleWorker()
654         workerTaskStop(task);
655         RELEASE_LOCK(&cap->lock);
656         shutdownThread();
657     }
658 }
659 
660 #endif
661 
662 /*
663  * Note [Benign data race due to work-pushing]
664  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
665  *
666  * #17276 points out a tricky data race (noticed by ThreadSanitizer) between
667  * waitForWorkerCapability and schedulePushWork. In short, schedulePushWork
668  * works as follows:
669  *
670  *  1. collect the set of all idle capabilities, take cap->lock of each.
671  *
672  *  2. sort through each TSO on the calling capability's run queue and push
673  *     some to idle capabilities. This may (if the TSO is a bound thread)
674  *     involve setting tso->bound->task->cap despite not holding
675  *     tso->bound->task->lock.
676  *
677  *  3. release cap->lock of all idle capabilities.
678  *
679  * Now, step 2 is in principle safe since the capability of the caller of
680  * schedulePushWork *owns* the TSO and therefore the Task to which it is bound.
681  * Furthermore, step 3 ensures that the write in step (2) will be visible to
682  * any core which starts execution of the previously-idle capability.
683  *
684  * However, this argument doesn't quite work for waitForWorkerCapability, which
685  * reads task->cap *without* first owning the capability which owns `task`.
686  * For this reason, we check again whether the task has been migrated to
687  * another capability after taking task->cap->lock. See Note [migrated bound
688  * threads] above.
689  *
690  */
691 
692 /* ----------------------------------------------------------------------------
693  * waitForWorkerCapability(task)
694  *
695  * waits to be given a Capability, and then returns the Capability.  The task
696  * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
697  * ------------------------------------------------------------------------- */
698 
699 #if defined(THREADED_RTS)
700 
waitForWorkerCapability(Task * task)701 static Capability * waitForWorkerCapability (Task *task)
702 {
703     Capability *cap;
704 
705     for (;;) {
706         ACQUIRE_LOCK(&task->lock);
707         // task->lock held, cap->lock not held
708         if (!task->wakeup) waitCondition(&task->cond, &task->lock);
709         // The happens-after matches the happens-before in
710         // schedulePushWork, which does owns 'task' when it sets 'task->cap'.
711         TSAN_ANNOTATE_HAPPENS_AFTER(&task->cap);
712         cap = task->cap;
713 
714         // See Note [Benign data race due to work-pushing].
715         TSAN_ANNOTATE_BENIGN_RACE(&task->cap, "we will double-check this below");
716         task->wakeup = false;
717         RELEASE_LOCK(&task->lock);
718 
719         debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
720 
721         ACQUIRE_LOCK(&cap->lock);
722         if (cap->running_task != NULL) {
723             debugTrace(DEBUG_sched,
724                        "capability %d is owned by another task", cap->no);
725             RELEASE_LOCK(&cap->lock);
726             continue;
727         }
728 
729         if (task->cap != cap) {
730             // see Note [migrated bound threads]
731             debugTrace(DEBUG_sched,
732                        "task has been migrated to cap %d", task->cap->no);
733             RELEASE_LOCK(&cap->lock);
734             continue;
735         }
736 
737         if (task->incall->tso == NULL) {
738             ASSERT(cap->spare_workers != NULL);
739             // if we're not at the front of the queue, release it
740                 // again.  This is unlikely to happen.
741             if (cap->spare_workers != task) {
742                 giveCapabilityToTask(cap,cap->spare_workers);
743                 RELEASE_LOCK(&cap->lock);
744                 continue;
745             }
746             cap->spare_workers = task->next;
747             task->next = NULL;
748             cap->n_spare_workers--;
749         }
750 
751         RELAXED_STORE(&cap->running_task, task);
752         RELEASE_LOCK(&cap->lock);
753         break;
754     }
755 
756     return cap;
757 }
758 
759 #endif /* THREADED_RTS */
760 
761 /* ----------------------------------------------------------------------------
762  * waitForReturnCapability (Task *task)
763  *
764  * The Task should be on the cap->returning_tasks queue of a Capability.  This
765  * function waits for the Task to be woken up, and returns the Capability that
766  * it was woken up on.
767  *
768  * ------------------------------------------------------------------------- */
769 
770 #if defined(THREADED_RTS)
771 
waitForReturnCapability(Task * task)772 static Capability * waitForReturnCapability (Task *task)
773 {
774     Capability *cap;
775 
776     for (;;) {
777         ACQUIRE_LOCK(&task->lock);
778         // task->lock held, cap->lock not held
779         if (!task->wakeup) waitCondition(&task->cond, &task->lock);
780         cap = task->cap;
781         task->wakeup = false;
782         RELEASE_LOCK(&task->lock);
783 
784         // now check whether we should wake up...
785         ACQUIRE_LOCK(&cap->lock);
786         if (cap->running_task == NULL) {
787             if (cap->returning_tasks_hd != task) {
788                 giveCapabilityToTask(cap,cap->returning_tasks_hd);
789                 RELEASE_LOCK(&cap->lock);
790                 continue;
791             }
792             RELAXED_STORE(&cap->running_task, task);
793             popReturningTask(cap);
794             RELEASE_LOCK(&cap->lock);
795             break;
796         }
797         RELEASE_LOCK(&cap->lock);
798     }
799 
800     return cap;
801 }
802 
803 #endif /* THREADED_RTS */
804 
805 #if defined(THREADED_RTS)
806 
807 /* ----------------------------------------------------------------------------
808  * capability_is_busy (Capability *cap)
809  *
810  * A predicate for determining whether the given Capability is currently running
811  * a Task. This can be safely called without holding the Capability's lock
812  * although the result may be inaccurate if it races with the scheduler.
813  * Consequently there is a TSAN suppression for it.
814  *
815  * ------------------------------------------------------------------------- */
capability_is_busy(const Capability * cap)816 static bool capability_is_busy(const Capability * cap)
817 {
818     return RELAXED_LOAD(&cap->running_task) != NULL;
819 }
820 
821 
822 /* ----------------------------------------------------------------------------
823  * find_capability_for_task
824  *
825  * Given a Task, identify a reasonable Capability to run it on. We try to
826  * find an idle capability if possible.
827  *
828  * ------------------------------------------------------------------------- */
829 
find_capability_for_task(const Task * task)830 static Capability * find_capability_for_task(const Task * task)
831 {
832     if (task->preferred_capability != -1) {
833         // Does the task have a preferred capability? If so, use it
834         return capabilities[task->preferred_capability %
835                             enabled_capabilities];
836     } else {
837         // Try last_free_capability first
838         Capability *cap = RELAXED_LOAD(&last_free_capability[task->node]);
839 
840         // N.B. There is a data race here since we are loking at
841         // cap->running_task without taking cap->lock. However, this is
842         // benign since the result is merely guiding our search heuristic.
843         if (!capability_is_busy(cap)) {
844             return cap;
845         } else {
846             // The last_free_capability is already busy, search for a free
847             // capability on this node.
848             for (uint32_t i = task->node; i < enabled_capabilities;
849                   i += n_numa_nodes) {
850                 // visits all the capabilities on this node, because
851                 // cap[i]->node == i % n_numa_nodes
852                 if (!RELAXED_LOAD(&capabilities[i]->running_task)) {
853                     return capabilities[i];
854                 }
855             }
856 
857             // Can't find a free one, use last_free_capability.
858             return RELAXED_LOAD(&last_free_capability[task->node]);
859         }
860     }
861 }
862 #endif /* THREADED_RTS */
863 
864 /* ----------------------------------------------------------------------------
865  * waitForCapability (Capability **pCap, Task *task)
866  *
867  * Purpose:  when an OS thread returns from an external call,
868  * it calls waitForCapability() (via Schedule.resumeThread())
869  * to wait for permission to enter the RTS & communicate the
870  * result of the external call back to the Haskell thread that
871  * made it.
872  *
873  * pCap is strictly an output.
874  *
875  * ------------------------------------------------------------------------- */
876 
waitForCapability(Capability ** pCap,Task * task)877 void waitForCapability (Capability **pCap, Task *task)
878 {
879 #if !defined(THREADED_RTS)
880 
881     MainCapability.running_task = task;
882     task->cap = &MainCapability;
883     *pCap = &MainCapability;
884 
885 #else
886     Capability *cap = *pCap;
887 
888     if (cap == NULL) {
889         cap = find_capability_for_task(task);
890 
891         // record the Capability as the one this Task is now assocated with.
892         task->cap = cap;
893     } else {
894         ASSERT(task->cap == cap);
895     }
896 
897     debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
898 
899     ACQUIRE_LOCK(&cap->lock);
900     if (!cap->running_task) {
901         // It's free; just grab it
902         RELAXED_STORE(&cap->running_task, task);
903         RELEASE_LOCK(&cap->lock);
904     } else {
905         newReturningTask(cap,task);
906         RELEASE_LOCK(&cap->lock);
907         cap = waitForReturnCapability(task);
908     }
909 
910 #if defined(PROFILING)
911     cap->r.rCCCS = CCS_SYSTEM;
912 #endif
913 
914     ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
915 
916     debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
917 
918     *pCap = cap;
919 #endif
920 }
921 
922 /* ----------------------------------------------------------------------------
923  * yieldCapability
924  *
925  * Give up the Capability, and return when we have it again.  This is called
926  * when either we know that the Capability should be given to another Task, or
927  * there is nothing to do right now.  One of the following is true:
928  *
929  *    - The current Task is a worker, and there's a bound thread at the head of
930  *      the run queue (or vice versa)
931  *
932  *    - The run queue is empty.  We'll be woken up again when there's work to
933  *      do.
934  *
935  *    - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
936  *      We should become a GC worker for a while.
937  *
938  *    - Another Task is trying to acquire all the Capabilities (pending_sync !=
939  *      SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
940  *      setNumCapabilities.  We should give up the Capability temporarily.
941  *
942  * When yieldCapability returns *pCap will have been updated to the new
943  * capability held by the caller.
944  *
945  * ------------------------------------------------------------------------- */
946 
947 #if defined(THREADED_RTS)
948 
949 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
950    and return the bool */
951 bool /* Did we GC? */
yieldCapability(Capability ** pCap,Task * task,bool gcAllowed)952 yieldCapability (Capability** pCap, Task *task, bool gcAllowed)
953 {
954     Capability *cap = *pCap;
955 
956     if (gcAllowed)
957     {
958         PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
959 
960         if (sync) {
961             switch (sync->type) {
962             case SYNC_GC_PAR:
963                 if (! sync->idle[cap->no]) {
964                     traceEventGcStart(cap);
965                     gcWorkerThread(cap);
966                     traceEventGcEnd(cap);
967                     traceSparkCounters(cap);
968                     // See Note [migrated bound threads 2]
969                     if (task->cap == cap) {
970                         return true;
971                     }
972                 }
973                 break;
974 
975             case SYNC_FLUSH_UPD_REM_SET:
976                 debugTrace(DEBUG_nonmoving_gc, "Flushing update remembered set blocks...");
977                 break;
978 
979             default:
980                 break;
981             }
982         }
983     }
984 
985     debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
986 
987     // We must now release the capability and wait to be woken up again.
988     task->wakeup = false;
989 
990     ACQUIRE_LOCK(&cap->lock);
991 
992     // If this is a worker thread, put it on the spare_workers queue
993     if (isWorker(task)) {
994         enqueueWorker(cap);
995     }
996 
997     releaseCapability_(cap, false);
998 
999     if (isWorker(task) || isBoundTask(task)) {
1000         RELEASE_LOCK(&cap->lock);
1001         cap = waitForWorkerCapability(task);
1002     } else {
1003         // Not a worker Task, or a bound Task.  The only way we can be woken up
1004         // again is to put ourselves on the returning_tasks queue, so that's
1005         // what we do.  We still hold cap->lock at this point
1006         // The Task waiting for this Capability does not have it
1007         // yet, so we can be sure to be woken up later. (see #10545)
1008         newReturningTask(cap,task);
1009         RELEASE_LOCK(&cap->lock);
1010         cap = waitForReturnCapability(task);
1011     }
1012 
1013     debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
1014     ASSERT(cap->running_task == task);
1015 
1016 #if defined(PROFILING)
1017     cap->r.rCCCS = CCS_SYSTEM;
1018 #endif
1019 
1020     *pCap = cap;
1021 
1022     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1023 
1024     return false;
1025 }
1026 
1027 #endif /* THREADED_RTS */
1028 
1029 /*
1030  * Note [migrated bound threads]
1031  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1032  *
1033  * There's a tricky case where:
1034  *    - cap A is running an unbound thread T1
1035  *    - there is a bound thread T2 at the head of the run queue on cap A
1036  *    - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
1037  *    - T1 returns quickly grabbing A again (T2 is still waking up on A)
1038  *    - T1 blocks, the scheduler migrates T2 to cap B
1039  *    - the task bound to T2 wakes up on cap B
1040  *
1041  * We take advantage of the following invariant:
1042  *
1043  *  - A bound thread can only be migrated by the holder of the
1044  *    Capability on which the bound thread currently lives.  So, if we
1045  *    hold Capability C, and task->cap == C, then task cannot be
1046  *    migrated under our feet.
1047  *
1048  * See also Note [Benign data race due to work-pushing].
1049  *
1050  *
1051  * Note [migrated bound threads 2]
1052  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1053  *
1054  * Second tricky case;
1055  *   - A bound Task becomes a GC thread
1056  *   - scheduleDoGC() migrates the thread belonging to this Task,
1057  *     because the Capability it is on is disabled
1058  *   - after GC, gcWorkerThread() returns, but now we are
1059  *     holding a Capability that is not the same as task->cap
1060  *   - Hence we must check for this case and immediately give up the
1061  *     cap we hold.
1062  *
1063  */
1064 
1065 /* ----------------------------------------------------------------------------
1066  * prodCapability
1067  *
1068  * If a Capability is currently idle, wake up a Task on it.  Used to
1069  * get every Capability into the GC.
1070  * ------------------------------------------------------------------------- */
1071 
1072 #if defined(THREADED_RTS)
1073 
1074 void
prodCapability(Capability * cap,Task * task)1075 prodCapability (Capability *cap, Task *task)
1076 {
1077     ACQUIRE_LOCK(&cap->lock);
1078     if (!cap->running_task) {
1079         cap->running_task = task;
1080         releaseCapability_(cap,true);
1081     }
1082     RELEASE_LOCK(&cap->lock);
1083 }
1084 
1085 #endif /* THREADED_RTS */
1086 
1087 /* ----------------------------------------------------------------------------
1088  * tryGrabCapability
1089  *
1090  * Attempt to gain control of a Capability if it is free.
1091  *
1092  * ------------------------------------------------------------------------- */
1093 
1094 #if defined(THREADED_RTS)
1095 
1096 bool
tryGrabCapability(Capability * cap,Task * task)1097 tryGrabCapability (Capability *cap, Task *task)
1098 {
1099     int r;
1100     // N.B. This is benign as we will check again after taking the lock.
1101     TSAN_ANNOTATE_BENIGN_RACE(&cap->running_task, "tryGrabCapability (cap->running_task)");
1102     if (RELAXED_LOAD(&cap->running_task) != NULL) return false;
1103 
1104     r = TRY_ACQUIRE_LOCK(&cap->lock);
1105     if (r != 0) return false;
1106     if (cap->running_task != NULL) {
1107         RELEASE_LOCK(&cap->lock);
1108         return false;
1109     }
1110     task->cap = cap;
1111     RELAXED_STORE(&cap->running_task, task);
1112     RELEASE_LOCK(&cap->lock);
1113     return true;
1114 }
1115 
1116 
1117 #endif /* THREADED_RTS */
1118 
1119 /* ----------------------------------------------------------------------------
1120  * shutdownCapability
1121  *
1122  * At shutdown time, we want to let everything exit as cleanly as
1123  * possible.  For each capability, we let its run queue drain, and
1124  * allow the workers to stop.
1125  *
1126  * This function should be called when interrupted and
1127  * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
1128  * will exit the scheduler and call taskStop(), and any bound thread
1129  * that wakes up will return to its caller.  Runnable threads are
1130  * killed.
1131  *
1132  * ------------------------------------------------------------------------- */
1133 
1134 static void
shutdownCapability(Capability * cap USED_IF_THREADS,Task * task USED_IF_THREADS,bool safe USED_IF_THREADS)1135 shutdownCapability (Capability *cap USED_IF_THREADS,
1136                     Task *task USED_IF_THREADS,
1137                     bool safe USED_IF_THREADS)
1138 {
1139 #if defined(THREADED_RTS)
1140     uint32_t i;
1141 
1142     task->cap = cap;
1143 
1144     // Loop indefinitely until all the workers have exited and there
1145     // are no Haskell threads left.  We used to bail out after 50
1146     // iterations of this loop, but that occasionally left a worker
1147     // running which caused problems later (the closeMutex() below
1148     // isn't safe, for one thing).
1149 
1150     for (i = 0; /* i < 50 */; i++) {
1151         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1152 
1153         debugTrace(DEBUG_sched,
1154                    "shutting down capability %d, attempt %d", cap->no, i);
1155         ACQUIRE_LOCK(&cap->lock);
1156         if (cap->running_task) {
1157             RELEASE_LOCK(&cap->lock);
1158             debugTrace(DEBUG_sched, "not owner, yielding");
1159             yieldThread();
1160             continue;
1161         }
1162         cap->running_task = task;
1163 
1164         if (cap->spare_workers) {
1165             // Look for workers that have died without removing
1166             // themselves from the list; this could happen if the OS
1167             // summarily killed the thread, for example.  This
1168             // actually happens on Windows when the system is
1169             // terminating the program, and the RTS is running in a
1170             // DLL.
1171             Task *t, *prev;
1172             prev = NULL;
1173             for (t = cap->spare_workers; t != NULL; t = t->next) {
1174                 if (!osThreadIsAlive(t->id)) {
1175                     debugTrace(DEBUG_sched,
1176                                "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1177                     cap->n_spare_workers--;
1178                     if (!prev) {
1179                         cap->spare_workers = t->next;
1180                     } else {
1181                         prev->next = t->next;
1182                     }
1183                     prev = t;
1184                 }
1185             }
1186         }
1187 
1188         if (!emptyRunQueue(cap) || cap->spare_workers) {
1189             debugTrace(DEBUG_sched,
1190                        "runnable threads or workers still alive, yielding");
1191             releaseCapability_(cap,false); // this will wake up a worker
1192             RELEASE_LOCK(&cap->lock);
1193             yieldThread();
1194             continue;
1195         }
1196 
1197         // If "safe", then busy-wait for any threads currently doing
1198         // foreign calls.  If we're about to unload this DLL, for
1199         // example, we need to be sure that there are no OS threads
1200         // that will try to return to code that has been unloaded.
1201         // We can be a bit more relaxed when this is a standalone
1202         // program that is about to terminate, and let safe=false.
1203         if (cap->suspended_ccalls && safe) {
1204             debugTrace(DEBUG_sched,
1205                        "thread(s) are involved in foreign calls, yielding");
1206             cap->running_task = NULL;
1207             RELEASE_LOCK(&cap->lock);
1208             // The IO manager thread might have been slow to start up,
1209             // so the first attempt to kill it might not have
1210             // succeeded.  Just in case, try again - the kill message
1211             // will only be sent once.
1212             //
1213             // To reproduce this deadlock: run ffi002(threaded1)
1214             // repeatedly on a loaded machine.
1215             ioManagerDie();
1216             yieldThread();
1217             continue;
1218         }
1219 
1220         traceSparkCounters(cap);
1221         RELEASE_LOCK(&cap->lock);
1222         break;
1223     }
1224     // we now have the Capability, its run queue and spare workers
1225     // list are both empty.
1226 
1227     // ToDo: we can't drop this mutex, because there might still be
1228     // threads performing foreign calls that will eventually try to
1229     // return via resumeThread() and attempt to grab cap->lock.
1230     // closeMutex(&cap->lock);
1231 #endif
1232 }
1233 
1234 void
shutdownCapabilities(Task * task,bool safe)1235 shutdownCapabilities(Task *task, bool safe)
1236 {
1237     uint32_t i;
1238     for (i=0; i < n_capabilities; i++) {
1239         ASSERT(task->incall->tso == NULL);
1240         shutdownCapability(capabilities[i], task, safe);
1241     }
1242 #if defined(THREADED_RTS)
1243     ASSERT(checkSparkCountInvariant());
1244 #endif
1245 }
1246 
1247 static void
freeCapability(Capability * cap)1248 freeCapability (Capability *cap)
1249 {
1250     stgFree(cap->mut_lists);
1251     stgFree(cap->saved_mut_lists);
1252 #if defined(THREADED_RTS)
1253     freeSparkPool(cap->sparks);
1254 #endif
1255     traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1256     traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1257     traceCapDelete(cap);
1258 }
1259 
1260 void
freeCapabilities(void)1261 freeCapabilities (void)
1262 {
1263 #if defined(THREADED_RTS)
1264     uint32_t i;
1265     for (i=0; i < n_capabilities; i++) {
1266         freeCapability(capabilities[i]);
1267         if (capabilities[i] != &MainCapability)
1268             stgFree(capabilities[i]);
1269     }
1270 #else
1271     freeCapability(&MainCapability);
1272 #endif
1273     stgFree(capabilities);
1274     traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1275     traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1276 }
1277 
1278 /* ---------------------------------------------------------------------------
1279    Mark everything directly reachable from the Capabilities.  When
1280    using multiple GC threads, each GC thread marks all Capabilities
1281    for which (c `mod` n == 0), for Capability c and thread n.
1282    ------------------------------------------------------------------------ */
1283 
1284 void
markCapability(evac_fn evac,void * user,Capability * cap,bool no_mark_sparks USED_IF_THREADS)1285 markCapability (evac_fn evac, void *user, Capability *cap,
1286                 bool no_mark_sparks USED_IF_THREADS)
1287 {
1288     InCall *incall;
1289 
1290     // Each GC thread is responsible for following roots from the
1291     // Capability of the same number.  There will usually be the same
1292     // or fewer Capabilities as GC threads, but just in case there
1293     // are more, we mark every Capability whose number is the GC
1294     // thread's index plus a multiple of the number of GC threads.
1295     evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1296     evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1297 #if defined(THREADED_RTS)
1298     evac(user, (StgClosure **)(void *)&cap->inbox);
1299 #endif
1300     for (incall = cap->suspended_ccalls; incall != NULL;
1301          incall=incall->next) {
1302         evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1303     }
1304 
1305 #if defined(THREADED_RTS)
1306     if (!no_mark_sparks) {
1307         traverseSparkQueue (evac, user, cap);
1308     }
1309 #endif
1310 
1311     // Free STM structures for this Capability
1312     stmPreGCHook(cap);
1313 }
1314 
1315 void
markCapabilities(evac_fn evac,void * user)1316 markCapabilities (evac_fn evac, void *user)
1317 {
1318     uint32_t n;
1319     for (n = 0; n < n_capabilities; n++) {
1320         markCapability(evac, user, capabilities[n], false);
1321     }
1322 }
1323 
1324 #if defined(THREADED_RTS)
checkSparkCountInvariant(void)1325 bool checkSparkCountInvariant (void)
1326 {
1327     SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1328     StgWord64 remaining = 0;
1329     uint32_t i;
1330 
1331     for (i = 0; i < n_capabilities; i++) {
1332         sparks.created   += capabilities[i]->spark_stats.created;
1333         sparks.dud       += capabilities[i]->spark_stats.dud;
1334         sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1335         sparks.converted += capabilities[i]->spark_stats.converted;
1336         sparks.gcd       += capabilities[i]->spark_stats.gcd;
1337         sparks.fizzled   += capabilities[i]->spark_stats.fizzled;
1338         remaining        += sparkPoolSize(capabilities[i]->sparks);
1339     }
1340 
1341     /* The invariant is
1342      *   created = converted + remaining + gcd + fizzled
1343      */
1344     debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1345                             "(created == converted + remaining + gcd + fizzled)",
1346                             sparks.created, sparks.converted, remaining,
1347                             sparks.gcd, sparks.fizzled);
1348 
1349     return (sparks.created ==
1350               sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1351 
1352 }
1353 #endif
1354 
1355 #if !defined(mingw32_HOST_OS)
1356 void
setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS,int fd USED_IF_THREADS)1357 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1358 #if defined(THREADED_RTS)
1359     if (cap_no < n_capabilities) {
1360         RELAXED_STORE(&capabilities[cap_no]->io_manager_control_wr_fd, fd);
1361     } else {
1362         errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1363     }
1364 #endif
1365 }
1366 #endif
1367