1 #include "fs.h" 2 #include <assert.h> 3 4 static void worker_get_work(void); 5 static void *worker_main(void *arg); 6 static void worker_sleep(void); 7 static void worker_wake(struct worker_thread *worker); 8 static mthread_attr_t tattr; 9 10 #ifdef MKCOVERAGE 11 # define TH_STACKSIZE (40 * 1024) 12 #else 13 # define TH_STACKSIZE (28 * 1024) 14 #endif 15 16 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]) 17 18 /*===========================================================================* 19 * worker_init * 20 *===========================================================================*/ 21 void worker_init(void) 22 { 23 /* Initialize worker thread */ 24 struct worker_thread *wp; 25 int i; 26 27 if (mthread_attr_init(&tattr) != 0) 28 panic("failed to initialize attribute"); 29 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0) 30 panic("couldn't set default thread stack size"); 31 if (mthread_attr_setdetachstate(&tattr, MTHREAD_CREATE_DETACHED) != 0) 32 panic("couldn't set default thread detach state"); 33 pending = 0; 34 35 for (i = 0; i < NR_WTHREADS; i++) { 36 wp = &workers[i]; 37 38 wp->w_fp = NULL; /* Mark not in use */ 39 wp->w_next = NULL; 40 wp->w_task = NONE; 41 if (mutex_init(&wp->w_event_mutex, NULL) != 0) 42 panic("failed to initialize mutex"); 43 if (cond_init(&wp->w_event, NULL) != 0) 44 panic("failed to initialize conditional variable"); 45 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0) 46 panic("unable to start thread"); 47 } 48 49 /* Let all threads get ready to accept work. */ 50 yield_all(); 51 } 52 53 /*===========================================================================* 54 * worker_get_work * 55 *===========================================================================*/ 56 static void worker_get_work(void) 57 { 58 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the 59 * latter case wait for new work to come in. 60 */ 61 struct fproc *rfp; 62 63 /* Do we have queued work to do? */ 64 if (pending > 0) { 65 /* Find pending work */ 66 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 67 if (rfp->fp_flags & FP_PENDING) { 68 self->w_fp = rfp; 69 rfp->fp_worker = self; 70 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 71 pending--; 72 assert(pending >= 0); 73 return; 74 } 75 } 76 panic("Pending work inconsistency"); 77 } 78 79 /* Wait for work to come to us */ 80 worker_sleep(); 81 } 82 83 /*===========================================================================* 84 * worker_available * 85 *===========================================================================*/ 86 int worker_available(void) 87 { 88 int busy, i; 89 90 busy = 0; 91 for (i = 0; i < NR_WTHREADS; i++) { 92 if (workers[i].w_fp != NULL) 93 busy++; 94 } 95 96 return(NR_WTHREADS - busy); 97 } 98 99 /*===========================================================================* 100 * worker_main * 101 *===========================================================================*/ 102 static void *worker_main(void *arg) 103 { 104 /* Worker thread main loop */ 105 106 self = (struct worker_thread *) arg; 107 ASSERTW(self); 108 109 while(TRUE) { 110 worker_get_work(); 111 112 fp = self->w_fp; 113 assert(fp->fp_worker == self); 114 115 /* Lock the process. */ 116 lock_proc(fp); 117 118 /* The following two blocks could be run in a loop until both the 119 * conditions are no longer met, but it is currently impossible that 120 * more normal work is present after postponed PM work has been done. 121 */ 122 123 /* Perform normal work, if any. */ 124 if (fp->fp_func != NULL) { 125 self->w_m_in = fp->fp_msg; 126 err_code = OK; 127 128 fp->fp_func(); 129 130 fp->fp_func = NULL; /* deliberately unset AFTER the call */ 131 } 132 133 /* Perform postponed PM work, if any. */ 134 if (fp->fp_flags & FP_PM_WORK) { 135 self->w_m_in = fp->fp_pm_msg; 136 137 service_pm_postponed(); 138 139 fp->fp_flags &= ~FP_PM_WORK; 140 } 141 142 /* Perform cleanup actions. */ 143 thread_cleanup(); 144 145 unlock_proc(fp); 146 147 fp->fp_worker = NULL; 148 self->w_fp = NULL; 149 } 150 151 return(NULL); /* Unreachable */ 152 } 153 154 /*===========================================================================* 155 * worker_can_start * 156 *===========================================================================*/ 157 int worker_can_start(struct fproc *rfp) 158 { 159 /* Return whether normal (non-PM) work can be started for the given process. 160 * This function is used to serialize invocation of "special" procedures, and 161 * not entirely safe for other cases, as explained in the comments below. 162 */ 163 int is_pending, is_active, has_normal_work, has_pm_work; 164 165 is_pending = (rfp->fp_flags & FP_PENDING); 166 is_active = (rfp->fp_worker != NULL); 167 has_normal_work = (rfp->fp_func != NULL); 168 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 169 170 /* If there is no work scheduled for the process, we can start work. */ 171 if (!is_pending && !is_active) return TRUE; 172 173 /* If there is already normal work scheduled for the process, we cannot add 174 * more, since we support only one normal job per process. 175 */ 176 if (has_normal_work) return FALSE; 177 178 /* If this process has pending PM work but no normal work, we can add the 179 * normal work for execution before the worker will start. 180 */ 181 if (is_pending) return TRUE; 182 183 /* However, if a worker is active for PM work, we cannot add normal work 184 * either, because the work will not be considered. For this reason, we can 185 * not use this function for processes that can possibly get postponed PM 186 * work. It is still safe for core system processes, though. 187 */ 188 return FALSE; 189 } 190 191 /*===========================================================================* 192 * worker_try_activate * 193 *===========================================================================*/ 194 static void worker_try_activate(struct fproc *rfp, int use_spare) 195 { 196 /* See if we can wake up a thread to do the work scheduled for the given 197 * process. If not, mark the process as having pending work for later. 198 */ 199 int i, available, needed; 200 struct worker_thread *worker; 201 202 /* Use the last available thread only if requested. Otherwise, leave at least 203 * one spare thread for deadlock resolution. 204 */ 205 needed = use_spare ? 1 : 2; 206 207 worker = NULL; 208 for (i = available = 0; i < NR_WTHREADS; i++) { 209 if (workers[i].w_fp == NULL) { 210 if (worker == NULL) 211 worker = &workers[i]; 212 if (++available >= needed) 213 break; 214 } 215 } 216 217 if (available >= needed) { 218 assert(worker != NULL); 219 rfp->fp_worker = worker; 220 worker->w_fp = rfp; 221 worker_wake(worker); 222 } else { 223 rfp->fp_flags |= FP_PENDING; 224 pending++; 225 } 226 } 227 228 /*===========================================================================* 229 * worker_start * 230 *===========================================================================*/ 231 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, 232 int use_spare) 233 { 234 /* Schedule work to be done by a worker thread. The work is bound to the given 235 * process. If a function pointer is given, the work is considered normal work, 236 * and the function will be called to handle it. If the function pointer is 237 * NULL, the work is considered postponed PM work, and service_pm_postponed 238 * will be called to handle it. The input message will be a copy of the given 239 * message. Optionally, the last spare (deadlock-resolving) thread may be used 240 * to execute the work immediately. 241 */ 242 int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work; 243 244 assert(rfp != NULL); 245 246 is_pm_work = (func == NULL); 247 is_pending = (rfp->fp_flags & FP_PENDING); 248 is_active = (rfp->fp_worker != NULL); 249 has_normal_work = (rfp->fp_func != NULL); 250 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 251 252 /* Sanity checks. If any of these trigger, someone messed up badly! */ 253 if (is_pending || is_active) { 254 if (is_pending && is_active) 255 panic("work cannot be both pending and active"); 256 257 /* The process cannot make more than one call at once. */ 258 if (!is_pm_work && has_normal_work) 259 panic("process has two calls (%x, %x)", 260 rfp->fp_msg.m_type, m_ptr->m_type); 261 262 /* PM will not send more than one job per process to us at once. */ 263 if (is_pm_work && has_pm_work) 264 panic("got two calls from PM (%x, %x)", 265 rfp->fp_pm_msg.m_type, m_ptr->m_type); 266 267 /* Despite PM's sys_delay_stop() system, it is possible that normal 268 * work (in particular, do_pending_pipe) arrives after postponed PM 269 * work has been scheduled for execution, so we don't check for that. 270 */ 271 #if 0 272 printf("VFS: adding %s work to %s thread\n", 273 is_pm_work ? "PM" : "normal", 274 is_pending ? "pending" : "active"); 275 #endif 276 } else { 277 /* Some cleanup step forgotten somewhere? */ 278 if (has_normal_work || has_pm_work) 279 panic("worker administration error"); 280 } 281 282 /* Save the work to be performed. */ 283 if (!is_pm_work) { 284 rfp->fp_msg = *m_ptr; 285 rfp->fp_func = func; 286 } else { 287 rfp->fp_pm_msg = *m_ptr; 288 rfp->fp_flags |= FP_PM_WORK; 289 } 290 291 /* If we have not only added to existing work, go look for a free thread. 292 * Note that we won't be using the spare thread for normal work if there is 293 * already PM work pending, but that situation will never occur in practice. 294 */ 295 if (!is_pending && !is_active) 296 worker_try_activate(rfp, use_spare); 297 } 298 299 /*===========================================================================* 300 * worker_sleep * 301 *===========================================================================*/ 302 static void worker_sleep(void) 303 { 304 struct worker_thread *worker = self; 305 ASSERTW(worker); 306 if (mutex_lock(&worker->w_event_mutex) != 0) 307 panic("unable to lock event mutex"); 308 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0) 309 panic("could not wait on conditional variable"); 310 if (mutex_unlock(&worker->w_event_mutex) != 0) 311 panic("unable to unlock event mutex"); 312 self = worker; 313 } 314 315 /*===========================================================================* 316 * worker_wake * 317 *===========================================================================*/ 318 static void worker_wake(struct worker_thread *worker) 319 { 320 /* Signal a worker to wake up */ 321 ASSERTW(worker); 322 if (mutex_lock(&worker->w_event_mutex) != 0) 323 panic("unable to lock event mutex"); 324 if (cond_signal(&worker->w_event) != 0) 325 panic("unable to signal conditional variable"); 326 if (mutex_unlock(&worker->w_event_mutex) != 0) 327 panic("unable to unlock event mutex"); 328 } 329 330 /*===========================================================================* 331 * worker_suspend * 332 *===========================================================================*/ 333 struct worker_thread *worker_suspend(void) 334 { 335 /* Suspend the current thread, saving certain thread variables. Return a 336 * pointer to the thread's worker structure for later resumption. 337 */ 338 339 ASSERTW(self); 340 assert(fp != NULL); 341 assert(self->w_fp == fp); 342 assert(fp->fp_worker == self); 343 344 self->w_err_code = err_code; 345 346 return self; 347 } 348 349 /*===========================================================================* 350 * worker_resume * 351 *===========================================================================*/ 352 void worker_resume(struct worker_thread *org_self) 353 { 354 /* Resume the current thread after suspension, restoring thread variables. */ 355 356 ASSERTW(org_self); 357 358 self = org_self; 359 360 fp = self->w_fp; 361 assert(fp != NULL); 362 363 err_code = self->w_err_code; 364 } 365 366 /*===========================================================================* 367 * worker_wait * 368 *===========================================================================*/ 369 void worker_wait(void) 370 { 371 /* Put the current thread to sleep until woken up by the main thread. */ 372 373 (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */ 374 375 worker_sleep(); 376 377 /* We continue here after waking up */ 378 worker_resume(self); 379 assert(self->w_next == NULL); 380 } 381 382 /*===========================================================================* 383 * worker_signal * 384 *===========================================================================*/ 385 void worker_signal(struct worker_thread *worker) 386 { 387 ASSERTW(worker); /* Make sure we have a valid thread */ 388 worker_wake(worker); 389 } 390 391 /*===========================================================================* 392 * worker_stop * 393 *===========================================================================*/ 394 void worker_stop(struct worker_thread *worker) 395 { 396 ASSERTW(worker); /* Make sure we have a valid thread */ 397 if (worker->w_task != NONE) { 398 /* This thread is communicating with a driver or file server */ 399 if (worker->w_drv_sendrec != NULL) { /* Driver */ 400 worker->w_drv_sendrec->m_type = EIO; 401 } else if (worker->w_sendrec != NULL) { /* FS */ 402 worker->w_sendrec->m_type = EIO; 403 } else { 404 panic("reply storage consistency error"); /* Oh dear */ 405 } 406 } else { 407 /* This shouldn't happen at all... */ 408 printf("VFS: stopping worker not blocked on any task?\n"); 409 util_stacktrace(); 410 } 411 worker_wake(worker); 412 } 413 414 /*===========================================================================* 415 * worker_stop_by_endpt * 416 *===========================================================================*/ 417 void worker_stop_by_endpt(endpoint_t proc_e) 418 { 419 struct worker_thread *worker; 420 int i; 421 422 if (proc_e == NONE) return; 423 424 for (i = 0; i < NR_WTHREADS; i++) { 425 worker = &workers[i]; 426 if (worker->w_fp != NULL && worker->w_task == proc_e) 427 worker_stop(worker); 428 } 429 } 430 431 /*===========================================================================* 432 * worker_get * 433 *===========================================================================*/ 434 struct worker_thread *worker_get(thread_t worker_tid) 435 { 436 int i; 437 438 for (i = 0; i < NR_WTHREADS; i++) 439 if (workers[i].w_tid == worker_tid) 440 return(&workers[i]); 441 442 return(NULL); 443 } 444 445 /*===========================================================================* 446 * worker_set_proc * 447 *===========================================================================*/ 448 void worker_set_proc(struct fproc *rfp) 449 { 450 /* Perform an incredibly ugly action that completely violates the threading 451 * model: change the current working thread's process context to another 452 * process. The caller is expected to hold the lock to both the calling and the 453 * target process, and neither process is expected to continue regular 454 * operation when done. This code is here *only* and *strictly* for the reboot 455 * code, and *must not* be used for anything else. 456 */ 457 458 if (fp == rfp) return; 459 460 if (rfp->fp_worker != NULL) 461 panic("worker_set_proc: target process not idle"); 462 463 fp->fp_worker = NULL; 464 465 fp = rfp; 466 467 self->w_fp = rfp; 468 fp->fp_worker = self; 469 } 470