xref: /minix/minix/servers/vfs/worker.c (revision 03de4d97)
1 #include "fs.h"
2 #include <string.h>
3 #include <assert.h>
4 
5 static void *worker_main(void *arg);
6 static void worker_sleep(void);
7 static void worker_wake(struct worker_thread *worker);
8 
9 static mthread_attr_t tattr;
10 static unsigned int pending;
11 static unsigned int busy;
12 static int block_all;
13 
14 #if defined(_MINIX_MAGIC)
15 # define TH_STACKSIZE (64 * 1024)
16 #elif defined(MKCOVERAGE)
17 # define TH_STACKSIZE (40 * 1024)
18 #else
19 # define TH_STACKSIZE (28 * 1024)
20 #endif
21 
22 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
23 
24 /*===========================================================================*
25  *				worker_init				     *
26  *===========================================================================*/
27 void worker_init(void)
28 {
29 /* Initialize worker threads */
30   struct worker_thread *wp;
31   int i;
32 
33   if (mthread_attr_init(&tattr) != 0)
34 	panic("failed to initialize attribute");
35   if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
36 	panic("couldn't set default thread stack size");
37 
38   pending = 0;
39   busy = 0;
40   block_all = FALSE;
41 
42   for (i = 0; i < NR_WTHREADS; i++) {
43 	wp = &workers[i];
44 
45 	wp->w_fp = NULL;		/* Mark not in use */
46 	wp->w_next = NULL;
47 	wp->w_task = NONE;
48 	if (mutex_init(&wp->w_event_mutex, NULL) != 0)
49 		panic("failed to initialize mutex");
50 	if (cond_init(&wp->w_event, NULL) != 0)
51 		panic("failed to initialize condition variable");
52 	if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
53 		panic("unable to start thread");
54   }
55 
56   /* Let all threads get ready to accept work. */
57   worker_yield();
58 }
59 
60 /*===========================================================================*
61  *				worker_cleanup				     *
62  *===========================================================================*/
63 void worker_cleanup(void)
64 {
65 /* Clean up worker threads, reversing the actions of worker_init() such that
66  * we can safely call worker_init() again later. All worker threads are
67  * expected to be idle already. Used for live updates, because transferring
68  * the thread stacks from one version to another is currently not feasible.
69  */
70   struct worker_thread *wp;
71   int i;
72 
73   assert(worker_idle());
74 
75   /* First terminate all threads. */
76   for (i = 0; i < NR_WTHREADS; i++) {
77 	wp = &workers[i];
78 
79 	assert(wp->w_fp == NULL);
80 
81 	/* Waking up the thread with no w_fp will cause it to exit. */
82 	worker_wake(wp);
83   }
84 
85   worker_yield();
86 
87   /* Then clean up their resources. */
88   for (i = 0; i < NR_WTHREADS; i++) {
89 	wp = &workers[i];
90 
91 	if (mthread_join(wp->w_tid, NULL) != 0)
92 		panic("worker_cleanup: could not join thread %d", i);
93 	if (cond_destroy(&wp->w_event) != 0)
94 		panic("failed to destroy condition variable");
95 	if (mutex_destroy(&wp->w_event_mutex) != 0)
96 		panic("failed to destroy mutex");
97   }
98 
99   /* Finally, clean up global resources. */
100   if (mthread_attr_destroy(&tattr) != 0)
101 	panic("failed to destroy attribute");
102 
103   memset(workers, 0, sizeof(workers));
104 }
105 
106 /*===========================================================================*
107  *				worker_idle				     *
108  *===========================================================================*/
109 int worker_idle(void)
110 {
111 /* Return whether all worker threads are idle. */
112 
113   return (pending == 0 && busy == 0);
114 }
115 
116 /*===========================================================================*
117  *				worker_assign				     *
118  *===========================================================================*/
119 static void worker_assign(struct fproc *rfp)
120 {
121 /* Assign the work for the given process to a free thread. The caller must
122  * ensure that there is in fact at least one free thread.
123  */
124   struct worker_thread *worker;
125   int i;
126 
127   /* Find a free worker thread. */
128   for (i = 0; i < NR_WTHREADS; i++) {
129 	worker = &workers[i];
130 
131 	if (worker->w_fp == NULL)
132 		break;
133   }
134   assert(worker != NULL);
135 
136   /* Assign work to it. */
137   rfp->fp_worker = worker;
138   worker->w_fp = rfp;
139   busy++;
140 
141   worker_wake(worker);
142 }
143 
144 /*===========================================================================*
145  *				worker_may_do_pending			     *
146  *===========================================================================*/
147 static int worker_may_do_pending(void)
148 {
149 /* Return whether there is a free thread that may do pending work. This is true
150  * only if there is pending work at all, and there is a free non-spare thread
151  * (the spare thread is never used for pending work), and VFS is currently
152  * processing new requests at all (this may not be true during initialization).
153  */
154 
155   /* Ordered by likelihood to be false. */
156   return (pending > 0 && worker_available() > 1 && !block_all);
157 }
158 
159 /*===========================================================================*
160  *				worker_allow				     *
161  *===========================================================================*/
162 void worker_allow(int allow)
163 {
164 /* Allow or disallow workers to process new work. If disallowed, any new work
165  * will be stored as pending, even when there are free worker threads. There is
166  * no facility to stop active workers. To be used only during initialization!
167  */
168   struct fproc *rfp;
169 
170   block_all = !allow;
171 
172   if (!worker_may_do_pending())
173 	return;
174 
175   /* Assign any pending work to workers. */
176   for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
177 	if (rfp->fp_flags & FP_PENDING) {
178 		rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
179 		assert(pending > 0);
180 		pending--;
181 		worker_assign(rfp);
182 
183 		if (!worker_may_do_pending())
184 			return;
185 	}
186   }
187 }
188 
189 /*===========================================================================*
190  *				worker_get_work				     *
191  *===========================================================================*/
192 static int worker_get_work(void)
193 {
194 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
195  * latter case wait for new work to come in. Return TRUE if there is work to
196  * do, or FALSE if the current thread is requested to shut down.
197  */
198   struct fproc *rfp;
199 
200   assert(self->w_fp == NULL);
201 
202   /* Is there pending work, and should we do it? */
203   if (worker_may_do_pending()) {
204 	/* Find pending work */
205 	for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
206 		if (rfp->fp_flags & FP_PENDING) {
207 			self->w_fp = rfp;
208 			rfp->fp_worker = self;
209 			busy++;
210 			rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
211 			assert(pending > 0);
212 			pending--;
213 			return TRUE;
214 		}
215 	}
216 	panic("Pending work inconsistency");
217   }
218 
219   /* Wait for work to come to us */
220   worker_sleep();
221 
222   return (self->w_fp != NULL);
223 }
224 
225 /*===========================================================================*
226  *				worker_available			     *
227  *===========================================================================*/
228 int worker_available(void)
229 {
230 /* Return the number of threads that are available, including the spare thread.
231  */
232 
233   return(NR_WTHREADS - busy);
234 }
235 
236 /*===========================================================================*
237  *				worker_main				     *
238  *===========================================================================*/
239 static void *worker_main(void *arg)
240 {
241 /* Worker thread main loop */
242 
243   self = (struct worker_thread *) arg;
244   ASSERTW(self);
245 
246   while (worker_get_work()) {
247 
248 	fp = self->w_fp;
249 	assert(fp->fp_worker == self);
250 
251 	/* Lock the process. */
252 	lock_proc(fp);
253 
254 	/* The following two blocks could be run in a loop until both the
255 	 * conditions are no longer met, but it is currently impossible that
256 	 * more normal work is present after postponed PM work has been done.
257 	 */
258 
259 	/* Perform normal work, if any. */
260 	if (fp->fp_func != NULL) {
261 		self->w_m_in = fp->fp_msg;
262 		err_code = OK;
263 
264 		fp->fp_func();
265 
266 		fp->fp_func = NULL;	/* deliberately unset AFTER the call */
267 	}
268 
269 	/* Perform postponed PM work, if any. */
270 	if (fp->fp_flags & FP_PM_WORK) {
271 		self->w_m_in = fp->fp_pm_msg;
272 
273 		service_pm_postponed();
274 
275 		fp->fp_flags &= ~FP_PM_WORK;
276 	}
277 
278 	/* Perform cleanup actions. */
279 	thread_cleanup();
280 
281 	unlock_proc(fp);
282 
283 	fp->fp_worker = NULL;
284 	self->w_fp = NULL;
285 	assert(busy > 0);
286 	busy--;
287   }
288 
289   return(NULL);
290 }
291 
292 /*===========================================================================*
293  *				worker_can_start			     *
294  *===========================================================================*/
295 int worker_can_start(struct fproc *rfp)
296 {
297 /* Return whether normal (non-PM) work can be started for the given process.
298  * This function is used to serialize invocation of "special" procedures, and
299  * not entirely safe for other cases, as explained in the comments below.
300  */
301   int is_pending, is_active, has_normal_work;
302 
303   is_pending = (rfp->fp_flags & FP_PENDING);
304   is_active = (rfp->fp_worker != NULL);
305   has_normal_work = (rfp->fp_func != NULL);
306 
307   /* If there is no work scheduled for the process, we can start work. */
308   if (!is_pending && !is_active) return TRUE;
309 
310   /* If there is already normal work scheduled for the process, we cannot add
311    * more, since we support only one normal job per process.
312    */
313   if (has_normal_work) return FALSE;
314 
315   /* If this process has pending PM work but no normal work, we can add the
316    * normal work for execution before the worker will start.
317    */
318   if (is_pending) return TRUE;
319 
320   /* However, if a worker is active for PM work, we cannot add normal work
321    * either, because the work will not be considered. For this reason, we can
322    * not use this function for processes that can possibly get postponed PM
323    * work. It is still safe for core system processes, though.
324    */
325   return FALSE;
326 }
327 
328 /*===========================================================================*
329  *				worker_try_activate			     *
330  *===========================================================================*/
331 static void worker_try_activate(struct fproc *rfp, int use_spare)
332 {
333 /* See if we can wake up a thread to do the work scheduled for the given
334  * process. If not, mark the process as having pending work for later.
335  */
336   int needed;
337 
338   /* Use the last available thread only if requested. Otherwise, leave at least
339    * one spare thread for deadlock resolution.
340    */
341   needed = use_spare ? 1 : 2;
342 
343   /* Also make sure that doing new work is allowed at all right now, which may
344    * not be the case during VFS initialization. We do always allow callback
345    * calls, i.e., calls that may use the spare thread. The reason is that we do
346    * not support callback calls being marked as pending, so the (entirely
347    * theoretical) exception here may (entirely theoretically) avoid deadlocks.
348    */
349   if (needed <= worker_available() && (!block_all || use_spare)) {
350 	worker_assign(rfp);
351   } else {
352 	rfp->fp_flags |= FP_PENDING;
353 	pending++;
354   }
355 }
356 
357 /*===========================================================================*
358  *				worker_start				     *
359  *===========================================================================*/
360 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
361 	int use_spare)
362 {
363 /* Schedule work to be done by a worker thread. The work is bound to the given
364  * process. If a function pointer is given, the work is considered normal work,
365  * and the function will be called to handle it. If the function pointer is
366  * NULL, the work is considered postponed PM work, and service_pm_postponed
367  * will be called to handle it. The input message will be a copy of the given
368  * message. Optionally, the last spare (deadlock-resolving) thread may be used
369  * to execute the work immediately.
370  */
371   int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
372 
373   assert(rfp != NULL);
374 
375   is_pm_work = (func == NULL);
376   is_pending = (rfp->fp_flags & FP_PENDING);
377   is_active = (rfp->fp_worker != NULL);
378   has_normal_work = (rfp->fp_func != NULL);
379   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
380 
381   /* Sanity checks. If any of these trigger, someone messed up badly! */
382   if (is_pending || is_active) {
383 	if (is_pending && is_active)
384 		panic("work cannot be both pending and active");
385 
386 	/* The process cannot make more than one call at once. */
387 	if (!is_pm_work && has_normal_work)
388 		panic("process has two calls (%x, %x)",
389 			rfp->fp_msg.m_type, m_ptr->m_type);
390 
391 	/* PM will not send more than one job per process to us at once. */
392 	if (is_pm_work && has_pm_work)
393 		panic("got two calls from PM (%x, %x)",
394 			rfp->fp_pm_msg.m_type, m_ptr->m_type);
395 
396 	/* Despite PM's sys_delay_stop() system, it is possible that normal
397 	 * work (in particular, do_pending_pipe) arrives after postponed PM
398 	 * work has been scheduled for execution, so we don't check for that.
399 	 */
400 #if 0
401 	printf("VFS: adding %s work to %s thread\n",
402 		is_pm_work ? "PM" : "normal",
403 		is_pending ? "pending" : "active");
404 #endif
405   } else {
406 	/* Some cleanup step forgotten somewhere? */
407 	if (has_normal_work || has_pm_work)
408 		panic("worker administration error");
409   }
410 
411   /* Save the work to be performed. */
412   if (!is_pm_work) {
413 	rfp->fp_msg = *m_ptr;
414 	rfp->fp_func = func;
415   } else {
416 	rfp->fp_pm_msg = *m_ptr;
417 	rfp->fp_flags |= FP_PM_WORK;
418   }
419 
420   /* If we have not only added to existing work, go look for a free thread.
421    * Note that we won't be using the spare thread for normal work if there is
422    * already PM work pending, but that situation will never occur in practice.
423    */
424   if (!is_pending && !is_active)
425 	worker_try_activate(rfp, use_spare);
426 }
427 
428 /*===========================================================================*
429  *				worker_yield				     *
430  *===========================================================================*/
431 void worker_yield(void)
432 {
433 /* Yield to all worker threads. To be called from the main thread only. */
434 
435   mthread_yield_all();
436 
437   self = NULL;
438 }
439 
440 /*===========================================================================*
441  *				worker_sleep				     *
442  *===========================================================================*/
443 static void worker_sleep(void)
444 {
445   struct worker_thread *worker = self;
446   ASSERTW(worker);
447   if (mutex_lock(&worker->w_event_mutex) != 0)
448 	panic("unable to lock event mutex");
449   if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
450 	panic("could not wait on conditional variable");
451   if (mutex_unlock(&worker->w_event_mutex) != 0)
452 	panic("unable to unlock event mutex");
453   self = worker;
454 }
455 
456 /*===========================================================================*
457  *				worker_wake				     *
458  *===========================================================================*/
459 static void worker_wake(struct worker_thread *worker)
460 {
461 /* Signal a worker to wake up */
462   ASSERTW(worker);
463   if (mutex_lock(&worker->w_event_mutex) != 0)
464 	panic("unable to lock event mutex");
465   if (cond_signal(&worker->w_event) != 0)
466 	panic("unable to signal conditional variable");
467   if (mutex_unlock(&worker->w_event_mutex) != 0)
468 	panic("unable to unlock event mutex");
469 }
470 
471 /*===========================================================================*
472  *				worker_suspend				     *
473  *===========================================================================*/
474 struct worker_thread *worker_suspend(void)
475 {
476 /* Suspend the current thread, saving certain thread variables. Return a
477  * pointer to the thread's worker structure for later resumption.
478  */
479 
480   ASSERTW(self);
481   assert(fp != NULL);
482   assert(self->w_fp == fp);
483   assert(fp->fp_worker == self);
484 
485   self->w_err_code = err_code;
486 
487   return self;
488 }
489 
490 /*===========================================================================*
491  *				worker_resume				     *
492  *===========================================================================*/
493 void worker_resume(struct worker_thread *org_self)
494 {
495 /* Resume the current thread after suspension, restoring thread variables. */
496 
497   ASSERTW(org_self);
498 
499   self = org_self;
500 
501   fp = self->w_fp;
502   assert(fp != NULL);
503 
504   err_code = self->w_err_code;
505 }
506 
507 /*===========================================================================*
508  *				worker_wait				     *
509  *===========================================================================*/
510 void worker_wait(void)
511 {
512 /* Put the current thread to sleep until woken up by the main thread. */
513 
514   (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
515 
516   worker_sleep();
517 
518   /* We continue here after waking up */
519   worker_resume(self);
520   assert(self->w_next == NULL);
521 }
522 
523 /*===========================================================================*
524  *				worker_signal				     *
525  *===========================================================================*/
526 void worker_signal(struct worker_thread *worker)
527 {
528   ASSERTW(worker);		/* Make sure we have a valid thread */
529   worker_wake(worker);
530 }
531 
532 /*===========================================================================*
533  *				worker_stop				     *
534  *===========================================================================*/
535 void worker_stop(struct worker_thread *worker)
536 {
537   ASSERTW(worker);		/* Make sure we have a valid thread */
538   /* This thread is communicating with a driver or file server */
539   if (worker->w_drv_sendrec != NULL) {			/* Driver */
540 	assert(worker->w_task != NONE);
541 	worker->w_drv_sendrec->m_type = EIO;
542 	worker->w_drv_sendrec = NULL;
543   } else if (worker->w_sendrec != NULL) {		/* FS */
544 	/* worker->w_task may be NONE if the FS message was still queued */
545 	worker->w_sendrec->m_type = EIO;
546 	worker->w_sendrec = NULL;
547   } else
548 	panic("reply storage consistency error");	/* Oh dear */
549   worker_wake(worker);
550 }
551 
552 /*===========================================================================*
553  *				worker_stop_by_endpt			     *
554  *===========================================================================*/
555 void worker_stop_by_endpt(endpoint_t proc_e)
556 {
557   struct worker_thread *worker;
558   int i;
559 
560   if (proc_e == NONE) return;
561 
562   for (i = 0; i < NR_WTHREADS; i++) {
563 	worker = &workers[i];
564 	if (worker->w_fp != NULL && worker->w_task == proc_e)
565 		worker_stop(worker);
566   }
567 }
568 
569 /*===========================================================================*
570  *				worker_get				     *
571  *===========================================================================*/
572 struct worker_thread *worker_get(thread_t worker_tid)
573 {
574   int i;
575 
576   for (i = 0; i < NR_WTHREADS; i++)
577 	if (workers[i].w_tid == worker_tid)
578 		return(&workers[i]);
579 
580   return(NULL);
581 }
582 
583 /*===========================================================================*
584  *				worker_set_proc				     *
585  *===========================================================================*/
586 void worker_set_proc(struct fproc *rfp)
587 {
588 /* Perform an incredibly ugly action that completely violates the threading
589  * model: change the current working thread's process context to another
590  * process. The caller is expected to hold the lock to both the calling and the
591  * target process, and neither process is expected to continue regular
592  * operation when done. This code is here *only* and *strictly* for the reboot
593  * code, and *must not* be used for anything else.
594  */
595 
596   if (fp == rfp) return;
597 
598   if (rfp->fp_worker != NULL)
599 	panic("worker_set_proc: target process not idle");
600 
601   fp->fp_worker = NULL;
602 
603   fp = rfp;
604 
605   self->w_fp = rfp;
606   fp->fp_worker = self;
607 }
608