1 /* This file contains the multithreaded driver interface. 2 * 3 * Changes: 4 * Aug 27, 2011 created (A. Welzel) 5 * 6 * The entry points into this file are: 7 * blockdriver_mt_task: the main message loop of the driver 8 * blockdriver_mt_terminate: break out of the main message loop 9 * blockdriver_mt_sleep: put the current thread to sleep 10 * blockdriver_mt_wakeup: wake up a sleeping thread 11 * blockdriver_mt_set_workers:set the number of worker threads 12 */ 13 14 #include <minix/blockdriver_mt.h> 15 #include <minix/mthread.h> 16 #include <assert.h> 17 18 #include "const.h" 19 #include "driver.h" 20 #include "driver_mt.h" 21 #include "mq.h" 22 23 /* A thread ID is composed of a device ID and a per-device worker thread ID. 24 * All thread IDs must be in the range 0..(MAX_THREADS-1) inclusive. 25 */ 26 #define MAKE_TID(did, wid) ((did) * MAX_WORKERS + (wid)) 27 #define TID_DEVICE(tid) ((tid) / MAX_WORKERS) 28 #define TID_WORKER(tid) ((tid) % MAX_WORKERS) 29 30 typedef unsigned int worker_id_t; 31 32 typedef enum { 33 STATE_DEAD, 34 STATE_RUNNING, 35 STATE_BUSY, 36 STATE_EXITED 37 } worker_state; 38 39 /* Structure with information about a worker thread. */ 40 typedef struct { 41 device_id_t device_id; 42 worker_id_t worker_id; 43 worker_state state; 44 mthread_thread_t mthread; 45 mthread_event_t sleep_event; 46 } worker_t; 47 48 /* Structure with information about a device. */ 49 typedef struct { 50 device_id_t id; 51 unsigned int workers; 52 worker_t worker[MAX_WORKERS]; 53 mthread_event_t queue_event; 54 mthread_rwlock_t barrier; 55 } device_t; 56 57 static struct blockdriver *bdtab; 58 static int running = FALSE; 59 60 static mthread_key_t worker_key; 61 62 static device_t device[MAX_DEVICES]; 63 64 static worker_t *exited[MAX_THREADS]; 65 static int num_exited = 0; 66 67 /*===========================================================================* 68 * enqueue * 69 *===========================================================================*/ 70 static void enqueue(device_t *dp, const message *m_src, int ipc_status) 71 { 72 /* Enqueue a message into the device's queue, and signal the event. 73 * Must be called from the master thread. 74 */ 75 76 if (!mq_enqueue(dp->id, m_src, ipc_status)) 77 panic("blockdriver_mt: enqueue failed (message queue full)"); 78 79 mthread_event_fire(&dp->queue_event); 80 } 81 82 /*===========================================================================* 83 * try_dequeue * 84 *===========================================================================*/ 85 static int try_dequeue(device_t *dp, message *m_dst, int *ipc_status) 86 { 87 /* See if a message can be dequeued from the current worker thread's device 88 * queue. If so, dequeue the message and return TRUE. If not, return FALSE. 89 * Must be called from a worker thread. Does not block. 90 */ 91 92 return mq_dequeue(dp->id, m_dst, ipc_status); 93 } 94 95 /*===========================================================================* 96 * dequeue * 97 *===========================================================================*/ 98 static int dequeue(device_t *dp, worker_t *wp, message *m_dst, 99 int *ipc_status) 100 { 101 /* Dequeue a message from the current worker thread's device queue. Block the 102 * current thread if necessary. Must be called from a worker thread. Either 103 * succeeds with a message (TRUE) or indicates that the thread should be 104 * terminated (FALSE). 105 */ 106 107 do { 108 mthread_event_wait(&dp->queue_event); 109 110 /* If we were woken up as a result of terminate or set_workers, break 111 * out of the loop and terminate the thread. 112 */ 113 if (!running || wp->worker_id >= dp->workers) 114 return FALSE; 115 } while (!try_dequeue(dp, m_dst, ipc_status)); 116 117 return TRUE; 118 } 119 120 /*===========================================================================* 121 * is_transfer_req * 122 *===========================================================================*/ 123 static int is_transfer_req(int type) 124 { 125 /* Return whether the given block device request is a transfer request. 126 */ 127 128 switch (type) { 129 case BDEV_READ: 130 case BDEV_WRITE: 131 case BDEV_GATHER: 132 case BDEV_SCATTER: 133 return TRUE; 134 135 default: 136 return FALSE; 137 } 138 } 139 140 /*===========================================================================* 141 * worker_thread * 142 *===========================================================================*/ 143 static void *worker_thread(void *param) 144 { 145 /* The worker thread loop. Set up the thread-specific reference to itself and 146 * start looping. The loop consists of blocking dequeing and handling messages. 147 * After handling a message, the thread might have been stopped, so we check 148 * for this condition and exit if so. 149 */ 150 worker_t *wp; 151 device_t *dp; 152 thread_id_t tid; 153 message m; 154 int ipc_status; 155 156 wp = (worker_t *) param; 157 assert(wp != NULL); 158 dp = &device[wp->device_id]; 159 tid = MAKE_TID(wp->device_id, wp->worker_id); 160 161 if (mthread_setspecific(worker_key, wp)) 162 panic("blockdriver_mt: could not save local thread pointer"); 163 164 while (running && wp->worker_id < dp->workers) { 165 166 /* See if a new message is available right away. */ 167 if (!try_dequeue(dp, &m, &ipc_status)) { 168 169 /* If not, block waiting for a new message or a thread 170 * termination event. 171 */ 172 if (!dequeue(dp, wp, &m, &ipc_status)) 173 break; 174 } 175 176 /* Even if the thread was stopped before, a new message resumes it. */ 177 wp->state = STATE_BUSY; 178 179 /* If the request is a transfer request, we acquire the read barrier 180 * lock. Otherwise, we acquire the write lock. 181 */ 182 if (is_transfer_req(m.m_type)) 183 mthread_rwlock_rdlock(&dp->barrier); 184 else 185 mthread_rwlock_wrlock(&dp->barrier); 186 187 /* Handle the request and send a reply. */ 188 blockdriver_process_on_thread(bdtab, &m, ipc_status, tid); 189 190 /* Switch the thread back to running state, and unlock the barrier. */ 191 wp->state = STATE_RUNNING; 192 mthread_rwlock_unlock(&dp->barrier); 193 } 194 195 /* Clean up and terminate this thread. */ 196 if (mthread_setspecific(worker_key, NULL)) 197 panic("blockdriver_mt: could not delete local thread pointer"); 198 199 wp->state = STATE_EXITED; 200 201 exited[num_exited++] = wp; 202 203 return NULL; 204 } 205 206 /*===========================================================================* 207 * master_create_worker * 208 *===========================================================================*/ 209 static void master_create_worker(worker_t *wp, worker_id_t worker_id, 210 device_id_t device_id) 211 { 212 /* Start a new worker thread. 213 */ 214 mthread_attr_t attr; 215 int r; 216 217 wp->device_id = device_id; 218 wp->worker_id = worker_id; 219 wp->state = STATE_RUNNING; 220 221 /* Initialize synchronization primitives. */ 222 mthread_event_init(&wp->sleep_event); 223 224 r = mthread_attr_init(&attr); 225 if (r != 0) 226 panic("blockdriver_mt: could not initialize attributes (%d)", r); 227 228 r = mthread_attr_setstacksize(&attr, STACK_SIZE); 229 if (r != 0) 230 panic("blockdriver_mt: could not set stack size (%d)", r); 231 232 r = mthread_create(&wp->mthread, &attr, worker_thread, (void *) wp); 233 if (r != 0) 234 panic("blockdriver_mt: could not start thread %d (%d)", worker_id, r); 235 236 mthread_attr_destroy(&attr); 237 } 238 239 /*===========================================================================* 240 * master_destroy_worker * 241 *===========================================================================*/ 242 static void master_destroy_worker(worker_t *wp) 243 { 244 /* Clean up resources used by an exited worker thread. 245 */ 246 247 assert(wp != NULL); 248 assert(wp->state == STATE_EXITED); 249 250 /* Join the thread. */ 251 if (mthread_join(wp->mthread, NULL)) 252 panic("blockdriver_mt: could not join thread %d", wp->worker_id); 253 254 /* Destroy resources. */ 255 mthread_event_destroy(&wp->sleep_event); 256 257 wp->state = STATE_DEAD; 258 } 259 260 /*===========================================================================* 261 * master_handle_exits * 262 *===========================================================================*/ 263 static void master_handle_exits(void) 264 { 265 /* Destroy the remains of all exited threads. 266 */ 267 int i; 268 269 for (i = 0; i < num_exited; i++) 270 master_destroy_worker(exited[i]); 271 272 num_exited = 0; 273 } 274 275 /*===========================================================================* 276 * master_yield * 277 *===========================================================================*/ 278 static void master_yield(void) 279 { 280 /* Let worker threads run, and clean up any exited threads. 281 */ 282 283 mthread_yield_all(); 284 285 if (num_exited > 0) 286 master_handle_exits(); 287 } 288 289 /*===========================================================================* 290 * master_handle_message * 291 *===========================================================================*/ 292 static void master_handle_message(message *m_ptr, int ipc_status) 293 { 294 /* For real request messages, query the device ID, start a thread if none is 295 * free and the maximum number of threads for that device has not yet been 296 * reached, and enqueue the message in the devices's message queue. All other 297 * messages are handled immediately from the main thread. 298 */ 299 device_id_t id; 300 worker_t *wp; 301 device_t *dp; 302 unsigned int wid; 303 int r; 304 305 /* If this is not a block driver request, we cannot get the minor device 306 * associated with it, and thus we can not tell which thread should process 307 * it either. In that case, the master thread has to handle it instead. 308 */ 309 if (is_ipc_notify(ipc_status) || !IS_BDEV_RQ(m_ptr->m_type)) { 310 /* Process as 'other' message. */ 311 blockdriver_process_on_thread(bdtab, m_ptr, ipc_status, MAIN_THREAD); 312 313 return; 314 } 315 316 /* Query the device ID. Upon failure, send the error code to the caller. */ 317 r = (*bdtab->bdr_device)(m_ptr->m_lbdev_lblockdriver_msg.minor, &id); 318 if (r != OK) { 319 blockdriver_reply(m_ptr, ipc_status, r); 320 321 return; 322 } 323 324 /* Look up the device control block. */ 325 assert(id >= 0 && id < MAX_DEVICES); 326 dp = &device[id]; 327 328 /* Find the first non-busy worker thread. */ 329 for (wid = 0; wid < dp->workers; wid++) 330 if (dp->worker[wid].state != STATE_BUSY) 331 break; 332 333 /* If the worker thread is dead, start a thread now, unless we have already 334 * reached the maximum number of threads. 335 */ 336 if (wid < dp->workers) { 337 wp = &dp->worker[wid]; 338 339 assert(wp->state != STATE_EXITED); 340 341 /* If the non-busy thread has not yet been created, create one now. */ 342 if (wp->state == STATE_DEAD) 343 master_create_worker(wp, wid, dp->id); 344 } 345 346 /* Enqueue the message at the device queue. */ 347 enqueue(dp, m_ptr, ipc_status); 348 } 349 350 /*===========================================================================* 351 * master_init * 352 *===========================================================================*/ 353 static void master_init(struct blockdriver *bdp) 354 { 355 /* Initialize the state of the master thread. 356 */ 357 int i, j; 358 359 assert(bdp != NULL); 360 assert(bdp->bdr_device != NULL); 361 362 bdtab = bdp; 363 364 /* Initialize device-specific data structures. */ 365 for (i = 0; i < MAX_DEVICES; i++) { 366 device[i].id = i; 367 device[i].workers = 1; 368 mthread_event_init(&device[i].queue_event); 369 mthread_rwlock_init(&device[i].barrier); 370 371 for (j = 0; j < MAX_WORKERS; j++) 372 device[i].worker[j].state = STATE_DEAD; 373 } 374 375 /* Initialize a per-thread key, where each worker thread stores its own 376 * reference to the worker structure. 377 */ 378 if (mthread_key_create(&worker_key, NULL)) 379 panic("blockdriver_mt: error initializing worker key"); 380 } 381 382 /*===========================================================================* 383 * blockdriver_mt_get_tid * 384 *===========================================================================*/ 385 thread_id_t blockdriver_mt_get_tid(void) 386 { 387 /* Return back the ID of this thread. 388 */ 389 worker_t *wp; 390 391 wp = (worker_t *) mthread_getspecific(worker_key); 392 393 if (wp == NULL) 394 panic("blockdriver_mt: master thread cannot query thread ID\n"); 395 396 return MAKE_TID(wp->device_id, wp->worker_id); 397 } 398 399 /*===========================================================================* 400 * blockdriver_mt_receive * 401 *===========================================================================*/ 402 static void blockdriver_mt_receive(message *m_ptr, int *ipc_status) 403 { 404 /* Receive a message. 405 */ 406 int r; 407 408 r = sef_receive_status(ANY, m_ptr, ipc_status); 409 410 if (r != OK) 411 panic("blockdriver_mt: sef_receive_status() returned %d", r); 412 } 413 414 /*===========================================================================* 415 * blockdriver_mt_task * 416 *===========================================================================*/ 417 void blockdriver_mt_task(struct blockdriver *driver_tab) 418 { 419 /* The multithreaded driver task. 420 */ 421 int ipc_status, i; 422 message mess; 423 424 /* Initialize first if necessary. */ 425 if (!running) { 426 master_init(driver_tab); 427 428 running = TRUE; 429 } 430 431 /* The main message loop. */ 432 while (running) { 433 /* Receive a message. */ 434 blockdriver_mt_receive(&mess, &ipc_status); 435 436 /* Dispatch the message. */ 437 master_handle_message(&mess, ipc_status); 438 439 /* Let worker threads run. */ 440 master_yield(); 441 } 442 443 /* Free up resources. */ 444 for (i = 0; i < MAX_DEVICES; i++) 445 mthread_event_destroy(&device[i].queue_event); 446 } 447 448 /*===========================================================================* 449 * blockdriver_mt_terminate * 450 *===========================================================================*/ 451 void blockdriver_mt_terminate(void) 452 { 453 /* Instruct libblockdriver to shut down. 454 */ 455 456 running = FALSE; 457 } 458 459 /*===========================================================================* 460 * blockdriver_mt_sleep * 461 *===========================================================================*/ 462 void blockdriver_mt_sleep(void) 463 { 464 /* Let the current thread sleep until it gets woken up by the master thread. 465 */ 466 worker_t *wp; 467 468 wp = (worker_t *) mthread_getspecific(worker_key); 469 470 if (wp == NULL) 471 panic("blockdriver_mt: master thread cannot sleep"); 472 473 mthread_event_wait(&wp->sleep_event); 474 } 475 476 /*===========================================================================* 477 * blockdriver_mt_wakeup * 478 *===========================================================================*/ 479 void blockdriver_mt_wakeup(thread_id_t id) 480 { 481 /* Wake up a sleeping worker thread from the master thread. 482 */ 483 worker_t *wp; 484 device_id_t device_id; 485 worker_id_t worker_id; 486 487 device_id = TID_DEVICE(id); 488 worker_id = TID_WORKER(id); 489 490 assert(device_id >= 0 && device_id < MAX_DEVICES); 491 assert(worker_id < MAX_WORKERS); 492 493 wp = &device[device_id].worker[worker_id]; 494 495 assert(wp->state == STATE_RUNNING || wp->state == STATE_BUSY); 496 497 mthread_event_fire(&wp->sleep_event); 498 } 499 500 /*===========================================================================* 501 * blockdriver_mt_set_workers * 502 *===========================================================================*/ 503 void blockdriver_mt_set_workers(device_id_t id, unsigned int workers) 504 { 505 /* Set the number of worker threads for the given device. 506 */ 507 device_t *dp; 508 509 assert(id >= 0 && id < MAX_DEVICES); 510 511 if (workers > MAX_WORKERS) 512 workers = MAX_WORKERS; 513 514 dp = &device[id]; 515 516 /* If we are cleaning up, wake up all threads waiting on a queue event. */ 517 if (workers == 1 && dp->workers > workers) 518 mthread_event_fire_all(&dp->queue_event); 519 520 dp->workers = workers; 521 } 522 523 /*===========================================================================* 524 * blockdriver_mt_is_idle * 525 *===========================================================================*/ 526 int blockdriver_mt_is_idle(void) 527 { 528 /* Return whether the block driver is idle. This means that it has no enqueued 529 * requests and no busy worker threads. Used for live update functionality. 530 */ 531 unsigned int did, wid; 532 533 for (did = 0; did < MAX_DEVICES; did++) { 534 if (!mq_isempty(did)) 535 return FALSE; 536 537 for (wid = 0; wid < device[did].workers; wid++) 538 if (device[did].worker[wid].state == STATE_BUSY) 539 return FALSE; 540 } 541 542 return TRUE; 543 } 544 545 /*===========================================================================* 546 * blockdriver_mt_suspend * 547 *===========================================================================*/ 548 void blockdriver_mt_suspend(void) 549 { 550 /* Suspend the driver operation in order to facilitate a live update. 551 * Suspension involves shutting down all worker threads, because transferring 552 * thread stacks is currently not supported by the state transfer framework. 553 */ 554 unsigned int did; 555 556 assert(running); 557 assert(blockdriver_mt_is_idle()); 558 559 /* We terminate the worker threads by simulating a driver shutdown. */ 560 running = FALSE; 561 562 for (did = 0; did < MAX_DEVICES; did++) 563 mthread_event_fire_all(&device[did].queue_event); 564 565 master_yield(); 566 } 567 568 /*===========================================================================* 569 * blockdriver_mt_resume * 570 *===========================================================================*/ 571 void blockdriver_mt_resume(void) 572 { 573 /* Resume regular operation after a (successful or failed) live update. We do 574 * not recreate worker threads; instead, they are recreated lazily as new 575 * requests come in. 576 */ 577 578 assert(!running); 579 580 running = TRUE; 581 } 582