1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2003-2012
4 *
5 * Capabilities
6 *
7 * A Capability represents the token required to execute STG code,
8 * and all the state an OS thread/task needs to run Haskell code:
9 * its STG registers, a pointer to its TSO, a nursery etc. During
10 * STG execution, a pointer to the capabilitity is kept in a
11 * register (BaseReg; actually it is a pointer to cap->r).
12 *
13 * Only in a THREADED_RTS build will there be multiple capabilities,
14 * for non-threaded builds there is only one global capability, namely
15 * MainCapability.
16 *
17 * --------------------------------------------------------------------------*/
18
19 #include "PosixSource.h"
20 #include "Rts.h"
21
22 #include "Capability.h"
23 #include "Schedule.h"
24 #include "Sparks.h"
25 #include "Trace.h"
26 #include "sm/GC.h" // for gcWorkerThread()
27 #include "STM.h"
28 #include "RtsUtils.h"
29 #include "sm/OSMem.h"
30 #include "sm/BlockAlloc.h" // for countBlocks()
31
32 #if !defined(mingw32_HOST_OS)
33 #include "rts/IOManager.h" // for setIOManagerControlFd()
34 #endif
35
36 #include <string.h>
37
38 // one global capability, this is the Capability for non-threaded
39 // builds, and for +RTS -N1
40 Capability MainCapability;
41
42 uint32_t n_capabilities = 0;
43 uint32_t enabled_capabilities = 0;
44
45 // The array of Capabilities. It's important that when we need
46 // to allocate more Capabilities we don't have to move the existing
47 // Capabilities, because there may be pointers to them in use
48 // (e.g. threads in waitForCapability(), see #8209), so this is
49 // an array of Capability* rather than an array of Capability.
50 Capability **capabilities = NULL;
51
52 // Holds the Capability which last became free. This is used so that
53 // an in-call has a chance of quickly finding a free Capability.
54 // Maintaining a global free list of Capabilities would require global
55 // locking, so we don't do that.
56 static Capability *last_free_capability[MAX_NUMA_NODES];
57
58 /*
59 * Indicates that the RTS wants to synchronise all the Capabilities
60 * for some reason. All Capabilities should yieldCapability().
61 */
62 PendingSync * volatile pending_sync = 0;
63
64 // Number of logical NUMA nodes
65 uint32_t n_numa_nodes;
66
67 // Map logical NUMA node to OS node numbers
68 uint32_t numa_map[MAX_NUMA_NODES];
69
70 /* Let foreign code get the current Capability -- assuming there is one!
71 * This is useful for unsafe foreign calls because they are called with
72 * the current Capability held, but they are not passed it. For example,
73 * see see the integer-gmp package which calls allocate() in its
74 * stgAllocForGMP() function (which gets called by gmp functions).
75 * */
rts_unsafeGetMyCapability(void)76 Capability * rts_unsafeGetMyCapability (void)
77 {
78 #if defined(THREADED_RTS)
79 return myTask()->cap;
80 #else
81 return &MainCapability;
82 #endif
83 }
84
85 #if defined(THREADED_RTS)
86 STATIC_INLINE bool
globalWorkToDo(void)87 globalWorkToDo (void)
88 {
89 return RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING
90 || RELAXED_LOAD(&recent_activity) == ACTIVITY_INACTIVE; // need to check for deadlock
91 }
92 #endif
93
94 #if defined(THREADED_RTS)
95 StgClosure *
findSpark(Capability * cap)96 findSpark (Capability *cap)
97 {
98 Capability *robbed;
99 StgClosurePtr spark;
100 bool retry;
101 uint32_t i = 0;
102
103 if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
104 // If there are other threads, don't try to run any new
105 // sparks: sparks might be speculative, we don't want to take
106 // resources away from the main computation.
107 return 0;
108 }
109
110 do {
111 retry = false;
112
113 // first try to get a spark from our own pool.
114 // We should be using reclaimSpark(), because it works without
115 // needing any atomic instructions:
116 // spark = reclaimSpark(cap->sparks);
117 // However, measurements show that this makes at least one benchmark
118 // slower (prsa) and doesn't affect the others.
119 spark = tryStealSpark(cap->sparks);
120 while (spark != NULL && fizzledSpark(spark)) {
121 cap->spark_stats.fizzled++;
122 traceEventSparkFizzle(cap);
123 spark = tryStealSpark(cap->sparks);
124 }
125 if (spark != NULL) {
126 cap->spark_stats.converted++;
127
128 // Post event for running a spark from capability's own pool.
129 traceEventSparkRun(cap);
130
131 return spark;
132 }
133 if (!emptySparkPoolCap(cap)) {
134 retry = true;
135 }
136
137 if (n_capabilities == 1) { return NULL; } // makes no sense...
138
139 debugTrace(DEBUG_sched,
140 "cap %d: Trying to steal work from other capabilities",
141 cap->no);
142
143 /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
144 start at a random place instead of 0 as well. */
145 for ( i=0 ; i < n_capabilities ; i++ ) {
146 robbed = capabilities[i];
147 if (cap == robbed) // ourselves...
148 continue;
149
150 if (emptySparkPoolCap(robbed)) // nothing to steal here
151 continue;
152
153 spark = tryStealSpark(robbed->sparks);
154 while (spark != NULL && fizzledSpark(spark)) {
155 cap->spark_stats.fizzled++;
156 traceEventSparkFizzle(cap);
157 spark = tryStealSpark(robbed->sparks);
158 }
159 if (spark == NULL && !emptySparkPoolCap(robbed)) {
160 // we conflicted with another thread while trying to steal;
161 // try again later.
162 retry = true;
163 }
164
165 if (spark != NULL) {
166 cap->spark_stats.converted++;
167 traceEventSparkSteal(cap, robbed->no);
168
169 return spark;
170 }
171 // otherwise: no success, try next one
172 }
173 } while (retry);
174
175 debugTrace(DEBUG_sched, "No sparks stolen");
176 return NULL;
177 }
178
179 // Returns True if any spark pool is non-empty at this moment in time
180 // The result is only valid for an instant, of course, so in a sense
181 // is immediately invalid, and should not be relied upon for
182 // correctness.
183 bool
anySparks(void)184 anySparks (void)
185 {
186 uint32_t i;
187
188 for (i=0; i < n_capabilities; i++) {
189 if (!emptySparkPoolCap(capabilities[i])) {
190 return true;
191 }
192 }
193 return false;
194 }
195 #endif
196
197 /* -----------------------------------------------------------------------------
198 * Manage the returning_tasks lists.
199 *
200 * These functions require cap->lock
201 * -------------------------------------------------------------------------- */
202
203 #if defined(THREADED_RTS)
204 STATIC_INLINE void
newReturningTask(Capability * cap,Task * task)205 newReturningTask (Capability *cap, Task *task)
206 {
207 ASSERT_LOCK_HELD(&cap->lock);
208 ASSERT(task->next == NULL);
209 if (cap->returning_tasks_hd) {
210 ASSERT(cap->returning_tasks_tl->next == NULL);
211 cap->returning_tasks_tl->next = task;
212 } else {
213 cap->returning_tasks_hd = task;
214 }
215 cap->returning_tasks_tl = task;
216
217 // See Note [Data race in shouldYieldCapability] in Schedule.c.
218 RELAXED_ADD(&cap->n_returning_tasks, 1);
219
220 ASSERT_RETURNING_TASKS(cap,task);
221 }
222
223 STATIC_INLINE Task *
popReturningTask(Capability * cap)224 popReturningTask (Capability *cap)
225 {
226 ASSERT_LOCK_HELD(&cap->lock);
227 Task *task;
228 task = cap->returning_tasks_hd;
229 ASSERT(task);
230 cap->returning_tasks_hd = task->next;
231 if (!cap->returning_tasks_hd) {
232 cap->returning_tasks_tl = NULL;
233 }
234 task->next = NULL;
235
236 // See Note [Data race in shouldYieldCapability] in Schedule.c.
237 RELAXED_ADD(&cap->n_returning_tasks, -1);
238
239 ASSERT_RETURNING_TASKS(cap,task);
240 return task;
241 }
242 #endif
243
244 /* ----------------------------------------------------------------------------
245 * Initialisation
246 *
247 * The Capability is initially marked not free.
248 * ------------------------------------------------------------------------- */
249
250 static void
initCapability(Capability * cap,uint32_t i)251 initCapability (Capability *cap, uint32_t i)
252 {
253 uint32_t g;
254
255 cap->no = i;
256 cap->node = capNoToNumaNode(i);
257 cap->in_haskell = false;
258 cap->idle = 0;
259 cap->disabled = false;
260
261 cap->run_queue_hd = END_TSO_QUEUE;
262 cap->run_queue_tl = END_TSO_QUEUE;
263 cap->n_run_queue = 0;
264
265 #if defined(THREADED_RTS)
266 initMutex(&cap->lock);
267 cap->running_task = NULL; // indicates cap is free
268 cap->spare_workers = NULL;
269 cap->n_spare_workers = 0;
270 cap->suspended_ccalls = NULL;
271 cap->n_suspended_ccalls = 0;
272 cap->returning_tasks_hd = NULL;
273 cap->returning_tasks_tl = NULL;
274 cap->n_returning_tasks = 0;
275 cap->inbox = (Message*)END_TSO_QUEUE;
276 cap->putMVars = NULL;
277 cap->sparks = allocSparkPool();
278 cap->spark_stats.created = 0;
279 cap->spark_stats.dud = 0;
280 cap->spark_stats.overflowed = 0;
281 cap->spark_stats.converted = 0;
282 cap->spark_stats.gcd = 0;
283 cap->spark_stats.fizzled = 0;
284 #if !defined(mingw32_HOST_OS)
285 cap->io_manager_control_wr_fd = -1;
286 #endif
287 #endif
288 cap->total_allocated = 0;
289
290 cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
291 cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
292 cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
293
294 cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
295 RtsFlags.GcFlags.generations,
296 "initCapability");
297 cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
298 RtsFlags.GcFlags.generations,
299 "initCapability");
300
301
302 // At this point storage manager is not initialized yet, so this will be
303 // initialized in initStorage().
304 cap->upd_rem_set.queue.blocks = NULL;
305
306 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
307 cap->mut_lists[g] = NULL;
308 }
309
310 cap->weak_ptr_list_hd = NULL;
311 cap->weak_ptr_list_tl = NULL;
312 cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
313 cap->free_trec_chunks = END_STM_CHUNK_LIST;
314 cap->free_trec_headers = NO_TREC;
315 cap->transaction_tokens = 0;
316 cap->context_switch = 0;
317 cap->interrupt = 0;
318 cap->pinned_object_block = NULL;
319 cap->pinned_object_blocks = NULL;
320
321 #if defined(PROFILING)
322 cap->r.rCCCS = CCS_SYSTEM;
323 #else
324 cap->r.rCCCS = NULL;
325 #endif
326
327 // cap->r.rCurrentTSO is charged for calls to allocate(), so we
328 // don't want it set when not running a Haskell thread.
329 cap->r.rCurrentTSO = NULL;
330
331 traceCapCreate(cap);
332 traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
333 traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
334 #if defined(THREADED_RTS)
335 traceSparkCounters(cap);
336 #endif
337 }
338
339 /* ---------------------------------------------------------------------------
340 * Function: initCapabilities()
341 *
342 * Purpose: set up the Capability handling. For the THREADED_RTS build,
343 * we keep a table of them, the size of which is
344 * controlled by the user via the RTS flag -N.
345 *
346 * ------------------------------------------------------------------------- */
initCapabilities(void)347 void initCapabilities (void)
348 {
349 uint32_t i;
350
351 /* Declare a couple capability sets representing the process and
352 clock domain. Each capability will get added to these capsets. */
353 traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
354 traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
355
356 // Initialise NUMA
357 if (!RtsFlags.GcFlags.numa) {
358 n_numa_nodes = 1;
359 for (i = 0; i < MAX_NUMA_NODES; i++) {
360 numa_map[i] = 0;
361 }
362 } else if (RtsFlags.DebugFlags.numa) {
363 // n_numa_nodes was set by RtsFlags.c
364 } else {
365 uint32_t nNodes = osNumaNodes();
366 if (nNodes > MAX_NUMA_NODES) {
367 barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES);
368 }
369 StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask();
370 uint32_t logical = 0, physical = 0;
371 for (; physical < MAX_NUMA_NODES; physical++) {
372 if (mask & 1) {
373 numa_map[logical++] = physical;
374 }
375 mask = mask >> 1;
376 }
377 n_numa_nodes = logical;
378 if (logical == 0) {
379 barf("available NUMA node set is empty");
380 }
381 }
382
383 #if defined(THREADED_RTS)
384
385 #if !defined(REG_Base)
386 // We can't support multiple CPUs if BaseReg is not a register
387 if (RtsFlags.ParFlags.nCapabilities > 1) {
388 errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
389 RtsFlags.ParFlags.nCapabilities = 1;
390 }
391 #endif
392
393 n_capabilities = 0;
394 moreCapabilities(0, RtsFlags.ParFlags.nCapabilities);
395 n_capabilities = RtsFlags.ParFlags.nCapabilities;
396
397 #else /* !THREADED_RTS */
398
399 n_capabilities = 1;
400 capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
401 capabilities[0] = &MainCapability;
402
403 initCapability(&MainCapability, 0);
404
405 #endif
406
407 enabled_capabilities = n_capabilities;
408
409 // There are no free capabilities to begin with. We will start
410 // a worker Task to each Capability, which will quickly put the
411 // Capability on the free list when it finds nothing to do.
412 for (i = 0; i < n_numa_nodes; i++) {
413 last_free_capability[i] = capabilities[0];
414 }
415 }
416
417 void
moreCapabilities(uint32_t from USED_IF_THREADS,uint32_t to USED_IF_THREADS)418 moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
419 {
420 #if defined(THREADED_RTS)
421 Capability **new_capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
422
423 // We must disable the timer while we do this since the tick handler may
424 // call contextSwitchAllCapabilities, which may see the capabilities array
425 // as we free it. The alternative would be to protect the capabilities
426 // array with a lock but this seems more expensive than necessary.
427 // See #17289.
428 stopTimer();
429
430 if (to == 1) {
431 // THREADED_RTS must work on builds that don't have a mutable
432 // BaseReg (eg. unregisterised), so in this case
433 // capabilities[0] must coincide with &MainCapability.
434 new_capabilities[0] = &MainCapability;
435 initCapability(&MainCapability, 0);
436 }
437 else
438 {
439 for (uint32_t i = 0; i < to; i++) {
440 if (i < from) {
441 new_capabilities[i] = capabilities[i];
442 } else {
443 new_capabilities[i] = stgMallocBytes(sizeof(Capability),
444 "moreCapabilities");
445 initCapability(new_capabilities[i], i);
446 }
447 }
448 }
449
450 debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
451
452 Capability **old_capabilities = ACQUIRE_LOAD(&capabilities);
453 RELEASE_STORE(&capabilities, new_capabilities);
454 if (old_capabilities != NULL) {
455 stgFree(old_capabilities);
456 }
457
458 startTimer();
459 #endif
460 }
461
462 /* ----------------------------------------------------------------------------
463 * setContextSwitches: cause all capabilities to context switch as
464 * soon as possible.
465 * ------------------------------------------------------------------------- */
466
contextSwitchAllCapabilities(void)467 void contextSwitchAllCapabilities(void)
468 {
469 uint32_t i;
470 for (i=0; i < n_capabilities; i++) {
471 contextSwitchCapability(capabilities[i]);
472 }
473 }
474
interruptAllCapabilities(void)475 void interruptAllCapabilities(void)
476 {
477 uint32_t i;
478 for (i=0; i < n_capabilities; i++) {
479 interruptCapability(capabilities[i]);
480 }
481 }
482
483 /* ----------------------------------------------------------------------------
484 * Give a Capability to a Task. The task must currently be sleeping
485 * on its condition variable.
486 *
487 * Requires cap->lock (modifies cap->running_task).
488 *
489 * When migrating a Task, the migrater must take task->lock before
490 * modifying task->cap, to synchronise with the waking up Task.
491 * Additionally, the migrater should own the Capability (when
492 * migrating the run queue), or cap->lock (when migrating
493 * returning_workers).
494 *
495 * ------------------------------------------------------------------------- */
496
497 #if defined(THREADED_RTS)
498 static void
giveCapabilityToTask(Capability * cap USED_IF_DEBUG,Task * task)499 giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
500 {
501 ASSERT_LOCK_HELD(&cap->lock);
502 ASSERT(task->cap == cap);
503 debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
504 cap->no, task->incall->tso ? "bound task" : "worker",
505 serialisableTaskId(task));
506 ACQUIRE_LOCK(&task->lock);
507 if (task->wakeup == false) {
508 task->wakeup = true;
509 // the wakeup flag is needed because signalCondition() doesn't
510 // flag the condition if the thread is already running, but we want
511 // it to be sticky.
512 signalCondition(&task->cond);
513 }
514 RELEASE_LOCK(&task->lock);
515 }
516 #endif
517
518 /* ----------------------------------------------------------------------------
519 * releaseCapability
520 *
521 * The current Task (cap->task) releases the Capability. The Capability is
522 * marked free, and if there is any work to do, an appropriate Task is woken up.
523 *
524 * The caller must hold cap->lock and will still hold it after
525 * releaseCapability returns.
526 *
527 * N.B. May need to take all_tasks_mutex.
528 *
529 * ------------------------------------------------------------------------- */
530
531 #if defined(THREADED_RTS)
532 void
releaseCapability_(Capability * cap,bool always_wakeup)533 releaseCapability_ (Capability* cap,
534 bool always_wakeup)
535 {
536 Task *task;
537
538 task = cap->running_task;
539
540 ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
541 ASSERT_RETURNING_TASKS(cap,task);
542 ASSERT_LOCK_HELD(&cap->lock);
543
544 RELAXED_STORE(&cap->running_task, NULL);
545
546 // Check to see whether a worker thread can be given
547 // the go-ahead to return the result of an external call..
548 if (cap->n_returning_tasks != 0) {
549 giveCapabilityToTask(cap,cap->returning_tasks_hd);
550 // The Task pops itself from the queue (see waitForCapability())
551 return;
552 }
553
554 // If there is a pending sync, then we should just leave the Capability
555 // free. The thread trying to sync will be about to call
556 // waitForCapability().
557 //
558 // Note: this is *after* we check for a returning task above,
559 // because the task attempting to acquire all the capabilities may
560 // be currently in waitForCapability() waiting for this
561 // capability, in which case simply setting it as free would not
562 // wake up the waiting task.
563 PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
564 if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
565 debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
566 return;
567 }
568
569 // If the next thread on the run queue is a bound thread,
570 // give this Capability to the appropriate Task.
571 if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
572 // Make sure we're not about to try to wake ourselves up
573 // ASSERT(task != cap->run_queue_hd->bound);
574 // assertion is false: in schedule() we force a yield after
575 // ThreadBlocked, but the thread may be back on the run queue
576 // by now.
577 task = peekRunQueue(cap)->bound->task;
578 giveCapabilityToTask(cap, task);
579 return;
580 }
581
582 if (!cap->spare_workers) {
583 // Create a worker thread if we don't have one. If the system
584 // is interrupted, we only create a worker task if there
585 // are threads that need to be completed. If the system is
586 // shutting down, we never create a new worker.
587 if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
588 debugTrace(DEBUG_sched,
589 "starting new worker on capability %d", cap->no);
590 startWorkerTask(cap);
591 return;
592 }
593 }
594
595 // If we have an unbound thread on the run queue, or if there's
596 // anything else to do, give the Capability to a worker thread.
597 if (always_wakeup ||
598 !emptyRunQueue(cap) || !emptyInbox(cap) ||
599 (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
600 if (cap->spare_workers) {
601 giveCapabilityToTask(cap, cap->spare_workers);
602 // The worker Task pops itself from the queue;
603 return;
604 }
605 }
606
607 #if defined(PROFILING)
608 cap->r.rCCCS = CCS_IDLE;
609 #endif
610 RELAXED_STORE(&last_free_capability[cap->node], cap);
611 debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
612 }
613
614 void
releaseCapability(Capability * cap USED_IF_THREADS)615 releaseCapability (Capability* cap USED_IF_THREADS)
616 {
617 ACQUIRE_LOCK(&cap->lock);
618 releaseCapability_(cap, false);
619 RELEASE_LOCK(&cap->lock);
620 }
621
622 void
releaseAndWakeupCapability(Capability * cap USED_IF_THREADS)623 releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
624 {
625 ACQUIRE_LOCK(&cap->lock);
626 releaseCapability_(cap, true);
627 RELEASE_LOCK(&cap->lock);
628 }
629
630 static void
enqueueWorker(Capability * cap USED_IF_THREADS)631 enqueueWorker (Capability* cap USED_IF_THREADS)
632 {
633 Task *task;
634
635 task = cap->running_task;
636
637 // If the Task is stopped, we shouldn't be yielding, we should
638 // be just exiting.
639 ASSERT(!task->stopped);
640 ASSERT(task->worker);
641
642 if (cap->n_spare_workers < MAX_SPARE_WORKERS)
643 {
644 task->next = cap->spare_workers;
645 cap->spare_workers = task;
646 cap->n_spare_workers++;
647 }
648 else
649 {
650 debugTrace(DEBUG_sched, "%d spare workers already, exiting",
651 cap->n_spare_workers);
652 releaseCapability_(cap,false);
653 // hold the lock until after workerTaskStop; c.f. scheduleWorker()
654 workerTaskStop(task);
655 RELEASE_LOCK(&cap->lock);
656 shutdownThread();
657 }
658 }
659
660 #endif
661
662 /*
663 * Note [Benign data race due to work-pushing]
664 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
665 *
666 * #17276 points out a tricky data race (noticed by ThreadSanitizer) between
667 * waitForWorkerCapability and schedulePushWork. In short, schedulePushWork
668 * works as follows:
669 *
670 * 1. collect the set of all idle capabilities, take cap->lock of each.
671 *
672 * 2. sort through each TSO on the calling capability's run queue and push
673 * some to idle capabilities. This may (if the TSO is a bound thread)
674 * involve setting tso->bound->task->cap despite not holding
675 * tso->bound->task->lock.
676 *
677 * 3. release cap->lock of all idle capabilities.
678 *
679 * Now, step 2 is in principle safe since the capability of the caller of
680 * schedulePushWork *owns* the TSO and therefore the Task to which it is bound.
681 * Furthermore, step 3 ensures that the write in step (2) will be visible to
682 * any core which starts execution of the previously-idle capability.
683 *
684 * However, this argument doesn't quite work for waitForWorkerCapability, which
685 * reads task->cap *without* first owning the capability which owns `task`.
686 * For this reason, we check again whether the task has been migrated to
687 * another capability after taking task->cap->lock. See Note [migrated bound
688 * threads] above.
689 *
690 */
691
692 /* ----------------------------------------------------------------------------
693 * waitForWorkerCapability(task)
694 *
695 * waits to be given a Capability, and then returns the Capability. The task
696 * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
697 * ------------------------------------------------------------------------- */
698
699 #if defined(THREADED_RTS)
700
waitForWorkerCapability(Task * task)701 static Capability * waitForWorkerCapability (Task *task)
702 {
703 Capability *cap;
704
705 for (;;) {
706 ACQUIRE_LOCK(&task->lock);
707 // task->lock held, cap->lock not held
708 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
709 // The happens-after matches the happens-before in
710 // schedulePushWork, which does owns 'task' when it sets 'task->cap'.
711 TSAN_ANNOTATE_HAPPENS_AFTER(&task->cap);
712 cap = task->cap;
713
714 // See Note [Benign data race due to work-pushing].
715 TSAN_ANNOTATE_BENIGN_RACE(&task->cap, "we will double-check this below");
716 task->wakeup = false;
717 RELEASE_LOCK(&task->lock);
718
719 debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
720
721 ACQUIRE_LOCK(&cap->lock);
722 if (cap->running_task != NULL) {
723 debugTrace(DEBUG_sched,
724 "capability %d is owned by another task", cap->no);
725 RELEASE_LOCK(&cap->lock);
726 continue;
727 }
728
729 if (task->cap != cap) {
730 // see Note [migrated bound threads]
731 debugTrace(DEBUG_sched,
732 "task has been migrated to cap %d", task->cap->no);
733 RELEASE_LOCK(&cap->lock);
734 continue;
735 }
736
737 if (task->incall->tso == NULL) {
738 ASSERT(cap->spare_workers != NULL);
739 // if we're not at the front of the queue, release it
740 // again. This is unlikely to happen.
741 if (cap->spare_workers != task) {
742 giveCapabilityToTask(cap,cap->spare_workers);
743 RELEASE_LOCK(&cap->lock);
744 continue;
745 }
746 cap->spare_workers = task->next;
747 task->next = NULL;
748 cap->n_spare_workers--;
749 }
750
751 RELAXED_STORE(&cap->running_task, task);
752 RELEASE_LOCK(&cap->lock);
753 break;
754 }
755
756 return cap;
757 }
758
759 #endif /* THREADED_RTS */
760
761 /* ----------------------------------------------------------------------------
762 * waitForReturnCapability (Task *task)
763 *
764 * The Task should be on the cap->returning_tasks queue of a Capability. This
765 * function waits for the Task to be woken up, and returns the Capability that
766 * it was woken up on.
767 *
768 * ------------------------------------------------------------------------- */
769
770 #if defined(THREADED_RTS)
771
waitForReturnCapability(Task * task)772 static Capability * waitForReturnCapability (Task *task)
773 {
774 Capability *cap;
775
776 for (;;) {
777 ACQUIRE_LOCK(&task->lock);
778 // task->lock held, cap->lock not held
779 if (!task->wakeup) waitCondition(&task->cond, &task->lock);
780 cap = task->cap;
781 task->wakeup = false;
782 RELEASE_LOCK(&task->lock);
783
784 // now check whether we should wake up...
785 ACQUIRE_LOCK(&cap->lock);
786 if (cap->running_task == NULL) {
787 if (cap->returning_tasks_hd != task) {
788 giveCapabilityToTask(cap,cap->returning_tasks_hd);
789 RELEASE_LOCK(&cap->lock);
790 continue;
791 }
792 RELAXED_STORE(&cap->running_task, task);
793 popReturningTask(cap);
794 RELEASE_LOCK(&cap->lock);
795 break;
796 }
797 RELEASE_LOCK(&cap->lock);
798 }
799
800 return cap;
801 }
802
803 #endif /* THREADED_RTS */
804
805 #if defined(THREADED_RTS)
806
807 /* ----------------------------------------------------------------------------
808 * capability_is_busy (Capability *cap)
809 *
810 * A predicate for determining whether the given Capability is currently running
811 * a Task. This can be safely called without holding the Capability's lock
812 * although the result may be inaccurate if it races with the scheduler.
813 * Consequently there is a TSAN suppression for it.
814 *
815 * ------------------------------------------------------------------------- */
capability_is_busy(const Capability * cap)816 static bool capability_is_busy(const Capability * cap)
817 {
818 return RELAXED_LOAD(&cap->running_task) != NULL;
819 }
820
821
822 /* ----------------------------------------------------------------------------
823 * find_capability_for_task
824 *
825 * Given a Task, identify a reasonable Capability to run it on. We try to
826 * find an idle capability if possible.
827 *
828 * ------------------------------------------------------------------------- */
829
find_capability_for_task(const Task * task)830 static Capability * find_capability_for_task(const Task * task)
831 {
832 if (task->preferred_capability != -1) {
833 // Does the task have a preferred capability? If so, use it
834 return capabilities[task->preferred_capability %
835 enabled_capabilities];
836 } else {
837 // Try last_free_capability first
838 Capability *cap = RELAXED_LOAD(&last_free_capability[task->node]);
839
840 // N.B. There is a data race here since we are loking at
841 // cap->running_task without taking cap->lock. However, this is
842 // benign since the result is merely guiding our search heuristic.
843 if (!capability_is_busy(cap)) {
844 return cap;
845 } else {
846 // The last_free_capability is already busy, search for a free
847 // capability on this node.
848 for (uint32_t i = task->node; i < enabled_capabilities;
849 i += n_numa_nodes) {
850 // visits all the capabilities on this node, because
851 // cap[i]->node == i % n_numa_nodes
852 if (!RELAXED_LOAD(&capabilities[i]->running_task)) {
853 return capabilities[i];
854 }
855 }
856
857 // Can't find a free one, use last_free_capability.
858 return RELAXED_LOAD(&last_free_capability[task->node]);
859 }
860 }
861 }
862 #endif /* THREADED_RTS */
863
864 /* ----------------------------------------------------------------------------
865 * waitForCapability (Capability **pCap, Task *task)
866 *
867 * Purpose: when an OS thread returns from an external call,
868 * it calls waitForCapability() (via Schedule.resumeThread())
869 * to wait for permission to enter the RTS & communicate the
870 * result of the external call back to the Haskell thread that
871 * made it.
872 *
873 * pCap is strictly an output.
874 *
875 * ------------------------------------------------------------------------- */
876
waitForCapability(Capability ** pCap,Task * task)877 void waitForCapability (Capability **pCap, Task *task)
878 {
879 #if !defined(THREADED_RTS)
880
881 MainCapability.running_task = task;
882 task->cap = &MainCapability;
883 *pCap = &MainCapability;
884
885 #else
886 Capability *cap = *pCap;
887
888 if (cap == NULL) {
889 cap = find_capability_for_task(task);
890
891 // record the Capability as the one this Task is now assocated with.
892 task->cap = cap;
893 } else {
894 ASSERT(task->cap == cap);
895 }
896
897 debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
898
899 ACQUIRE_LOCK(&cap->lock);
900 if (!cap->running_task) {
901 // It's free; just grab it
902 RELAXED_STORE(&cap->running_task, task);
903 RELEASE_LOCK(&cap->lock);
904 } else {
905 newReturningTask(cap,task);
906 RELEASE_LOCK(&cap->lock);
907 cap = waitForReturnCapability(task);
908 }
909
910 #if defined(PROFILING)
911 cap->r.rCCCS = CCS_SYSTEM;
912 #endif
913
914 ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
915
916 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
917
918 *pCap = cap;
919 #endif
920 }
921
922 /* ----------------------------------------------------------------------------
923 * yieldCapability
924 *
925 * Give up the Capability, and return when we have it again. This is called
926 * when either we know that the Capability should be given to another Task, or
927 * there is nothing to do right now. One of the following is true:
928 *
929 * - The current Task is a worker, and there's a bound thread at the head of
930 * the run queue (or vice versa)
931 *
932 * - The run queue is empty. We'll be woken up again when there's work to
933 * do.
934 *
935 * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
936 * We should become a GC worker for a while.
937 *
938 * - Another Task is trying to acquire all the Capabilities (pending_sync !=
939 * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
940 * setNumCapabilities. We should give up the Capability temporarily.
941 *
942 * When yieldCapability returns *pCap will have been updated to the new
943 * capability held by the caller.
944 *
945 * ------------------------------------------------------------------------- */
946
947 #if defined(THREADED_RTS)
948
949 /* See Note [GC livelock] in Schedule.c for why we have gcAllowed
950 and return the bool */
951 bool /* Did we GC? */
yieldCapability(Capability ** pCap,Task * task,bool gcAllowed)952 yieldCapability (Capability** pCap, Task *task, bool gcAllowed)
953 {
954 Capability *cap = *pCap;
955
956 if (gcAllowed)
957 {
958 PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
959
960 if (sync) {
961 switch (sync->type) {
962 case SYNC_GC_PAR:
963 if (! sync->idle[cap->no]) {
964 traceEventGcStart(cap);
965 gcWorkerThread(cap);
966 traceEventGcEnd(cap);
967 traceSparkCounters(cap);
968 // See Note [migrated bound threads 2]
969 if (task->cap == cap) {
970 return true;
971 }
972 }
973 break;
974
975 case SYNC_FLUSH_UPD_REM_SET:
976 debugTrace(DEBUG_nonmoving_gc, "Flushing update remembered set blocks...");
977 break;
978
979 default:
980 break;
981 }
982 }
983 }
984
985 debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
986
987 // We must now release the capability and wait to be woken up again.
988 task->wakeup = false;
989
990 ACQUIRE_LOCK(&cap->lock);
991
992 // If this is a worker thread, put it on the spare_workers queue
993 if (isWorker(task)) {
994 enqueueWorker(cap);
995 }
996
997 releaseCapability_(cap, false);
998
999 if (isWorker(task) || isBoundTask(task)) {
1000 RELEASE_LOCK(&cap->lock);
1001 cap = waitForWorkerCapability(task);
1002 } else {
1003 // Not a worker Task, or a bound Task. The only way we can be woken up
1004 // again is to put ourselves on the returning_tasks queue, so that's
1005 // what we do. We still hold cap->lock at this point
1006 // The Task waiting for this Capability does not have it
1007 // yet, so we can be sure to be woken up later. (see #10545)
1008 newReturningTask(cap,task);
1009 RELEASE_LOCK(&cap->lock);
1010 cap = waitForReturnCapability(task);
1011 }
1012
1013 debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
1014 ASSERT(cap->running_task == task);
1015
1016 #if defined(PROFILING)
1017 cap->r.rCCCS = CCS_SYSTEM;
1018 #endif
1019
1020 *pCap = cap;
1021
1022 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1023
1024 return false;
1025 }
1026
1027 #endif /* THREADED_RTS */
1028
1029 /*
1030 * Note [migrated bound threads]
1031 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1032 *
1033 * There's a tricky case where:
1034 * - cap A is running an unbound thread T1
1035 * - there is a bound thread T2 at the head of the run queue on cap A
1036 * - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
1037 * - T1 returns quickly grabbing A again (T2 is still waking up on A)
1038 * - T1 blocks, the scheduler migrates T2 to cap B
1039 * - the task bound to T2 wakes up on cap B
1040 *
1041 * We take advantage of the following invariant:
1042 *
1043 * - A bound thread can only be migrated by the holder of the
1044 * Capability on which the bound thread currently lives. So, if we
1045 * hold Capability C, and task->cap == C, then task cannot be
1046 * migrated under our feet.
1047 *
1048 * See also Note [Benign data race due to work-pushing].
1049 *
1050 *
1051 * Note [migrated bound threads 2]
1052 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1053 *
1054 * Second tricky case;
1055 * - A bound Task becomes a GC thread
1056 * - scheduleDoGC() migrates the thread belonging to this Task,
1057 * because the Capability it is on is disabled
1058 * - after GC, gcWorkerThread() returns, but now we are
1059 * holding a Capability that is not the same as task->cap
1060 * - Hence we must check for this case and immediately give up the
1061 * cap we hold.
1062 *
1063 */
1064
1065 /* ----------------------------------------------------------------------------
1066 * prodCapability
1067 *
1068 * If a Capability is currently idle, wake up a Task on it. Used to
1069 * get every Capability into the GC.
1070 * ------------------------------------------------------------------------- */
1071
1072 #if defined(THREADED_RTS)
1073
1074 void
prodCapability(Capability * cap,Task * task)1075 prodCapability (Capability *cap, Task *task)
1076 {
1077 ACQUIRE_LOCK(&cap->lock);
1078 if (!cap->running_task) {
1079 cap->running_task = task;
1080 releaseCapability_(cap,true);
1081 }
1082 RELEASE_LOCK(&cap->lock);
1083 }
1084
1085 #endif /* THREADED_RTS */
1086
1087 /* ----------------------------------------------------------------------------
1088 * tryGrabCapability
1089 *
1090 * Attempt to gain control of a Capability if it is free.
1091 *
1092 * ------------------------------------------------------------------------- */
1093
1094 #if defined(THREADED_RTS)
1095
1096 bool
tryGrabCapability(Capability * cap,Task * task)1097 tryGrabCapability (Capability *cap, Task *task)
1098 {
1099 int r;
1100 // N.B. This is benign as we will check again after taking the lock.
1101 TSAN_ANNOTATE_BENIGN_RACE(&cap->running_task, "tryGrabCapability (cap->running_task)");
1102 if (RELAXED_LOAD(&cap->running_task) != NULL) return false;
1103
1104 r = TRY_ACQUIRE_LOCK(&cap->lock);
1105 if (r != 0) return false;
1106 if (cap->running_task != NULL) {
1107 RELEASE_LOCK(&cap->lock);
1108 return false;
1109 }
1110 task->cap = cap;
1111 RELAXED_STORE(&cap->running_task, task);
1112 RELEASE_LOCK(&cap->lock);
1113 return true;
1114 }
1115
1116
1117 #endif /* THREADED_RTS */
1118
1119 /* ----------------------------------------------------------------------------
1120 * shutdownCapability
1121 *
1122 * At shutdown time, we want to let everything exit as cleanly as
1123 * possible. For each capability, we let its run queue drain, and
1124 * allow the workers to stop.
1125 *
1126 * This function should be called when interrupted and
1127 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
1128 * will exit the scheduler and call taskStop(), and any bound thread
1129 * that wakes up will return to its caller. Runnable threads are
1130 * killed.
1131 *
1132 * ------------------------------------------------------------------------- */
1133
1134 static void
shutdownCapability(Capability * cap USED_IF_THREADS,Task * task USED_IF_THREADS,bool safe USED_IF_THREADS)1135 shutdownCapability (Capability *cap USED_IF_THREADS,
1136 Task *task USED_IF_THREADS,
1137 bool safe USED_IF_THREADS)
1138 {
1139 #if defined(THREADED_RTS)
1140 uint32_t i;
1141
1142 task->cap = cap;
1143
1144 // Loop indefinitely until all the workers have exited and there
1145 // are no Haskell threads left. We used to bail out after 50
1146 // iterations of this loop, but that occasionally left a worker
1147 // running which caused problems later (the closeMutex() below
1148 // isn't safe, for one thing).
1149
1150 for (i = 0; /* i < 50 */; i++) {
1151 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
1152
1153 debugTrace(DEBUG_sched,
1154 "shutting down capability %d, attempt %d", cap->no, i);
1155 ACQUIRE_LOCK(&cap->lock);
1156 if (cap->running_task) {
1157 RELEASE_LOCK(&cap->lock);
1158 debugTrace(DEBUG_sched, "not owner, yielding");
1159 yieldThread();
1160 continue;
1161 }
1162 cap->running_task = task;
1163
1164 if (cap->spare_workers) {
1165 // Look for workers that have died without removing
1166 // themselves from the list; this could happen if the OS
1167 // summarily killed the thread, for example. This
1168 // actually happens on Windows when the system is
1169 // terminating the program, and the RTS is running in a
1170 // DLL.
1171 Task *t, *prev;
1172 prev = NULL;
1173 for (t = cap->spare_workers; t != NULL; t = t->next) {
1174 if (!osThreadIsAlive(t->id)) {
1175 debugTrace(DEBUG_sched,
1176 "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
1177 cap->n_spare_workers--;
1178 if (!prev) {
1179 cap->spare_workers = t->next;
1180 } else {
1181 prev->next = t->next;
1182 }
1183 prev = t;
1184 }
1185 }
1186 }
1187
1188 if (!emptyRunQueue(cap) || cap->spare_workers) {
1189 debugTrace(DEBUG_sched,
1190 "runnable threads or workers still alive, yielding");
1191 releaseCapability_(cap,false); // this will wake up a worker
1192 RELEASE_LOCK(&cap->lock);
1193 yieldThread();
1194 continue;
1195 }
1196
1197 // If "safe", then busy-wait for any threads currently doing
1198 // foreign calls. If we're about to unload this DLL, for
1199 // example, we need to be sure that there are no OS threads
1200 // that will try to return to code that has been unloaded.
1201 // We can be a bit more relaxed when this is a standalone
1202 // program that is about to terminate, and let safe=false.
1203 if (cap->suspended_ccalls && safe) {
1204 debugTrace(DEBUG_sched,
1205 "thread(s) are involved in foreign calls, yielding");
1206 cap->running_task = NULL;
1207 RELEASE_LOCK(&cap->lock);
1208 // The IO manager thread might have been slow to start up,
1209 // so the first attempt to kill it might not have
1210 // succeeded. Just in case, try again - the kill message
1211 // will only be sent once.
1212 //
1213 // To reproduce this deadlock: run ffi002(threaded1)
1214 // repeatedly on a loaded machine.
1215 ioManagerDie();
1216 yieldThread();
1217 continue;
1218 }
1219
1220 traceSparkCounters(cap);
1221 RELEASE_LOCK(&cap->lock);
1222 break;
1223 }
1224 // we now have the Capability, its run queue and spare workers
1225 // list are both empty.
1226
1227 // ToDo: we can't drop this mutex, because there might still be
1228 // threads performing foreign calls that will eventually try to
1229 // return via resumeThread() and attempt to grab cap->lock.
1230 // closeMutex(&cap->lock);
1231 #endif
1232 }
1233
1234 void
shutdownCapabilities(Task * task,bool safe)1235 shutdownCapabilities(Task *task, bool safe)
1236 {
1237 uint32_t i;
1238 for (i=0; i < n_capabilities; i++) {
1239 ASSERT(task->incall->tso == NULL);
1240 shutdownCapability(capabilities[i], task, safe);
1241 }
1242 #if defined(THREADED_RTS)
1243 ASSERT(checkSparkCountInvariant());
1244 #endif
1245 }
1246
1247 static void
freeCapability(Capability * cap)1248 freeCapability (Capability *cap)
1249 {
1250 stgFree(cap->mut_lists);
1251 stgFree(cap->saved_mut_lists);
1252 #if defined(THREADED_RTS)
1253 freeSparkPool(cap->sparks);
1254 #endif
1255 traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
1256 traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
1257 traceCapDelete(cap);
1258 }
1259
1260 void
freeCapabilities(void)1261 freeCapabilities (void)
1262 {
1263 #if defined(THREADED_RTS)
1264 uint32_t i;
1265 for (i=0; i < n_capabilities; i++) {
1266 freeCapability(capabilities[i]);
1267 if (capabilities[i] != &MainCapability)
1268 stgFree(capabilities[i]);
1269 }
1270 #else
1271 freeCapability(&MainCapability);
1272 #endif
1273 stgFree(capabilities);
1274 traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
1275 traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1276 }
1277
1278 /* ---------------------------------------------------------------------------
1279 Mark everything directly reachable from the Capabilities. When
1280 using multiple GC threads, each GC thread marks all Capabilities
1281 for which (c `mod` n == 0), for Capability c and thread n.
1282 ------------------------------------------------------------------------ */
1283
1284 void
markCapability(evac_fn evac,void * user,Capability * cap,bool no_mark_sparks USED_IF_THREADS)1285 markCapability (evac_fn evac, void *user, Capability *cap,
1286 bool no_mark_sparks USED_IF_THREADS)
1287 {
1288 InCall *incall;
1289
1290 // Each GC thread is responsible for following roots from the
1291 // Capability of the same number. There will usually be the same
1292 // or fewer Capabilities as GC threads, but just in case there
1293 // are more, we mark every Capability whose number is the GC
1294 // thread's index plus a multiple of the number of GC threads.
1295 evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
1296 evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1297 #if defined(THREADED_RTS)
1298 evac(user, (StgClosure **)(void *)&cap->inbox);
1299 #endif
1300 for (incall = cap->suspended_ccalls; incall != NULL;
1301 incall=incall->next) {
1302 evac(user, (StgClosure **)(void *)&incall->suspended_tso);
1303 }
1304
1305 #if defined(THREADED_RTS)
1306 if (!no_mark_sparks) {
1307 traverseSparkQueue (evac, user, cap);
1308 }
1309 #endif
1310
1311 // Free STM structures for this Capability
1312 stmPreGCHook(cap);
1313 }
1314
1315 void
markCapabilities(evac_fn evac,void * user)1316 markCapabilities (evac_fn evac, void *user)
1317 {
1318 uint32_t n;
1319 for (n = 0; n < n_capabilities; n++) {
1320 markCapability(evac, user, capabilities[n], false);
1321 }
1322 }
1323
1324 #if defined(THREADED_RTS)
checkSparkCountInvariant(void)1325 bool checkSparkCountInvariant (void)
1326 {
1327 SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
1328 StgWord64 remaining = 0;
1329 uint32_t i;
1330
1331 for (i = 0; i < n_capabilities; i++) {
1332 sparks.created += capabilities[i]->spark_stats.created;
1333 sparks.dud += capabilities[i]->spark_stats.dud;
1334 sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
1335 sparks.converted += capabilities[i]->spark_stats.converted;
1336 sparks.gcd += capabilities[i]->spark_stats.gcd;
1337 sparks.fizzled += capabilities[i]->spark_stats.fizzled;
1338 remaining += sparkPoolSize(capabilities[i]->sparks);
1339 }
1340
1341 /* The invariant is
1342 * created = converted + remaining + gcd + fizzled
1343 */
1344 debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
1345 "(created == converted + remaining + gcd + fizzled)",
1346 sparks.created, sparks.converted, remaining,
1347 sparks.gcd, sparks.fizzled);
1348
1349 return (sparks.created ==
1350 sparks.converted + remaining + sparks.gcd + sparks.fizzled);
1351
1352 }
1353 #endif
1354
1355 #if !defined(mingw32_HOST_OS)
1356 void
setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS,int fd USED_IF_THREADS)1357 setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
1358 #if defined(THREADED_RTS)
1359 if (cap_no < n_capabilities) {
1360 RELAXED_STORE(&capabilities[cap_no]->io_manager_control_wr_fd, fd);
1361 } else {
1362 errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
1363 }
1364 #endif
1365 }
1366 #endif
1367