1 /* -----------------------------------------------------------------------------
2 *
3 * (c) The GHC Team 2001-2005
4 *
5 * The task manager subsystem. Tasks execute STG code, with this
6 * module providing the API which the Scheduler uses to control their
7 * creation and destruction.
8 *
9 * -------------------------------------------------------------------------*/
10
11 #include "PosixSource.h"
12 #include "Rts.h"
13
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21
22 #include <string.h>
23
24 #if HAVE_SIGNAL_H
25 #include <signal.h>
26 #endif
27
28 // Task lists and global counters.
29 // Locks required: all_tasks_mutex.
30 Task *all_tasks = NULL;
31
32 // current number of bound tasks + total number of worker tasks.
33 // Locks required: all_tasks_mutex.
34 uint32_t taskCount;
35 uint32_t workerCount;
36 uint32_t currentWorkerCount;
37 uint32_t peakWorkerCount;
38
39 static int tasksInitialized = 0;
40
41 static void freeTask (Task *task);
42 static Task * newTask (bool);
43
44 #if defined(THREADED_RTS)
45 Mutex all_tasks_mutex;
46 #endif
47
48 /* -----------------------------------------------------------------------------
49 * Remembering the current thread's Task
50 * -------------------------------------------------------------------------- */
51
52 // A thread-local-storage key that we can use to get access to the
53 // current thread's Task structure.
54 #if defined(THREADED_RTS)
55 # if defined(MYTASK_USE_TLV)
56 __thread Task *my_task;
57 # else
58 ThreadLocalKey currentTaskKey;
59 # endif
60 #else
61 Task *my_task;
62 #endif
63
64 /* -----------------------------------------------------------------------------
65 * Rest of the Task API
66 * -------------------------------------------------------------------------- */
67
68 void
initTaskManager(void)69 initTaskManager (void)
70 {
71 if (!tasksInitialized) {
72 taskCount = 0;
73 workerCount = 0;
74 currentWorkerCount = 0;
75 peakWorkerCount = 0;
76 tasksInitialized = 1;
77 #if defined(THREADED_RTS)
78 #if !defined(MYTASK_USE_TLV)
79 newThreadLocalKey(¤tTaskKey);
80 #endif
81 initMutex(&all_tasks_mutex);
82 #endif
83 }
84 }
85
86 uint32_t
freeTaskManager(void)87 freeTaskManager (void)
88 {
89 Task *task, *next;
90 uint32_t tasksRunning = 0;
91
92 ACQUIRE_LOCK(&all_tasks_mutex);
93
94 for (task = all_tasks; task != NULL; task = next) {
95 next = task->all_next;
96 if (task->stopped) {
97 freeTask(task);
98 } else {
99 tasksRunning++;
100 }
101 }
102
103 debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
104 tasksRunning);
105
106 all_tasks = NULL;
107
108 RELEASE_LOCK(&all_tasks_mutex);
109
110 #if defined(THREADED_RTS)
111 closeMutex(&all_tasks_mutex);
112 #if !defined(MYTASK_USE_TLV)
113 freeThreadLocalKey(¤tTaskKey);
114 #endif
115 #endif
116
117 tasksInitialized = 0;
118
119 return tasksRunning;
120 }
121
getTask(void)122 Task* getTask (void)
123 {
124 Task *task;
125
126 task = myTask();
127 if (task != NULL) {
128 return task;
129 } else {
130 task = newTask(false);
131 #if defined(THREADED_RTS)
132 task->id = osThreadId();
133 #endif
134 setMyTask(task);
135 return task;
136 }
137 }
138
freeMyTask(void)139 void freeMyTask (void)
140 {
141 Task *task;
142
143 task = myTask();
144
145 if (task == NULL) return;
146
147 if (!task->stopped) {
148 errorBelch(
149 "freeMyTask() called, but the Task is not stopped; ignoring");
150 return;
151 }
152
153 if (task->worker) {
154 errorBelch("freeMyTask() called on a worker; ignoring");
155 return;
156 }
157
158 ACQUIRE_LOCK(&all_tasks_mutex);
159
160 if (task->all_prev) {
161 task->all_prev->all_next = task->all_next;
162 } else {
163 all_tasks = task->all_next;
164 }
165 if (task->all_next) {
166 task->all_next->all_prev = task->all_prev;
167 }
168
169 taskCount--;
170
171 RELEASE_LOCK(&all_tasks_mutex);
172
173 freeTask(task);
174 setMyTask(NULL);
175 }
176
177 static void
freeTask(Task * task)178 freeTask (Task *task)
179 {
180 InCall *incall, *next;
181
182 // We only free resources if the Task is not in use. A
183 // Task may still be in use if we have a Haskell thread in
184 // a foreign call while we are attempting to shut down the
185 // RTS (see conc059).
186 #if defined(THREADED_RTS)
187 closeCondition(&task->cond);
188 closeMutex(&task->lock);
189 #endif
190
191 for (incall = task->incall; incall != NULL; incall = next) {
192 next = incall->prev_stack;
193 stgFree(incall);
194 }
195 for (incall = task->spare_incalls; incall != NULL; incall = next) {
196 next = incall->next;
197 stgFree(incall);
198 }
199
200 stgFree(task);
201 }
202
203 /* Must take all_tasks_mutex */
204 static Task*
newTask(bool worker)205 newTask (bool worker)
206 {
207 Task *task;
208
209 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
210 task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
211
212 task->cap = NULL;
213 task->worker = worker;
214 task->stopped = true;
215 task->running_finalizers = false;
216 task->n_spare_incalls = 0;
217 task->spare_incalls = NULL;
218 task->incall = NULL;
219 task->preferred_capability = -1;
220
221 #if defined(THREADED_RTS)
222 initCondition(&task->cond);
223 initMutex(&task->lock);
224 task->id = 0;
225 task->wakeup = false;
226 task->node = 0;
227 #endif
228
229 task->next = NULL;
230
231 ACQUIRE_LOCK(&all_tasks_mutex);
232
233 task->all_prev = NULL;
234 task->all_next = all_tasks;
235 if (all_tasks != NULL) {
236 all_tasks->all_prev = task;
237 }
238 all_tasks = task;
239
240 taskCount++;
241 debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
242
243 if (worker) {
244 workerCount++;
245 currentWorkerCount++;
246 if (currentWorkerCount > peakWorkerCount) {
247 peakWorkerCount = currentWorkerCount;
248 }
249 }
250 RELEASE_LOCK(&all_tasks_mutex);
251
252 return task;
253 }
254
255 // avoid the spare_incalls list growing unboundedly
256 #define MAX_SPARE_INCALLS 8
257
258 static void
newInCall(Task * task)259 newInCall (Task *task)
260 {
261 InCall *incall;
262
263 if (task->spare_incalls != NULL) {
264 incall = task->spare_incalls;
265 task->spare_incalls = incall->next;
266 task->n_spare_incalls--;
267 } else {
268 incall = stgMallocBytes((sizeof(InCall)), "newInCall");
269 }
270
271 incall->tso = NULL;
272 incall->task = task;
273 incall->suspended_tso = NULL;
274 incall->suspended_cap = NULL;
275 incall->rstat = NoStatus;
276 incall->ret = NULL;
277 incall->next = NULL;
278 incall->prev = NULL;
279 incall->prev_stack = task->incall;
280 task->incall = incall;
281 }
282
283 static void
endInCall(Task * task)284 endInCall (Task *task)
285 {
286 InCall *incall;
287
288 incall = task->incall;
289 incall->tso = NULL;
290 task->incall = task->incall->prev_stack;
291
292 if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
293 stgFree(incall);
294 } else {
295 incall->next = task->spare_incalls;
296 task->spare_incalls = incall;
297 task->n_spare_incalls++;
298 }
299 }
300
301
302 Task *
newBoundTask(void)303 newBoundTask (void)
304 {
305 Task *task;
306
307 if (!tasksInitialized) {
308 errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
309 stg_exit(EXIT_FAILURE);
310 }
311
312 task = getTask();
313
314 task->stopped = false;
315
316 newInCall(task);
317 return task;
318 }
319
320 void
boundTaskExiting(Task * task)321 boundTaskExiting (Task *task)
322 {
323 #if defined(THREADED_RTS)
324 ASSERT(osThreadId() == task->id);
325 #endif
326 ASSERT(myTask() == task);
327
328 endInCall(task);
329
330 // Set task->stopped, but only if this is the last call (#4850).
331 // Remember that we might have a worker Task that makes a foreign
332 // call and then a callback, so it can transform into a bound
333 // Task for the duration of the callback.
334 if (task->incall == NULL) {
335 task->stopped = true;
336 }
337
338 debugTrace(DEBUG_sched, "task exiting");
339 }
340
341
342 #if defined(THREADED_RTS)
343 #define TASK_ID(t) (t)->id
344 #else
345 #define TASK_ID(t) (t)
346 #endif
347
348 void
discardTasksExcept(Task * keep)349 discardTasksExcept (Task *keep)
350 {
351 Task *task, *next;
352
353 // Wipe the task list, except the current Task.
354 ACQUIRE_LOCK(&all_tasks_mutex);
355 for (task = all_tasks; task != NULL; task=next) {
356 next = task->all_next;
357 if (task != keep) {
358 debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
359 #if defined(THREADED_RTS)
360 // It is possible that some of these tasks are currently blocked
361 // (in the parent process) either on their condition variable
362 // `cond` or on their mutex `lock`. If they are we may deadlock
363 // when `freeTask` attempts to call `closeCondition` or
364 // `closeMutex` (the behaviour of these functions is documented to
365 // be undefined in the case that there are threads blocked on
366 // them). To avoid this, we re-initialize both the condition
367 // variable and the mutex before calling `freeTask` (we do
368 // precisely the same for all global locks in `forkProcess`).
369 initCondition(&task->cond);
370 initMutex(&task->lock);
371 #endif
372
373 // Note that we do not traceTaskDelete here because
374 // we are not really deleting a task.
375 // The OS threads for all these tasks do not exist in
376 // this process (since we're currently
377 // in the child of a forkProcess).
378 freeTask(task);
379 }
380 }
381 all_tasks = keep;
382 keep->all_next = NULL;
383 keep->all_prev = NULL;
384 RELEASE_LOCK(&all_tasks_mutex);
385 }
386
387 #if defined(THREADED_RTS)
388
389 void
workerTaskStop(Task * task)390 workerTaskStop (Task *task)
391 {
392 DEBUG_ONLY( OSThreadId id );
393 DEBUG_ONLY( id = osThreadId() );
394 ASSERT(task->id == id);
395 ASSERT(myTask() == task);
396
397 ACQUIRE_LOCK(&all_tasks_mutex);
398
399 if (task->all_prev) {
400 task->all_prev->all_next = task->all_next;
401 } else {
402 all_tasks = task->all_next;
403 }
404 if (task->all_next) {
405 task->all_next->all_prev = task->all_prev;
406 }
407
408 currentWorkerCount--;
409
410 RELEASE_LOCK(&all_tasks_mutex);
411
412 traceTaskDelete(task);
413
414 freeTask(task);
415 }
416
417 #endif
418
419 #if defined(THREADED_RTS)
420
421 static void* OSThreadProcAttr
workerStart(Task * task)422 workerStart(Task *task)
423 {
424 Capability *cap;
425
426 // See startWorkerTask().
427 ACQUIRE_LOCK(&task->lock);
428 cap = task->cap;
429 RELEASE_LOCK(&task->lock);
430
431 if (RtsFlags.ParFlags.setAffinity) {
432 setThreadAffinity(cap->no, n_capabilities);
433 }
434 if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
435 setThreadNode(numa_map[task->node]);
436 }
437
438 // set the thread-local pointer to the Task:
439 setMyTask(task);
440
441 newInCall(task);
442
443 // Everything set up; emit the event before the worker starts working.
444 traceTaskCreate(task, cap);
445
446 scheduleWorker(cap,task);
447
448 return NULL;
449 }
450
451 /* N.B. must take all_tasks_mutex */
452 void
startWorkerTask(Capability * cap)453 startWorkerTask (Capability *cap)
454 {
455 int r;
456 OSThreadId tid;
457 Task *task;
458
459 // A worker always gets a fresh Task structure.
460 task = newTask(true);
461 task->stopped = false;
462
463 // The lock here is to synchronise with taskStart(), to make sure
464 // that we have finished setting up the Task structure before the
465 // worker thread reads it.
466 ACQUIRE_LOCK(&task->lock);
467
468 // We don't emit a task creation event here, but in workerStart,
469 // where the kernel thread id is known.
470 task->cap = cap;
471 task->node = cap->node;
472
473 // Give the capability directly to the worker; we can't let anyone
474 // else get in, because the new worker Task has nowhere to go to
475 // sleep so that it could be woken up again.
476 ASSERT_LOCK_HELD(&cap->lock);
477 RELAXED_STORE(&cap->running_task, task);
478
479 // Set the name of the worker thread to the original process name followed by
480 // ":w", but only if we're on Linux where the program_invocation_short_name
481 // global is available.
482 #if defined(linux_HOST_OS)
483 size_t procname_len = strlen(program_invocation_short_name);
484 char worker_name[16];
485 // The kernel only allocates 16 bytes for thread names, so we truncate if the
486 // original name is too long. Process names go in another table that has more
487 // capacity.
488 if (procname_len >= 13) {
489 strncpy(worker_name, program_invocation_short_name, 13);
490 strcpy(worker_name + 13, ":w");
491 } else {
492 strcpy(worker_name, program_invocation_short_name);
493 strcpy(worker_name + procname_len, ":w");
494 }
495 #else
496 char * worker_name = "ghc_worker";
497 #endif
498 r = createOSThread(&tid, worker_name, (OSThreadProc*)workerStart, task);
499 if (r != 0) {
500 sysErrorBelch("failed to create OS thread");
501 stg_exit(EXIT_FAILURE);
502 }
503
504 debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
505
506 task->id = tid;
507
508 // ok, finished with the Task struct.
509 RELEASE_LOCK(&task->lock);
510 }
511
512 void
interruptWorkerTask(Task * task)513 interruptWorkerTask (Task *task)
514 {
515 ASSERT(osThreadId() != task->id); // seppuku not allowed
516 ASSERT(task->incall->suspended_tso); // use this only for FFI calls
517 interruptOSThread(task->id);
518 debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
519 serialisableTaskId(task));
520 }
521
522 #endif /* THREADED_RTS */
523
rts_setInCallCapability(int preferred_capability,int affinity USED_IF_THREADS)524 void rts_setInCallCapability (
525 int preferred_capability,
526 int affinity USED_IF_THREADS)
527 {
528 Task *task = getTask();
529 task->preferred_capability = preferred_capability;
530
531 #if defined(THREADED_RTS)
532 if (affinity) {
533 if (RtsFlags.ParFlags.setAffinity) {
534 setThreadAffinity(preferred_capability, n_capabilities);
535 }
536 }
537 #endif
538 }
539
rts_pinThreadToNumaNode(int node USED_IF_THREADS)540 void rts_pinThreadToNumaNode (
541 int node USED_IF_THREADS)
542 {
543 #if defined(THREADED_RTS)
544 if (RtsFlags.GcFlags.numa) {
545 Task *task = getTask();
546 task->node = capNoToNumaNode(node);
547 if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
548 setThreadNode(numa_map[task->node]);
549 }
550 }
551 #endif
552 }
553
554 #if defined(DEBUG)
555
556 void printAllTasks(void);
557
558 void
printAllTasks(void)559 printAllTasks(void)
560 {
561 Task *task;
562 for (task = all_tasks; task != NULL; task = task->all_next) {
563 debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
564 task->stopped ? "stopped" : "alive");
565 if (!task->stopped) {
566 if (task->cap) {
567 debugBelch("on capability %d, ", task->cap->no);
568 }
569 if (task->incall->tso) {
570 debugBelch("bound to thread %lu",
571 (unsigned long)task->incall->tso->id);
572 } else {
573 debugBelch("worker");
574 }
575 }
576 debugBelch("\n");
577 }
578 }
579
580 #endif
581