xref: /minix/minix/servers/vfs/worker.c (revision 83133719)
1 #include "fs.h"
2 #include <assert.h>
3 
4 static void worker_get_work(void);
5 static void *worker_main(void *arg);
6 static void worker_sleep(void);
7 static void worker_wake(struct worker_thread *worker);
8 static mthread_attr_t tattr;
9 
10 #ifdef MKCOVERAGE
11 # define TH_STACKSIZE (40 * 1024)
12 #else
13 # define TH_STACKSIZE (28 * 1024)
14 #endif
15 
16 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS])
17 
18 /*===========================================================================*
19  *				worker_init				     *
20  *===========================================================================*/
21 void worker_init(void)
22 {
23 /* Initialize worker thread */
24   struct worker_thread *wp;
25   int i;
26 
27   if (mthread_attr_init(&tattr) != 0)
28 	panic("failed to initialize attribute");
29   if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0)
30 	panic("couldn't set default thread stack size");
31   if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0)
32 	panic("couldn't set default thread detach state");
33   pending = 0;
34 
35   for (i = 0; i < NR_WTHREADS; i++) {
36 	wp = &workers[i];
37 
38 	wp->w_fp = NULL;		/* Mark not in use */
39 	wp->w_next = NULL;
40 	wp->w_task = NONE;
41 	if (mutex_init(&wp->w_event_mutex, NULL) != 0)
42 		panic("failed to initialize mutex");
43 	if (cond_init(&wp->w_event, NULL) != 0)
44 		panic("failed to initialize conditional variable");
45 	if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0)
46 		panic("unable to start thread");
47   }
48 
49   /* Let all threads get ready to accept work. */
50   yield_all();
51 }
52 
53 /*===========================================================================*
54  *				worker_get_work				     *
55  *===========================================================================*/
56 static void worker_get_work(void)
57 {
58 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the
59  * latter case wait for new work to come in.
60  */
61   struct fproc *rfp;
62 
63   /* Do we have queued work to do? */
64   if (pending > 0) {
65 	/* Find pending work */
66 	for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) {
67 		if (rfp->fp_flags & FP_PENDING) {
68 			self->w_fp = rfp;
69 			rfp->fp_worker = self;
70 			rfp->fp_flags &= ~FP_PENDING; /* No longer pending */
71 			pending--;
72 			assert(pending >= 0);
73 			return;
74 		}
75 	}
76 	panic("Pending work inconsistency");
77   }
78 
79   /* Wait for work to come to us */
80   worker_sleep();
81 }
82 
83 /*===========================================================================*
84  *				worker_available			     *
85  *===========================================================================*/
86 int worker_available(void)
87 {
88   int busy, i;
89 
90   busy = 0;
91   for (i = 0; i < NR_WTHREADS; i++) {
92 	if (workers[i].w_fp != NULL)
93 		busy++;
94   }
95 
96   return(NR_WTHREADS - busy);
97 }
98 
99 /*===========================================================================*
100  *				worker_main				     *
101  *===========================================================================*/
102 static void *worker_main(void *arg)
103 {
104 /* Worker thread main loop */
105 
106   self = (struct worker_thread *) arg;
107   ASSERTW(self);
108 
109   while(TRUE) {
110 	worker_get_work();
111 
112 	fp = self->w_fp;
113 	assert(fp->fp_worker == self);
114 
115 	/* Lock the process. */
116 	lock_proc(fp);
117 
118 	/* The following two blocks could be run in a loop until both the
119 	 * conditions are no longer met, but it is currently impossible that
120 	 * more normal work is present after postponed PM work has been done.
121 	 */
122 
123 	/* Perform normal work, if any. */
124 	if (fp->fp_func != NULL) {
125 		self->w_m_in = fp->fp_msg;
126 		err_code = OK;
127 
128 		fp->fp_func();
129 
130 		fp->fp_func = NULL;	/* deliberately unset AFTER the call */
131 	}
132 
133 	/* Perform postponed PM work, if any. */
134 	if (fp->fp_flags & FP_PM_WORK) {
135 		self->w_m_in = fp->fp_pm_msg;
136 
137 		service_pm_postponed();
138 
139 		fp->fp_flags &= ~FP_PM_WORK;
140 	}
141 
142 	/* Perform cleanup actions. */
143 	thread_cleanup();
144 
145 	unlock_proc(fp);
146 
147 	fp->fp_worker = NULL;
148 	self->w_fp = NULL;
149   }
150 
151   return(NULL);	/* Unreachable */
152 }
153 
154 /*===========================================================================*
155  *				worker_can_start			     *
156  *===========================================================================*/
157 int worker_can_start(struct fproc *rfp)
158 {
159 /* Return whether normal (non-PM) work can be started for the given process.
160  * This function is used to serialize invocation of "special" procedures, and
161  * not entirely safe for other cases, as explained in the comments below.
162  */
163   int is_pending, is_active, has_normal_work, has_pm_work;
164 
165   is_pending = (rfp->fp_flags & FP_PENDING);
166   is_active = (rfp->fp_worker != NULL);
167   has_normal_work = (rfp->fp_func != NULL);
168   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
169 
170   /* If there is no work scheduled for the process, we can start work. */
171   if (!is_pending && !is_active) return TRUE;
172 
173   /* If there is already normal work scheduled for the process, we cannot add
174    * more, since we support only one normal job per process.
175    */
176   if (has_normal_work) return FALSE;
177 
178   /* If this process has pending PM work but no normal work, we can add the
179    * normal work for execution before the worker will start.
180    */
181   if (is_pending) return TRUE;
182 
183   /* However, if a worker is active for PM work, we cannot add normal work
184    * either, because the work will not be considered. For this reason, we can
185    * not use this function for processes that can possibly get postponed PM
186    * work. It is still safe for core system processes, though.
187    */
188   return FALSE;
189 }
190 
191 /*===========================================================================*
192  *				worker_try_activate			     *
193  *===========================================================================*/
194 static void worker_try_activate(struct fproc *rfp, int use_spare)
195 {
196 /* See if we can wake up a thread to do the work scheduled for the given
197  * process. If not, mark the process as having pending work for later.
198  */
199   int i, available, needed;
200   struct worker_thread *worker;
201 
202   /* Use the last available thread only if requested. Otherwise, leave at least
203    * one spare thread for deadlock resolution.
204    */
205   needed = use_spare ? 1 : 2;
206 
207   worker = NULL;
208   for (i = available = 0; i < NR_WTHREADS; i++) {
209 	if (workers[i].w_fp == NULL) {
210 		if (worker == NULL)
211 			worker = &workers[i];
212 		if (++available >= needed)
213 			break;
214 	}
215   }
216 
217   if (available >= needed) {
218 	assert(worker != NULL);
219 	rfp->fp_worker = worker;
220 	worker->w_fp = rfp;
221 	worker_wake(worker);
222   } else {
223 	rfp->fp_flags |= FP_PENDING;
224 	pending++;
225   }
226 }
227 
228 /*===========================================================================*
229  *				worker_start				     *
230  *===========================================================================*/
231 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr,
232 	int use_spare)
233 {
234 /* Schedule work to be done by a worker thread. The work is bound to the given
235  * process. If a function pointer is given, the work is considered normal work,
236  * and the function will be called to handle it. If the function pointer is
237  * NULL, the work is considered postponed PM work, and service_pm_postponed
238  * will be called to handle it. The input message will be a copy of the given
239  * message. Optionally, the last spare (deadlock-resolving) thread may be used
240  * to execute the work immediately.
241  */
242   int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work;
243 
244   assert(rfp != NULL);
245 
246   is_pm_work = (func == NULL);
247   is_pending = (rfp->fp_flags & FP_PENDING);
248   is_active = (rfp->fp_worker != NULL);
249   has_normal_work = (rfp->fp_func != NULL);
250   has_pm_work = (rfp->fp_flags & FP_PM_WORK);
251 
252   /* Sanity checks. If any of these trigger, someone messed up badly! */
253   if (is_pending || is_active) {
254 	if (is_pending && is_active)
255 		panic("work cannot be both pending and active");
256 
257 	/* The process cannot make more than one call at once. */
258 	if (!is_pm_work && has_normal_work)
259 		panic("process has two calls (%x, %x)",
260 			rfp->fp_msg.m_type, m_ptr->m_type);
261 
262 	/* PM will not send more than one job per process to us at once. */
263 	if (is_pm_work && has_pm_work)
264 		panic("got two calls from PM (%x, %x)",
265 			rfp->fp_pm_msg.m_type, m_ptr->m_type);
266 
267 	/* Despite PM's sys_delay_stop() system, it is possible that normal
268 	 * work (in particular, do_pending_pipe) arrives after postponed PM
269 	 * work has been scheduled for execution, so we don't check for that.
270 	 */
271 #if 0
272 	printf("VFS: adding %s work to %s thread\n",
273 		is_pm_work ? "PM" : "normal",
274 		is_pending ? "pending" : "active");
275 #endif
276   } else {
277 	/* Some cleanup step forgotten somewhere? */
278 	if (has_normal_work || has_pm_work)
279 		panic("worker administration error");
280   }
281 
282   /* Save the work to be performed. */
283   if (!is_pm_work) {
284 	rfp->fp_msg = *m_ptr;
285 	rfp->fp_func = func;
286   } else {
287 	rfp->fp_pm_msg = *m_ptr;
288 	rfp->fp_flags |= FP_PM_WORK;
289   }
290 
291   /* If we have not only added to existing work, go look for a free thread.
292    * Note that we won't be using the spare thread for normal work if there is
293    * already PM work pending, but that situation will never occur in practice.
294    */
295   if (!is_pending && !is_active)
296 	worker_try_activate(rfp, use_spare);
297 }
298 
299 /*===========================================================================*
300  *				worker_sleep				     *
301  *===========================================================================*/
302 static void worker_sleep(void)
303 {
304   struct worker_thread *worker = self;
305   ASSERTW(worker);
306   if (mutex_lock(&worker->w_event_mutex) != 0)
307 	panic("unable to lock event mutex");
308   if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0)
309 	panic("could not wait on conditional variable");
310   if (mutex_unlock(&worker->w_event_mutex) != 0)
311 	panic("unable to unlock event mutex");
312   self = worker;
313 }
314 
315 /*===========================================================================*
316  *				worker_wake				     *
317  *===========================================================================*/
318 static void worker_wake(struct worker_thread *worker)
319 {
320 /* Signal a worker to wake up */
321   ASSERTW(worker);
322   if (mutex_lock(&worker->w_event_mutex) != 0)
323 	panic("unable to lock event mutex");
324   if (cond_signal(&worker->w_event) != 0)
325 	panic("unable to signal conditional variable");
326   if (mutex_unlock(&worker->w_event_mutex) != 0)
327 	panic("unable to unlock event mutex");
328 }
329 
330 /*===========================================================================*
331  *				worker_suspend				     *
332  *===========================================================================*/
333 struct worker_thread *worker_suspend(void)
334 {
335 /* Suspend the current thread, saving certain thread variables. Return a
336  * pointer to the thread's worker structure for later resumption.
337  */
338 
339   ASSERTW(self);
340   assert(fp != NULL);
341   assert(self->w_fp == fp);
342   assert(fp->fp_worker == self);
343 
344   self->w_err_code = err_code;
345 
346   return self;
347 }
348 
349 /*===========================================================================*
350  *				worker_resume				     *
351  *===========================================================================*/
352 void worker_resume(struct worker_thread *org_self)
353 {
354 /* Resume the current thread after suspension, restoring thread variables. */
355 
356   ASSERTW(org_self);
357 
358   self = org_self;
359 
360   fp = self->w_fp;
361   assert(fp != NULL);
362 
363   err_code = self->w_err_code;
364 }
365 
366 /*===========================================================================*
367  *				worker_wait				     *
368  *===========================================================================*/
369 void worker_wait(void)
370 {
371 /* Put the current thread to sleep until woken up by the main thread. */
372 
373   (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */
374 
375   worker_sleep();
376 
377   /* We continue here after waking up */
378   worker_resume(self);
379   assert(self->w_next == NULL);
380 }
381 
382 /*===========================================================================*
383  *				worker_signal				     *
384  *===========================================================================*/
385 void worker_signal(struct worker_thread *worker)
386 {
387   ASSERTW(worker);		/* Make sure we have a valid thread */
388   worker_wake(worker);
389 }
390 
391 /*===========================================================================*
392  *				worker_stop				     *
393  *===========================================================================*/
394 void worker_stop(struct worker_thread *worker)
395 {
396   ASSERTW(worker);		/* Make sure we have a valid thread */
397   if (worker->w_task != NONE) {
398 	/* This thread is communicating with a driver or file server */
399 	if (worker->w_drv_sendrec != NULL) {			/* Driver */
400 		worker->w_drv_sendrec->m_type = EIO;
401 	} else if (worker->w_sendrec != NULL) {		/* FS */
402 		worker->w_sendrec->m_type = EIO;
403 	} else {
404 		panic("reply storage consistency error");	/* Oh dear */
405 	}
406   } else {
407 	/* This shouldn't happen at all... */
408 	printf("VFS: stopping worker not blocked on any task?\n");
409 	util_stacktrace();
410   }
411   worker_wake(worker);
412 }
413 
414 /*===========================================================================*
415  *				worker_stop_by_endpt			     *
416  *===========================================================================*/
417 void worker_stop_by_endpt(endpoint_t proc_e)
418 {
419   struct worker_thread *worker;
420   int i;
421 
422   if (proc_e == NONE) return;
423 
424   for (i = 0; i < NR_WTHREADS; i++) {
425 	worker = &workers[i];
426 	if (worker->w_fp != NULL && worker->w_task == proc_e)
427 		worker_stop(worker);
428   }
429 }
430 
431 /*===========================================================================*
432  *				worker_get				     *
433  *===========================================================================*/
434 struct worker_thread *worker_get(thread_t worker_tid)
435 {
436   int i;
437 
438   for (i = 0; i < NR_WTHREADS; i++)
439 	if (workers[i].w_tid == worker_tid)
440 		return(&workers[i]);
441 
442   return(NULL);
443 }
444 
445 /*===========================================================================*
446  *				worker_set_proc				     *
447  *===========================================================================*/
448 void worker_set_proc(struct fproc *rfp)
449 {
450 /* Perform an incredibly ugly action that completely violates the threading
451  * model: change the current working thread's process context to another
452  * process. The caller is expected to hold the lock to both the calling and the
453  * target process, and neither process is expected to continue regular
454  * operation when done. This code is here *only* and *strictly* for the reboot
455  * code, and *must not* be used for anything else.
456  */
457 
458   if (fp == rfp) return;
459 
460   if (rfp->fp_worker != NULL)
461 	panic("worker_set_proc: target process not idle");
462 
463   fp->fp_worker = NULL;
464 
465   fp = rfp;
466 
467   self->w_fp = rfp;
468   fp->fp_worker = self;
469 }
470