1 #include "fs.h" 2 #include <string.h> 3 #include <assert.h> 4 5 static void *worker_main(void *arg); 6 static void worker_sleep(void); 7 static void worker_wake(struct worker_thread *worker); 8 9 static mthread_attr_t tattr; 10 static unsigned int pending; 11 static unsigned int busy; 12 static int block_all; 13 14 #if defined(_MINIX_MAGIC) 15 # define TH_STACKSIZE (64 * 1024) 16 #elif defined(MKCOVERAGE) 17 # define TH_STACKSIZE (40 * 1024) 18 #else 19 # define TH_STACKSIZE (28 * 1024) 20 #endif 21 22 #define ASSERTW(w) assert((w) >= &workers[0] && (w) < &workers[NR_WTHREADS]) 23 24 /*===========================================================================* 25 * worker_init * 26 *===========================================================================*/ 27 void worker_init(void) 28 { 29 /* Initialize worker threads */ 30 struct worker_thread *wp; 31 int i; 32 33 if (mthread_attr_init(&tattr) != 0) 34 panic("failed to initialize attribute"); 35 if (mthread_attr_setstacksize(&tattr, TH_STACKSIZE) != 0) 36 panic("couldn't set default thread stack size"); 37 38 pending = 0; 39 busy = 0; 40 block_all = FALSE; 41 42 for (i = 0; i < NR_WTHREADS; i++) { 43 wp = &workers[i]; 44 45 wp->w_fp = NULL; /* Mark not in use */ 46 wp->w_next = NULL; 47 wp->w_task = NONE; 48 if (mutex_init(&wp->w_event_mutex, NULL) != 0) 49 panic("failed to initialize mutex"); 50 if (cond_init(&wp->w_event, NULL) != 0) 51 panic("failed to initialize condition variable"); 52 if (mthread_create(&wp->w_tid, &tattr, worker_main, (void *) wp) != 0) 53 panic("unable to start thread"); 54 } 55 56 /* Let all threads get ready to accept work. */ 57 worker_yield(); 58 } 59 60 /*===========================================================================* 61 * worker_cleanup * 62 *===========================================================================*/ 63 void worker_cleanup(void) 64 { 65 /* Clean up worker threads, reversing the actions of worker_init() such that 66 * we can safely call worker_init() again later. All worker threads are 67 * expected to be idle already. Used for live updates, because transferring 68 * the thread stacks from one version to another is currently not feasible. 69 */ 70 struct worker_thread *wp; 71 int i; 72 73 assert(worker_idle()); 74 75 /* First terminate all threads. */ 76 for (i = 0; i < NR_WTHREADS; i++) { 77 wp = &workers[i]; 78 79 assert(wp->w_fp == NULL); 80 81 /* Waking up the thread with no w_fp will cause it to exit. */ 82 worker_wake(wp); 83 } 84 85 worker_yield(); 86 87 /* Then clean up their resources. */ 88 for (i = 0; i < NR_WTHREADS; i++) { 89 wp = &workers[i]; 90 91 if (mthread_join(wp->w_tid, NULL) != 0) 92 panic("worker_cleanup: could not join thread %d", i); 93 if (cond_destroy(&wp->w_event) != 0) 94 panic("failed to destroy condition variable"); 95 if (mutex_destroy(&wp->w_event_mutex) != 0) 96 panic("failed to destroy mutex"); 97 } 98 99 /* Finally, clean up global resources. */ 100 if (mthread_attr_destroy(&tattr) != 0) 101 panic("failed to destroy attribute"); 102 103 memset(workers, 0, sizeof(workers)); 104 } 105 106 /*===========================================================================* 107 * worker_idle * 108 *===========================================================================*/ 109 int worker_idle(void) 110 { 111 /* Return whether all worker threads are idle. */ 112 113 return (pending == 0 && busy == 0); 114 } 115 116 /*===========================================================================* 117 * worker_assign * 118 *===========================================================================*/ 119 static void worker_assign(struct fproc *rfp) 120 { 121 /* Assign the work for the given process to a free thread. The caller must 122 * ensure that there is in fact at least one free thread. 123 */ 124 struct worker_thread *worker; 125 int i; 126 127 /* Find a free worker thread. */ 128 for (i = 0; i < NR_WTHREADS; i++) { 129 worker = &workers[i]; 130 131 if (worker->w_fp == NULL) 132 break; 133 } 134 assert(worker != NULL); 135 136 /* Assign work to it. */ 137 rfp->fp_worker = worker; 138 worker->w_fp = rfp; 139 busy++; 140 141 worker_wake(worker); 142 } 143 144 /*===========================================================================* 145 * worker_may_do_pending * 146 *===========================================================================*/ 147 static int worker_may_do_pending(void) 148 { 149 /* Return whether there is a free thread that may do pending work. This is true 150 * only if there is pending work at all, and there is a free non-spare thread 151 * (the spare thread is never used for pending work), and VFS is currently 152 * processing new requests at all (this may not be true during initialization). 153 */ 154 155 /* Ordered by likelihood to be false. */ 156 return (pending > 0 && worker_available() > 1 && !block_all); 157 } 158 159 /*===========================================================================* 160 * worker_allow * 161 *===========================================================================*/ 162 void worker_allow(int allow) 163 { 164 /* Allow or disallow workers to process new work. If disallowed, any new work 165 * will be stored as pending, even when there are free worker threads. There is 166 * no facility to stop active workers. To be used only during initialization! 167 */ 168 struct fproc *rfp; 169 170 block_all = !allow; 171 172 if (!worker_may_do_pending()) 173 return; 174 175 /* Assign any pending work to workers. */ 176 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 177 if (rfp->fp_flags & FP_PENDING) { 178 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 179 assert(pending > 0); 180 pending--; 181 worker_assign(rfp); 182 183 if (!worker_may_do_pending()) 184 return; 185 } 186 } 187 } 188 189 /*===========================================================================* 190 * worker_get_work * 191 *===========================================================================*/ 192 static int worker_get_work(void) 193 { 194 /* Find new work to do. Work can be 'queued', 'pending', or absent. In the 195 * latter case wait for new work to come in. Return TRUE if there is work to 196 * do, or FALSE if the current thread is requested to shut down. 197 */ 198 struct fproc *rfp; 199 200 assert(self->w_fp == NULL); 201 202 /* Is there pending work, and should we do it? */ 203 if (worker_may_do_pending()) { 204 /* Find pending work */ 205 for (rfp = &fproc[0]; rfp < &fproc[NR_PROCS]; rfp++) { 206 if (rfp->fp_flags & FP_PENDING) { 207 self->w_fp = rfp; 208 rfp->fp_worker = self; 209 busy++; 210 rfp->fp_flags &= ~FP_PENDING; /* No longer pending */ 211 assert(pending > 0); 212 pending--; 213 return TRUE; 214 } 215 } 216 panic("Pending work inconsistency"); 217 } 218 219 /* Wait for work to come to us */ 220 worker_sleep(); 221 222 return (self->w_fp != NULL); 223 } 224 225 /*===========================================================================* 226 * worker_available * 227 *===========================================================================*/ 228 int worker_available(void) 229 { 230 /* Return the number of threads that are available, including the spare thread. 231 */ 232 233 return(NR_WTHREADS - busy); 234 } 235 236 /*===========================================================================* 237 * worker_main * 238 *===========================================================================*/ 239 static void *worker_main(void *arg) 240 { 241 /* Worker thread main loop */ 242 243 self = (struct worker_thread *) arg; 244 ASSERTW(self); 245 246 while (worker_get_work()) { 247 248 fp = self->w_fp; 249 assert(fp->fp_worker == self); 250 251 /* Lock the process. */ 252 lock_proc(fp); 253 254 /* The following two blocks could be run in a loop until both the 255 * conditions are no longer met, but it is currently impossible that 256 * more normal work is present after postponed PM work has been done. 257 */ 258 259 /* Perform normal work, if any. */ 260 if (fp->fp_func != NULL) { 261 self->w_m_in = fp->fp_msg; 262 err_code = OK; 263 264 fp->fp_func(); 265 266 fp->fp_func = NULL; /* deliberately unset AFTER the call */ 267 } 268 269 /* Perform postponed PM work, if any. */ 270 if (fp->fp_flags & FP_PM_WORK) { 271 self->w_m_in = fp->fp_pm_msg; 272 273 service_pm_postponed(); 274 275 fp->fp_flags &= ~FP_PM_WORK; 276 } 277 278 /* Perform cleanup actions. */ 279 thread_cleanup(); 280 281 unlock_proc(fp); 282 283 fp->fp_worker = NULL; 284 self->w_fp = NULL; 285 assert(busy > 0); 286 busy--; 287 } 288 289 return(NULL); 290 } 291 292 /*===========================================================================* 293 * worker_can_start * 294 *===========================================================================*/ 295 int worker_can_start(struct fproc *rfp) 296 { 297 /* Return whether normal (non-PM) work can be started for the given process. 298 * This function is used to serialize invocation of "special" procedures, and 299 * not entirely safe for other cases, as explained in the comments below. 300 */ 301 int is_pending, is_active, has_normal_work; 302 303 is_pending = (rfp->fp_flags & FP_PENDING); 304 is_active = (rfp->fp_worker != NULL); 305 has_normal_work = (rfp->fp_func != NULL); 306 307 /* If there is no work scheduled for the process, we can start work. */ 308 if (!is_pending && !is_active) return TRUE; 309 310 /* If there is already normal work scheduled for the process, we cannot add 311 * more, since we support only one normal job per process. 312 */ 313 if (has_normal_work) return FALSE; 314 315 /* If this process has pending PM work but no normal work, we can add the 316 * normal work for execution before the worker will start. 317 */ 318 if (is_pending) return TRUE; 319 320 /* However, if a worker is active for PM work, we cannot add normal work 321 * either, because the work will not be considered. For this reason, we can 322 * not use this function for processes that can possibly get postponed PM 323 * work. It is still safe for core system processes, though. 324 */ 325 return FALSE; 326 } 327 328 /*===========================================================================* 329 * worker_try_activate * 330 *===========================================================================*/ 331 static void worker_try_activate(struct fproc *rfp, int use_spare) 332 { 333 /* See if we can wake up a thread to do the work scheduled for the given 334 * process. If not, mark the process as having pending work for later. 335 */ 336 int needed; 337 338 /* Use the last available thread only if requested. Otherwise, leave at least 339 * one spare thread for deadlock resolution. 340 */ 341 needed = use_spare ? 1 : 2; 342 343 /* Also make sure that doing new work is allowed at all right now, which may 344 * not be the case during VFS initialization. We do always allow callback 345 * calls, i.e., calls that may use the spare thread. The reason is that we do 346 * not support callback calls being marked as pending, so the (entirely 347 * theoretical) exception here may (entirely theoretically) avoid deadlocks. 348 */ 349 if (needed <= worker_available() && (!block_all || use_spare)) { 350 worker_assign(rfp); 351 } else { 352 rfp->fp_flags |= FP_PENDING; 353 pending++; 354 } 355 } 356 357 /*===========================================================================* 358 * worker_start * 359 *===========================================================================*/ 360 void worker_start(struct fproc *rfp, void (*func)(void), message *m_ptr, 361 int use_spare) 362 { 363 /* Schedule work to be done by a worker thread. The work is bound to the given 364 * process. If a function pointer is given, the work is considered normal work, 365 * and the function will be called to handle it. If the function pointer is 366 * NULL, the work is considered postponed PM work, and service_pm_postponed 367 * will be called to handle it. The input message will be a copy of the given 368 * message. Optionally, the last spare (deadlock-resolving) thread may be used 369 * to execute the work immediately. 370 */ 371 int is_pm_work, is_pending, is_active, has_normal_work, has_pm_work; 372 373 assert(rfp != NULL); 374 375 is_pm_work = (func == NULL); 376 is_pending = (rfp->fp_flags & FP_PENDING); 377 is_active = (rfp->fp_worker != NULL); 378 has_normal_work = (rfp->fp_func != NULL); 379 has_pm_work = (rfp->fp_flags & FP_PM_WORK); 380 381 /* Sanity checks. If any of these trigger, someone messed up badly! */ 382 if (is_pending || is_active) { 383 if (is_pending && is_active) 384 panic("work cannot be both pending and active"); 385 386 /* The process cannot make more than one call at once. */ 387 if (!is_pm_work && has_normal_work) 388 panic("process has two calls (%x, %x)", 389 rfp->fp_msg.m_type, m_ptr->m_type); 390 391 /* PM will not send more than one job per process to us at once. */ 392 if (is_pm_work && has_pm_work) 393 panic("got two calls from PM (%x, %x)", 394 rfp->fp_pm_msg.m_type, m_ptr->m_type); 395 396 /* Despite PM's sys_delay_stop() system, it is possible that normal 397 * work (in particular, do_pending_pipe) arrives after postponed PM 398 * work has been scheduled for execution, so we don't check for that. 399 */ 400 #if 0 401 printf("VFS: adding %s work to %s thread\n", 402 is_pm_work ? "PM" : "normal", 403 is_pending ? "pending" : "active"); 404 #endif 405 } else { 406 /* Some cleanup step forgotten somewhere? */ 407 if (has_normal_work || has_pm_work) 408 panic("worker administration error"); 409 } 410 411 /* Save the work to be performed. */ 412 if (!is_pm_work) { 413 rfp->fp_msg = *m_ptr; 414 rfp->fp_func = func; 415 } else { 416 rfp->fp_pm_msg = *m_ptr; 417 rfp->fp_flags |= FP_PM_WORK; 418 } 419 420 /* If we have not only added to existing work, go look for a free thread. 421 * Note that we won't be using the spare thread for normal work if there is 422 * already PM work pending, but that situation will never occur in practice. 423 */ 424 if (!is_pending && !is_active) 425 worker_try_activate(rfp, use_spare); 426 } 427 428 /*===========================================================================* 429 * worker_yield * 430 *===========================================================================*/ 431 void worker_yield(void) 432 { 433 /* Yield to all worker threads. To be called from the main thread only. */ 434 435 mthread_yield_all(); 436 437 self = NULL; 438 } 439 440 /*===========================================================================* 441 * worker_sleep * 442 *===========================================================================*/ 443 static void worker_sleep(void) 444 { 445 struct worker_thread *worker = self; 446 ASSERTW(worker); 447 if (mutex_lock(&worker->w_event_mutex) != 0) 448 panic("unable to lock event mutex"); 449 if (cond_wait(&worker->w_event, &worker->w_event_mutex) != 0) 450 panic("could not wait on conditional variable"); 451 if (mutex_unlock(&worker->w_event_mutex) != 0) 452 panic("unable to unlock event mutex"); 453 self = worker; 454 } 455 456 /*===========================================================================* 457 * worker_wake * 458 *===========================================================================*/ 459 static void worker_wake(struct worker_thread *worker) 460 { 461 /* Signal a worker to wake up */ 462 ASSERTW(worker); 463 if (mutex_lock(&worker->w_event_mutex) != 0) 464 panic("unable to lock event mutex"); 465 if (cond_signal(&worker->w_event) != 0) 466 panic("unable to signal conditional variable"); 467 if (mutex_unlock(&worker->w_event_mutex) != 0) 468 panic("unable to unlock event mutex"); 469 } 470 471 /*===========================================================================* 472 * worker_suspend * 473 *===========================================================================*/ 474 struct worker_thread *worker_suspend(void) 475 { 476 /* Suspend the current thread, saving certain thread variables. Return a 477 * pointer to the thread's worker structure for later resumption. 478 */ 479 480 ASSERTW(self); 481 assert(fp != NULL); 482 assert(self->w_fp == fp); 483 assert(fp->fp_worker == self); 484 485 self->w_err_code = err_code; 486 487 return self; 488 } 489 490 /*===========================================================================* 491 * worker_resume * 492 *===========================================================================*/ 493 void worker_resume(struct worker_thread *org_self) 494 { 495 /* Resume the current thread after suspension, restoring thread variables. */ 496 497 ASSERTW(org_self); 498 499 self = org_self; 500 501 fp = self->w_fp; 502 assert(fp != NULL); 503 504 err_code = self->w_err_code; 505 } 506 507 /*===========================================================================* 508 * worker_wait * 509 *===========================================================================*/ 510 void worker_wait(void) 511 { 512 /* Put the current thread to sleep until woken up by the main thread. */ 513 514 (void) worker_suspend(); /* worker_sleep already saves and restores 'self' */ 515 516 worker_sleep(); 517 518 /* We continue here after waking up */ 519 worker_resume(self); 520 assert(self->w_next == NULL); 521 } 522 523 /*===========================================================================* 524 * worker_signal * 525 *===========================================================================*/ 526 void worker_signal(struct worker_thread *worker) 527 { 528 ASSERTW(worker); /* Make sure we have a valid thread */ 529 worker_wake(worker); 530 } 531 532 /*===========================================================================* 533 * worker_stop * 534 *===========================================================================*/ 535 void worker_stop(struct worker_thread *worker) 536 { 537 ASSERTW(worker); /* Make sure we have a valid thread */ 538 /* This thread is communicating with a driver or file server */ 539 if (worker->w_drv_sendrec != NULL) { /* Driver */ 540 assert(worker->w_task != NONE); 541 worker->w_drv_sendrec->m_type = EIO; 542 worker->w_drv_sendrec = NULL; 543 } else if (worker->w_sendrec != NULL) { /* FS */ 544 /* worker->w_task may be NONE if the FS message was still queued */ 545 worker->w_sendrec->m_type = EIO; 546 worker->w_sendrec = NULL; 547 } else 548 panic("reply storage consistency error"); /* Oh dear */ 549 worker_wake(worker); 550 } 551 552 /*===========================================================================* 553 * worker_stop_by_endpt * 554 *===========================================================================*/ 555 void worker_stop_by_endpt(endpoint_t proc_e) 556 { 557 struct worker_thread *worker; 558 int i; 559 560 if (proc_e == NONE) return; 561 562 for (i = 0; i < NR_WTHREADS; i++) { 563 worker = &workers[i]; 564 if (worker->w_fp != NULL && worker->w_task == proc_e) 565 worker_stop(worker); 566 } 567 } 568 569 /*===========================================================================* 570 * worker_get * 571 *===========================================================================*/ 572 struct worker_thread *worker_get(thread_t worker_tid) 573 { 574 int i; 575 576 for (i = 0; i < NR_WTHREADS; i++) 577 if (workers[i].w_tid == worker_tid) 578 return(&workers[i]); 579 580 return(NULL); 581 } 582 583 /*===========================================================================* 584 * worker_set_proc * 585 *===========================================================================*/ 586 void worker_set_proc(struct fproc *rfp) 587 { 588 /* Perform an incredibly ugly action that completely violates the threading 589 * model: change the current working thread's process context to another 590 * process. The caller is expected to hold the lock to both the calling and the 591 * target process, and neither process is expected to continue regular 592 * operation when done. This code is here *only* and *strictly* for the reboot 593 * code, and *must not* be used for anything else. 594 */ 595 596 if (fp == rfp) return; 597 598 if (rfp->fp_worker != NULL) 599 panic("worker_set_proc: target process not idle"); 600 601 fp->fp_worker = NULL; 602 603 fp = rfp; 604 605 self->w_fp = rfp; 606 fp->fp_worker = self; 607 } 608