1 /* $NetBSD: rf_engine.c,v 1.25 2002/10/04 22:56:54 oster Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /**************************************************************************** 30 * * 31 * engine.c -- code for DAG execution engine * 32 * * 33 * Modified to work as follows (holland): * 34 * A user-thread calls into DispatchDAG, which fires off the nodes that * 35 * are direct successors to the header node. DispatchDAG then returns, * 36 * and the rest of the I/O continues asynchronously. As each node * 37 * completes, the node execution function calls FinishNode(). FinishNode * 38 * scans the list of successors to the node and increments the antecedent * 39 * counts. Each node that becomes enabled is placed on a central node * 40 * queue. A dedicated dag-execution thread grabs nodes off of this * 41 * queue and fires them. * 42 * * 43 * NULL nodes are never fired. * 44 * * 45 * Terminator nodes are never fired, but rather cause the callback * 46 * associated with the DAG to be invoked. * 47 * * 48 * If a node fails, the dag either rolls forward to the completion or * 49 * rolls back, undoing previously-completed nodes and fails atomically. * 50 * The direction of recovery is determined by the location of the failed * 51 * node in the graph. If the failure occurred before the commit node in * 52 * the graph, backward recovery is used. Otherwise, forward recovery is * 53 * used. * 54 * * 55 ****************************************************************************/ 56 57 #include <sys/cdefs.h> 58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.25 2002/10/04 22:56:54 oster Exp $"); 59 60 #include <sys/errno.h> 61 62 #include "rf_threadstuff.h" 63 #include "rf_dag.h" 64 #include "rf_engine.h" 65 #include "rf_etimer.h" 66 #include "rf_general.h" 67 #include "rf_dagutils.h" 68 #include "rf_shutdown.h" 69 #include "rf_raid.h" 70 71 static void rf_ShutdownEngine(void *); 72 static void DAGExecutionThread(RF_ThreadArg_t arg); 73 static void rf_RaidIOThread(RF_ThreadArg_t arg); 74 75 #define DO_INIT(_l_,_r_) { \ 76 int _rc; \ 77 _rc = rf_create_managed_mutex(_l_,&(_r_)->node_queue_mutex); \ 78 if (_rc) { \ 79 return(_rc); \ 80 } \ 81 _rc = rf_create_managed_cond(_l_,&(_r_)->node_queue_cond); \ 82 if (_rc) { \ 83 return(_rc); \ 84 } \ 85 } 86 87 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */ 88 89 #define DO_LOCK(_r_) \ 90 do { \ 91 ks = splbio(); \ 92 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \ 93 } while (0) 94 95 #define DO_UNLOCK(_r_) \ 96 do { \ 97 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \ 98 splx(ks); \ 99 } while (0) 100 101 #define DO_WAIT(_r_) \ 102 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex) 103 104 #define DO_SIGNAL(_r_) \ 105 RF_BROADCAST_COND((_r_)->node_queue) /* XXX RF_SIGNAL_COND? */ 106 107 static void 108 rf_ShutdownEngine(arg) 109 void *arg; 110 { 111 RF_Raid_t *raidPtr; 112 int ks; 113 114 raidPtr = (RF_Raid_t *) arg; 115 116 /* Tell the rf_RaidIOThread to shutdown */ 117 simple_lock(&(raidPtr->iodone_lock)); 118 119 raidPtr->shutdown_raidio = 1; 120 wakeup(&(raidPtr->iodone)); 121 122 /* ...and wait for it to tell us it has finished */ 123 while (raidPtr->shutdown_raidio) 124 ltsleep(&(raidPtr->shutdown_raidio), PRIBIO, "raidshutdown", 0, 125 &(raidPtr->iodone_lock)); 126 127 simple_unlock(&(raidPtr->iodone_lock)); 128 129 /* Now shut down the DAG execution engine. */ 130 DO_LOCK(raidPtr); 131 raidPtr->shutdown_engine = 1; 132 DO_SIGNAL(raidPtr); 133 DO_UNLOCK(raidPtr); 134 135 } 136 137 int 138 rf_ConfigureEngine( 139 RF_ShutdownList_t ** listp, 140 RF_Raid_t * raidPtr, 141 RF_Config_t * cfgPtr) 142 { 143 int rc; 144 145 DO_INIT(listp, raidPtr); 146 147 raidPtr->node_queue = NULL; 148 raidPtr->dags_in_flight = 0; 149 150 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg); 151 if (rc) 152 return (rc); 153 154 /* we create the execution thread only once per system boot. no need 155 * to check return code b/c the kernel panics if it can't create the 156 * thread. */ 157 if (rf_engineDebug) { 158 printf("raid%d: Creating engine thread\n", raidPtr->raidid); 159 } 160 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread, 161 DAGExecutionThread, raidPtr, 162 "raid%d", raidPtr->raidid)) { 163 printf("raid%d: Unable to create engine thread\n", 164 raidPtr->raidid); 165 return (ENOMEM); 166 } 167 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread, 168 rf_RaidIOThread, raidPtr, 169 "raidio%d", raidPtr->raidid)) { 170 printf("raid%d: Unable to create raidio thread\n", 171 raidPtr->raidid); 172 return (ENOMEM); 173 } 174 if (rf_engineDebug) { 175 printf("raid%d: Created engine thread\n", raidPtr->raidid); 176 } 177 RF_THREADGROUP_STARTED(&raidPtr->engine_tg); 178 /* XXX something is missing here... */ 179 #ifdef debug 180 printf("Skipping the WAIT_START!!\n"); 181 #endif 182 #if 0 183 RF_THREADGROUP_WAIT_START(&raidPtr->engine_tg); 184 #endif 185 /* engine thread is now running and waiting for work */ 186 if (rf_engineDebug) { 187 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid); 188 } 189 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr); 190 if (rc) { 191 rf_print_unable_to_add_shutdown(__FILE__, __LINE__, rc); 192 rf_ShutdownEngine(NULL); 193 } 194 return (rc); 195 } 196 197 static int 198 BranchDone(RF_DagNode_t * node) 199 { 200 int i; 201 202 /* return true if forward execution is completed for a node and it's 203 * succedents */ 204 switch (node->status) { 205 case rf_wait: 206 /* should never be called in this state */ 207 RF_PANIC(); 208 break; 209 case rf_fired: 210 /* node is currently executing, so we're not done */ 211 return (RF_FALSE); 212 case rf_good: 213 /* for each succedent recursively check branch */ 214 for (i = 0; i < node->numSuccedents; i++) 215 if (!BranchDone(node->succedents[i])) 216 return RF_FALSE; 217 return RF_TRUE; /* node and all succedent branches aren't in 218 * fired state */ 219 case rf_bad: 220 /* succedents can't fire */ 221 return (RF_TRUE); 222 case rf_recover: 223 /* should never be called in this state */ 224 RF_PANIC(); 225 break; 226 case rf_undone: 227 case rf_panic: 228 /* XXX need to fix this case */ 229 /* for now, assume that we're done */ 230 return (RF_TRUE); 231 default: 232 /* illegal node status */ 233 RF_PANIC(); 234 break; 235 } 236 } 237 238 static int 239 NodeReady(RF_DagNode_t * node) 240 { 241 int ready; 242 243 switch (node->dagHdr->status) { 244 case rf_enable: 245 case rf_rollForward: 246 if ((node->status == rf_wait) && 247 (node->numAntecedents == node->numAntDone)) 248 ready = RF_TRUE; 249 else 250 ready = RF_FALSE; 251 break; 252 case rf_rollBackward: 253 RF_ASSERT(node->numSuccDone <= node->numSuccedents); 254 RF_ASSERT(node->numSuccFired <= node->numSuccedents); 255 RF_ASSERT(node->numSuccFired <= node->numSuccDone); 256 if ((node->status == rf_good) && 257 (node->numSuccDone == node->numSuccedents)) 258 ready = RF_TRUE; 259 else 260 ready = RF_FALSE; 261 break; 262 default: 263 printf("Execution engine found illegal DAG status in NodeReady\n"); 264 RF_PANIC(); 265 break; 266 } 267 268 return (ready); 269 } 270 271 272 273 /* user context and dag-exec-thread context: Fire a node. The node's 274 * status field determines which function, do or undo, to be fired. 275 * This routine assumes that the node's status field has alread been 276 * set to "fired" or "recover" to indicate the direction of execution. 277 */ 278 static void 279 FireNode(RF_DagNode_t * node) 280 { 281 switch (node->status) { 282 case rf_fired: 283 /* fire the do function of a node */ 284 if (rf_engineDebug) { 285 printf("raid%d: Firing node 0x%lx (%s)\n", 286 node->dagHdr->raidPtr->raidid, 287 (unsigned long) node, node->name); 288 } 289 if (node->flags & RF_DAGNODE_FLAG_YIELD) { 290 #if defined(__NetBSD__) && defined(_KERNEL) 291 /* thread_block(); */ 292 /* printf("Need to block the thread here...\n"); */ 293 /* XXX thread_block is actually mentioned in 294 * /usr/include/vm/vm_extern.h */ 295 #else 296 thread_block(); 297 #endif 298 } 299 (*(node->doFunc)) (node); 300 break; 301 case rf_recover: 302 /* fire the undo function of a node */ 303 if (rf_engineDebug) { 304 printf("raid%d: Firing (undo) node 0x%lx (%s)\n", 305 node->dagHdr->raidPtr->raidid, 306 (unsigned long) node, node->name); 307 } 308 if (node->flags & RF_DAGNODE_FLAG_YIELD) 309 #if defined(__NetBSD__) && defined(_KERNEL) 310 /* thread_block(); */ 311 /* printf("Need to block the thread here...\n"); */ 312 /* XXX thread_block is actually mentioned in 313 * /usr/include/vm/vm_extern.h */ 314 #else 315 thread_block(); 316 #endif 317 (*(node->undoFunc)) (node); 318 break; 319 default: 320 RF_PANIC(); 321 break; 322 } 323 } 324 325 326 327 /* user context: 328 * Attempt to fire each node in a linear array. 329 * The entire list is fired atomically. 330 */ 331 static void 332 FireNodeArray( 333 int numNodes, 334 RF_DagNode_t ** nodeList) 335 { 336 RF_DagStatus_t dstat; 337 RF_DagNode_t *node; 338 int i, j; 339 340 /* first, mark all nodes which are ready to be fired */ 341 for (i = 0; i < numNodes; i++) { 342 node = nodeList[i]; 343 dstat = node->dagHdr->status; 344 RF_ASSERT((node->status == rf_wait) || 345 (node->status == rf_good)); 346 if (NodeReady(node)) { 347 if ((dstat == rf_enable) || 348 (dstat == rf_rollForward)) { 349 RF_ASSERT(node->status == rf_wait); 350 if (node->commitNode) 351 node->dagHdr->numCommits++; 352 node->status = rf_fired; 353 for (j = 0; j < node->numAntecedents; j++) 354 node->antecedents[j]->numSuccFired++; 355 } else { 356 RF_ASSERT(dstat == rf_rollBackward); 357 RF_ASSERT(node->status == rf_good); 358 /* only one commit node per graph */ 359 RF_ASSERT(node->commitNode == RF_FALSE); 360 node->status = rf_recover; 361 } 362 } 363 } 364 /* now, fire the nodes */ 365 for (i = 0; i < numNodes; i++) { 366 if ((nodeList[i]->status == rf_fired) || 367 (nodeList[i]->status == rf_recover)) 368 FireNode(nodeList[i]); 369 } 370 } 371 372 373 /* user context: 374 * Attempt to fire each node in a linked list. 375 * The entire list is fired atomically. 376 */ 377 static void 378 FireNodeList(RF_DagNode_t * nodeList) 379 { 380 RF_DagNode_t *node, *next; 381 RF_DagStatus_t dstat; 382 int j; 383 384 if (nodeList) { 385 /* first, mark all nodes which are ready to be fired */ 386 for (node = nodeList; node; node = next) { 387 next = node->next; 388 dstat = node->dagHdr->status; 389 RF_ASSERT((node->status == rf_wait) || 390 (node->status == rf_good)); 391 if (NodeReady(node)) { 392 if ((dstat == rf_enable) || 393 (dstat == rf_rollForward)) { 394 RF_ASSERT(node->status == rf_wait); 395 if (node->commitNode) 396 node->dagHdr->numCommits++; 397 node->status = rf_fired; 398 for (j = 0; j < node->numAntecedents; j++) 399 node->antecedents[j]->numSuccFired++; 400 } else { 401 RF_ASSERT(dstat == rf_rollBackward); 402 RF_ASSERT(node->status == rf_good); 403 /* only one commit node per graph */ 404 RF_ASSERT(node->commitNode == RF_FALSE); 405 node->status = rf_recover; 406 } 407 } 408 } 409 /* now, fire the nodes */ 410 for (node = nodeList; node; node = next) { 411 next = node->next; 412 if ((node->status == rf_fired) || 413 (node->status == rf_recover)) 414 FireNode(node); 415 } 416 } 417 } 418 /* interrupt context: 419 * for each succedent 420 * propagate required results from node to succedent 421 * increment succedent's numAntDone 422 * place newly-enable nodes on node queue for firing 423 * 424 * To save context switches, we don't place NIL nodes on the node queue, 425 * but rather just process them as if they had fired. Note that NIL nodes 426 * that are the direct successors of the header will actually get fired by 427 * DispatchDAG, which is fine because no context switches are involved. 428 * 429 * Important: when running at user level, this can be called by any 430 * disk thread, and so the increment and check of the antecedent count 431 * must be locked. I used the node queue mutex and locked down the 432 * entire function, but this is certainly overkill. 433 */ 434 static void 435 PropagateResults( 436 RF_DagNode_t * node, 437 int context) 438 { 439 RF_DagNode_t *s, *a; 440 RF_Raid_t *raidPtr; 441 int i, ks; 442 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be 443 * finished */ 444 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata 445 * antecedents */ 446 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */ 447 RF_DagNode_t *q = NULL, *qh = NULL, *next; 448 int j, skipNode; 449 450 raidPtr = node->dagHdr->raidPtr; 451 452 DO_LOCK(raidPtr); 453 454 /* debug - validate fire counts */ 455 for (i = 0; i < node->numAntecedents; i++) { 456 a = *(node->antecedents + i); 457 RF_ASSERT(a->numSuccFired >= a->numSuccDone); 458 RF_ASSERT(a->numSuccFired <= a->numSuccedents); 459 a->numSuccDone++; 460 } 461 462 switch (node->dagHdr->status) { 463 case rf_enable: 464 case rf_rollForward: 465 for (i = 0; i < node->numSuccedents; i++) { 466 s = *(node->succedents + i); 467 RF_ASSERT(s->status == rf_wait); 468 (s->numAntDone)++; 469 if (s->numAntDone == s->numAntecedents) { 470 /* look for NIL nodes */ 471 if (s->doFunc == rf_NullNodeFunc) { 472 /* don't fire NIL nodes, just process 473 * them */ 474 s->next = finishlist; 475 finishlist = s; 476 } else { 477 /* look to see if the node is to be 478 * skipped */ 479 skipNode = RF_FALSE; 480 for (j = 0; j < s->numAntecedents; j++) 481 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad)) 482 skipNode = RF_TRUE; 483 if (skipNode) { 484 /* this node has one or more 485 * failed true data 486 * dependencies, so skip it */ 487 s->next = skiplist; 488 skiplist = s; 489 } else 490 /* add s to list of nodes (q) 491 * to execute */ 492 if (context != RF_INTR_CONTEXT) { 493 /* we only have to 494 * enqueue if we're at 495 * intr context */ 496 /* put node on 497 a list to 498 be fired 499 after we 500 unlock */ 501 s->next = firelist; 502 firelist = s; 503 } else { 504 /* enqueue the 505 node for 506 the dag 507 exec thread 508 to fire */ 509 RF_ASSERT(NodeReady(s)); 510 if (q) { 511 q->next = s; 512 q = s; 513 } else { 514 qh = q = s; 515 qh->next = NULL; 516 } 517 } 518 } 519 } 520 } 521 522 if (q) { 523 /* xfer our local list of nodes to the node queue */ 524 q->next = raidPtr->node_queue; 525 raidPtr->node_queue = qh; 526 DO_SIGNAL(raidPtr); 527 } 528 DO_UNLOCK(raidPtr); 529 530 for (; skiplist; skiplist = next) { 531 next = skiplist->next; 532 skiplist->status = rf_skipped; 533 for (i = 0; i < skiplist->numAntecedents; i++) { 534 skiplist->antecedents[i]->numSuccFired++; 535 } 536 if (skiplist->commitNode) { 537 skiplist->dagHdr->numCommits++; 538 } 539 rf_FinishNode(skiplist, context); 540 } 541 for (; finishlist; finishlist = next) { 542 /* NIL nodes: no need to fire them */ 543 next = finishlist->next; 544 finishlist->status = rf_good; 545 for (i = 0; i < finishlist->numAntecedents; i++) { 546 finishlist->antecedents[i]->numSuccFired++; 547 } 548 if (finishlist->commitNode) 549 finishlist->dagHdr->numCommits++; 550 /* 551 * Okay, here we're calling rf_FinishNode() on 552 * nodes that have the null function as their 553 * work proc. Such a node could be the 554 * terminal node in a DAG. If so, it will 555 * cause the DAG to complete, which will in 556 * turn free memory used by the DAG, which 557 * includes the node in question. Thus, we 558 * must avoid referencing the node at all 559 * after calling rf_FinishNode() on it. */ 560 rf_FinishNode(finishlist, context); /* recursive call */ 561 } 562 /* fire all nodes in firelist */ 563 FireNodeList(firelist); 564 break; 565 566 case rf_rollBackward: 567 for (i = 0; i < node->numAntecedents; i++) { 568 a = *(node->antecedents + i); 569 RF_ASSERT(a->status == rf_good); 570 RF_ASSERT(a->numSuccDone <= a->numSuccedents); 571 RF_ASSERT(a->numSuccDone <= a->numSuccFired); 572 573 if (a->numSuccDone == a->numSuccFired) { 574 if (a->undoFunc == rf_NullNodeFunc) { 575 /* don't fire NIL nodes, just process 576 * them */ 577 a->next = finishlist; 578 finishlist = a; 579 } else { 580 if (context != RF_INTR_CONTEXT) { 581 /* we only have to enqueue if 582 * we're at intr context */ 583 /* put node on a list to be 584 fired after we unlock */ 585 a->next = firelist; 586 587 firelist = a; 588 } else { 589 /* enqueue the node for the 590 dag exec thread to fire */ 591 RF_ASSERT(NodeReady(a)); 592 if (q) { 593 q->next = a; 594 q = a; 595 } else { 596 qh = q = a; 597 qh->next = NULL; 598 } 599 } 600 } 601 } 602 } 603 if (q) { 604 /* xfer our local list of nodes to the node queue */ 605 q->next = raidPtr->node_queue; 606 raidPtr->node_queue = qh; 607 DO_SIGNAL(raidPtr); 608 } 609 DO_UNLOCK(raidPtr); 610 for (; finishlist; finishlist = next) { 611 /* NIL nodes: no need to fire them */ 612 next = finishlist->next; 613 finishlist->status = rf_good; 614 /* 615 * Okay, here we're calling rf_FinishNode() on 616 * nodes that have the null function as their 617 * work proc. Such a node could be the first 618 * node in a DAG. If so, it will cause the DAG 619 * to complete, which will in turn free memory 620 * used by the DAG, which includes the node in 621 * question. Thus, we must avoid referencing 622 * the node at all after calling 623 * rf_FinishNode() on it. */ 624 rf_FinishNode(finishlist, context); /* recursive call */ 625 } 626 /* fire all nodes in firelist */ 627 FireNodeList(firelist); 628 629 break; 630 default: 631 printf("Engine found illegal DAG status in PropagateResults()\n"); 632 RF_PANIC(); 633 break; 634 } 635 } 636 637 638 639 /* 640 * Process a fired node which has completed 641 */ 642 static void 643 ProcessNode( 644 RF_DagNode_t * node, 645 int context) 646 { 647 RF_Raid_t *raidPtr; 648 649 raidPtr = node->dagHdr->raidPtr; 650 651 switch (node->status) { 652 case rf_good: 653 /* normal case, don't need to do anything */ 654 break; 655 case rf_bad: 656 if ((node->dagHdr->numCommits > 0) || 657 (node->dagHdr->numCommitNodes == 0)) { 658 /* crossed commit barrier */ 659 node->dagHdr->status = rf_rollForward; 660 if (rf_engineDebug || 1) { 661 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name); 662 } 663 } else { 664 /* never reached commit barrier */ 665 node->dagHdr->status = rf_rollBackward; 666 if (rf_engineDebug || 1) { 667 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name); 668 } 669 } 670 break; 671 case rf_undone: 672 /* normal rollBackward case, don't need to do anything */ 673 break; 674 case rf_panic: 675 /* an undo node failed!!! */ 676 printf("UNDO of a node failed!!!/n"); 677 break; 678 default: 679 printf("node finished execution with an illegal status!!!\n"); 680 RF_PANIC(); 681 break; 682 } 683 684 /* enqueue node's succedents (antecedents if rollBackward) for 685 * execution */ 686 PropagateResults(node, context); 687 } 688 689 690 691 /* user context or dag-exec-thread context: 692 * This is the first step in post-processing a newly-completed node. 693 * This routine is called by each node execution function to mark the node 694 * as complete and fire off any successors that have been enabled. 695 */ 696 int 697 rf_FinishNode( 698 RF_DagNode_t * node, 699 int context) 700 { 701 int retcode = RF_FALSE; 702 node->dagHdr->numNodesCompleted++; 703 ProcessNode(node, context); 704 705 return (retcode); 706 } 707 708 709 /* user context: submit dag for execution, return non-zero if we have 710 * to wait for completion. if and only if we return non-zero, we'll 711 * cause cbFunc to get invoked with cbArg when the DAG has completed. 712 * 713 * for now we always return 1. If the DAG does not cause any I/O, 714 * then the callback may get invoked before DispatchDAG returns. 715 * There's code in state 5 of ContinueRaidAccess to handle this. 716 * 717 * All we do here is fire the direct successors of the header node. 718 * The DAG execution thread does the rest of the dag processing. */ 719 int 720 rf_DispatchDAG( 721 RF_DagHeader_t * dag, 722 void (*cbFunc) (void *), 723 void *cbArg) 724 { 725 RF_Raid_t *raidPtr; 726 727 raidPtr = dag->raidPtr; 728 if (dag->tracerec) { 729 RF_ETIMER_START(dag->tracerec->timer); 730 } 731 #if DEBUG 732 #if RF_DEBUG_VALIDATE_DAG 733 if (rf_engineDebug || rf_validateDAGDebug) { 734 if (rf_ValidateDAG(dag)) 735 RF_PANIC(); 736 } 737 #endif 738 #endif 739 if (rf_engineDebug) { 740 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid); 741 } 742 raidPtr->dags_in_flight++; /* debug only: blow off proper 743 * locking */ 744 dag->cbFunc = cbFunc; 745 dag->cbArg = cbArg; 746 dag->numNodesCompleted = 0; 747 dag->status = rf_enable; 748 FireNodeArray(dag->numSuccedents, dag->succedents); 749 return (1); 750 } 751 /* dedicated kernel thread: the thread that handles all DAG node 752 * firing. To minimize locking and unlocking, we grab a copy of the 753 * entire node queue and then set the node queue to NULL before doing 754 * any firing of nodes. This way we only have to release the lock 755 * once. Of course, it's probably rare that there's more than one 756 * node in the queue at any one time, but it sometimes happens. 757 */ 758 759 static void 760 DAGExecutionThread(RF_ThreadArg_t arg) 761 { 762 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq; 763 RF_Raid_t *raidPtr; 764 int ks; 765 int s; 766 767 raidPtr = (RF_Raid_t *) arg; 768 769 if (rf_engineDebug) { 770 printf("raid%d: Engine thread is running\n", raidPtr->raidid); 771 } 772 773 s = splbio(); 774 775 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg); 776 777 DO_LOCK(raidPtr); 778 while (!raidPtr->shutdown_engine) { 779 780 while (raidPtr->node_queue != NULL) { 781 local_nq = raidPtr->node_queue; 782 fire_nq = NULL; 783 term_nq = NULL; 784 raidPtr->node_queue = NULL; 785 DO_UNLOCK(raidPtr); 786 787 /* first, strip out the terminal nodes */ 788 while (local_nq) { 789 nd = local_nq; 790 local_nq = local_nq->next; 791 switch (nd->dagHdr->status) { 792 case rf_enable: 793 case rf_rollForward: 794 if (nd->numSuccedents == 0) { 795 /* end of the dag, add to 796 * callback list */ 797 nd->next = term_nq; 798 term_nq = nd; 799 } else { 800 /* not the end, add to the 801 * fire queue */ 802 nd->next = fire_nq; 803 fire_nq = nd; 804 } 805 break; 806 case rf_rollBackward: 807 if (nd->numAntecedents == 0) { 808 /* end of the dag, add to the 809 * callback list */ 810 nd->next = term_nq; 811 term_nq = nd; 812 } else { 813 /* not the end, add to the 814 * fire queue */ 815 nd->next = fire_nq; 816 fire_nq = nd; 817 } 818 break; 819 default: 820 RF_PANIC(); 821 break; 822 } 823 } 824 825 /* execute callback of dags which have reached the 826 * terminal node */ 827 while (term_nq) { 828 nd = term_nq; 829 term_nq = term_nq->next; 830 nd->next = NULL; 831 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg); 832 raidPtr->dags_in_flight--; /* debug only */ 833 } 834 835 /* fire remaining nodes */ 836 FireNodeList(fire_nq); 837 838 DO_LOCK(raidPtr); 839 } 840 while (!raidPtr->shutdown_engine && 841 raidPtr->node_queue == NULL) { 842 DO_WAIT(raidPtr); 843 } 844 } 845 DO_UNLOCK(raidPtr); 846 847 RF_THREADGROUP_DONE(&raidPtr->engine_tg); 848 849 splx(s); 850 kthread_exit(0); 851 } 852 853 /* 854 * rf_RaidIOThread() -- When I/O to a component completes, 855 * KernelWakeupFunc() puts the completed request onto raidPtr->iodone 856 * TAILQ. This function looks after requests on that queue by calling 857 * rf_DiskIOComplete() for the request, and by calling any required 858 * CompleteFunc for the request. 859 */ 860 861 static void 862 rf_RaidIOThread(RF_ThreadArg_t arg) 863 { 864 RF_Raid_t *raidPtr; 865 RF_DiskQueueData_t *req; 866 int s; 867 868 raidPtr = (RF_Raid_t *) arg; 869 870 s = splbio(); 871 simple_lock(&(raidPtr->iodone_lock)); 872 873 while (!raidPtr->shutdown_raidio) { 874 /* if there is nothing to do, then snooze. */ 875 if (TAILQ_EMPTY(&(raidPtr->iodone))) { 876 ltsleep(&(raidPtr->iodone), PRIBIO, "raidiow", 0, 877 &(raidPtr->iodone_lock)); 878 } 879 880 /* See what I/Os, if any, have arrived */ 881 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) { 882 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries); 883 simple_unlock(&(raidPtr->iodone_lock)); 884 rf_DiskIOComplete(req->queue, req, req->error); 885 (req->CompleteFunc) (req->argument, req->error); 886 simple_lock(&(raidPtr->iodone_lock)); 887 } 888 } 889 890 /* Let rf_ShutdownEngine know that we're done... */ 891 raidPtr->shutdown_raidio = 0; 892 wakeup(&(raidPtr->shutdown_raidio)); 893 894 simple_unlock(&(raidPtr->iodone_lock)); 895 splx(s); 896 897 kthread_exit(0); 898 } 899