1 /* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 2006
4 *
5 * Thread-related functionality
6 *
7 * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11
12 #include "Capability.h"
13 #include "Updates.h"
14 #include "Threads.h"
15 #include "STM.h"
16 #include "Schedule.h"
17 #include "Trace.h"
18 #include "ThreadLabels.h"
19 #include "Updates.h"
20 #include "Messages.h"
21 #include "RaiseAsync.h"
22 #include "Prelude.h"
23 #include "Printer.h"
24 #include "sm/Sanity.h"
25 #include "sm/Storage.h"
26
27 #include <string.h>
28
29 /* Next thread ID to allocate.
30 * LOCK: sched_mutex
31 */
32 static StgThreadID next_thread_id = 1;
33
34 /* The smallest stack size that makes any sense is:
35 * RESERVED_STACK_WORDS (so we can get back from the stack overflow)
36 * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame)
37 * + 1 (the closure to enter)
38 * + 1 (stg_ap_v_ret)
39 * + 1 (spare slot req'd by stg_ap_v_ret)
40 *
41 * A thread with this stack will bomb immediately with a stack
42 * overflow, which will increase its stack size.
43 */
44 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
45
46 /* ---------------------------------------------------------------------------
47 Create a new thread.
48
49 The new thread starts with the given stack size. Before the
50 scheduler can run, however, this thread needs to have a closure
51 (and possibly some arguments) pushed on its stack. See
52 pushClosure() in Schedule.h.
53
54 createGenThread() and createIOThread() (in SchedAPI.h) are
55 convenient packaged versions of this function.
56 ------------------------------------------------------------------------ */
57 StgTSO *
createThread(Capability * cap,W_ size)58 createThread(Capability *cap, W_ size)
59 {
60 StgTSO *tso;
61 StgStack *stack;
62 uint32_t stack_size;
63
64 /* sched_mutex is *not* required */
65
66 /* catch ridiculously small stack sizes */
67 if (size < MIN_STACK_WORDS + sizeofW(StgStack) + sizeofW(StgTSO)) {
68 size = MIN_STACK_WORDS + sizeofW(StgStack) + sizeofW(StgTSO);
69 }
70
71 /* The size argument we are given includes all the per-thread
72 * overheads:
73 *
74 * - The TSO structure
75 * - The STACK header
76 *
77 * This is so that we can use a nice round power of 2 for the
78 * default stack size (e.g. 1k), and if we're allocating lots of
79 * threads back-to-back they'll fit nicely in a block. It's a bit
80 * of a benchmark hack, but it doesn't do any harm.
81 */
82 stack_size = round_to_mblocks(size - sizeofW(StgTSO));
83 stack = (StgStack *)allocate(cap, stack_size);
84 TICK_ALLOC_STACK(stack_size);
85 SET_HDR(stack, &stg_STACK_info, cap->r.rCCCS);
86 stack->stack_size = stack_size - sizeofW(StgStack);
87 stack->sp = stack->stack + stack->stack_size;
88 stack->dirty = STACK_DIRTY;
89 stack->marking = 0;
90
91 tso = (StgTSO *)allocate(cap, sizeofW(StgTSO));
92 TICK_ALLOC_TSO();
93 SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
94
95 // Always start with the compiled code evaluator
96 tso->what_next = ThreadRunGHC;
97 tso->why_blocked = NotBlocked;
98 tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
99 tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
100 tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
101 tso->flags = 0;
102 tso->dirty = 1;
103 tso->_link = END_TSO_QUEUE;
104
105 tso->saved_errno = 0;
106 tso->bound = NULL;
107 tso->cap = cap;
108
109 tso->stackobj = stack;
110 tso->tot_stack_size = stack->stack_size;
111
112 ASSIGN_Int64((W_*)&(tso->alloc_limit), 0);
113
114 tso->trec = NO_TREC;
115
116 #if defined(PROFILING)
117 tso->prof.cccs = CCS_MAIN;
118 #endif
119
120 // put a stop frame on the stack
121 stack->sp -= sizeofW(StgStopFrame);
122 SET_HDR((StgClosure*)stack->sp,
123 (StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
124
125 /* Link the new thread on the global thread list.
126 */
127 ACQUIRE_LOCK(&sched_mutex);
128 tso->id = next_thread_id++; // while we have the mutex
129 tso->global_link = g0->threads;
130 /* Mutations above need no memory barrier since this lock will provide
131 * a release barrier */
132 g0->threads = tso;
133 RELEASE_LOCK(&sched_mutex);
134
135 // ToDo: report the stack size in the event?
136 traceEventCreateThread(cap, tso);
137
138 return tso;
139 }
140
141 /* ---------------------------------------------------------------------------
142 * Comparing Thread ids.
143 *
144 * This is used from STG land in the implementation of the
145 * instances of Eq/Ord for ThreadIds.
146 * ------------------------------------------------------------------------ */
147
148 int
cmp_thread(StgPtr tso1,StgPtr tso2)149 cmp_thread(StgPtr tso1, StgPtr tso2)
150 {
151 StgThreadID id1 = ((StgTSO *)tso1)->id;
152 StgThreadID id2 = ((StgTSO *)tso2)->id;
153
154 if (id1 < id2) return (-1);
155 if (id1 > id2) return 1;
156 return 0;
157 }
158
159 /* ---------------------------------------------------------------------------
160 * Fetching the ThreadID from an StgTSO.
161 *
162 * This is used in the implementation of Show for ThreadIds.
163 * ------------------------------------------------------------------------ */
164 int
rts_getThreadId(StgPtr tso)165 rts_getThreadId(StgPtr tso)
166 {
167 return ((StgTSO *)tso)->id;
168 }
169
170 /* ---------------------------------------------------------------------------
171 * Enabling and disabling the thread allocation limit
172 * ------------------------------------------------------------------------ */
173
rts_enableThreadAllocationLimit(StgPtr tso)174 void rts_enableThreadAllocationLimit(StgPtr tso)
175 {
176 ((StgTSO *)tso)->flags |= TSO_ALLOC_LIMIT;
177 }
178
rts_disableThreadAllocationLimit(StgPtr tso)179 void rts_disableThreadAllocationLimit(StgPtr tso)
180 {
181 ((StgTSO *)tso)->flags &= ~TSO_ALLOC_LIMIT;
182 }
183
184 /* -----------------------------------------------------------------------------
185 Remove a thread from a queue.
186 Fails fatally if the TSO is not on the queue.
187 -------------------------------------------------------------------------- */
188
189 bool // returns true if we modified queue
removeThreadFromQueue(Capability * cap,StgTSO ** queue,StgTSO * tso)190 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
191 {
192 StgTSO *t, *prev;
193
194 prev = NULL;
195 for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->_link) {
196 if (t == tso) {
197 if (prev) {
198 setTSOLink(cap,prev,t->_link);
199 t->_link = END_TSO_QUEUE;
200 return false;
201 } else {
202 *queue = t->_link;
203 t->_link = END_TSO_QUEUE;
204 return true;
205 }
206 }
207 }
208 barf("removeThreadFromQueue: not found");
209 }
210
211 bool // returns true if we modified head or tail
removeThreadFromDeQueue(Capability * cap,StgTSO ** head,StgTSO ** tail,StgTSO * tso)212 removeThreadFromDeQueue (Capability *cap,
213 StgTSO **head, StgTSO **tail, StgTSO *tso)
214 {
215 StgTSO *t, *prev;
216 bool flag = false;
217
218 prev = NULL;
219 for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
220 if (t == tso) {
221 if (prev) {
222 setTSOLink(cap,prev,t->_link);
223 flag = false;
224 } else {
225 *head = t->_link;
226 flag = true;
227 }
228 t->_link = END_TSO_QUEUE;
229 if (*tail == tso) {
230 if (prev) {
231 *tail = prev;
232 } else {
233 *tail = END_TSO_QUEUE;
234 }
235 return true;
236 } else {
237 return flag;
238 }
239 }
240 }
241 barf("removeThreadFromDeQueue: not found");
242 }
243
244 /* ----------------------------------------------------------------------------
245 tryWakeupThread()
246
247 Attempt to wake up a thread. tryWakeupThread is idempotent: it is
248 always safe to call it too many times, but it is not safe in
249 general to omit a call.
250
251 ------------------------------------------------------------------------- */
252
253 void
tryWakeupThread(Capability * cap,StgTSO * tso)254 tryWakeupThread (Capability *cap, StgTSO *tso)
255 {
256 traceEventThreadWakeup (cap, tso, tso->cap->no);
257
258 #if defined(THREADED_RTS)
259 if (tso->cap != cap)
260 {
261 MessageWakeup *msg;
262 msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
263 msg->tso = tso;
264 SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
265 sendMessage(cap, tso->cap, (Message*)msg);
266 debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
267 (W_)tso->id, tso->cap->no);
268 return;
269 }
270 #endif
271
272 switch (tso->why_blocked)
273 {
274 case BlockedOnMVar:
275 case BlockedOnMVarRead:
276 {
277 if (tso->_link == END_TSO_QUEUE) {
278 tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
279 goto unblock;
280 } else {
281 return;
282 }
283 }
284
285 case BlockedOnMsgThrowTo:
286 {
287 const StgInfoTable *i;
288
289 i = lockClosure(tso->block_info.closure);
290 unlockClosure(tso->block_info.closure, i);
291 if (i != &stg_MSG_NULL_info) {
292 debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
293 (W_)tso->id, tso->block_info.throwto->header.info);
294 return;
295 }
296
297 // remove the block frame from the stack
298 ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
299 tso->stackobj->sp += 3;
300 goto unblock;
301 }
302
303 case BlockedOnSTM:
304 tso->block_info.closure = &stg_STM_AWOKEN_closure;
305 goto unblock;
306
307 case BlockedOnBlackHole:
308 case ThreadMigrating:
309 goto unblock;
310
311 default:
312 // otherwise, do nothing
313 return;
314 }
315
316 unblock:
317 // just run the thread now, if the BH is not really available,
318 // we'll block again.
319 tso->why_blocked = NotBlocked;
320 appendToRunQueue(cap,tso);
321
322 // We used to set the context switch flag here, which would
323 // trigger a context switch a short time in the future (at the end
324 // of the current nursery block). The idea is that we have just
325 // woken up a thread, so we may need to load-balance and migrate
326 // threads to other CPUs. On the other hand, setting the context
327 // switch flag here unfairly penalises the current thread by
328 // yielding its time slice too early.
329 //
330 // The synthetic benchmark nofib/smp/chan can be used to show the
331 // difference quite clearly.
332
333 // cap->context_switch = 1;
334 }
335
336 /* ----------------------------------------------------------------------------
337 migrateThread
338 ------------------------------------------------------------------------- */
339
340 // Precondition: The caller must own the `from` capability.
341 void
migrateThread(Capability * from,StgTSO * tso,Capability * to)342 migrateThread (Capability *from, StgTSO *tso, Capability *to)
343 {
344 // Sadly we can't assert this since migrateThread is called from
345 // scheduleDoGC, where we implicly own all capabilities.
346 //ASSERT_FULL_CAPABILITY_INVARIANTS(from, getTask());
347
348 traceEventMigrateThread (from, tso, to->no);
349 // ThreadMigrating tells the target cap that it needs to be added to
350 // the run queue when it receives the MSG_TRY_WAKEUP.
351 tso->why_blocked = ThreadMigrating;
352 tso->cap = to;
353 tryWakeupThread(from, tso);
354 }
355
356 /* ----------------------------------------------------------------------------
357 awakenBlockedQueue
358
359 wakes up all the threads on the specified queue.
360 ------------------------------------------------------------------------- */
361
362 static void
wakeBlockingQueue(Capability * cap,StgBlockingQueue * bq)363 wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
364 {
365 MessageBlackHole *msg;
366 const StgInfoTable *i;
367
368 ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
369 bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
370
371 for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
372 msg = msg->link) {
373 i = ACQUIRE_LOAD(&msg->header.info);
374 if (i != &stg_IND_info) {
375 ASSERT(i == &stg_MSG_BLACKHOLE_info);
376 tryWakeupThread(cap,msg->tso);
377 }
378 }
379
380 // overwrite the BQ with an indirection so it will be
381 // collected at the next GC.
382 OVERWRITE_INFO(bq, &stg_IND_info);
383 }
384
385 // If we update a closure that we know we BLACKHOLE'd, and the closure
386 // no longer points to the current TSO as its owner, then there may be
387 // an orphaned BLOCKING_QUEUE closure with blocked threads attached to
388 // it. We therefore traverse the BLOCKING_QUEUEs attached to the
389 // current TSO to see if any can now be woken up.
390 void
checkBlockingQueues(Capability * cap,StgTSO * tso)391 checkBlockingQueues (Capability *cap, StgTSO *tso)
392 {
393 StgBlockingQueue *bq, *next;
394 StgClosure *p;
395
396 debugTraceCap(DEBUG_sched, cap,
397 "collision occurred; checking blocking queues for thread %ld",
398 (W_)tso->id);
399
400 for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
401 next = bq->link;
402
403 const StgInfoTable *bqinfo = ACQUIRE_LOAD(&bq->header.info);
404 if (bqinfo == &stg_IND_info) {
405 // ToDo: could short it out right here, to avoid
406 // traversing this IND multiple times.
407 continue;
408 }
409
410 // We need to always ensure we untag bh. While it might seem a
411 // sensible assumption that bh will never be tagged, the GC could
412 // shortcut the indirection and put a tagged pointer into the
413 // indirection.
414 //
415 // This blew up on aarch64-darwin with misaligned access. bh pointing
416 // to an address that always ended in 0xa. Thus on architectures that
417 // are a little less strict about alignment, this would have read a
418 // garbage pinfo, which very, very unlikely would have been equal to
419 // stg_BLACKHOLE_info. Thus while the code would have done the wrong
420 // thing the result would be the same in almost all cases. See #20093.
421 p = UNTAG_CLOSURE(bq->bh);
422 const StgInfoTable *pinfo = ACQUIRE_LOAD(&p->header.info);
423 if (pinfo != &stg_BLACKHOLE_info ||
424 ((StgInd *)p)->indirectee != (StgClosure*)bq)
425 {
426 wakeBlockingQueue(cap,bq);
427 }
428 }
429 }
430
431 /* ----------------------------------------------------------------------------
432 updateThunk
433
434 Update a thunk with a value. In order to do this, we need to know
435 which TSO owns (or is evaluating) the thunk, in case we need to
436 awaken any threads that are blocked on it.
437 ------------------------------------------------------------------------- */
438
439 void
updateThunk(Capability * cap,StgTSO * tso,StgClosure * thunk,StgClosure * val)440 updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
441 {
442 StgClosure *v;
443 StgTSO *owner;
444 const StgInfoTable *i;
445
446 i = ACQUIRE_LOAD(&thunk->header.info);
447 if (i != &stg_BLACKHOLE_info &&
448 i != &stg_CAF_BLACKHOLE_info &&
449 i != &__stg_EAGER_BLACKHOLE_info &&
450 i != &stg_WHITEHOLE_info) {
451 updateWithIndirection(cap, thunk, val);
452 return;
453 }
454
455 v = UNTAG_CLOSURE(((StgInd*)thunk)->indirectee);
456
457 updateWithIndirection(cap, thunk, val);
458
459 // sometimes the TSO is locked when we reach here, so its header
460 // might be WHITEHOLE. Hence check for the correct owner using
461 // pointer equality first.
462 if ((StgTSO*)v == tso) {
463 return;
464 }
465
466 i = ACQUIRE_LOAD(&v->header.info);
467 if (i == &stg_TSO_info) {
468 checkBlockingQueues(cap, tso);
469 return;
470 }
471
472 if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
473 i != &stg_BLOCKING_QUEUE_DIRTY_info) {
474 checkBlockingQueues(cap, tso);
475 return;
476 }
477
478 owner = ((StgBlockingQueue*)v)->owner;
479
480 if (owner != tso) {
481 checkBlockingQueues(cap, tso);
482 } else {
483 wakeBlockingQueue(cap, (StgBlockingQueue*)v);
484 }
485 }
486
487 /* ---------------------------------------------------------------------------
488 * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
489 * used by Control.Concurrent for error checking.
490 * ------------------------------------------------------------------------- */
491
492 HsBool
rtsSupportsBoundThreads(void)493 rtsSupportsBoundThreads(void)
494 {
495 #if defined(THREADED_RTS)
496 return HS_BOOL_TRUE;
497 #else
498 return HS_BOOL_FALSE;
499 #endif
500 }
501
502 /* ---------------------------------------------------------------------------
503 * isThreadBound(tso): check whether tso is bound to an OS thread.
504 * ------------------------------------------------------------------------- */
505
506 StgBool
isThreadBound(StgTSO * tso USED_IF_THREADS)507 isThreadBound(StgTSO* tso USED_IF_THREADS)
508 {
509 #if defined(THREADED_RTS)
510 return (tso->bound != NULL);
511 #endif
512 return false;
513 }
514
515 /* -----------------------------------------------------------------------------
516 Stack overflow
517
518 If the thread has reached its maximum stack size, then raise the
519 StackOverflow exception in the offending thread. Otherwise
520 relocate the TSO into a larger chunk of memory and adjust its stack
521 size appropriately.
522 -------------------------------------------------------------------------- */
523
524 void
threadStackOverflow(Capability * cap,StgTSO * tso)525 threadStackOverflow (Capability *cap, StgTSO *tso)
526 {
527 StgStack *new_stack, *old_stack;
528 StgUnderflowFrame *frame;
529 W_ chunk_size;
530
531 IF_DEBUG(sanity,checkTSO(tso));
532
533 if (RtsFlags.GcFlags.maxStkSize > 0
534 && tso->tot_stack_size >= RtsFlags.GcFlags.maxStkSize) {
535 // #3677: In a stack overflow situation, stack squeezing may
536 // reduce the stack size, but we don't know whether it has been
537 // reduced enough for the stack check to succeed if we try
538 // again. Fortunately stack squeezing is idempotent, so all we
539 // need to do is record whether *any* squeezing happened. If we
540 // are at the stack's absolute -K limit, and stack squeezing
541 // happened, then we try running the thread again. The
542 // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
543 // squeezing happened or not.
544 if (tso->flags & TSO_SQUEEZED) {
545 return;
546 }
547
548 debugTrace(DEBUG_gc,
549 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
550 (long)tso->id, tso, (long)tso->stackobj->stack_size,
551 RtsFlags.GcFlags.maxStkSize);
552 IF_DEBUG(gc,
553 /* If we're debugging, just print out the top of the stack */
554 printStackChunk(tso->stackobj->sp,
555 stg_min(tso->stackobj->stack + tso->stackobj->stack_size,
556 tso->stackobj->sp+64)));
557
558 // Note [Throw to self when masked], also #767 and #8303.
559 throwToSelf(cap, tso, (StgClosure *)stackOverflow_closure);
560 return;
561 }
562
563
564 // We also want to avoid enlarging the stack if squeezing has
565 // already released some of it. However, we don't want to get into
566 // a pathological situation where a thread has a nearly full stack
567 // (near its current limit, but not near the absolute -K limit),
568 // keeps allocating a little bit, squeezing removes a little bit,
569 // and then it runs again. So to avoid this, if we squeezed *and*
570 // there is still less than BLOCK_SIZE_W words free, then we enlarge
571 // the stack anyway.
572 //
573 // NB: This reasoning only applies if the stack has been squeezed;
574 // if no squeezing has occurred, then BLOCK_SIZE_W free space does
575 // not mean there is enough stack to run; the thread may have
576 // requested a large amount of stack (see below). If the amount
577 // we squeezed is not enough to run the thread, we'll come back
578 // here (no squeezing will have occurred and thus we'll enlarge the
579 // stack.)
580 if ((tso->flags & TSO_SQUEEZED) &&
581 ((W_)(tso->stackobj->sp - tso->stackobj->stack) >= BLOCK_SIZE_W)) {
582 return;
583 }
584
585 old_stack = tso->stackobj;
586
587 // If we used less than half of the previous stack chunk, then we
588 // must have failed a stack check for a large amount of stack. In
589 // this case we allocate a double-sized chunk to try to
590 // accommodate the large stack request. If that also fails, the
591 // next chunk will be 4x normal size, and so on.
592 //
593 // It would be better to have the mutator tell us how much stack
594 // was needed, as we do with heap allocations, but this works for
595 // now.
596 //
597 if (old_stack->sp > old_stack->stack + old_stack->stack_size / 2)
598 {
599 chunk_size = stg_max(2 * (old_stack->stack_size + sizeofW(StgStack)),
600 RtsFlags.GcFlags.stkChunkSize);
601 }
602 else
603 {
604 chunk_size = RtsFlags.GcFlags.stkChunkSize;
605 }
606
607 debugTraceCap(DEBUG_sched, cap,
608 "allocating new stack chunk of size %d bytes",
609 chunk_size * sizeof(W_));
610
611 // Charge the current thread for allocating stack. Stack usage is
612 // non-deterministic, because the chunk boundaries might vary from
613 // run to run, but accounting for this is better than not
614 // accounting for it, since a deep recursion will otherwise not be
615 // subject to allocation limits.
616 cap->r.rCurrentTSO = tso;
617 new_stack = (StgStack*) allocate(cap, chunk_size);
618 cap->r.rCurrentTSO = NULL;
619
620 SET_HDR(new_stack, &stg_STACK_info, old_stack->header.prof.ccs);
621 TICK_ALLOC_STACK(chunk_size);
622
623 new_stack->dirty = 0; // begin clean, we'll mark it dirty below
624 new_stack->marking = 0;
625 new_stack->stack_size = chunk_size - sizeofW(StgStack);
626 new_stack->sp = new_stack->stack + new_stack->stack_size;
627
628 tso->tot_stack_size += new_stack->stack_size;
629
630 {
631 StgWord *sp;
632 W_ chunk_words, size;
633
634 // find the boundary of the chunk of old stack we're going to
635 // copy to the new stack. We skip over stack frames until we
636 // reach the smaller of
637 //
638 // * the chunk buffer size (+RTS -kb)
639 // * the end of the old stack
640 //
641 for (sp = old_stack->sp;
642 sp < stg_min(old_stack->sp + RtsFlags.GcFlags.stkChunkBufferSize,
643 old_stack->stack + old_stack->stack_size); )
644 {
645 size = stack_frame_sizeW((StgClosure*)sp);
646
647 // if including this frame would exceed the size of the
648 // new stack (taking into account the underflow frame),
649 // then stop at the previous frame.
650 if (sp + size > old_stack->sp + (new_stack->stack_size -
651 sizeofW(StgUnderflowFrame))) {
652 break;
653 }
654 sp += size;
655 }
656
657 if (sp == old_stack->stack + old_stack->stack_size) {
658 //
659 // the old stack chunk is now empty, so we do *not* insert
660 // an underflow frame pointing back to it. There are two
661 // cases: either the old stack chunk was the last one, in
662 // which case it ends with a STOP_FRAME, or it is not the
663 // last one, and it already ends with an UNDERFLOW_FRAME
664 // pointing to the previous chunk. In the latter case, we
665 // will copy the UNDERFLOW_FRAME into the new stack chunk.
666 // In both cases, the old chunk will be subsequently GC'd.
667 //
668 // With the default settings, -ki1k -kb1k, this means the
669 // first stack chunk will be discarded after the first
670 // overflow, being replaced by a non-moving 32k chunk.
671 //
672 } else {
673 new_stack->sp -= sizeofW(StgUnderflowFrame);
674 frame = (StgUnderflowFrame*)new_stack->sp;
675 frame->info = &stg_stack_underflow_frame_info;
676 frame->next_chunk = old_stack;
677 }
678
679 // copy the stack chunk between tso->sp and sp to
680 // new_tso->sp + (tso->sp - sp)
681 chunk_words = sp - old_stack->sp;
682
683 memcpy(/* dest */ new_stack->sp - chunk_words,
684 /* source */ old_stack->sp,
685 /* size */ chunk_words * sizeof(W_));
686
687 old_stack->sp += chunk_words;
688 new_stack->sp -= chunk_words;
689 }
690
691 // No write barriers needed; all of the writes above are to structured
692 // owned by our capability.
693 tso->stackobj = new_stack;
694
695 // we're about to run it, better mark it dirty
696 dirty_STACK(cap, new_stack);
697
698 IF_DEBUG(sanity,checkTSO(tso));
699 // IF_DEBUG(scheduler,printTSO(new_tso));
700 }
701
702
703
704 /* ---------------------------------------------------------------------------
705 Stack underflow - called from the stg_stack_underflow_info frame
706 ------------------------------------------------------------------------ */
707
708 W_ // returns offset to the return address
threadStackUnderflow(Capability * cap,StgTSO * tso)709 threadStackUnderflow (Capability *cap, StgTSO *tso)
710 {
711 StgStack *new_stack, *old_stack;
712 StgUnderflowFrame *frame;
713 uint32_t retvals;
714
715 debugTraceCap(DEBUG_sched, cap, "stack underflow");
716
717 old_stack = tso->stackobj;
718
719 frame = (StgUnderflowFrame*)(old_stack->stack + old_stack->stack_size
720 - sizeofW(StgUnderflowFrame));
721 ASSERT(frame->info == &stg_stack_underflow_frame_info);
722
723 new_stack = (StgStack*)frame->next_chunk;
724 tso->stackobj = new_stack;
725
726 retvals = (P_)frame - old_stack->sp;
727 if (retvals != 0)
728 {
729 // we have some return values to copy to the old stack
730 if ((W_)(new_stack->sp - new_stack->stack) < retvals)
731 {
732 barf("threadStackUnderflow: not enough space for return values");
733 }
734
735 memcpy(/* dest */ new_stack->sp - retvals,
736 /* src */ old_stack->sp,
737 /* size */ retvals * sizeof(W_));
738 }
739
740 // empty the old stack. The GC may still visit this object
741 // because it is on the mutable list.
742 old_stack->sp = old_stack->stack + old_stack->stack_size;
743
744 // restore the stack parameters, and update tot_stack_size
745 tso->tot_stack_size -= old_stack->stack_size;
746
747 // we're about to run it, better mark it dirty.
748 //
749 // N.B. the nonmoving collector may mark the stack, meaning that sp must
750 // point at a valid stack frame.
751 dirty_STACK(cap, new_stack);
752 new_stack->sp -= retvals;
753
754 return retvals;
755 }
756
757 /* ----------------------------------------------------------------------------
758 Implementation of tryPutMVar#
759
760 NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
761 ------------------------------------------------------------------------- */
762
performTryPutMVar(Capability * cap,StgMVar * mvar,StgClosure * value)763 bool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
764 {
765 const StgInfoTable *info;
766 const StgInfoTable *qinfo;
767 StgMVarTSOQueue *q;
768 StgTSO *tso;
769
770 info = lockClosure((StgClosure*)mvar);
771
772 if (mvar->value != &stg_END_TSO_QUEUE_closure) {
773 #if defined(THREADED_RTS)
774 unlockClosure((StgClosure*)mvar, info);
775 #endif
776 return false;
777 }
778
779 q = mvar->head;
780 loop:
781 if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
782 /* No further takes, the MVar is now full. */
783 if (info == &stg_MVAR_CLEAN_info) {
784 dirty_MVAR(&cap->r, (StgClosure*)mvar, mvar->value);
785 }
786
787 mvar->value = value;
788 unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
789 return true;
790 }
791
792 qinfo = ACQUIRE_LOAD(&q->header.info);
793 if (qinfo == &stg_IND_info ||
794 qinfo == &stg_MSG_NULL_info) {
795 q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
796 goto loop;
797 }
798
799 // There are takeMVar(s) waiting: wake up the first one
800 tso = q->tso;
801 mvar->head = q = q->link;
802 if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
803 mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
804 } else {
805 if (info == &stg_MVAR_CLEAN_info) {
806 // Resolve #18919.
807 dirty_MVAR(&cap->r, (StgClosure*)mvar, mvar->value);
808 info = &stg_MVAR_DIRTY_info;
809 }
810 }
811
812 ASSERT(tso->block_info.closure == (StgClosure*)mvar);
813 // save why_blocked here, because waking up the thread destroys
814 // this information
815 StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked);
816
817 // actually perform the takeMVar
818 StgStack* stack = tso->stackobj;
819 RELAXED_STORE(&stack->sp[1], (W_)value);
820 RELAXED_STORE(&stack->sp[0], (W_)&stg_ret_p_info);
821
822 // indicate that the MVar operation has now completed.
823 RELEASE_STORE(&tso->_link, (StgTSO*)&stg_END_TSO_QUEUE_closure);
824
825 if ((stack->dirty & STACK_DIRTY) == 0) {
826 dirty_STACK(cap, stack);
827 }
828
829 tryWakeupThread(cap, tso);
830
831 // If it was a readMVar, then we can still do work,
832 // so loop back. (XXX: This could take a while)
833 if (why_blocked == BlockedOnMVarRead)
834 goto loop;
835
836 ASSERT(why_blocked == BlockedOnMVar);
837
838 unlockClosure((StgClosure*)mvar, info);
839
840 return true;
841 }
842
843 /* ----------------------------------------------------------------------------
844 * Debugging: why is a thread blocked
845 * ------------------------------------------------------------------------- */
846
847 #if defined(DEBUG)
848 void
printThreadBlockage(StgTSO * tso)849 printThreadBlockage(StgTSO *tso)
850 {
851 switch (tso->why_blocked) {
852 #if defined(mingw32_HOST_OS)
853 case BlockedOnDoProc:
854 debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
855 break;
856 #endif
857 #if !defined(THREADED_RTS)
858 case BlockedOnRead:
859 debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
860 break;
861 case BlockedOnWrite:
862 debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
863 break;
864 case BlockedOnDelay:
865 debugBelch("is blocked until %ld", (long)(tso->block_info.target));
866 break;
867 #endif
868 case BlockedOnMVar:
869 debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
870 break;
871 case BlockedOnMVarRead:
872 debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
873 break;
874 case BlockedOnBlackHole:
875 debugBelch("is blocked on a black hole %p",
876 ((StgBlockingQueue*)tso->block_info.bh->bh));
877 break;
878 case BlockedOnMsgThrowTo:
879 debugBelch("is blocked on a throwto message");
880 break;
881 case NotBlocked:
882 debugBelch("is not blocked");
883 break;
884 case ThreadMigrating:
885 debugBelch("is runnable, but not on the run queue");
886 break;
887 case BlockedOnCCall:
888 debugBelch("is blocked on an external call");
889 break;
890 case BlockedOnCCall_Interruptible:
891 debugBelch("is blocked on an external call (but may be interrupted)");
892 break;
893 case BlockedOnSTM:
894 debugBelch("is blocked on an STM operation");
895 break;
896 default:
897 barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%p)",
898 tso->why_blocked, tso->id, tso);
899 }
900 }
901
902
903 void
printThreadStatus(StgTSO * t)904 printThreadStatus(StgTSO *t)
905 {
906 debugBelch("\tthread %4lu @ %p ", (unsigned long)t->id, (void *)t);
907 {
908 void *label = lookupThreadLabel(t->id);
909 if (label) debugBelch("[\"%s\"] ",(char *)label);
910 }
911 switch (t->what_next) {
912 case ThreadKilled:
913 debugBelch("has been killed");
914 break;
915 case ThreadComplete:
916 debugBelch("has completed");
917 break;
918 default:
919 printThreadBlockage(t);
920 }
921 if (t->dirty) {
922 debugBelch(" (TSO_DIRTY)");
923 }
924 debugBelch("\n");
925 }
926
927 void
printAllThreads(void)928 printAllThreads(void)
929 {
930 StgTSO *t, *next;
931 uint32_t i, g;
932 Capability *cap;
933
934 debugBelch("all threads:\n");
935
936 for (i = 0; i < n_capabilities; i++) {
937 cap = capabilities[i];
938 debugBelch("threads on capability %d:\n", cap->no);
939 for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
940 printThreadStatus(t);
941 }
942 }
943
944 debugBelch("other threads:\n");
945 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
946 for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
947 if (t->why_blocked != NotBlocked) {
948 printThreadStatus(t);
949 }
950 next = t->global_link;
951 }
952 }
953 }
954
955 // useful from gdb
956 void
printThreadQueue(StgTSO * t)957 printThreadQueue(StgTSO *t)
958 {
959 uint32_t i = 0;
960 for (; t != END_TSO_QUEUE; t = t->_link) {
961 printThreadStatus(t);
962 i++;
963 }
964 debugBelch("%d threads on queue\n", i);
965 }
966
967 #endif /* DEBUG */
968