1 /* $NetBSD: rf_dagffwr.c,v 1.7 2001/11/13 07:11:13 lukem Exp $ */ 2 /* 3 * Copyright (c) 1995 Carnegie-Mellon University. 4 * All rights reserved. 5 * 6 * Author: Mark Holland, Daniel Stodolsky, William V. Courtright II 7 * 8 * Permission to use, copy, modify and distribute this software and 9 * its documentation is hereby granted, provided that both the copyright 10 * notice and this permission notice appear in all copies of the 11 * software, derivative works or modified versions, and any portions 12 * thereof, and that both notices appear in supporting documentation. 13 * 14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" 15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND 16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. 17 * 18 * Carnegie Mellon requests users of this software to return to 19 * 20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU 21 * School of Computer Science 22 * Carnegie Mellon University 23 * Pittsburgh PA 15213-3890 24 * 25 * any improvements or extensions that they make and grant Carnegie the 26 * rights to redistribute these changes. 27 */ 28 29 /* 30 * rf_dagff.c 31 * 32 * code for creating fault-free DAGs 33 * 34 */ 35 36 #include <sys/cdefs.h> 37 __KERNEL_RCSID(0, "$NetBSD: rf_dagffwr.c,v 1.7 2001/11/13 07:11:13 lukem Exp $"); 38 39 #include <dev/raidframe/raidframevar.h> 40 41 #include "rf_raid.h" 42 #include "rf_dag.h" 43 #include "rf_dagutils.h" 44 #include "rf_dagfuncs.h" 45 #include "rf_debugMem.h" 46 #include "rf_dagffrd.h" 47 #include "rf_memchunk.h" 48 #include "rf_general.h" 49 #include "rf_dagffwr.h" 50 51 /****************************************************************************** 52 * 53 * General comments on DAG creation: 54 * 55 * All DAGs in this file use roll-away error recovery. Each DAG has a single 56 * commit node, usually called "Cmt." If an error occurs before the Cmt node 57 * is reached, the execution engine will halt forward execution and work 58 * backward through the graph, executing the undo functions. Assuming that 59 * each node in the graph prior to the Cmt node are undoable and atomic - or - 60 * does not make changes to permanent state, the graph will fail atomically. 61 * If an error occurs after the Cmt node executes, the engine will roll-forward 62 * through the graph, blindly executing nodes until it reaches the end. 63 * If a graph reaches the end, it is assumed to have completed successfully. 64 * 65 * A graph has only 1 Cmt node. 66 * 67 */ 68 69 70 /****************************************************************************** 71 * 72 * The following wrappers map the standard DAG creation interface to the 73 * DAG creation routines. Additionally, these wrappers enable experimentation 74 * with new DAG structures by providing an extra level of indirection, allowing 75 * the DAG creation routines to be replaced at this single point. 76 */ 77 78 79 void 80 rf_CreateNonRedundantWriteDAG( 81 RF_Raid_t * raidPtr, 82 RF_AccessStripeMap_t * asmap, 83 RF_DagHeader_t * dag_h, 84 void *bp, 85 RF_RaidAccessFlags_t flags, 86 RF_AllocListElem_t * allocList, 87 RF_IoType_t type) 88 { 89 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 90 RF_IO_TYPE_WRITE); 91 } 92 93 void 94 rf_CreateRAID0WriteDAG( 95 RF_Raid_t * raidPtr, 96 RF_AccessStripeMap_t * asmap, 97 RF_DagHeader_t * dag_h, 98 void *bp, 99 RF_RaidAccessFlags_t flags, 100 RF_AllocListElem_t * allocList, 101 RF_IoType_t type) 102 { 103 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 104 RF_IO_TYPE_WRITE); 105 } 106 107 void 108 rf_CreateSmallWriteDAG( 109 RF_Raid_t * raidPtr, 110 RF_AccessStripeMap_t * asmap, 111 RF_DagHeader_t * dag_h, 112 void *bp, 113 RF_RaidAccessFlags_t flags, 114 RF_AllocListElem_t * allocList) 115 { 116 /* "normal" rollaway */ 117 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 118 &rf_xorFuncs, NULL); 119 } 120 121 void 122 rf_CreateLargeWriteDAG( 123 RF_Raid_t * raidPtr, 124 RF_AccessStripeMap_t * asmap, 125 RF_DagHeader_t * dag_h, 126 void *bp, 127 RF_RaidAccessFlags_t flags, 128 RF_AllocListElem_t * allocList) 129 { 130 /* "normal" rollaway */ 131 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags, allocList, 132 1, rf_RegularXorFunc, RF_TRUE); 133 } 134 135 136 /****************************************************************************** 137 * 138 * DAG creation code begins here 139 */ 140 141 142 /****************************************************************************** 143 * 144 * creates a DAG to perform a large-write operation: 145 * 146 * / Rod \ / Wnd \ 147 * H -- block- Rod - Xor - Cmt - Wnd --- T 148 * \ Rod / \ Wnp / 149 * \[Wnq]/ 150 * 151 * The XOR node also does the Q calculation in the P+Q architecture. 152 * All nodes are before the commit node (Cmt) are assumed to be atomic and 153 * undoable - or - they make no changes to permanent state. 154 * 155 * Rod = read old data 156 * Cmt = commit node 157 * Wnp = write new parity 158 * Wnd = write new data 159 * Wnq = write new "q" 160 * [] denotes optional segments in the graph 161 * 162 * Parameters: raidPtr - description of the physical array 163 * asmap - logical & physical addresses for this access 164 * bp - buffer ptr (holds write data) 165 * flags - general flags (e.g. disk locking) 166 * allocList - list of memory allocated in DAG creation 167 * nfaults - number of faults array can tolerate 168 * (equal to # redundancy units in stripe) 169 * redfuncs - list of redundancy generating functions 170 * 171 *****************************************************************************/ 172 173 void 174 rf_CommonCreateLargeWriteDAG( 175 RF_Raid_t * raidPtr, 176 RF_AccessStripeMap_t * asmap, 177 RF_DagHeader_t * dag_h, 178 void *bp, 179 RF_RaidAccessFlags_t flags, 180 RF_AllocListElem_t * allocList, 181 int nfaults, 182 int (*redFunc) (RF_DagNode_t *), 183 int allowBufferRecycle) 184 { 185 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 186 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode; 187 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 188 RF_AccessStripeMapHeader_t *new_asm_h[2]; 189 RF_StripeNum_t parityStripeID; 190 char *sosBuffer, *eosBuffer; 191 RF_ReconUnitNum_t which_ru; 192 RF_RaidLayout_t *layoutPtr; 193 RF_PhysDiskAddr_t *pda; 194 195 layoutPtr = &(raidPtr->Layout); 196 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr, asmap->raidAddress, 197 &which_ru); 198 199 if (rf_dagDebug) { 200 printf("[Creating large-write DAG]\n"); 201 } 202 dag_h->creator = "LargeWriteDAG"; 203 204 dag_h->numCommitNodes = 1; 205 dag_h->numCommits = 0; 206 dag_h->numSuccedents = 1; 207 208 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 209 nWndNodes = asmap->numStripeUnitsAccessed; 210 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), 211 (RF_DagNode_t *), allocList); 212 i = 0; 213 wndNodes = &nodes[i]; 214 i += nWndNodes; 215 xorNode = &nodes[i]; 216 i += 1; 217 wnpNode = &nodes[i]; 218 i += 1; 219 blockNode = &nodes[i]; 220 i += 1; 221 commitNode = &nodes[i]; 222 i += 1; 223 termNode = &nodes[i]; 224 i += 1; 225 if (nfaults == 2) { 226 wnqNode = &nodes[i]; 227 i += 1; 228 } else { 229 wnqNode = NULL; 230 } 231 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, 232 &nRodNodes, &sosBuffer, &eosBuffer, allocList); 233 if (nRodNodes > 0) { 234 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), 235 (RF_DagNode_t *), allocList); 236 } else { 237 rodNodes = NULL; 238 } 239 240 /* begin node initialization */ 241 if (nRodNodes > 0) { 242 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 243 NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 244 } else { 245 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 246 NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 247 } 248 249 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 250 nWndNodes + nfaults, 1, 0, 0, dag_h, "Cmt", allocList); 251 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 252 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 253 254 /* initialize the Rod nodes */ 255 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 256 if (new_asm_h[asmNum]) { 257 pda = new_asm_h[asmNum]->stripeMap->physInfo; 258 while (pda) { 259 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, 260 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 261 "Rod", allocList); 262 rodNodes[nodeNum].params[0].p = pda; 263 rodNodes[nodeNum].params[1].p = pda->bufPtr; 264 rodNodes[nodeNum].params[2].v = parityStripeID; 265 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 266 0, 0, which_ru); 267 nodeNum++; 268 pda = pda->next; 269 } 270 } 271 } 272 RF_ASSERT(nodeNum == nRodNodes); 273 274 /* initialize the wnd nodes */ 275 pda = asmap->physInfo; 276 for (i = 0; i < nWndNodes; i++) { 277 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 278 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 279 RF_ASSERT(pda != NULL); 280 wndNodes[i].params[0].p = pda; 281 wndNodes[i].params[1].p = pda->bufPtr; 282 wndNodes[i].params[2].v = parityStripeID; 283 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 284 pda = pda->next; 285 } 286 287 /* initialize the redundancy node */ 288 if (nRodNodes > 0) { 289 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 290 nRodNodes, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, 291 "Xr ", allocList); 292 } else { 293 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, 294 1, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 295 } 296 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 297 for (i = 0; i < nWndNodes; i++) { 298 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 299 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 300 } 301 for (i = 0; i < nRodNodes; i++) { 302 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 303 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 304 } 305 /* xor node needs to get at RAID information */ 306 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; 307 308 /* 309 * Look for an Rod node that reads a complete SU. If none, alloc a buffer 310 * to receive the parity info. Note that we can't use a new data buffer 311 * because it will not have gotten written when the xor occurs. 312 */ 313 if (allowBufferRecycle) { 314 for (i = 0; i < nRodNodes; i++) { 315 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 316 break; 317 } 318 } 319 if ((!allowBufferRecycle) || (i == nRodNodes)) { 320 RF_CallocAndAdd(xorNode->results[0], 1, 321 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 322 (void *), allocList); 323 } else { 324 xorNode->results[0] = rodNodes[i].params[1].p; 325 } 326 327 /* initialize the Wnp node */ 328 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 329 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 330 wnpNode->params[0].p = asmap->parityInfo; 331 wnpNode->params[1].p = xorNode->results[0]; 332 wnpNode->params[2].v = parityStripeID; 333 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 334 /* parityInfo must describe entire parity unit */ 335 RF_ASSERT(asmap->parityInfo->next == NULL); 336 337 if (nfaults == 2) { 338 /* 339 * We never try to recycle a buffer for the Q calcuation 340 * in addition to the parity. This would cause two buffers 341 * to get smashed during the P and Q calculation, guaranteeing 342 * one would be wrong. 343 */ 344 RF_CallocAndAdd(xorNode->results[1], 1, 345 rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), 346 (void *), allocList); 347 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 348 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 349 wnqNode->params[0].p = asmap->qInfo; 350 wnqNode->params[1].p = xorNode->results[1]; 351 wnqNode->params[2].v = parityStripeID; 352 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 353 /* parityInfo must describe entire parity unit */ 354 RF_ASSERT(asmap->parityInfo->next == NULL); 355 } 356 /* 357 * Connect nodes to form graph. 358 */ 359 360 /* connect dag header to block node */ 361 RF_ASSERT(blockNode->numAntecedents == 0); 362 dag_h->succedents[0] = blockNode; 363 364 if (nRodNodes > 0) { 365 /* connect the block node to the Rod nodes */ 366 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 367 RF_ASSERT(xorNode->numAntecedents == nRodNodes); 368 for (i = 0; i < nRodNodes; i++) { 369 RF_ASSERT(rodNodes[i].numAntecedents == 1); 370 blockNode->succedents[i] = &rodNodes[i]; 371 rodNodes[i].antecedents[0] = blockNode; 372 rodNodes[i].antType[0] = rf_control; 373 374 /* connect the Rod nodes to the Xor node */ 375 RF_ASSERT(rodNodes[i].numSuccedents == 1); 376 rodNodes[i].succedents[0] = xorNode; 377 xorNode->antecedents[i] = &rodNodes[i]; 378 xorNode->antType[i] = rf_trueData; 379 } 380 } else { 381 /* connect the block node to the Xor node */ 382 RF_ASSERT(blockNode->numSuccedents == 1); 383 RF_ASSERT(xorNode->numAntecedents == 1); 384 blockNode->succedents[0] = xorNode; 385 xorNode->antecedents[0] = blockNode; 386 xorNode->antType[0] = rf_control; 387 } 388 389 /* connect the xor node to the commit node */ 390 RF_ASSERT(xorNode->numSuccedents == 1); 391 RF_ASSERT(commitNode->numAntecedents == 1); 392 xorNode->succedents[0] = commitNode; 393 commitNode->antecedents[0] = xorNode; 394 commitNode->antType[0] = rf_control; 395 396 /* connect the commit node to the write nodes */ 397 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults); 398 for (i = 0; i < nWndNodes; i++) { 399 RF_ASSERT(wndNodes->numAntecedents == 1); 400 commitNode->succedents[i] = &wndNodes[i]; 401 wndNodes[i].antecedents[0] = commitNode; 402 wndNodes[i].antType[0] = rf_control; 403 } 404 RF_ASSERT(wnpNode->numAntecedents == 1); 405 commitNode->succedents[nWndNodes] = wnpNode; 406 wnpNode->antecedents[0] = commitNode; 407 wnpNode->antType[0] = rf_trueData; 408 if (nfaults == 2) { 409 RF_ASSERT(wnqNode->numAntecedents == 1); 410 commitNode->succedents[nWndNodes + 1] = wnqNode; 411 wnqNode->antecedents[0] = commitNode; 412 wnqNode->antType[0] = rf_trueData; 413 } 414 /* connect the write nodes to the term node */ 415 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 416 RF_ASSERT(termNode->numSuccedents == 0); 417 for (i = 0; i < nWndNodes; i++) { 418 RF_ASSERT(wndNodes->numSuccedents == 1); 419 wndNodes[i].succedents[0] = termNode; 420 termNode->antecedents[i] = &wndNodes[i]; 421 termNode->antType[i] = rf_control; 422 } 423 RF_ASSERT(wnpNode->numSuccedents == 1); 424 wnpNode->succedents[0] = termNode; 425 termNode->antecedents[nWndNodes] = wnpNode; 426 termNode->antType[nWndNodes] = rf_control; 427 if (nfaults == 2) { 428 RF_ASSERT(wnqNode->numSuccedents == 1); 429 wnqNode->succedents[0] = termNode; 430 termNode->antecedents[nWndNodes + 1] = wnqNode; 431 termNode->antType[nWndNodes + 1] = rf_control; 432 } 433 } 434 /****************************************************************************** 435 * 436 * creates a DAG to perform a small-write operation (either raid 5 or pq), 437 * which is as follows: 438 * 439 * Hdr -> Nil -> Rop -> Xor -> Cmt ----> Wnp [Unp] --> Trm 440 * \- Rod X / \----> Wnd [Und]-/ 441 * [\- Rod X / \---> Wnd [Und]-/] 442 * [\- Roq -> Q / \--> Wnq [Unq]-/] 443 * 444 * Rop = read old parity 445 * Rod = read old data 446 * Roq = read old "q" 447 * Cmt = commit node 448 * Und = unlock data disk 449 * Unp = unlock parity disk 450 * Unq = unlock q disk 451 * Wnp = write new parity 452 * Wnd = write new data 453 * Wnq = write new "q" 454 * [ ] denotes optional segments in the graph 455 * 456 * Parameters: raidPtr - description of the physical array 457 * asmap - logical & physical addresses for this access 458 * bp - buffer ptr (holds write data) 459 * flags - general flags (e.g. disk locking) 460 * allocList - list of memory allocated in DAG creation 461 * pfuncs - list of parity generating functions 462 * qfuncs - list of q generating functions 463 * 464 * A null qfuncs indicates single fault tolerant 465 *****************************************************************************/ 466 467 void 468 rf_CommonCreateSmallWriteDAG( 469 RF_Raid_t * raidPtr, 470 RF_AccessStripeMap_t * asmap, 471 RF_DagHeader_t * dag_h, 472 void *bp, 473 RF_RaidAccessFlags_t flags, 474 RF_AllocListElem_t * allocList, 475 RF_RedFuncs_t * pfuncs, 476 RF_RedFuncs_t * qfuncs) 477 { 478 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 479 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 480 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes; 481 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 482 int i, j, nNodes, totalNumNodes, lu_flag; 483 RF_ReconUnitNum_t which_ru; 484 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 485 int (*qfunc) (RF_DagNode_t *); 486 int numDataNodes, numParityNodes; 487 RF_StripeNum_t parityStripeID; 488 RF_PhysDiskAddr_t *pda; 489 char *name, *qname; 490 long nfaults; 491 492 nfaults = qfuncs ? 2 : 1; 493 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 494 495 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 496 asmap->raidAddress, &which_ru); 497 pda = asmap->physInfo; 498 numDataNodes = asmap->numStripeUnitsAccessed; 499 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 500 501 if (rf_dagDebug) { 502 printf("[Creating small-write DAG]\n"); 503 } 504 RF_ASSERT(numDataNodes > 0); 505 dag_h->creator = "SmallWriteDAG"; 506 507 dag_h->numCommitNodes = 1; 508 dag_h->numCommits = 0; 509 dag_h->numSuccedents = 1; 510 511 /* 512 * DAG creation occurs in four steps: 513 * 1. count the number of nodes in the DAG 514 * 2. create the nodes 515 * 3. initialize the nodes 516 * 4. connect the nodes 517 */ 518 519 /* 520 * Step 1. compute number of nodes in the graph 521 */ 522 523 /* number of nodes: a read and write for each data unit a redundancy 524 * computation node for each parity node (nfaults * nparity) a read 525 * and write for each parity unit a block and commit node (2) a 526 * terminate node if atomic RMW an unlock node for each data unit, 527 * redundancy unit */ 528 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) 529 + (nfaults * 2 * numParityNodes) + 3; 530 if (lu_flag) { 531 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 532 } 533 /* 534 * Step 2. create the nodes 535 */ 536 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), 537 (RF_DagNode_t *), allocList); 538 i = 0; 539 blockNode = &nodes[i]; 540 i += 1; 541 commitNode = &nodes[i]; 542 i += 1; 543 readDataNodes = &nodes[i]; 544 i += numDataNodes; 545 readParityNodes = &nodes[i]; 546 i += numParityNodes; 547 writeDataNodes = &nodes[i]; 548 i += numDataNodes; 549 writeParityNodes = &nodes[i]; 550 i += numParityNodes; 551 xorNodes = &nodes[i]; 552 i += numParityNodes; 553 termNode = &nodes[i]; 554 i += 1; 555 if (lu_flag) { 556 unlockDataNodes = &nodes[i]; 557 i += numDataNodes; 558 unlockParityNodes = &nodes[i]; 559 i += numParityNodes; 560 } else { 561 unlockDataNodes = unlockParityNodes = NULL; 562 } 563 if (nfaults == 2) { 564 readQNodes = &nodes[i]; 565 i += numParityNodes; 566 writeQNodes = &nodes[i]; 567 i += numParityNodes; 568 qNodes = &nodes[i]; 569 i += numParityNodes; 570 if (lu_flag) { 571 unlockQNodes = &nodes[i]; 572 i += numParityNodes; 573 } else { 574 unlockQNodes = NULL; 575 } 576 } else { 577 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 578 } 579 RF_ASSERT(i == totalNumNodes); 580 581 /* 582 * Step 3. initialize the nodes 583 */ 584 /* initialize block node (Nil) */ 585 nNodes = numDataNodes + (nfaults * numParityNodes); 586 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 587 NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 588 589 /* initialize commit node (Cmt) */ 590 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 591 NULL, nNodes, (nfaults * numParityNodes), 0, 0, dag_h, "Cmt", allocList); 592 593 /* initialize terminate node (Trm) */ 594 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 595 NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 596 597 /* initialize nodes which read old data (Rod) */ 598 for (i = 0; i < numDataNodes; i++) { 599 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 600 rf_GenericWakeupFunc, (nfaults * numParityNodes), 1, 4, 0, dag_h, 601 "Rod", allocList); 602 RF_ASSERT(pda != NULL); 603 /* physical disk addr desc */ 604 readDataNodes[i].params[0].p = pda; 605 /* buffer to hold old data */ 606 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 607 dag_h, pda, allocList); 608 readDataNodes[i].params[2].v = parityStripeID; 609 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 610 lu_flag, 0, which_ru); 611 pda = pda->next; 612 for (j = 0; j < readDataNodes[i].numSuccedents; j++) { 613 readDataNodes[i].propList[j] = NULL; 614 } 615 } 616 617 /* initialize nodes which read old parity (Rop) */ 618 pda = asmap->parityInfo; 619 i = 0; 620 for (i = 0; i < numParityNodes; i++) { 621 RF_ASSERT(pda != NULL); 622 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, 623 rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 624 0, dag_h, "Rop", allocList); 625 readParityNodes[i].params[0].p = pda; 626 /* buffer to hold old parity */ 627 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, 628 dag_h, pda, allocList); 629 readParityNodes[i].params[2].v = parityStripeID; 630 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 631 lu_flag, 0, which_ru); 632 pda = pda->next; 633 for (j = 0; j < readParityNodes[i].numSuccedents; j++) { 634 readParityNodes[i].propList[0] = NULL; 635 } 636 } 637 638 /* initialize nodes which read old Q (Roq) */ 639 if (nfaults == 2) { 640 pda = asmap->qInfo; 641 for (i = 0; i < numParityNodes; i++) { 642 RF_ASSERT(pda != NULL); 643 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, 644 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 645 readQNodes[i].params[0].p = pda; 646 /* buffer to hold old Q */ 647 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, 648 allocList); 649 readQNodes[i].params[2].v = parityStripeID; 650 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 651 lu_flag, 0, which_ru); 652 pda = pda->next; 653 for (j = 0; j < readQNodes[i].numSuccedents; j++) { 654 readQNodes[i].propList[0] = NULL; 655 } 656 } 657 } 658 /* initialize nodes which write new data (Wnd) */ 659 pda = asmap->physInfo; 660 for (i = 0; i < numDataNodes; i++) { 661 RF_ASSERT(pda != NULL); 662 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 663 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 664 "Wnd", allocList); 665 /* physical disk addr desc */ 666 writeDataNodes[i].params[0].p = pda; 667 /* buffer holding new data to be written */ 668 writeDataNodes[i].params[1].p = pda->bufPtr; 669 writeDataNodes[i].params[2].v = parityStripeID; 670 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 671 0, 0, which_ru); 672 if (lu_flag) { 673 /* initialize node to unlock the disk queue */ 674 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 675 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 676 "Und", allocList); 677 /* physical disk addr desc */ 678 unlockDataNodes[i].params[0].p = pda; 679 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 680 0, lu_flag, which_ru); 681 } 682 pda = pda->next; 683 } 684 685 /* 686 * Initialize nodes which compute new parity and Q. 687 */ 688 /* 689 * We use the simple XOR func in the double-XOR case, and when 690 * we're accessing only a portion of one stripe unit. The distinction 691 * between the two is that the regular XOR func assumes that the targbuf 692 * is a full SU in size, and examines the pda associated with the buffer 693 * to decide where within the buffer to XOR the data, whereas 694 * the simple XOR func just XORs the data into the start of the buffer. 695 */ 696 if ((numParityNodes == 2) || ((numDataNodes == 1) 697 && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 698 func = pfuncs->simple; 699 undoFunc = rf_NullNodeUndoFunc; 700 name = pfuncs->SimpleName; 701 if (qfuncs) { 702 qfunc = qfuncs->simple; 703 qname = qfuncs->SimpleName; 704 } else { 705 qfunc = NULL; 706 qname = NULL; 707 } 708 } else { 709 func = pfuncs->regular; 710 undoFunc = rf_NullNodeUndoFunc; 711 name = pfuncs->RegularName; 712 if (qfuncs) { 713 qfunc = qfuncs->regular; 714 qname = qfuncs->RegularName; 715 } else { 716 qfunc = NULL; 717 qname = NULL; 718 } 719 } 720 /* 721 * Initialize the xor nodes: params are {pda,buf} 722 * from {Rod,Wnd,Rop} nodes, and raidPtr 723 */ 724 if (numParityNodes == 2) { 725 /* double-xor case */ 726 for (i = 0; i < numParityNodes; i++) { 727 /* note: no wakeup func for xor */ 728 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, 729 1, (numDataNodes + numParityNodes), 7, 1, dag_h, name, allocList); 730 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 731 xorNodes[i].params[0] = readDataNodes[i].params[0]; 732 xorNodes[i].params[1] = readDataNodes[i].params[1]; 733 xorNodes[i].params[2] = readParityNodes[i].params[0]; 734 xorNodes[i].params[3] = readParityNodes[i].params[1]; 735 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 736 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 737 xorNodes[i].params[6].p = raidPtr; 738 /* use old parity buf as target buf */ 739 xorNodes[i].results[0] = readParityNodes[i].params[1].p; 740 if (nfaults == 2) { 741 /* note: no wakeup func for qor */ 742 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 743 (numDataNodes + numParityNodes), 7, 1, dag_h, qname, allocList); 744 qNodes[i].params[0] = readDataNodes[i].params[0]; 745 qNodes[i].params[1] = readDataNodes[i].params[1]; 746 qNodes[i].params[2] = readQNodes[i].params[0]; 747 qNodes[i].params[3] = readQNodes[i].params[1]; 748 qNodes[i].params[4] = writeDataNodes[i].params[0]; 749 qNodes[i].params[5] = writeDataNodes[i].params[1]; 750 qNodes[i].params[6].p = raidPtr; 751 /* use old Q buf as target buf */ 752 qNodes[i].results[0] = readQNodes[i].params[1].p; 753 } 754 } 755 } else { 756 /* there is only one xor node in this case */ 757 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, 1, 758 (numDataNodes + numParityNodes), 759 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 760 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 761 for (i = 0; i < numDataNodes + 1; i++) { 762 /* set up params related to Rod and Rop nodes */ 763 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 764 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 765 } 766 for (i = 0; i < numDataNodes; i++) { 767 /* set up params related to Wnd and Wnp nodes */ 768 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 769 writeDataNodes[i].params[0]; 770 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 771 writeDataNodes[i].params[1]; 772 } 773 /* xor node needs to get at RAID information */ 774 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 775 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 776 if (nfaults == 2) { 777 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, 1, 778 (numDataNodes + numParityNodes), 779 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, 780 qname, allocList); 781 for (i = 0; i < numDataNodes; i++) { 782 /* set up params related to Rod */ 783 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 784 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer ptr */ 785 } 786 /* and read old q */ 787 qNodes[0].params[2 * numDataNodes + 0] = /* pda */ 788 readQNodes[0].params[0]; 789 qNodes[0].params[2 * numDataNodes + 1] = /* buffer ptr */ 790 readQNodes[0].params[1]; 791 for (i = 0; i < numDataNodes; i++) { 792 /* set up params related to Wnd nodes */ 793 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = /* pda */ 794 writeDataNodes[i].params[0]; 795 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = /* buffer ptr */ 796 writeDataNodes[i].params[1]; 797 } 798 /* xor node needs to get at RAID information */ 799 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; 800 qNodes[0].results[0] = readQNodes[0].params[1].p; 801 } 802 } 803 804 /* initialize nodes which write new parity (Wnp) */ 805 pda = asmap->parityInfo; 806 for (i = 0; i < numParityNodes; i++) { 807 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 808 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 809 "Wnp", allocList); 810 RF_ASSERT(pda != NULL); 811 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 812 * filled in by xor node */ 813 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 814 * parity write 815 * operation */ 816 writeParityNodes[i].params[2].v = parityStripeID; 817 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 818 0, 0, which_ru); 819 if (lu_flag) { 820 /* initialize node to unlock the disk queue */ 821 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 822 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 823 "Unp", allocList); 824 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 825 * desc */ 826 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 827 0, lu_flag, which_ru); 828 } 829 pda = pda->next; 830 } 831 832 /* initialize nodes which write new Q (Wnq) */ 833 if (nfaults == 2) { 834 pda = asmap->qInfo; 835 for (i = 0; i < numParityNodes; i++) { 836 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, 837 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, 838 "Wnq", allocList); 839 RF_ASSERT(pda != NULL); 840 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 841 * filled in by xor node */ 842 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 843 * parity write 844 * operation */ 845 writeQNodes[i].params[2].v = parityStripeID; 846 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 847 0, 0, which_ru); 848 if (lu_flag) { 849 /* initialize node to unlock the disk queue */ 850 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, 851 rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, 852 "Unq", allocList); 853 unlockQNodes[i].params[0].p = pda; /* physical disk addr 854 * desc */ 855 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 856 0, lu_flag, which_ru); 857 } 858 pda = pda->next; 859 } 860 } 861 /* 862 * Step 4. connect the nodes. 863 */ 864 865 /* connect header to block node */ 866 dag_h->succedents[0] = blockNode; 867 868 /* connect block node to read old data nodes */ 869 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 870 for (i = 0; i < numDataNodes; i++) { 871 blockNode->succedents[i] = &readDataNodes[i]; 872 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 873 readDataNodes[i].antecedents[0] = blockNode; 874 readDataNodes[i].antType[0] = rf_control; 875 } 876 877 /* connect block node to read old parity nodes */ 878 for (i = 0; i < numParityNodes; i++) { 879 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 880 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 881 readParityNodes[i].antecedents[0] = blockNode; 882 readParityNodes[i].antType[0] = rf_control; 883 } 884 885 /* connect block node to read old Q nodes */ 886 if (nfaults == 2) { 887 for (i = 0; i < numParityNodes; i++) { 888 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 889 RF_ASSERT(readQNodes[i].numAntecedents == 1); 890 readQNodes[i].antecedents[0] = blockNode; 891 readQNodes[i].antType[0] = rf_control; 892 } 893 } 894 /* connect read old data nodes to xor nodes */ 895 for (i = 0; i < numDataNodes; i++) { 896 RF_ASSERT(readDataNodes[i].numSuccedents == (nfaults * numParityNodes)); 897 for (j = 0; j < numParityNodes; j++) { 898 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 899 readDataNodes[i].succedents[j] = &xorNodes[j]; 900 xorNodes[j].antecedents[i] = &readDataNodes[i]; 901 xorNodes[j].antType[i] = rf_trueData; 902 } 903 } 904 905 /* connect read old data nodes to q nodes */ 906 if (nfaults == 2) { 907 for (i = 0; i < numDataNodes; i++) { 908 for (j = 0; j < numParityNodes; j++) { 909 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 910 readDataNodes[i].succedents[numParityNodes + j] = &qNodes[j]; 911 qNodes[j].antecedents[i] = &readDataNodes[i]; 912 qNodes[j].antType[i] = rf_trueData; 913 } 914 } 915 } 916 /* connect read old parity nodes to xor nodes */ 917 for (i = 0; i < numParityNodes; i++) { 918 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 919 for (j = 0; j < numParityNodes; j++) { 920 readParityNodes[i].succedents[j] = &xorNodes[j]; 921 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 922 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 923 } 924 } 925 926 /* connect read old q nodes to q nodes */ 927 if (nfaults == 2) { 928 for (i = 0; i < numParityNodes; i++) { 929 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 930 for (j = 0; j < numParityNodes; j++) { 931 readQNodes[i].succedents[j] = &qNodes[j]; 932 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 933 qNodes[j].antType[numDataNodes + i] = rf_trueData; 934 } 935 } 936 } 937 /* connect xor nodes to commit node */ 938 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes)); 939 for (i = 0; i < numParityNodes; i++) { 940 RF_ASSERT(xorNodes[i].numSuccedents == 1); 941 xorNodes[i].succedents[0] = commitNode; 942 commitNode->antecedents[i] = &xorNodes[i]; 943 commitNode->antType[i] = rf_control; 944 } 945 946 /* connect q nodes to commit node */ 947 if (nfaults == 2) { 948 for (i = 0; i < numParityNodes; i++) { 949 RF_ASSERT(qNodes[i].numSuccedents == 1); 950 qNodes[i].succedents[0] = commitNode; 951 commitNode->antecedents[i + numParityNodes] = &qNodes[i]; 952 commitNode->antType[i + numParityNodes] = rf_control; 953 } 954 } 955 /* connect commit node to write nodes */ 956 RF_ASSERT(commitNode->numSuccedents == (numDataNodes + (nfaults * numParityNodes))); 957 for (i = 0; i < numDataNodes; i++) { 958 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 959 commitNode->succedents[i] = &writeDataNodes[i]; 960 writeDataNodes[i].antecedents[0] = commitNode; 961 writeDataNodes[i].antType[0] = rf_trueData; 962 } 963 for (i = 0; i < numParityNodes; i++) { 964 RF_ASSERT(writeParityNodes[i].numAntecedents == 1); 965 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i]; 966 writeParityNodes[i].antecedents[0] = commitNode; 967 writeParityNodes[i].antType[0] = rf_trueData; 968 } 969 if (nfaults == 2) { 970 for (i = 0; i < numParityNodes; i++) { 971 RF_ASSERT(writeQNodes[i].numAntecedents == 1); 972 commitNode->succedents[i + numDataNodes + numParityNodes] = &writeQNodes[i]; 973 writeQNodes[i].antecedents[0] = commitNode; 974 writeQNodes[i].antType[0] = rf_trueData; 975 } 976 } 977 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 978 RF_ASSERT(termNode->numSuccedents == 0); 979 for (i = 0; i < numDataNodes; i++) { 980 if (lu_flag) { 981 /* connect write new data nodes to unlock nodes */ 982 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 983 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 984 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 985 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 986 unlockDataNodes[i].antType[0] = rf_control; 987 988 /* connect unlock nodes to term node */ 989 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 990 unlockDataNodes[i].succedents[0] = termNode; 991 termNode->antecedents[i] = &unlockDataNodes[i]; 992 termNode->antType[i] = rf_control; 993 } else { 994 /* connect write new data nodes to term node */ 995 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 996 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 997 writeDataNodes[i].succedents[0] = termNode; 998 termNode->antecedents[i] = &writeDataNodes[i]; 999 termNode->antType[i] = rf_control; 1000 } 1001 } 1002 1003 for (i = 0; i < numParityNodes; i++) { 1004 if (lu_flag) { 1005 /* connect write new parity nodes to unlock nodes */ 1006 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1007 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1008 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1009 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1010 unlockParityNodes[i].antType[0] = rf_control; 1011 1012 /* connect unlock nodes to term node */ 1013 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1014 unlockParityNodes[i].succedents[0] = termNode; 1015 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1016 termNode->antType[numDataNodes + i] = rf_control; 1017 } else { 1018 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1019 writeParityNodes[i].succedents[0] = termNode; 1020 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1021 termNode->antType[numDataNodes + i] = rf_control; 1022 } 1023 } 1024 1025 if (nfaults == 2) { 1026 for (i = 0; i < numParityNodes; i++) { 1027 if (lu_flag) { 1028 /* connect write new Q nodes to unlock nodes */ 1029 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1030 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1031 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1032 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1033 unlockQNodes[i].antType[0] = rf_control; 1034 1035 /* connect unlock nodes to unblock node */ 1036 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1037 unlockQNodes[i].succedents[0] = termNode; 1038 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1039 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1040 } else { 1041 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1042 writeQNodes[i].succedents[0] = termNode; 1043 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1044 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1045 } 1046 } 1047 } 1048 } 1049 1050 1051 /****************************************************************************** 1052 * create a write graph (fault-free or degraded) for RAID level 1 1053 * 1054 * Hdr -> Commit -> Wpd -> Nil -> Trm 1055 * -> Wsd -> 1056 * 1057 * The "Wpd" node writes data to the primary copy in the mirror pair 1058 * The "Wsd" node writes data to the secondary copy in the mirror pair 1059 * 1060 * Parameters: raidPtr - description of the physical array 1061 * asmap - logical & physical addresses for this access 1062 * bp - buffer ptr (holds write data) 1063 * flags - general flags (e.g. disk locking) 1064 * allocList - list of memory allocated in DAG creation 1065 *****************************************************************************/ 1066 1067 void 1068 rf_CreateRaidOneWriteDAG( 1069 RF_Raid_t * raidPtr, 1070 RF_AccessStripeMap_t * asmap, 1071 RF_DagHeader_t * dag_h, 1072 void *bp, 1073 RF_RaidAccessFlags_t flags, 1074 RF_AllocListElem_t * allocList) 1075 { 1076 RF_DagNode_t *unblockNode, *termNode, *commitNode; 1077 RF_DagNode_t *nodes, *wndNode, *wmirNode; 1078 int nWndNodes, nWmirNodes, i; 1079 RF_ReconUnitNum_t which_ru; 1080 RF_PhysDiskAddr_t *pda, *pdaP; 1081 RF_StripeNum_t parityStripeID; 1082 1083 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 1084 asmap->raidAddress, &which_ru); 1085 if (rf_dagDebug) { 1086 printf("[Creating RAID level 1 write DAG]\n"); 1087 } 1088 dag_h->creator = "RaidOneWriteDAG"; 1089 1090 /* 2 implies access not SU aligned */ 1091 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; 1092 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 1093 1094 /* alloc the Wnd nodes and the Wmir node */ 1095 if (asmap->numDataFailed == 1) 1096 nWndNodes--; 1097 if (asmap->numParityFailed == 1) 1098 nWmirNodes--; 1099 1100 /* total number of nodes = nWndNodes + nWmirNodes + (commit + unblock 1101 * + terminator) */ 1102 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), 1103 (RF_DagNode_t *), allocList); 1104 i = 0; 1105 wndNode = &nodes[i]; 1106 i += nWndNodes; 1107 wmirNode = &nodes[i]; 1108 i += nWmirNodes; 1109 commitNode = &nodes[i]; 1110 i += 1; 1111 unblockNode = &nodes[i]; 1112 i += 1; 1113 termNode = &nodes[i]; 1114 i += 1; 1115 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 1116 1117 /* this dag can commit immediately */ 1118 dag_h->numCommitNodes = 1; 1119 dag_h->numCommits = 0; 1120 dag_h->numSuccedents = 1; 1121 1122 /* initialize the commit, unblock, and term nodes */ 1123 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1124 NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Cmt", allocList); 1125 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, 1126 NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 1127 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, 1128 NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 1129 1130 /* initialize the wnd nodes */ 1131 if (nWndNodes > 0) { 1132 pda = asmap->physInfo; 1133 for (i = 0; i < nWndNodes; i++) { 1134 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1135 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 1136 RF_ASSERT(pda != NULL); 1137 wndNode[i].params[0].p = pda; 1138 wndNode[i].params[1].p = pda->bufPtr; 1139 wndNode[i].params[2].v = parityStripeID; 1140 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1141 pda = pda->next; 1142 } 1143 RF_ASSERT(pda == NULL); 1144 } 1145 /* initialize the mirror nodes */ 1146 if (nWmirNodes > 0) { 1147 pda = asmap->physInfo; 1148 pdaP = asmap->parityInfo; 1149 for (i = 0; i < nWmirNodes; i++) { 1150 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, 1151 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 1152 RF_ASSERT(pda != NULL); 1153 wmirNode[i].params[0].p = pdaP; 1154 wmirNode[i].params[1].p = pda->bufPtr; 1155 wmirNode[i].params[2].v = parityStripeID; 1156 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1157 pda = pda->next; 1158 pdaP = pdaP->next; 1159 } 1160 RF_ASSERT(pda == NULL); 1161 RF_ASSERT(pdaP == NULL); 1162 } 1163 /* link the header node to the commit node */ 1164 RF_ASSERT(dag_h->numSuccedents == 1); 1165 RF_ASSERT(commitNode->numAntecedents == 0); 1166 dag_h->succedents[0] = commitNode; 1167 1168 /* link the commit node to the write nodes */ 1169 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes)); 1170 for (i = 0; i < nWndNodes; i++) { 1171 RF_ASSERT(wndNode[i].numAntecedents == 1); 1172 commitNode->succedents[i] = &wndNode[i]; 1173 wndNode[i].antecedents[0] = commitNode; 1174 wndNode[i].antType[0] = rf_control; 1175 } 1176 for (i = 0; i < nWmirNodes; i++) { 1177 RF_ASSERT(wmirNode[i].numAntecedents == 1); 1178 commitNode->succedents[i + nWndNodes] = &wmirNode[i]; 1179 wmirNode[i].antecedents[0] = commitNode; 1180 wmirNode[i].antType[0] = rf_control; 1181 } 1182 1183 /* link the write nodes to the unblock node */ 1184 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 1185 for (i = 0; i < nWndNodes; i++) { 1186 RF_ASSERT(wndNode[i].numSuccedents == 1); 1187 wndNode[i].succedents[0] = unblockNode; 1188 unblockNode->antecedents[i] = &wndNode[i]; 1189 unblockNode->antType[i] = rf_control; 1190 } 1191 for (i = 0; i < nWmirNodes; i++) { 1192 RF_ASSERT(wmirNode[i].numSuccedents == 1); 1193 wmirNode[i].succedents[0] = unblockNode; 1194 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 1195 unblockNode->antType[i + nWndNodes] = rf_control; 1196 } 1197 1198 /* link the unblock node to the term node */ 1199 RF_ASSERT(unblockNode->numSuccedents == 1); 1200 RF_ASSERT(termNode->numAntecedents == 1); 1201 RF_ASSERT(termNode->numSuccedents == 0); 1202 unblockNode->succedents[0] = termNode; 1203 termNode->antecedents[0] = unblockNode; 1204 termNode->antType[0] = rf_control; 1205 } 1206 1207 1208 1209 /* DAGs which have no commit points. 1210 * 1211 * The following DAGs are used in forward and backward error recovery experiments. 1212 * They are identical to the DAGs above this comment with the exception that the 1213 * the commit points have been removed. 1214 */ 1215 1216 1217 1218 void 1219 rf_CommonCreateLargeWriteDAGFwd( 1220 RF_Raid_t * raidPtr, 1221 RF_AccessStripeMap_t * asmap, 1222 RF_DagHeader_t * dag_h, 1223 void *bp, 1224 RF_RaidAccessFlags_t flags, 1225 RF_AllocListElem_t * allocList, 1226 int nfaults, 1227 int (*redFunc) (RF_DagNode_t *), 1228 int allowBufferRecycle) 1229 { 1230 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode; 1231 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode; 1232 int nWndNodes, nRodNodes, i, nodeNum, asmNum; 1233 RF_AccessStripeMapHeader_t *new_asm_h[2]; 1234 RF_StripeNum_t parityStripeID; 1235 char *sosBuffer, *eosBuffer; 1236 RF_ReconUnitNum_t which_ru; 1237 RF_RaidLayout_t *layoutPtr; 1238 RF_PhysDiskAddr_t *pda; 1239 1240 layoutPtr = &(raidPtr->Layout); 1241 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); 1242 1243 if (rf_dagDebug) 1244 printf("[Creating large-write DAG]\n"); 1245 dag_h->creator = "LargeWriteDAGFwd"; 1246 1247 dag_h->numCommitNodes = 0; 1248 dag_h->numCommits = 0; 1249 dag_h->numSuccedents = 1; 1250 1251 /* alloc the nodes: Wnd, xor, commit, block, term, and Wnp */ 1252 nWndNodes = asmap->numStripeUnitsAccessed; 1253 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1254 i = 0; 1255 wndNodes = &nodes[i]; 1256 i += nWndNodes; 1257 xorNode = &nodes[i]; 1258 i += 1; 1259 wnpNode = &nodes[i]; 1260 i += 1; 1261 blockNode = &nodes[i]; 1262 i += 1; 1263 syncNode = &nodes[i]; 1264 i += 1; 1265 termNode = &nodes[i]; 1266 i += 1; 1267 if (nfaults == 2) { 1268 wnqNode = &nodes[i]; 1269 i += 1; 1270 } else { 1271 wnqNode = NULL; 1272 } 1273 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h, new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList); 1274 if (nRodNodes > 0) { 1275 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1276 } else { 1277 rodNodes = NULL; 1278 } 1279 1280 /* begin node initialization */ 1281 if (nRodNodes > 0) { 1282 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h, "Nil", allocList); 1283 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0, dag_h, "Nil", allocList); 1284 } else { 1285 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil", allocList); 1286 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h, "Nil", allocList); 1287 } 1288 1289 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0, dag_h, "Trm", allocList); 1290 1291 /* initialize the Rod nodes */ 1292 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) { 1293 if (new_asm_h[asmNum]) { 1294 pda = new_asm_h[asmNum]->stripeMap->physInfo; 1295 while (pda) { 1296 rf_InitNode(&rodNodes[nodeNum], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Rod", allocList); 1297 rodNodes[nodeNum].params[0].p = pda; 1298 rodNodes[nodeNum].params[1].p = pda->bufPtr; 1299 rodNodes[nodeNum].params[2].v = parityStripeID; 1300 rodNodes[nodeNum].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1301 nodeNum++; 1302 pda = pda->next; 1303 } 1304 } 1305 } 1306 RF_ASSERT(nodeNum == nRodNodes); 1307 1308 /* initialize the wnd nodes */ 1309 pda = asmap->physInfo; 1310 for (i = 0; i < nWndNodes; i++) { 1311 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 1312 RF_ASSERT(pda != NULL); 1313 wndNodes[i].params[0].p = pda; 1314 wndNodes[i].params[1].p = pda->bufPtr; 1315 wndNodes[i].params[2].v = parityStripeID; 1316 wndNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1317 pda = pda->next; 1318 } 1319 1320 /* initialize the redundancy node */ 1321 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc, NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h, "Xr ", allocList); 1322 xorNode->flags |= RF_DAGNODE_FLAG_YIELD; 1323 for (i = 0; i < nWndNodes; i++) { 1324 xorNode->params[2 * i + 0] = wndNodes[i].params[0]; /* pda */ 1325 xorNode->params[2 * i + 1] = wndNodes[i].params[1]; /* buf ptr */ 1326 } 1327 for (i = 0; i < nRodNodes; i++) { 1328 xorNode->params[2 * (nWndNodes + i) + 0] = rodNodes[i].params[0]; /* pda */ 1329 xorNode->params[2 * (nWndNodes + i) + 1] = rodNodes[i].params[1]; /* buf ptr */ 1330 } 1331 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr; /* xor node needs to get 1332 * at RAID information */ 1333 1334 /* look for an Rod node that reads a complete SU. If none, alloc a 1335 * buffer to receive the parity info. Note that we can't use a new 1336 * data buffer because it will not have gotten written when the xor 1337 * occurs. */ 1338 if (allowBufferRecycle) { 1339 for (i = 0; i < nRodNodes; i++) 1340 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)->numSector == raidPtr->Layout.sectorsPerStripeUnit) 1341 break; 1342 } 1343 if ((!allowBufferRecycle) || (i == nRodNodes)) { 1344 RF_CallocAndAdd(xorNode->results[0], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); 1345 } else 1346 xorNode->results[0] = rodNodes[i].params[1].p; 1347 1348 /* initialize the Wnp node */ 1349 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnp", allocList); 1350 wnpNode->params[0].p = asmap->parityInfo; 1351 wnpNode->params[1].p = xorNode->results[0]; 1352 wnpNode->params[2].v = parityStripeID; 1353 wnpNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1354 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must 1355 * describe entire 1356 * parity unit */ 1357 1358 if (nfaults == 2) { 1359 /* we never try to recycle a buffer for the Q calcuation in 1360 * addition to the parity. This would cause two buffers to get 1361 * smashed during the P and Q calculation, guaranteeing one 1362 * would be wrong. */ 1363 RF_CallocAndAdd(xorNode->results[1], 1, rf_RaidAddressToByte(raidPtr, raidPtr->Layout.sectorsPerStripeUnit), (void *), allocList); 1364 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnq", allocList); 1365 wnqNode->params[0].p = asmap->qInfo; 1366 wnqNode->params[1].p = xorNode->results[1]; 1367 wnqNode->params[2].v = parityStripeID; 1368 wnqNode->params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1369 RF_ASSERT(asmap->parityInfo->next == NULL); /* parityInfo must 1370 * describe entire 1371 * parity unit */ 1372 } 1373 /* connect nodes to form graph */ 1374 1375 /* connect dag header to block node */ 1376 RF_ASSERT(blockNode->numAntecedents == 0); 1377 dag_h->succedents[0] = blockNode; 1378 1379 if (nRodNodes > 0) { 1380 /* connect the block node to the Rod nodes */ 1381 RF_ASSERT(blockNode->numSuccedents == nRodNodes); 1382 RF_ASSERT(syncNode->numAntecedents == nRodNodes); 1383 for (i = 0; i < nRodNodes; i++) { 1384 RF_ASSERT(rodNodes[i].numAntecedents == 1); 1385 blockNode->succedents[i] = &rodNodes[i]; 1386 rodNodes[i].antecedents[0] = blockNode; 1387 rodNodes[i].antType[0] = rf_control; 1388 1389 /* connect the Rod nodes to the Nil node */ 1390 RF_ASSERT(rodNodes[i].numSuccedents == 1); 1391 rodNodes[i].succedents[0] = syncNode; 1392 syncNode->antecedents[i] = &rodNodes[i]; 1393 syncNode->antType[i] = rf_trueData; 1394 } 1395 } else { 1396 /* connect the block node to the Nil node */ 1397 RF_ASSERT(blockNode->numSuccedents == 1); 1398 RF_ASSERT(syncNode->numAntecedents == 1); 1399 blockNode->succedents[0] = syncNode; 1400 syncNode->antecedents[0] = blockNode; 1401 syncNode->antType[0] = rf_control; 1402 } 1403 1404 /* connect the sync node to the Wnd nodes */ 1405 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes)); 1406 for (i = 0; i < nWndNodes; i++) { 1407 RF_ASSERT(wndNodes->numAntecedents == 1); 1408 syncNode->succedents[i] = &wndNodes[i]; 1409 wndNodes[i].antecedents[0] = syncNode; 1410 wndNodes[i].antType[0] = rf_control; 1411 } 1412 1413 /* connect the sync node to the Xor node */ 1414 RF_ASSERT(xorNode->numAntecedents == 1); 1415 syncNode->succedents[nWndNodes] = xorNode; 1416 xorNode->antecedents[0] = syncNode; 1417 xorNode->antType[0] = rf_control; 1418 1419 /* connect the xor node to the write parity node */ 1420 RF_ASSERT(xorNode->numSuccedents == nfaults); 1421 RF_ASSERT(wnpNode->numAntecedents == 1); 1422 xorNode->succedents[0] = wnpNode; 1423 wnpNode->antecedents[0] = xorNode; 1424 wnpNode->antType[0] = rf_trueData; 1425 if (nfaults == 2) { 1426 RF_ASSERT(wnqNode->numAntecedents == 1); 1427 xorNode->succedents[1] = wnqNode; 1428 wnqNode->antecedents[0] = xorNode; 1429 wnqNode->antType[0] = rf_trueData; 1430 } 1431 /* connect the write nodes to the term node */ 1432 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults); 1433 RF_ASSERT(termNode->numSuccedents == 0); 1434 for (i = 0; i < nWndNodes; i++) { 1435 RF_ASSERT(wndNodes->numSuccedents == 1); 1436 wndNodes[i].succedents[0] = termNode; 1437 termNode->antecedents[i] = &wndNodes[i]; 1438 termNode->antType[i] = rf_control; 1439 } 1440 RF_ASSERT(wnpNode->numSuccedents == 1); 1441 wnpNode->succedents[0] = termNode; 1442 termNode->antecedents[nWndNodes] = wnpNode; 1443 termNode->antType[nWndNodes] = rf_control; 1444 if (nfaults == 2) { 1445 RF_ASSERT(wnqNode->numSuccedents == 1); 1446 wnqNode->succedents[0] = termNode; 1447 termNode->antecedents[nWndNodes + 1] = wnqNode; 1448 termNode->antType[nWndNodes + 1] = rf_control; 1449 } 1450 } 1451 1452 1453 /****************************************************************************** 1454 * 1455 * creates a DAG to perform a small-write operation (either raid 5 or pq), 1456 * which is as follows: 1457 * 1458 * Hdr -> Nil -> Rop - Xor - Wnp [Unp] -- Trm 1459 * \- Rod X- Wnd [Und] -------/ 1460 * [\- Rod X- Wnd [Und] ------/] 1461 * [\- Roq - Q --> Wnq [Unq]-/] 1462 * 1463 * Rop = read old parity 1464 * Rod = read old data 1465 * Roq = read old "q" 1466 * Cmt = commit node 1467 * Und = unlock data disk 1468 * Unp = unlock parity disk 1469 * Unq = unlock q disk 1470 * Wnp = write new parity 1471 * Wnd = write new data 1472 * Wnq = write new "q" 1473 * [ ] denotes optional segments in the graph 1474 * 1475 * Parameters: raidPtr - description of the physical array 1476 * asmap - logical & physical addresses for this access 1477 * bp - buffer ptr (holds write data) 1478 * flags - general flags (e.g. disk locking) 1479 * allocList - list of memory allocated in DAG creation 1480 * pfuncs - list of parity generating functions 1481 * qfuncs - list of q generating functions 1482 * 1483 * A null qfuncs indicates single fault tolerant 1484 *****************************************************************************/ 1485 1486 void 1487 rf_CommonCreateSmallWriteDAGFwd( 1488 RF_Raid_t * raidPtr, 1489 RF_AccessStripeMap_t * asmap, 1490 RF_DagHeader_t * dag_h, 1491 void *bp, 1492 RF_RaidAccessFlags_t flags, 1493 RF_AllocListElem_t * allocList, 1494 RF_RedFuncs_t * pfuncs, 1495 RF_RedFuncs_t * qfuncs) 1496 { 1497 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode; 1498 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes; 1499 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes; 1500 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes; 1501 int i, j, nNodes, totalNumNodes, lu_flag; 1502 RF_ReconUnitNum_t which_ru; 1503 int (*func) (RF_DagNode_t *), (*undoFunc) (RF_DagNode_t *); 1504 int (*qfunc) (RF_DagNode_t *); 1505 int numDataNodes, numParityNodes; 1506 RF_StripeNum_t parityStripeID; 1507 RF_PhysDiskAddr_t *pda; 1508 char *name, *qname; 1509 long nfaults; 1510 1511 nfaults = qfuncs ? 2 : 1; 1512 lu_flag = (rf_enableAtomicRMW) ? 1 : 0; /* lock/unlock flag */ 1513 1514 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), asmap->raidAddress, &which_ru); 1515 pda = asmap->physInfo; 1516 numDataNodes = asmap->numStripeUnitsAccessed; 1517 numParityNodes = (asmap->parityInfo->next) ? 2 : 1; 1518 1519 if (rf_dagDebug) 1520 printf("[Creating small-write DAG]\n"); 1521 RF_ASSERT(numDataNodes > 0); 1522 dag_h->creator = "SmallWriteDAGFwd"; 1523 1524 dag_h->numCommitNodes = 0; 1525 dag_h->numCommits = 0; 1526 dag_h->numSuccedents = 1; 1527 1528 qfunc = NULL; 1529 qname = NULL; 1530 1531 /* DAG creation occurs in four steps: 1. count the number of nodes in 1532 * the DAG 2. create the nodes 3. initialize the nodes 4. connect the 1533 * nodes */ 1534 1535 /* Step 1. compute number of nodes in the graph */ 1536 1537 /* number of nodes: a read and write for each data unit a redundancy 1538 * computation node for each parity node (nfaults * nparity) a read 1539 * and write for each parity unit a block node a terminate node if 1540 * atomic RMW an unlock node for each data unit, redundancy unit */ 1541 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes) + (nfaults * 2 * numParityNodes) + 2; 1542 if (lu_flag) 1543 totalNumNodes += (numDataNodes + (nfaults * numParityNodes)); 1544 1545 1546 /* Step 2. create the nodes */ 1547 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 1548 i = 0; 1549 blockNode = &nodes[i]; 1550 i += 1; 1551 readDataNodes = &nodes[i]; 1552 i += numDataNodes; 1553 readParityNodes = &nodes[i]; 1554 i += numParityNodes; 1555 writeDataNodes = &nodes[i]; 1556 i += numDataNodes; 1557 writeParityNodes = &nodes[i]; 1558 i += numParityNodes; 1559 xorNodes = &nodes[i]; 1560 i += numParityNodes; 1561 termNode = &nodes[i]; 1562 i += 1; 1563 if (lu_flag) { 1564 unlockDataNodes = &nodes[i]; 1565 i += numDataNodes; 1566 unlockParityNodes = &nodes[i]; 1567 i += numParityNodes; 1568 } else { 1569 unlockDataNodes = unlockParityNodes = NULL; 1570 } 1571 if (nfaults == 2) { 1572 readQNodes = &nodes[i]; 1573 i += numParityNodes; 1574 writeQNodes = &nodes[i]; 1575 i += numParityNodes; 1576 qNodes = &nodes[i]; 1577 i += numParityNodes; 1578 if (lu_flag) { 1579 unlockQNodes = &nodes[i]; 1580 i += numParityNodes; 1581 } else { 1582 unlockQNodes = NULL; 1583 } 1584 } else { 1585 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL; 1586 } 1587 RF_ASSERT(i == totalNumNodes); 1588 1589 /* Step 3. initialize the nodes */ 1590 /* initialize block node (Nil) */ 1591 nNodes = numDataNodes + (nfaults * numParityNodes); 1592 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h, "Nil", allocList); 1593 1594 /* initialize terminate node (Trm) */ 1595 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h, "Trm", allocList); 1596 1597 /* initialize nodes which read old data (Rod) */ 1598 for (i = 0; i < numDataNodes; i++) { 1599 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h, "Rod", allocList); 1600 RF_ASSERT(pda != NULL); 1601 readDataNodes[i].params[0].p = pda; /* physical disk addr 1602 * desc */ 1603 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old 1604 * data */ 1605 readDataNodes[i].params[2].v = parityStripeID; 1606 readDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1607 pda = pda->next; 1608 for (j = 0; j < readDataNodes[i].numSuccedents; j++) 1609 readDataNodes[i].propList[j] = NULL; 1610 } 1611 1612 /* initialize nodes which read old parity (Rop) */ 1613 pda = asmap->parityInfo; 1614 i = 0; 1615 for (i = 0; i < numParityNodes; i++) { 1616 RF_ASSERT(pda != NULL); 1617 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Rop", allocList); 1618 readParityNodes[i].params[0].p = pda; 1619 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old 1620 * parity */ 1621 readParityNodes[i].params[2].v = parityStripeID; 1622 readParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1623 for (j = 0; j < readParityNodes[i].numSuccedents; j++) 1624 readParityNodes[i].propList[0] = NULL; 1625 pda = pda->next; 1626 } 1627 1628 /* initialize nodes which read old Q (Roq) */ 1629 if (nfaults == 2) { 1630 pda = asmap->qInfo; 1631 for (i = 0; i < numParityNodes; i++) { 1632 RF_ASSERT(pda != NULL); 1633 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE, rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc, numParityNodes, 1, 4, 0, dag_h, "Roq", allocList); 1634 readQNodes[i].params[0].p = pda; 1635 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h, pda, allocList); /* buffer to hold old Q */ 1636 readQNodes[i].params[2].v = parityStripeID; 1637 readQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, lu_flag, 0, which_ru); 1638 for (j = 0; j < readQNodes[i].numSuccedents; j++) 1639 readQNodes[i].propList[0] = NULL; 1640 pda = pda->next; 1641 } 1642 } 1643 /* initialize nodes which write new data (Wnd) */ 1644 pda = asmap->physInfo; 1645 for (i = 0; i < numDataNodes; i++) { 1646 RF_ASSERT(pda != NULL); 1647 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wnd", allocList); 1648 writeDataNodes[i].params[0].p = pda; /* physical disk addr 1649 * desc */ 1650 writeDataNodes[i].params[1].p = pda->bufPtr; /* buffer holding new 1651 * data to be written */ 1652 writeDataNodes[i].params[2].v = parityStripeID; 1653 writeDataNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1654 1655 if (lu_flag) { 1656 /* initialize node to unlock the disk queue */ 1657 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Und", allocList); 1658 unlockDataNodes[i].params[0].p = pda; /* physical disk addr 1659 * desc */ 1660 unlockDataNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1661 } 1662 pda = pda->next; 1663 } 1664 1665 1666 /* initialize nodes which compute new parity and Q */ 1667 /* we use the simple XOR func in the double-XOR case, and when we're 1668 * accessing only a portion of one stripe unit. the distinction 1669 * between the two is that the regular XOR func assumes that the 1670 * targbuf is a full SU in size, and examines the pda associated with 1671 * the buffer to decide where within the buffer to XOR the data, 1672 * whereas the simple XOR func just XORs the data into the start of 1673 * the buffer. */ 1674 if ((numParityNodes == 2) || ((numDataNodes == 1) && (asmap->totalSectorsAccessed < raidPtr->Layout.sectorsPerStripeUnit))) { 1675 func = pfuncs->simple; 1676 undoFunc = rf_NullNodeUndoFunc; 1677 name = pfuncs->SimpleName; 1678 if (qfuncs) { 1679 qfunc = qfuncs->simple; 1680 qname = qfuncs->SimpleName; 1681 } 1682 } else { 1683 func = pfuncs->regular; 1684 undoFunc = rf_NullNodeUndoFunc; 1685 name = pfuncs->RegularName; 1686 if (qfuncs) { 1687 qfunc = qfuncs->regular; 1688 qname = qfuncs->RegularName; 1689 } 1690 } 1691 /* initialize the xor nodes: params are {pda,buf} from {Rod,Wnd,Rop} 1692 * nodes, and raidPtr */ 1693 if (numParityNodes == 2) { /* double-xor case */ 1694 for (i = 0; i < numParityNodes; i++) { 1695 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, name, allocList); /* no wakeup func for 1696 * xor */ 1697 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD; 1698 xorNodes[i].params[0] = readDataNodes[i].params[0]; 1699 xorNodes[i].params[1] = readDataNodes[i].params[1]; 1700 xorNodes[i].params[2] = readParityNodes[i].params[0]; 1701 xorNodes[i].params[3] = readParityNodes[i].params[1]; 1702 xorNodes[i].params[4] = writeDataNodes[i].params[0]; 1703 xorNodes[i].params[5] = writeDataNodes[i].params[1]; 1704 xorNodes[i].params[6].p = raidPtr; 1705 xorNodes[i].results[0] = readParityNodes[i].params[1].p; /* use old parity buf as 1706 * target buf */ 1707 if (nfaults == 2) { 1708 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, 7, 1, dag_h, qname, allocList); /* no wakeup func for 1709 * xor */ 1710 qNodes[i].params[0] = readDataNodes[i].params[0]; 1711 qNodes[i].params[1] = readDataNodes[i].params[1]; 1712 qNodes[i].params[2] = readQNodes[i].params[0]; 1713 qNodes[i].params[3] = readQNodes[i].params[1]; 1714 qNodes[i].params[4] = writeDataNodes[i].params[0]; 1715 qNodes[i].params[5] = writeDataNodes[i].params[1]; 1716 qNodes[i].params[6].p = raidPtr; 1717 qNodes[i].results[0] = readQNodes[i].params[1].p; /* use old Q buf as 1718 * target buf */ 1719 } 1720 } 1721 } else { 1722 /* there is only one xor node in this case */ 1723 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, name, allocList); 1724 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD; 1725 for (i = 0; i < numDataNodes + 1; i++) { 1726 /* set up params related to Rod and Rop nodes */ 1727 xorNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 1728 xorNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */ 1729 } 1730 for (i = 0; i < numDataNodes; i++) { 1731 /* set up params related to Wnd and Wnp nodes */ 1732 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */ 1733 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */ 1734 } 1735 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get 1736 * at RAID information */ 1737 xorNodes[0].results[0] = readParityNodes[0].params[1].p; 1738 if (nfaults == 2) { 1739 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc, undoFunc, NULL, numParityNodes, numParityNodes + numDataNodes, (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h, qname, allocList); 1740 for (i = 0; i < numDataNodes; i++) { 1741 /* set up params related to Rod */ 1742 qNodes[0].params[2 * i + 0] = readDataNodes[i].params[0]; /* pda */ 1743 qNodes[0].params[2 * i + 1] = readDataNodes[i].params[1]; /* buffer pointer */ 1744 } 1745 /* and read old q */ 1746 qNodes[0].params[2 * numDataNodes + 0] = readQNodes[0].params[0]; /* pda */ 1747 qNodes[0].params[2 * numDataNodes + 1] = readQNodes[0].params[1]; /* buffer pointer */ 1748 for (i = 0; i < numDataNodes; i++) { 1749 /* set up params related to Wnd nodes */ 1750 qNodes[0].params[2 * (numDataNodes + 1 + i) + 0] = writeDataNodes[i].params[0]; /* pda */ 1751 qNodes[0].params[2 * (numDataNodes + 1 + i) + 1] = writeDataNodes[i].params[1]; /* buffer pointer */ 1752 } 1753 qNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p = raidPtr; /* xor node needs to get 1754 * at RAID information */ 1755 qNodes[0].results[0] = readQNodes[0].params[1].p; 1756 } 1757 } 1758 1759 /* initialize nodes which write new parity (Wnp) */ 1760 pda = asmap->parityInfo; 1761 for (i = 0; i < numParityNodes; i++) { 1762 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnp", allocList); 1763 RF_ASSERT(pda != NULL); 1764 writeParityNodes[i].params[0].p = pda; /* param 1 (bufPtr) 1765 * filled in by xor node */ 1766 writeParityNodes[i].params[1].p = xorNodes[i].results[0]; /* buffer pointer for 1767 * parity write 1768 * operation */ 1769 writeParityNodes[i].params[2].v = parityStripeID; 1770 writeParityNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1771 1772 if (lu_flag) { 1773 /* initialize node to unlock the disk queue */ 1774 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unp", allocList); 1775 unlockParityNodes[i].params[0].p = pda; /* physical disk addr 1776 * desc */ 1777 unlockParityNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1778 } 1779 pda = pda->next; 1780 } 1781 1782 /* initialize nodes which write new Q (Wnq) */ 1783 if (nfaults == 2) { 1784 pda = asmap->qInfo; 1785 for (i = 0; i < numParityNodes; i++) { 1786 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, numParityNodes, 4, 0, dag_h, "Wnq", allocList); 1787 RF_ASSERT(pda != NULL); 1788 writeQNodes[i].params[0].p = pda; /* param 1 (bufPtr) 1789 * filled in by xor node */ 1790 writeQNodes[i].params[1].p = qNodes[i].results[0]; /* buffer pointer for 1791 * parity write 1792 * operation */ 1793 writeQNodes[i].params[2].v = parityStripeID; 1794 writeQNodes[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 1795 1796 if (lu_flag) { 1797 /* initialize node to unlock the disk queue */ 1798 rf_InitNode(&unlockQNodes[i], rf_wait, RF_FALSE, rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc, rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h, "Unq", allocList); 1799 unlockQNodes[i].params[0].p = pda; /* physical disk addr 1800 * desc */ 1801 unlockQNodes[i].params[1].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, lu_flag, which_ru); 1802 } 1803 pda = pda->next; 1804 } 1805 } 1806 /* Step 4. connect the nodes */ 1807 1808 /* connect header to block node */ 1809 dag_h->succedents[0] = blockNode; 1810 1811 /* connect block node to read old data nodes */ 1812 RF_ASSERT(blockNode->numSuccedents == (numDataNodes + (numParityNodes * nfaults))); 1813 for (i = 0; i < numDataNodes; i++) { 1814 blockNode->succedents[i] = &readDataNodes[i]; 1815 RF_ASSERT(readDataNodes[i].numAntecedents == 1); 1816 readDataNodes[i].antecedents[0] = blockNode; 1817 readDataNodes[i].antType[0] = rf_control; 1818 } 1819 1820 /* connect block node to read old parity nodes */ 1821 for (i = 0; i < numParityNodes; i++) { 1822 blockNode->succedents[numDataNodes + i] = &readParityNodes[i]; 1823 RF_ASSERT(readParityNodes[i].numAntecedents == 1); 1824 readParityNodes[i].antecedents[0] = blockNode; 1825 readParityNodes[i].antType[0] = rf_control; 1826 } 1827 1828 /* connect block node to read old Q nodes */ 1829 if (nfaults == 2) 1830 for (i = 0; i < numParityNodes; i++) { 1831 blockNode->succedents[numDataNodes + numParityNodes + i] = &readQNodes[i]; 1832 RF_ASSERT(readQNodes[i].numAntecedents == 1); 1833 readQNodes[i].antecedents[0] = blockNode; 1834 readQNodes[i].antType[0] = rf_control; 1835 } 1836 1837 /* connect read old data nodes to write new data nodes */ 1838 for (i = 0; i < numDataNodes; i++) { 1839 RF_ASSERT(readDataNodes[i].numSuccedents == ((nfaults * numParityNodes) + 1)); 1840 RF_ASSERT(writeDataNodes[i].numAntecedents == 1); 1841 readDataNodes[i].succedents[0] = &writeDataNodes[i]; 1842 writeDataNodes[i].antecedents[0] = &readDataNodes[i]; 1843 writeDataNodes[i].antType[0] = rf_antiData; 1844 } 1845 1846 /* connect read old data nodes to xor nodes */ 1847 for (i = 0; i < numDataNodes; i++) { 1848 for (j = 0; j < numParityNodes; j++) { 1849 RF_ASSERT(xorNodes[j].numAntecedents == numDataNodes + numParityNodes); 1850 readDataNodes[i].succedents[1 + j] = &xorNodes[j]; 1851 xorNodes[j].antecedents[i] = &readDataNodes[i]; 1852 xorNodes[j].antType[i] = rf_trueData; 1853 } 1854 } 1855 1856 /* connect read old data nodes to q nodes */ 1857 if (nfaults == 2) 1858 for (i = 0; i < numDataNodes; i++) 1859 for (j = 0; j < numParityNodes; j++) { 1860 RF_ASSERT(qNodes[j].numAntecedents == numDataNodes + numParityNodes); 1861 readDataNodes[i].succedents[1 + numParityNodes + j] = &qNodes[j]; 1862 qNodes[j].antecedents[i] = &readDataNodes[i]; 1863 qNodes[j].antType[i] = rf_trueData; 1864 } 1865 1866 /* connect read old parity nodes to xor nodes */ 1867 for (i = 0; i < numParityNodes; i++) { 1868 for (j = 0; j < numParityNodes; j++) { 1869 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes); 1870 readParityNodes[i].succedents[j] = &xorNodes[j]; 1871 xorNodes[j].antecedents[numDataNodes + i] = &readParityNodes[i]; 1872 xorNodes[j].antType[numDataNodes + i] = rf_trueData; 1873 } 1874 } 1875 1876 /* connect read old q nodes to q nodes */ 1877 if (nfaults == 2) 1878 for (i = 0; i < numParityNodes; i++) { 1879 for (j = 0; j < numParityNodes; j++) { 1880 RF_ASSERT(readQNodes[i].numSuccedents == numParityNodes); 1881 readQNodes[i].succedents[j] = &qNodes[j]; 1882 qNodes[j].antecedents[numDataNodes + i] = &readQNodes[i]; 1883 qNodes[j].antType[numDataNodes + i] = rf_trueData; 1884 } 1885 } 1886 1887 /* connect xor nodes to the write new parity nodes */ 1888 for (i = 0; i < numParityNodes; i++) { 1889 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes); 1890 for (j = 0; j < numParityNodes; j++) { 1891 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes); 1892 xorNodes[i].succedents[j] = &writeParityNodes[j]; 1893 writeParityNodes[j].antecedents[i] = &xorNodes[i]; 1894 writeParityNodes[j].antType[i] = rf_trueData; 1895 } 1896 } 1897 1898 /* connect q nodes to the write new q nodes */ 1899 if (nfaults == 2) 1900 for (i = 0; i < numParityNodes; i++) { 1901 RF_ASSERT(writeQNodes[i].numAntecedents == numParityNodes); 1902 for (j = 0; j < numParityNodes; j++) { 1903 RF_ASSERT(qNodes[j].numSuccedents == 1); 1904 qNodes[i].succedents[j] = &writeQNodes[j]; 1905 writeQNodes[j].antecedents[i] = &qNodes[i]; 1906 writeQNodes[j].antType[i] = rf_trueData; 1907 } 1908 } 1909 1910 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1911 RF_ASSERT(termNode->numSuccedents == 0); 1912 for (i = 0; i < numDataNodes; i++) { 1913 if (lu_flag) { 1914 /* connect write new data nodes to unlock nodes */ 1915 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 1916 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1); 1917 writeDataNodes[i].succedents[0] = &unlockDataNodes[i]; 1918 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i]; 1919 unlockDataNodes[i].antType[0] = rf_control; 1920 1921 /* connect unlock nodes to term node */ 1922 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1); 1923 unlockDataNodes[i].succedents[0] = termNode; 1924 termNode->antecedents[i] = &unlockDataNodes[i]; 1925 termNode->antType[i] = rf_control; 1926 } else { 1927 /* connect write new data nodes to term node */ 1928 RF_ASSERT(writeDataNodes[i].numSuccedents == 1); 1929 RF_ASSERT(termNode->numAntecedents == (numDataNodes + (nfaults * numParityNodes))); 1930 writeDataNodes[i].succedents[0] = termNode; 1931 termNode->antecedents[i] = &writeDataNodes[i]; 1932 termNode->antType[i] = rf_control; 1933 } 1934 } 1935 1936 for (i = 0; i < numParityNodes; i++) { 1937 if (lu_flag) { 1938 /* connect write new parity nodes to unlock nodes */ 1939 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1940 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1); 1941 writeParityNodes[i].succedents[0] = &unlockParityNodes[i]; 1942 unlockParityNodes[i].antecedents[0] = &writeParityNodes[i]; 1943 unlockParityNodes[i].antType[0] = rf_control; 1944 1945 /* connect unlock nodes to term node */ 1946 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1); 1947 unlockParityNodes[i].succedents[0] = termNode; 1948 termNode->antecedents[numDataNodes + i] = &unlockParityNodes[i]; 1949 termNode->antType[numDataNodes + i] = rf_control; 1950 } else { 1951 RF_ASSERT(writeParityNodes[i].numSuccedents == 1); 1952 writeParityNodes[i].succedents[0] = termNode; 1953 termNode->antecedents[numDataNodes + i] = &writeParityNodes[i]; 1954 termNode->antType[numDataNodes + i] = rf_control; 1955 } 1956 } 1957 1958 if (nfaults == 2) 1959 for (i = 0; i < numParityNodes; i++) { 1960 if (lu_flag) { 1961 /* connect write new Q nodes to unlock nodes */ 1962 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1963 RF_ASSERT(unlockQNodes[i].numAntecedents == 1); 1964 writeQNodes[i].succedents[0] = &unlockQNodes[i]; 1965 unlockQNodes[i].antecedents[0] = &writeQNodes[i]; 1966 unlockQNodes[i].antType[0] = rf_control; 1967 1968 /* connect unlock nodes to unblock node */ 1969 RF_ASSERT(unlockQNodes[i].numSuccedents == 1); 1970 unlockQNodes[i].succedents[0] = termNode; 1971 termNode->antecedents[numDataNodes + numParityNodes + i] = &unlockQNodes[i]; 1972 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1973 } else { 1974 RF_ASSERT(writeQNodes[i].numSuccedents == 1); 1975 writeQNodes[i].succedents[0] = termNode; 1976 termNode->antecedents[numDataNodes + numParityNodes + i] = &writeQNodes[i]; 1977 termNode->antType[numDataNodes + numParityNodes + i] = rf_control; 1978 } 1979 } 1980 } 1981 1982 1983 1984 /****************************************************************************** 1985 * create a write graph (fault-free or degraded) for RAID level 1 1986 * 1987 * Hdr Nil -> Wpd -> Nil -> Trm 1988 * Nil -> Wsd -> 1989 * 1990 * The "Wpd" node writes data to the primary copy in the mirror pair 1991 * The "Wsd" node writes data to the secondary copy in the mirror pair 1992 * 1993 * Parameters: raidPtr - description of the physical array 1994 * asmap - logical & physical addresses for this access 1995 * bp - buffer ptr (holds write data) 1996 * flags - general flags (e.g. disk locking) 1997 * allocList - list of memory allocated in DAG creation 1998 *****************************************************************************/ 1999 2000 void 2001 rf_CreateRaidOneWriteDAGFwd( 2002 RF_Raid_t * raidPtr, 2003 RF_AccessStripeMap_t * asmap, 2004 RF_DagHeader_t * dag_h, 2005 void *bp, 2006 RF_RaidAccessFlags_t flags, 2007 RF_AllocListElem_t * allocList) 2008 { 2009 RF_DagNode_t *blockNode, *unblockNode, *termNode; 2010 RF_DagNode_t *nodes, *wndNode, *wmirNode; 2011 int nWndNodes, nWmirNodes, i; 2012 RF_ReconUnitNum_t which_ru; 2013 RF_PhysDiskAddr_t *pda, *pdaP; 2014 RF_StripeNum_t parityStripeID; 2015 2016 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout), 2017 asmap->raidAddress, &which_ru); 2018 if (rf_dagDebug) { 2019 printf("[Creating RAID level 1 write DAG]\n"); 2020 } 2021 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1; /* 2 implies access not 2022 * SU aligned */ 2023 nWndNodes = (asmap->physInfo->next) ? 2 : 1; 2024 2025 /* alloc the Wnd nodes and the Wmir node */ 2026 if (asmap->numDataFailed == 1) 2027 nWndNodes--; 2028 if (asmap->numParityFailed == 1) 2029 nWmirNodes--; 2030 2031 /* total number of nodes = nWndNodes + nWmirNodes + (block + unblock + 2032 * terminator) */ 2033 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList); 2034 i = 0; 2035 wndNode = &nodes[i]; 2036 i += nWndNodes; 2037 wmirNode = &nodes[i]; 2038 i += nWmirNodes; 2039 blockNode = &nodes[i]; 2040 i += 1; 2041 unblockNode = &nodes[i]; 2042 i += 1; 2043 termNode = &nodes[i]; 2044 i += 1; 2045 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3)); 2046 2047 /* this dag can commit immediately */ 2048 dag_h->numCommitNodes = 0; 2049 dag_h->numCommits = 0; 2050 dag_h->numSuccedents = 1; 2051 2052 /* initialize the unblock and term nodes */ 2053 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0, dag_h, "Nil", allocList); 2054 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc, rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0, dag_h, "Nil", allocList); 2055 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc, rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList); 2056 2057 /* initialize the wnd nodes */ 2058 if (nWndNodes > 0) { 2059 pda = asmap->physInfo; 2060 for (i = 0; i < nWndNodes; i++) { 2061 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wpd", allocList); 2062 RF_ASSERT(pda != NULL); 2063 wndNode[i].params[0].p = pda; 2064 wndNode[i].params[1].p = pda->bufPtr; 2065 wndNode[i].params[2].v = parityStripeID; 2066 wndNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 2067 pda = pda->next; 2068 } 2069 RF_ASSERT(pda == NULL); 2070 } 2071 /* initialize the mirror nodes */ 2072 if (nWmirNodes > 0) { 2073 pda = asmap->physInfo; 2074 pdaP = asmap->parityInfo; 2075 for (i = 0; i < nWmirNodes; i++) { 2076 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE, rf_DiskWriteFunc, rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h, "Wsd", allocList); 2077 RF_ASSERT(pda != NULL); 2078 wmirNode[i].params[0].p = pdaP; 2079 wmirNode[i].params[1].p = pda->bufPtr; 2080 wmirNode[i].params[2].v = parityStripeID; 2081 wmirNode[i].params[3].v = RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru); 2082 pda = pda->next; 2083 pdaP = pdaP->next; 2084 } 2085 RF_ASSERT(pda == NULL); 2086 RF_ASSERT(pdaP == NULL); 2087 } 2088 /* link the header node to the block node */ 2089 RF_ASSERT(dag_h->numSuccedents == 1); 2090 RF_ASSERT(blockNode->numAntecedents == 0); 2091 dag_h->succedents[0] = blockNode; 2092 2093 /* link the block node to the write nodes */ 2094 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes)); 2095 for (i = 0; i < nWndNodes; i++) { 2096 RF_ASSERT(wndNode[i].numAntecedents == 1); 2097 blockNode->succedents[i] = &wndNode[i]; 2098 wndNode[i].antecedents[0] = blockNode; 2099 wndNode[i].antType[0] = rf_control; 2100 } 2101 for (i = 0; i < nWmirNodes; i++) { 2102 RF_ASSERT(wmirNode[i].numAntecedents == 1); 2103 blockNode->succedents[i + nWndNodes] = &wmirNode[i]; 2104 wmirNode[i].antecedents[0] = blockNode; 2105 wmirNode[i].antType[0] = rf_control; 2106 } 2107 2108 /* link the write nodes to the unblock node */ 2109 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes)); 2110 for (i = 0; i < nWndNodes; i++) { 2111 RF_ASSERT(wndNode[i].numSuccedents == 1); 2112 wndNode[i].succedents[0] = unblockNode; 2113 unblockNode->antecedents[i] = &wndNode[i]; 2114 unblockNode->antType[i] = rf_control; 2115 } 2116 for (i = 0; i < nWmirNodes; i++) { 2117 RF_ASSERT(wmirNode[i].numSuccedents == 1); 2118 wmirNode[i].succedents[0] = unblockNode; 2119 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i]; 2120 unblockNode->antType[i + nWndNodes] = rf_control; 2121 } 2122 2123 /* link the unblock node to the term node */ 2124 RF_ASSERT(unblockNode->numSuccedents == 1); 2125 RF_ASSERT(termNode->numAntecedents == 1); 2126 RF_ASSERT(termNode->numSuccedents == 0); 2127 unblockNode->succedents[0] = termNode; 2128 termNode->antecedents[0] = unblockNode; 2129 termNode->antType[0] = rf_control; 2130 2131 return; 2132 } 2133