1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 2001-2005
4  *
5  * The task manager subsystem.  Tasks execute STG code, with this
6  * module providing the API which the Scheduler uses to control their
7  * creation and destruction.
8  *
9  * -------------------------------------------------------------------------*/
10 
11 #include "PosixSource.h"
12 #include "Rts.h"
13 
14 #include "RtsUtils.h"
15 #include "Task.h"
16 #include "Capability.h"
17 #include "Stats.h"
18 #include "Schedule.h"
19 #include "Hash.h"
20 #include "Trace.h"
21 
22 #include <string.h>
23 
24 #if HAVE_SIGNAL_H
25 #include <signal.h>
26 #endif
27 
28 // Task lists and global counters.
29 // Locks required: all_tasks_mutex.
30 Task *all_tasks = NULL;
31 
32 // current number of bound tasks + total number of worker tasks.
33 // Locks required: all_tasks_mutex.
34 uint32_t taskCount;
35 uint32_t workerCount;
36 uint32_t currentWorkerCount;
37 uint32_t peakWorkerCount;
38 
39 static int tasksInitialized = 0;
40 
41 static void   freeTask  (Task *task);
42 static Task * newTask   (bool);
43 
44 #if defined(THREADED_RTS)
45 Mutex all_tasks_mutex;
46 #endif
47 
48 /* -----------------------------------------------------------------------------
49  * Remembering the current thread's Task
50  * -------------------------------------------------------------------------- */
51 
52 // A thread-local-storage key that we can use to get access to the
53 // current thread's Task structure.
54 #if defined(THREADED_RTS)
55 # if defined(MYTASK_USE_TLV)
56 __thread Task *my_task;
57 # else
58 ThreadLocalKey currentTaskKey;
59 # endif
60 #else
61 Task *my_task;
62 #endif
63 
64 /* -----------------------------------------------------------------------------
65  * Rest of the Task API
66  * -------------------------------------------------------------------------- */
67 
68 void
initTaskManager(void)69 initTaskManager (void)
70 {
71     if (!tasksInitialized) {
72         taskCount = 0;
73         workerCount = 0;
74         currentWorkerCount = 0;
75         peakWorkerCount = 0;
76         tasksInitialized = 1;
77 #if defined(THREADED_RTS)
78 #if !defined(MYTASK_USE_TLV)
79         newThreadLocalKey(&currentTaskKey);
80 #endif
81         initMutex(&all_tasks_mutex);
82 #endif
83     }
84 }
85 
86 uint32_t
freeTaskManager(void)87 freeTaskManager (void)
88 {
89     Task *task, *next;
90     uint32_t tasksRunning = 0;
91 
92     ACQUIRE_LOCK(&all_tasks_mutex);
93 
94     for (task = all_tasks; task != NULL; task = next) {
95         next = task->all_next;
96         if (task->stopped) {
97             freeTask(task);
98         } else {
99             tasksRunning++;
100         }
101     }
102 
103     debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
104                tasksRunning);
105 
106     all_tasks = NULL;
107 
108     RELEASE_LOCK(&all_tasks_mutex);
109 
110 #if defined(THREADED_RTS)
111     closeMutex(&all_tasks_mutex);
112 #if !defined(MYTASK_USE_TLV)
113     freeThreadLocalKey(&currentTaskKey);
114 #endif
115 #endif
116 
117     tasksInitialized = 0;
118 
119     return tasksRunning;
120 }
121 
getTask(void)122 Task* getTask (void)
123 {
124     Task *task;
125 
126     task = myTask();
127     if (task != NULL) {
128         return task;
129     } else {
130         task = newTask(false);
131 #if defined(THREADED_RTS)
132         task->id = osThreadId();
133 #endif
134         setMyTask(task);
135         return task;
136     }
137 }
138 
freeMyTask(void)139 void freeMyTask (void)
140 {
141     Task *task;
142 
143     task = myTask();
144 
145     if (task == NULL) return;
146 
147     if (!task->stopped) {
148         errorBelch(
149             "freeMyTask() called, but the Task is not stopped; ignoring");
150         return;
151     }
152 
153     if (task->worker) {
154         errorBelch("freeMyTask() called on a worker; ignoring");
155         return;
156     }
157 
158     ACQUIRE_LOCK(&all_tasks_mutex);
159 
160     if (task->all_prev) {
161         task->all_prev->all_next = task->all_next;
162     } else {
163         all_tasks = task->all_next;
164     }
165     if (task->all_next) {
166         task->all_next->all_prev = task->all_prev;
167     }
168 
169     taskCount--;
170 
171     RELEASE_LOCK(&all_tasks_mutex);
172 
173     freeTask(task);
174     setMyTask(NULL);
175 }
176 
177 static void
freeTask(Task * task)178 freeTask (Task *task)
179 {
180     InCall *incall, *next;
181 
182     // We only free resources if the Task is not in use.  A
183     // Task may still be in use if we have a Haskell thread in
184     // a foreign call while we are attempting to shut down the
185     // RTS (see conc059).
186 #if defined(THREADED_RTS)
187     closeCondition(&task->cond);
188     closeMutex(&task->lock);
189 #endif
190 
191     for (incall = task->incall; incall != NULL; incall = next) {
192         next = incall->prev_stack;
193         stgFree(incall);
194     }
195     for (incall = task->spare_incalls; incall != NULL; incall = next) {
196         next = incall->next;
197         stgFree(incall);
198     }
199 
200     stgFree(task);
201 }
202 
203 /* Must take all_tasks_mutex */
204 static Task*
newTask(bool worker)205 newTask (bool worker)
206 {
207     Task *task;
208 
209 #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
210     task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
211 
212     task->cap           = NULL;
213     task->worker        = worker;
214     task->stopped       = true;
215     task->running_finalizers = false;
216     task->n_spare_incalls = 0;
217     task->spare_incalls = NULL;
218     task->incall        = NULL;
219     task->preferred_capability = -1;
220 
221 #if defined(THREADED_RTS)
222     initCondition(&task->cond);
223     initMutex(&task->lock);
224     task->id = 0;
225     task->wakeup = false;
226     task->node = 0;
227 #endif
228 
229     task->next = NULL;
230 
231     ACQUIRE_LOCK(&all_tasks_mutex);
232 
233     task->all_prev = NULL;
234     task->all_next = all_tasks;
235     if (all_tasks != NULL) {
236         all_tasks->all_prev = task;
237     }
238     all_tasks = task;
239 
240     taskCount++;
241     debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
242 
243     if (worker) {
244         workerCount++;
245         currentWorkerCount++;
246         if (currentWorkerCount > peakWorkerCount) {
247             peakWorkerCount = currentWorkerCount;
248         }
249     }
250     RELEASE_LOCK(&all_tasks_mutex);
251 
252     return task;
253 }
254 
255 // avoid the spare_incalls list growing unboundedly
256 #define MAX_SPARE_INCALLS 8
257 
258 static void
newInCall(Task * task)259 newInCall (Task *task)
260 {
261     InCall *incall;
262 
263     if (task->spare_incalls != NULL) {
264         incall = task->spare_incalls;
265         task->spare_incalls = incall->next;
266         task->n_spare_incalls--;
267     } else {
268         incall = stgMallocBytes((sizeof(InCall)), "newInCall");
269     }
270 
271     incall->tso = NULL;
272     incall->task = task;
273     incall->suspended_tso = NULL;
274     incall->suspended_cap = NULL;
275     incall->rstat         = NoStatus;
276     incall->ret           = NULL;
277     incall->next = NULL;
278     incall->prev = NULL;
279     incall->prev_stack = task->incall;
280     task->incall = incall;
281 }
282 
283 static void
endInCall(Task * task)284 endInCall (Task *task)
285 {
286     InCall *incall;
287 
288     incall = task->incall;
289     incall->tso = NULL;
290     task->incall = task->incall->prev_stack;
291 
292     if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
293         stgFree(incall);
294     } else {
295         incall->next = task->spare_incalls;
296         task->spare_incalls = incall;
297         task->n_spare_incalls++;
298     }
299 }
300 
301 
302 Task *
newBoundTask(void)303 newBoundTask (void)
304 {
305     Task *task;
306 
307     if (!tasksInitialized) {
308         errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
309         stg_exit(EXIT_FAILURE);
310     }
311 
312     task = getTask();
313 
314     task->stopped = false;
315 
316     newInCall(task);
317     return task;
318 }
319 
320 void
boundTaskExiting(Task * task)321 boundTaskExiting (Task *task)
322 {
323 #if defined(THREADED_RTS)
324     ASSERT(osThreadId() == task->id);
325 #endif
326     ASSERT(myTask() == task);
327 
328     endInCall(task);
329 
330     // Set task->stopped, but only if this is the last call (#4850).
331     // Remember that we might have a worker Task that makes a foreign
332     // call and then a callback, so it can transform into a bound
333     // Task for the duration of the callback.
334     if (task->incall == NULL) {
335         task->stopped = true;
336     }
337 
338     debugTrace(DEBUG_sched, "task exiting");
339 }
340 
341 
342 #if defined(THREADED_RTS)
343 #define TASK_ID(t) (t)->id
344 #else
345 #define TASK_ID(t) (t)
346 #endif
347 
348 void
discardTasksExcept(Task * keep)349 discardTasksExcept (Task *keep)
350 {
351     Task *task, *next;
352 
353     // Wipe the task list, except the current Task.
354     ACQUIRE_LOCK(&all_tasks_mutex);
355     for (task = all_tasks; task != NULL; task=next) {
356         next = task->all_next;
357         if (task != keep) {
358             debugTrace(DEBUG_sched, "discarding task %" FMT_SizeT "", (size_t)TASK_ID(task));
359 #if defined(THREADED_RTS)
360             // It is possible that some of these tasks are currently blocked
361             // (in the parent process) either on their condition variable
362             // `cond` or on their mutex `lock`. If they are we may deadlock
363             // when `freeTask` attempts to call `closeCondition` or
364             // `closeMutex` (the behaviour of these functions is documented to
365             // be undefined in the case that there are threads blocked on
366             // them). To avoid this, we re-initialize both the condition
367             // variable and the mutex before calling `freeTask` (we do
368             // precisely the same for all global locks in `forkProcess`).
369             initCondition(&task->cond);
370             initMutex(&task->lock);
371 #endif
372 
373             // Note that we do not traceTaskDelete here because
374             // we are not really deleting a task.
375             // The OS threads for all these tasks do not exist in
376             // this process (since we're currently
377             // in the child of a forkProcess).
378             freeTask(task);
379         }
380     }
381     all_tasks = keep;
382     keep->all_next = NULL;
383     keep->all_prev = NULL;
384     RELEASE_LOCK(&all_tasks_mutex);
385 }
386 
387 #if defined(THREADED_RTS)
388 
389 void
workerTaskStop(Task * task)390 workerTaskStop (Task *task)
391 {
392     DEBUG_ONLY( OSThreadId id );
393     DEBUG_ONLY( id = osThreadId() );
394     ASSERT(task->id == id);
395     ASSERT(myTask() == task);
396 
397     ACQUIRE_LOCK(&all_tasks_mutex);
398 
399     if (task->all_prev) {
400         task->all_prev->all_next = task->all_next;
401     } else {
402         all_tasks = task->all_next;
403     }
404     if (task->all_next) {
405         task->all_next->all_prev = task->all_prev;
406     }
407 
408     currentWorkerCount--;
409 
410     RELEASE_LOCK(&all_tasks_mutex);
411 
412     traceTaskDelete(task);
413 
414     freeTask(task);
415 }
416 
417 #endif
418 
419 #if defined(THREADED_RTS)
420 
421 static void* OSThreadProcAttr
workerStart(Task * task)422 workerStart(Task *task)
423 {
424     Capability *cap;
425 
426     // See startWorkerTask().
427     ACQUIRE_LOCK(&task->lock);
428     cap = task->cap;
429     RELEASE_LOCK(&task->lock);
430 
431     if (RtsFlags.ParFlags.setAffinity) {
432         setThreadAffinity(cap->no, n_capabilities);
433     }
434     if (RtsFlags.GcFlags.numa && !RtsFlags.DebugFlags.numa) {
435         setThreadNode(numa_map[task->node]);
436     }
437 
438     // set the thread-local pointer to the Task:
439     setMyTask(task);
440 
441     newInCall(task);
442 
443     // Everything set up; emit the event before the worker starts working.
444     traceTaskCreate(task, cap);
445 
446     scheduleWorker(cap,task);
447 
448     return NULL;
449 }
450 
451 /* N.B. must take all_tasks_mutex */
452 void
startWorkerTask(Capability * cap)453 startWorkerTask (Capability *cap)
454 {
455   int r;
456   OSThreadId tid;
457   Task *task;
458 
459   // A worker always gets a fresh Task structure.
460   task = newTask(true);
461   task->stopped = false;
462 
463   // The lock here is to synchronise with taskStart(), to make sure
464   // that we have finished setting up the Task structure before the
465   // worker thread reads it.
466   ACQUIRE_LOCK(&task->lock);
467 
468   // We don't emit a task creation event here, but in workerStart,
469   // where the kernel thread id is known.
470   task->cap = cap;
471   task->node = cap->node;
472 
473   // Give the capability directly to the worker; we can't let anyone
474   // else get in, because the new worker Task has nowhere to go to
475   // sleep so that it could be woken up again.
476   ASSERT_LOCK_HELD(&cap->lock);
477   RELAXED_STORE(&cap->running_task, task);
478 
479   // Set the name of the worker thread to the original process name followed by
480   // ":w", but only if we're on Linux where the program_invocation_short_name
481   // global is available.
482 #if defined(linux_HOST_OS)
483   size_t procname_len = strlen(program_invocation_short_name);
484   char worker_name[16];
485   // The kernel only allocates 16 bytes for thread names, so we truncate if the
486   // original name is too long. Process names go in another table that has more
487   // capacity.
488   if (procname_len >= 13) {
489       strncpy(worker_name, program_invocation_short_name, 13);
490       strcpy(worker_name + 13, ":w");
491   } else {
492       strcpy(worker_name, program_invocation_short_name);
493       strcpy(worker_name + procname_len, ":w");
494   }
495 #else
496   char * worker_name = "ghc_worker";
497 #endif
498   r = createOSThread(&tid, worker_name, (OSThreadProc*)workerStart, task);
499   if (r != 0) {
500     sysErrorBelch("failed to create OS thread");
501     stg_exit(EXIT_FAILURE);
502   }
503 
504   debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
505 
506   task->id = tid;
507 
508   // ok, finished with the Task struct.
509   RELEASE_LOCK(&task->lock);
510 }
511 
512 void
interruptWorkerTask(Task * task)513 interruptWorkerTask (Task *task)
514 {
515   ASSERT(osThreadId() != task->id);    // seppuku not allowed
516   ASSERT(task->incall->suspended_tso); // use this only for FFI calls
517   interruptOSThread(task->id);
518   debugTrace(DEBUG_sched, "interrupted worker task %#" FMT_HexWord64,
519              serialisableTaskId(task));
520 }
521 
522 #endif /* THREADED_RTS */
523 
rts_setInCallCapability(int preferred_capability,int affinity USED_IF_THREADS)524 void rts_setInCallCapability (
525     int preferred_capability,
526     int affinity USED_IF_THREADS)
527 {
528     Task *task = getTask();
529     task->preferred_capability = preferred_capability;
530 
531 #if defined(THREADED_RTS)
532     if (affinity) {
533         if (RtsFlags.ParFlags.setAffinity) {
534             setThreadAffinity(preferred_capability, n_capabilities);
535         }
536     }
537 #endif
538 }
539 
rts_pinThreadToNumaNode(int node USED_IF_THREADS)540 void rts_pinThreadToNumaNode (
541     int node USED_IF_THREADS)
542 {
543 #if defined(THREADED_RTS)
544     if (RtsFlags.GcFlags.numa) {
545         Task *task = getTask();
546         task->node = capNoToNumaNode(node);
547         if (!DEBUG_IS_ON || !RtsFlags.DebugFlags.numa) { // faking NUMA
548             setThreadNode(numa_map[task->node]);
549         }
550     }
551 #endif
552 }
553 
554 #if defined(DEBUG)
555 
556 void printAllTasks(void);
557 
558 void
printAllTasks(void)559 printAllTasks(void)
560 {
561     Task *task;
562     for (task = all_tasks; task != NULL; task = task->all_next) {
563         debugBelch("task %#" FMT_HexWord64 " is %s, ", serialisableTaskId(task),
564                    task->stopped ? "stopped" : "alive");
565         if (!task->stopped) {
566             if (task->cap) {
567                 debugBelch("on capability %d, ", task->cap->no);
568             }
569             if (task->incall->tso) {
570               debugBelch("bound to thread %lu",
571                          (unsigned long)task->incall->tso->id);
572             } else {
573                 debugBelch("worker");
574             }
575         }
576         debugBelch("\n");
577     }
578 }
579 
580 #endif
581