1 /* $NetBSD: rf_engine.c,v 1.53 2019/10/10 03:43:59 christos Exp $ */
2 /*
3 * Copyright (c) 1995 Carnegie-Mellon University.
4 * All rights reserved.
5 *
6 * Author: William V. Courtright II, Mark Holland, Rachad Youssef
7 *
8 * Permission to use, copy, modify and distribute this software and
9 * its documentation is hereby granted, provided that both the copyright
10 * notice and this permission notice appear in all copies of the
11 * software, derivative works or modified versions, and any portions
12 * thereof, and that both notices appear in supporting documentation.
13 *
14 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
15 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
16 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
17 *
18 * Carnegie Mellon requests users of this software to return to
19 *
20 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
21 * School of Computer Science
22 * Carnegie Mellon University
23 * Pittsburgh PA 15213-3890
24 *
25 * any improvements or extensions that they make and grant Carnegie the
26 * rights to redistribute these changes.
27 */
28
29 /****************************************************************************
30 * *
31 * engine.c -- code for DAG execution engine *
32 * *
33 * Modified to work as follows (holland): *
34 * A user-thread calls into DispatchDAG, which fires off the nodes that *
35 * are direct successors to the header node. DispatchDAG then returns, *
36 * and the rest of the I/O continues asynchronously. As each node *
37 * completes, the node execution function calls FinishNode(). FinishNode *
38 * scans the list of successors to the node and increments the antecedent *
39 * counts. Each node that becomes enabled is placed on a central node *
40 * queue. A dedicated dag-execution thread grabs nodes off of this *
41 * queue and fires them. *
42 * *
43 * NULL nodes are never fired. *
44 * *
45 * Terminator nodes are never fired, but rather cause the callback *
46 * associated with the DAG to be invoked. *
47 * *
48 * If a node fails, the dag either rolls forward to the completion or *
49 * rolls back, undoing previously-completed nodes and fails atomically. *
50 * The direction of recovery is determined by the location of the failed *
51 * node in the graph. If the failure occurred before the commit node in *
52 * the graph, backward recovery is used. Otherwise, forward recovery is *
53 * used. *
54 * *
55 ****************************************************************************/
56
57 #include <sys/cdefs.h>
58 __KERNEL_RCSID(0, "$NetBSD: rf_engine.c,v 1.53 2019/10/10 03:43:59 christos Exp $");
59
60 #include <sys/errno.h>
61
62 #include "rf_threadstuff.h"
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70 #include "rf_kintf.h"
71 #include "rf_paritymap.h"
72
73 static void rf_ShutdownEngine(void *);
74 static void DAGExecutionThread(RF_ThreadArg_t arg);
75 static void rf_RaidIOThread(RF_ThreadArg_t arg);
76
77 /* synchronization primitives for this file. DO_WAIT should be enclosed in a while loop. */
78
79 #define DO_LOCK(_r_) \
80 rf_lock_mutex2((_r_)->node_queue_mutex)
81
82 #define DO_UNLOCK(_r_) \
83 rf_unlock_mutex2((_r_)->node_queue_mutex)
84
85 #define DO_WAIT(_r_) \
86 rf_wait_cond2((_r_)->node_queue_cv, (_r_)->node_queue_mutex)
87
88 #define DO_SIGNAL(_r_) \
89 rf_broadcast_cond2((_r_)->node_queue_cv) /* XXX rf_signal_cond2? */
90
91 static void
rf_ShutdownEngine(void * arg)92 rf_ShutdownEngine(void *arg)
93 {
94 RF_Raid_t *raidPtr;
95
96 raidPtr = (RF_Raid_t *) arg;
97
98 /* Tell the rf_RaidIOThread to shutdown */
99 rf_lock_mutex2(raidPtr->iodone_lock);
100
101 raidPtr->shutdown_raidio = 1;
102 rf_signal_cond2(raidPtr->iodone_cv);
103
104 /* ...and wait for it to tell us it has finished */
105 while (raidPtr->shutdown_raidio)
106 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock);
107
108 rf_unlock_mutex2(raidPtr->iodone_lock);
109
110 /* Now shut down the DAG execution engine. */
111 DO_LOCK(raidPtr);
112 raidPtr->shutdown_engine = 1;
113 DO_SIGNAL(raidPtr);
114
115 /* ...and wait for it to tell us it has finished */
116 while (raidPtr->shutdown_engine)
117 DO_WAIT(raidPtr);
118
119 DO_UNLOCK(raidPtr);
120
121 rf_destroy_mutex2(raidPtr->node_queue_mutex);
122 rf_destroy_cond2(raidPtr->node_queue_cv);
123
124 rf_destroy_mutex2(raidPtr->iodone_lock);
125 rf_destroy_cond2(raidPtr->iodone_cv);
126 }
127
128 int
rf_ConfigureEngine(RF_ShutdownList_t ** listp,RF_Raid_t * raidPtr,RF_Config_t * cfgPtr)129 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
130 RF_Config_t *cfgPtr)
131 {
132
133 /*
134 * Initialise iodone for the IO thread.
135 */
136 TAILQ_INIT(&(raidPtr->iodone));
137 rf_init_mutex2(raidPtr->iodone_lock, IPL_VM);
138 rf_init_cond2(raidPtr->iodone_cv, "raidiow");
139
140 rf_init_mutex2(raidPtr->node_queue_mutex, IPL_VM);
141 rf_init_cond2(raidPtr->node_queue_cv, "rfnodeq");
142 raidPtr->node_queue = NULL;
143 raidPtr->dags_in_flight = 0;
144
145 /* we create the execution thread only once per system boot. no need
146 * to check return code b/c the kernel panics if it can't create the
147 * thread. */
148 #if RF_DEBUG_ENGINE
149 if (rf_engineDebug) {
150 printf("raid%d: Creating engine thread\n", raidPtr->raidid);
151 }
152 #endif
153 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_thread,
154 DAGExecutionThread, raidPtr,
155 "raid%d", raidPtr->raidid)) {
156 printf("raid%d: Unable to create engine thread\n",
157 raidPtr->raidid);
158 return (ENOMEM);
159 }
160 if (RF_CREATE_ENGINE_THREAD(raidPtr->engine_helper_thread,
161 rf_RaidIOThread, raidPtr,
162 "raidio%d", raidPtr->raidid)) {
163 printf("raid%d: Unable to create raidio thread\n",
164 raidPtr->raidid);
165 return (ENOMEM);
166 }
167 #if RF_DEBUG_ENGINE
168 if (rf_engineDebug) {
169 printf("raid%d: Created engine thread\n", raidPtr->raidid);
170 }
171 #endif
172
173 /* engine thread is now running and waiting for work */
174 #if RF_DEBUG_ENGINE
175 if (rf_engineDebug) {
176 printf("raid%d: Engine thread running and waiting for events\n", raidPtr->raidid);
177 }
178 #endif
179 rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
180
181 return (0);
182 }
183
184 #if 0
185 static int
186 BranchDone(RF_DagNode_t *node)
187 {
188 int i;
189
190 /* return true if forward execution is completed for a node and its
191 * succedents */
192 switch (node->status) {
193 case rf_wait:
194 /* should never be called in this state */
195 RF_PANIC();
196 break;
197 case rf_fired:
198 /* node is currently executing, so we're not done */
199 return (RF_FALSE);
200 case rf_good:
201 /* for each succedent recursively check branch */
202 for (i = 0; i < node->numSuccedents; i++)
203 if (!BranchDone(node->succedents[i]))
204 return RF_FALSE;
205 return RF_TRUE; /* node and all succedent branches aren't in
206 * fired state */
207 case rf_bad:
208 /* succedents can't fire */
209 return (RF_TRUE);
210 case rf_recover:
211 /* should never be called in this state */
212 RF_PANIC();
213 break;
214 case rf_undone:
215 case rf_panic:
216 /* XXX need to fix this case */
217 /* for now, assume that we're done */
218 return (RF_TRUE);
219 default:
220 /* illegal node status */
221 RF_PANIC();
222 break;
223 }
224 }
225 #endif
226
227 static int
NodeReady(RF_DagNode_t * node)228 NodeReady(RF_DagNode_t *node)
229 {
230 int ready;
231
232 ready = RF_FALSE;
233
234 switch (node->dagHdr->status) {
235 case rf_enable:
236 case rf_rollForward:
237 if ((node->status == rf_wait) &&
238 (node->numAntecedents == node->numAntDone))
239 ready = RF_TRUE;
240 break;
241 case rf_rollBackward:
242 RF_ASSERT(node->numSuccDone <= node->numSuccedents);
243 RF_ASSERT(node->numSuccFired <= node->numSuccedents);
244 RF_ASSERT(node->numSuccFired <= node->numSuccDone);
245 if ((node->status == rf_good) &&
246 (node->numSuccDone == node->numSuccedents))
247 ready = RF_TRUE;
248 break;
249 default:
250 printf("Execution engine found illegal DAG status in NodeReady\n");
251 RF_PANIC();
252 break;
253 }
254
255 return (ready);
256 }
257
258
259
260 /* user context and dag-exec-thread context: Fire a node. The node's
261 * status field determines which function, do or undo, to be fired.
262 * This routine assumes that the node's status field has alread been
263 * set to "fired" or "recover" to indicate the direction of execution.
264 */
265 static void
FireNode(RF_DagNode_t * node)266 FireNode(RF_DagNode_t *node)
267 {
268 switch (node->status) {
269 case rf_fired:
270 /* fire the do function of a node */
271 #if RF_DEBUG_ENGINE
272 if (rf_engineDebug) {
273 printf("raid%d: Firing node 0x%lx (%s)\n",
274 node->dagHdr->raidPtr->raidid,
275 (unsigned long) node, node->name);
276 }
277 #endif
278 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
279 #if defined(__NetBSD__) && defined(_KERNEL)
280 /* thread_block(); */
281 /* printf("Need to block the thread here...\n"); */
282 /* XXX thread_block is actually mentioned in
283 * /usr/include/vm/vm_extern.h */
284 #else
285 thread_block();
286 #endif
287 }
288 (*(node->doFunc)) (node);
289 break;
290 case rf_recover:
291 /* fire the undo function of a node */
292 #if RF_DEBUG_ENGINE
293 if (rf_engineDebug) {
294 printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
295 node->dagHdr->raidPtr->raidid,
296 (unsigned long) node, node->name);
297 }
298 #endif
299 if (node->flags & RF_DAGNODE_FLAG_YIELD)
300 #if defined(__NetBSD__) && defined(_KERNEL)
301 /* thread_block(); */
302 /* printf("Need to block the thread here...\n"); */
303 /* XXX thread_block is actually mentioned in
304 * /usr/include/vm/vm_extern.h */
305 #else
306 thread_block();
307 #endif
308 (*(node->undoFunc)) (node);
309 break;
310 default:
311 RF_PANIC();
312 break;
313 }
314 }
315
316
317
318 /* user context:
319 * Attempt to fire each node in a linear array.
320 * The entire list is fired atomically.
321 */
322 static void
FireNodeArray(int numNodes,RF_DagNode_t ** nodeList)323 FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
324 {
325 RF_DagStatus_t dstat;
326 RF_DagNode_t *node;
327 int i, j;
328
329 /* first, mark all nodes which are ready to be fired */
330 for (i = 0; i < numNodes; i++) {
331 node = nodeList[i];
332 dstat = node->dagHdr->status;
333 RF_ASSERT((node->status == rf_wait) ||
334 (node->status == rf_good));
335 if (NodeReady(node)) {
336 if ((dstat == rf_enable) ||
337 (dstat == rf_rollForward)) {
338 RF_ASSERT(node->status == rf_wait);
339 if (node->commitNode)
340 node->dagHdr->numCommits++;
341 node->status = rf_fired;
342 for (j = 0; j < node->numAntecedents; j++)
343 node->antecedents[j]->numSuccFired++;
344 } else {
345 RF_ASSERT(dstat == rf_rollBackward);
346 RF_ASSERT(node->status == rf_good);
347 /* only one commit node per graph */
348 RF_ASSERT(node->commitNode == RF_FALSE);
349 node->status = rf_recover;
350 }
351 }
352 }
353 /* now, fire the nodes */
354 for (i = 0; i < numNodes; i++) {
355 if ((nodeList[i]->status == rf_fired) ||
356 (nodeList[i]->status == rf_recover))
357 FireNode(nodeList[i]);
358 }
359 }
360
361
362 /* user context:
363 * Attempt to fire each node in a linked list.
364 * The entire list is fired atomically.
365 */
366 static void
FireNodeList(RF_DagNode_t * nodeList)367 FireNodeList(RF_DagNode_t *nodeList)
368 {
369 RF_DagNode_t *node, *next;
370 RF_DagStatus_t dstat;
371 int j;
372
373 if (nodeList) {
374 /* first, mark all nodes which are ready to be fired */
375 for (node = nodeList; node; node = next) {
376 next = node->next;
377 dstat = node->dagHdr->status;
378 RF_ASSERT((node->status == rf_wait) ||
379 (node->status == rf_good));
380 if (NodeReady(node)) {
381 if ((dstat == rf_enable) ||
382 (dstat == rf_rollForward)) {
383 RF_ASSERT(node->status == rf_wait);
384 if (node->commitNode)
385 node->dagHdr->numCommits++;
386 node->status = rf_fired;
387 for (j = 0; j < node->numAntecedents; j++)
388 node->antecedents[j]->numSuccFired++;
389 } else {
390 RF_ASSERT(dstat == rf_rollBackward);
391 RF_ASSERT(node->status == rf_good);
392 /* only one commit node per graph */
393 RF_ASSERT(node->commitNode == RF_FALSE);
394 node->status = rf_recover;
395 }
396 }
397 }
398 /* now, fire the nodes */
399 for (node = nodeList; node; node = next) {
400 next = node->next;
401 if ((node->status == rf_fired) ||
402 (node->status == rf_recover))
403 FireNode(node);
404 }
405 }
406 }
407 /* interrupt context:
408 * for each succedent
409 * propagate required results from node to succedent
410 * increment succedent's numAntDone
411 * place newly-enable nodes on node queue for firing
412 *
413 * To save context switches, we don't place NIL nodes on the node queue,
414 * but rather just process them as if they had fired. Note that NIL nodes
415 * that are the direct successors of the header will actually get fired by
416 * DispatchDAG, which is fine because no context switches are involved.
417 *
418 * Important: when running at user level, this can be called by any
419 * disk thread, and so the increment and check of the antecedent count
420 * must be locked. I used the node queue mutex and locked down the
421 * entire function, but this is certainly overkill.
422 */
423 static void
PropagateResults(RF_DagNode_t * node,int context)424 PropagateResults(RF_DagNode_t *node, int context)
425 {
426 RF_DagNode_t *s, *a;
427 RF_Raid_t *raidPtr;
428 int i;
429 RF_DagNode_t *finishlist = NULL; /* a list of NIL nodes to be
430 * finished */
431 RF_DagNode_t *skiplist = NULL; /* list of nodes with failed truedata
432 * antecedents */
433 RF_DagNode_t *firelist = NULL; /* a list of nodes to be fired */
434 RF_DagNode_t *q = NULL, *qh = NULL, *next;
435 int j, skipNode;
436
437 raidPtr = node->dagHdr->raidPtr;
438
439 DO_LOCK(raidPtr);
440
441 /* debug - validate fire counts */
442 for (i = 0; i < node->numAntecedents; i++) {
443 a = *(node->antecedents + i);
444 RF_ASSERT(a->numSuccFired >= a->numSuccDone);
445 RF_ASSERT(a->numSuccFired <= a->numSuccedents);
446 a->numSuccDone++;
447 }
448
449 switch (node->dagHdr->status) {
450 case rf_enable:
451 case rf_rollForward:
452 for (i = 0; i < node->numSuccedents; i++) {
453 s = *(node->succedents + i);
454 RF_ASSERT(s->status == rf_wait);
455 (s->numAntDone)++;
456 if (s->numAntDone == s->numAntecedents) {
457 /* look for NIL nodes */
458 if (s->doFunc == rf_NullNodeFunc) {
459 /* don't fire NIL nodes, just process
460 * them */
461 s->next = finishlist;
462 finishlist = s;
463 } else {
464 /* look to see if the node is to be
465 * skipped */
466 skipNode = RF_FALSE;
467 for (j = 0; j < s->numAntecedents; j++)
468 if ((s->antType[j] == rf_trueData) && (s->antecedents[j]->status == rf_bad))
469 skipNode = RF_TRUE;
470 if (skipNode) {
471 /* this node has one or more
472 * failed true data
473 * dependencies, so skip it */
474 s->next = skiplist;
475 skiplist = s;
476 } else
477 /* add s to list of nodes (q)
478 * to execute */
479 if (context != RF_INTR_CONTEXT) {
480 /* we only have to
481 * enqueue if we're at
482 * intr context */
483 /* put node on
484 a list to
485 be fired
486 after we
487 unlock */
488 s->next = firelist;
489 firelist = s;
490 } else {
491 /* enqueue the
492 node for
493 the dag
494 exec thread
495 to fire */
496 RF_ASSERT(NodeReady(s));
497 if (q) {
498 q->next = s;
499 q = s;
500 } else {
501 qh = q = s;
502 qh->next = NULL;
503 }
504 }
505 }
506 }
507 }
508
509 if (q) {
510 /* xfer our local list of nodes to the node queue */
511 q->next = raidPtr->node_queue;
512 raidPtr->node_queue = qh;
513 DO_SIGNAL(raidPtr);
514 }
515 DO_UNLOCK(raidPtr);
516
517 for (; skiplist; skiplist = next) {
518 next = skiplist->next;
519 skiplist->status = rf_skipped;
520 for (i = 0; i < skiplist->numAntecedents; i++) {
521 skiplist->antecedents[i]->numSuccFired++;
522 }
523 if (skiplist->commitNode) {
524 skiplist->dagHdr->numCommits++;
525 }
526 rf_FinishNode(skiplist, context);
527 }
528 for (; finishlist; finishlist = next) {
529 /* NIL nodes: no need to fire them */
530 next = finishlist->next;
531 finishlist->status = rf_good;
532 for (i = 0; i < finishlist->numAntecedents; i++) {
533 finishlist->antecedents[i]->numSuccFired++;
534 }
535 if (finishlist->commitNode)
536 finishlist->dagHdr->numCommits++;
537 /*
538 * Okay, here we're calling rf_FinishNode() on
539 * nodes that have the null function as their
540 * work proc. Such a node could be the
541 * terminal node in a DAG. If so, it will
542 * cause the DAG to complete, which will in
543 * turn free memory used by the DAG, which
544 * includes the node in question. Thus, we
545 * must avoid referencing the node at all
546 * after calling rf_FinishNode() on it. */
547 rf_FinishNode(finishlist, context); /* recursive call */
548 }
549 /* fire all nodes in firelist */
550 FireNodeList(firelist);
551 break;
552
553 case rf_rollBackward:
554 for (i = 0; i < node->numAntecedents; i++) {
555 a = *(node->antecedents + i);
556 RF_ASSERT(a->status == rf_good);
557 RF_ASSERT(a->numSuccDone <= a->numSuccedents);
558 RF_ASSERT(a->numSuccDone <= a->numSuccFired);
559
560 if (a->numSuccDone == a->numSuccFired) {
561 if (a->undoFunc == rf_NullNodeFunc) {
562 /* don't fire NIL nodes, just process
563 * them */
564 a->next = finishlist;
565 finishlist = a;
566 } else {
567 if (context != RF_INTR_CONTEXT) {
568 /* we only have to enqueue if
569 * we're at intr context */
570 /* put node on a list to be
571 fired after we unlock */
572 a->next = firelist;
573
574 firelist = a;
575 } else {
576 /* enqueue the node for the
577 dag exec thread to fire */
578 RF_ASSERT(NodeReady(a));
579 if (q) {
580 q->next = a;
581 q = a;
582 } else {
583 qh = q = a;
584 qh->next = NULL;
585 }
586 }
587 }
588 }
589 }
590 if (q) {
591 /* xfer our local list of nodes to the node queue */
592 q->next = raidPtr->node_queue;
593 raidPtr->node_queue = qh;
594 DO_SIGNAL(raidPtr);
595 }
596 DO_UNLOCK(raidPtr);
597 for (; finishlist; finishlist = next) {
598 /* NIL nodes: no need to fire them */
599 next = finishlist->next;
600 finishlist->status = rf_good;
601 /*
602 * Okay, here we're calling rf_FinishNode() on
603 * nodes that have the null function as their
604 * work proc. Such a node could be the first
605 * node in a DAG. If so, it will cause the DAG
606 * to complete, which will in turn free memory
607 * used by the DAG, which includes the node in
608 * question. Thus, we must avoid referencing
609 * the node at all after calling
610 * rf_FinishNode() on it. */
611 rf_FinishNode(finishlist, context); /* recursive call */
612 }
613 /* fire all nodes in firelist */
614 FireNodeList(firelist);
615
616 break;
617 default:
618 printf("Engine found illegal DAG status in PropagateResults()\n");
619 RF_PANIC();
620 break;
621 }
622 }
623
624
625
626 /*
627 * Process a fired node which has completed
628 */
629 static void
ProcessNode(RF_DagNode_t * node,int context)630 ProcessNode(RF_DagNode_t *node, int context)
631 {
632 #if RF_DEBUG_ENGINE
633 RF_Raid_t *raidPtr;
634
635 raidPtr = node->dagHdr->raidPtr;
636 #endif
637
638 switch (node->status) {
639 case rf_good:
640 /* normal case, don't need to do anything */
641 break;
642 case rf_bad:
643 if ((node->dagHdr->numCommits > 0) ||
644 (node->dagHdr->numCommitNodes == 0)) {
645 /* crossed commit barrier */
646 node->dagHdr->status = rf_rollForward;
647 #if RF_DEBUG_ENGINE
648 if (rf_engineDebug) {
649 printf("raid%d: node (%s) returned fail, rolling forward\n", raidPtr->raidid, node->name);
650 }
651 #endif
652 } else {
653 /* never reached commit barrier */
654 node->dagHdr->status = rf_rollBackward;
655 #if RF_DEBUG_ENGINE
656 if (rf_engineDebug) {
657 printf("raid%d: node (%s) returned fail, rolling backward\n", raidPtr->raidid, node->name);
658 }
659 #endif
660 }
661 break;
662 case rf_undone:
663 /* normal rollBackward case, don't need to do anything */
664 break;
665 case rf_panic:
666 /* an undo node failed!!! */
667 printf("UNDO of a node failed!!!\n");
668 break;
669 default:
670 printf("node finished execution with an illegal status!!!\n");
671 RF_PANIC();
672 break;
673 }
674
675 /* enqueue node's succedents (antecedents if rollBackward) for
676 * execution */
677 PropagateResults(node, context);
678 }
679
680
681
682 /* user context or dag-exec-thread context:
683 * This is the first step in post-processing a newly-completed node.
684 * This routine is called by each node execution function to mark the node
685 * as complete and fire off any successors that have been enabled.
686 */
687 void
rf_FinishNode(RF_DagNode_t * node,int context)688 rf_FinishNode(RF_DagNode_t *node, int context)
689 {
690 node->dagHdr->numNodesCompleted++;
691 ProcessNode(node, context);
692 }
693
694
695 /* user context: submit dag for execution, return non-zero if we have
696 * to wait for completion. if and only if we return non-zero, we'll
697 * cause cbFunc to get invoked with cbArg when the DAG has completed.
698 *
699 * for now we always return 1. If the DAG does not cause any I/O,
700 * then the callback may get invoked before DispatchDAG returns.
701 * There's code in state 5 of ContinueRaidAccess to handle this.
702 *
703 * All we do here is fire the direct successors of the header node.
704 * The DAG execution thread does the rest of the dag processing. */
705 int
rf_DispatchDAG(RF_DagHeader_t * dag,void (* cbFunc)(void *),void * cbArg)706 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *),
707 void *cbArg)
708 {
709 RF_Raid_t *raidPtr;
710
711 raidPtr = dag->raidPtr;
712 #if RF_ACC_TRACE > 0
713 if (dag->tracerec) {
714 RF_ETIMER_START(dag->tracerec->timer);
715 }
716 #endif
717 #if DEBUG
718 #if RF_DEBUG_VALIDATE_DAG
719 if (rf_engineDebug || rf_validateDAGDebug) {
720 if (rf_ValidateDAG(dag))
721 RF_PANIC();
722 }
723 #endif
724 #endif
725 #if RF_DEBUG_ENGINE
726 if (rf_engineDebug) {
727 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
728 }
729 #endif
730 raidPtr->dags_in_flight++; /* debug only: blow off proper
731 * locking */
732 dag->cbFunc = cbFunc;
733 dag->cbArg = cbArg;
734 dag->numNodesCompleted = 0;
735 dag->status = rf_enable;
736 FireNodeArray(dag->numSuccedents, dag->succedents);
737 return (1);
738 }
739 /* dedicated kernel thread: the thread that handles all DAG node
740 * firing. To minimize locking and unlocking, we grab a copy of the
741 * entire node queue and then set the node queue to NULL before doing
742 * any firing of nodes. This way we only have to release the lock
743 * once. Of course, it's probably rare that there's more than one
744 * node in the queue at any one time, but it sometimes happens.
745 */
746
747 static void
DAGExecutionThread(RF_ThreadArg_t arg)748 DAGExecutionThread(RF_ThreadArg_t arg)
749 {
750 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
751 RF_Raid_t *raidPtr;
752
753 raidPtr = (RF_Raid_t *) arg;
754
755 #if RF_DEBUG_ENGINE
756 if (rf_engineDebug) {
757 printf("raid%d: Engine thread is running\n", raidPtr->raidid);
758 }
759 #endif
760
761 DO_LOCK(raidPtr);
762 while (!raidPtr->shutdown_engine) {
763
764 while (raidPtr->node_queue != NULL) {
765 local_nq = raidPtr->node_queue;
766 fire_nq = NULL;
767 term_nq = NULL;
768 raidPtr->node_queue = NULL;
769 DO_UNLOCK(raidPtr);
770
771 /* first, strip out the terminal nodes */
772 while (local_nq) {
773 nd = local_nq;
774 local_nq = local_nq->next;
775 switch (nd->dagHdr->status) {
776 case rf_enable:
777 case rf_rollForward:
778 if (nd->numSuccedents == 0) {
779 /* end of the dag, add to
780 * callback list */
781 nd->next = term_nq;
782 term_nq = nd;
783 } else {
784 /* not the end, add to the
785 * fire queue */
786 nd->next = fire_nq;
787 fire_nq = nd;
788 }
789 break;
790 case rf_rollBackward:
791 if (nd->numAntecedents == 0) {
792 /* end of the dag, add to the
793 * callback list */
794 nd->next = term_nq;
795 term_nq = nd;
796 } else {
797 /* not the end, add to the
798 * fire queue */
799 nd->next = fire_nq;
800 fire_nq = nd;
801 }
802 break;
803 default:
804 RF_PANIC();
805 break;
806 }
807 }
808
809 /* execute callback of dags which have reached the
810 * terminal node */
811 while (term_nq) {
812 nd = term_nq;
813 term_nq = term_nq->next;
814 nd->next = NULL;
815 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
816 raidPtr->dags_in_flight--; /* debug only */
817 }
818
819 /* fire remaining nodes */
820 FireNodeList(fire_nq);
821
822 DO_LOCK(raidPtr);
823 }
824 while (!raidPtr->shutdown_engine &&
825 raidPtr->node_queue == NULL) {
826 DO_WAIT(raidPtr);
827 }
828 }
829
830 /* Let rf_ShutdownEngine know that we're done... */
831 raidPtr->shutdown_engine = 0;
832 DO_SIGNAL(raidPtr);
833
834 DO_UNLOCK(raidPtr);
835
836 kthread_exit(0);
837 }
838
839 /*
840 * rf_RaidIOThread() -- When I/O to a component begins, raidstrategy()
841 * puts the I/O on a buffer queue, and then signals raidPtr->iodone. If
842 * necessary, this function calls raidstart() to initiate the I/O.
843 * When I/O to a component completes, KernelWakeupFunc() puts the
844 * completed request onto raidPtr->iodone TAILQ. This function looks
845 * after requests on that queue by calling rf_DiskIOComplete() for the
846 * request, and by calling any required CompleteFunc for the request.
847 */
848
849 static void
rf_RaidIOThread(RF_ThreadArg_t arg)850 rf_RaidIOThread(RF_ThreadArg_t arg)
851 {
852 RF_Raid_t *raidPtr;
853 RF_DiskQueueData_t *req;
854
855 raidPtr = (RF_Raid_t *) arg;
856
857 rf_lock_mutex2(raidPtr->iodone_lock);
858
859 while (!raidPtr->shutdown_raidio) {
860 /* if there is nothing to do, then snooze. */
861 if (TAILQ_EMPTY(&(raidPtr->iodone)) &&
862 rf_buf_queue_check(raidPtr)) {
863 rf_wait_cond2(raidPtr->iodone_cv, raidPtr->iodone_lock);
864 }
865
866 /* Check for deferred parity-map-related work. */
867 if (raidPtr->parity_map != NULL) {
868 rf_unlock_mutex2(raidPtr->iodone_lock);
869 rf_paritymap_checkwork(raidPtr->parity_map);
870 rf_lock_mutex2(raidPtr->iodone_lock);
871 }
872
873 /* See what I/Os, if any, have arrived */
874 while ((req = TAILQ_FIRST(&(raidPtr->iodone))) != NULL) {
875 TAILQ_REMOVE(&(raidPtr->iodone), req, iodone_entries);
876 rf_unlock_mutex2(raidPtr->iodone_lock);
877 rf_DiskIOComplete(req->queue, req, req->error);
878 (req->CompleteFunc) (req->argument, req->error);
879 rf_lock_mutex2(raidPtr->iodone_lock);
880 }
881
882 /* process any pending outgoing IO */
883 rf_unlock_mutex2(raidPtr->iodone_lock);
884 raidstart(raidPtr);
885 rf_lock_mutex2(raidPtr->iodone_lock);
886
887 }
888
889 /* Let rf_ShutdownEngine know that we're done... */
890 raidPtr->shutdown_raidio = 0;
891 rf_signal_cond2(raidPtr->iodone_cv);
892
893 rf_unlock_mutex2(raidPtr->iodone_lock);
894
895 kthread_exit(0);
896 }
897