1 /*
2 ** cs_new_dispatch.c
3 **
4 ** Copyright (C) Martin Brain (mjb@cs.bath.ac.uk) 04/08/12
5 ** Realisation in code for Csound John ffitch Feb 2013
6 **
7 This file is part of Csound.
8
9 The Csound Library is free software; you can redistribute it
10 and/or modify it under the terms of the GNU Lesser General Public
11 License as published by the Free Software Foundation; either
12 version 2.1 of the License, or (at your option) any later version.
13
14 Csound is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public
20 License along with Csound; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 02110-1301 USA
23
24
25 ** Fast system for managing task dependencies and dispatching to threads.
26 **
27 ** Has a DAG of tasks and has to assign them to worker threads while respecting
28 ** dependency order.
29 **
30 ** OPT marks code relevant to particular optimisations (listed below the code).
31 ** INV marks invariants
32 ** NOTE marks notes
33 */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <limits.h>
38 #include "csoundCore.h"
39 #include "cs_par_base.h"
40 #include "cs_par_orc_semantics.h"
41 #include <stdbool.h>
42
43 #if defined(_MSC_VER)
44 /* For InterlockedCompareExchange */
45 #include <windows.h>
46 #endif
47
48 /* Used as an error value */
49 //typedef int taskID;
50 #define INVALID (-1)
51 #define WAIT (-2)
52
53 /* Each task has a status */
54 //enum state { WAITING = 3, /* Dependencies have not been finished */
55 // AVAILABLE = 2, /* Dependencies met, ready to be run */
56 // INPROGRESS = 1, /* Has been started */
57 // DONE = 0 }; /* Has been completed */
58
59 /* Sets of prerequiste tasks for each task */
60 //typedef struct _watchList {
61 // taskID id;
62 // struct _watchList *next;
63 //} watchList;
64
65 /* Array of states of each task -- need to move to CSOUND structure */
66 //static enum state *task_status = NULL; /* OPT : Structure lay out */
67 //static watchList **task_watch = NULL;
68 //static INSDS **task_map = NULL;
69
70 /* INV : Read by multiple threads, updated by only one */
71 /* Thus use atomic read and write */
72
73 //static char ** task_dep; /* OPT : Structure lay out */
74 //static watchList * wlmm;
75
76 #define INIT_SIZE (100)
77 //static int task_max_size;
78
dag_print_state(CSOUND * csound)79 static void dag_print_state(CSOUND *csound)
80 {
81 int i;
82 watchList *w;
83 printf("*** %d tasks\n", csound->dag_num_active);
84 for (i=0; i<csound->dag_num_active; i++) {
85 printf("%d(%d): ", i, csound->dag_task_map[i]->insno);
86 switch (csound->dag_task_status[i].s) {
87 case DONE:
88 printf("status=DONE (watchList ");
89 w = csound->dag_task_watch[i];
90 while (w) { printf("%d ", w->id); w=w->next; }
91 printf(")\n");
92 break;
93 case INPROGRESS:
94 printf("status=INPROGRESS (watchList ");
95 w = csound->dag_task_watch[i];
96 while (w) { printf("%d ", w->id); w=w->next; }
97 printf(")\n");
98 break;
99 case AVAILABLE:
100 printf("status=AVAILABLE (watchList ");
101 w = csound->dag_task_watch[i];
102 while (w) { printf("%d ", w->id); w=w->next; }
103 printf(")\n");
104 break;
105 case WAITING:
106 {
107 char *tt = csound->dag_task_dep[i];
108 int j;
109 printf("status=WAITING for tasks [");
110 for (j=0; j<i; j++) if (tt[j]) printf("%d ", j);
111 printf("]\n");
112 }
113 break;
114 default:
115 printf("status=???\n"); break;
116 }
117 }
118 }
119
120 /* For now allocate a fixed maximum number of tasks; FIXME */
create_dag(CSOUND * csound)121 static void create_dag(CSOUND *csound)
122 {
123 /* Allocate the main task status and watchlists */
124 int max = csound->dag_task_max_size;
125 csound->dag_task_status = csound->Calloc(csound, sizeof(stateWithPadding)*max);
126 csound->dag_task_watch = csound->Calloc(csound, sizeof(watchList*)*max);
127 csound->dag_task_map = csound->Calloc(csound, sizeof(INSDS*)*max);
128 csound->dag_task_dep = (char **)csound->Calloc(csound, sizeof(char*)*max);
129 csound->dag_wlmm = (watchList *)csound->Calloc(csound, sizeof(watchList)*max);
130 }
131
recreate_dag(CSOUND * csound)132 static void recreate_dag(CSOUND *csound)
133 {
134 /* Allocate the main task status and watchlists */
135 int max = csound->dag_task_max_size;
136 csound->dag_task_status =
137 csound->ReAlloc(csound, (stateWithPadding *)csound->dag_task_status,
138 sizeof(stateWithPadding)*max);
139 csound->dag_task_watch =
140 csound->ReAlloc(csound, (struct watchList *)csound->dag_task_watch,
141 sizeof(watchList*)*max);
142 csound->dag_task_map =
143 csound->ReAlloc(csound, (INSDS *)csound->dag_task_map, sizeof(INSDS*)*max);
144 csound->dag_task_dep =
145 (char **)csound->ReAlloc(csound, csound->dag_task_dep, sizeof(char*)*max);
146 csound->dag_wlmm =
147 (watchList *)csound->ReAlloc(csound, csound->dag_wlmm, sizeof(watchList)*max);
148 }
149
dag_get_info(CSOUND * csound,int insno)150 static INSTR_SEMANTICS *dag_get_info(CSOUND* csound, int insno)
151 {
152 INSTR_SEMANTICS *current_instr =
153 csp_orc_sa_instr_get_by_num(csound, insno);
154 if (current_instr == NULL) {
155 current_instr =
156 csp_orc_sa_instr_get_by_name(csound,
157 csound->engineState.instrtxtp[insno]->insname);
158 if (UNLIKELY(current_instr == NULL))
159 csound->Die(csound,
160 Str("Failed to find semantic information"
161 " for instrument '%i'"),
162 insno);
163 }
164 return current_instr;
165 }
166
dag_intersect(CSOUND * csound,struct set_t * current,struct set_t * later,int cnt)167 static int dag_intersect(CSOUND *csound, struct set_t *current,
168 struct set_t *later, int cnt)
169 {
170 IGN(cnt);
171 struct set_t *ans;
172 int res = 0;
173 struct set_element_t *ele;
174 ans = csp_set_intersection(csound, current, later);
175 res = ans->count;
176 ele = ans->head;
177 while (ele != NULL) {
178 struct set_element_t *next = ele->next;
179 csound->Free(csound, ele);
180 ele = next; res++;
181 }
182 csound->Free(csound, ans);
183 return res;
184 }
185
dag_build(CSOUND * csound,INSDS * chain)186 void dag_build(CSOUND *csound, INSDS *chain)
187 {
188 INSDS *save = chain;
189 INSDS **task_map;
190 int i;
191
192 //printf("DAG BUILD***************************************\n");
193 csound->dag_num_active = 0;
194 while (chain != NULL) {
195 csound->dag_num_active++;
196 chain = chain->nxtact;
197 }
198 if (csound->dag_num_active>csound->dag_task_max_size) {
199 //printf("**************need to extend task vector\n");
200 csound->dag_task_max_size = csound->dag_num_active+INIT_SIZE;
201 recreate_dag(csound);
202 }
203 if (csound->dag_task_status == NULL)
204 create_dag(csound); /* Should move elsewhere */
205 else {
206 memset((void*)csound->dag_task_watch, '\0',
207 sizeof(watchList*)*csound->dag_task_max_size);
208 for (i=0; i<csound->dag_task_max_size; i++) {
209 if (csound->dag_task_dep[i]) {
210 csound->dag_task_dep[i]= NULL;
211 }
212 csound->dag_wlmm[i].id = INVALID;
213 }
214 }
215 task_map = csound->dag_task_map;
216 for (i=0; i<csound->dag_num_active; i++) {
217 csound->dag_task_status[i].s = AVAILABLE;
218 csound->dag_wlmm[i].id=i;
219 }
220 csound->dag_changed = 0;
221 if (UNLIKELY(csound->oparms->odebug))
222 printf("dag_num_active = %d\n", csound->dag_num_active);
223 i = 0; chain = save;
224 while (chain != NULL) { /* for each instance check against later */
225 int j = i+1; /* count of instance */
226 if (UNLIKELY(csound->oparms->odebug))
227 printf("\nWho depends on %d (instr %d)?\n", i, chain->insno);
228 INSDS *next = chain->nxtact;
229 INSTR_SEMANTICS *current_instr = dag_get_info(csound, chain->insno);
230 //csp_set_print(csound, current_instr->read);
231 //csp_set_print(csound, current_instr->write);
232 while (next) {
233 INSTR_SEMANTICS *later_instr = dag_get_info(csound, next->insno);
234 int cnt = 0;
235 if (UNLIKELY(csound->oparms->odebug)) printf("%d ", j);
236 //csp_set_print(csound, later_instr->read);
237 //csp_set_print(csound, later_instr->write);
238 //csp_set_print(csound, later_instr->read_write);
239 if (dag_intersect(csound, current_instr->write,
240 later_instr->read, cnt++) ||
241 dag_intersect(csound, current_instr->read_write,
242 later_instr->read, cnt++) ||
243 dag_intersect(csound, current_instr->read,
244 later_instr->write, cnt++) ||
245 dag_intersect(csound, current_instr->write,
246 later_instr->write, cnt++) ||
247 dag_intersect(csound, current_instr->read_write,
248 later_instr->write, cnt++) ||
249 dag_intersect(csound, current_instr->read,
250 later_instr->read_write, cnt++) ||
251 dag_intersect(csound, current_instr->write,
252 later_instr->read_write, cnt++)) {
253 char *tt = csound->dag_task_dep[j];
254 if (tt==NULL) {
255 /* get dep vector if missing and set watch first time */
256 tt = csound->dag_task_dep[j] =
257 (char*)csound->Calloc(csound, sizeof(char)*(j+1));
258 csound->dag_task_status[j].s = WAITING;
259 csound->dag_wlmm[j].next = csound->dag_task_watch[i];
260 csound->dag_wlmm[j].id = j;
261 csound->dag_task_watch[i] = &(csound->dag_wlmm[j]);
262 //printf("set watch %d to %d\n", j, i);
263 }
264 tt[i] = 1;
265 //printf("-yes ");
266 }
267 j++; next = next->nxtact;
268 }
269 task_map[i] = chain;
270 i++; chain = chain->nxtact;
271 }
272 if (UNLIKELY(csound->oparms->odebug)) dag_print_state(csound);
273 }
274
dag_reinit(CSOUND * csound)275 void dag_reinit(CSOUND *csound)
276 {
277 int i;
278 int max = csound->dag_task_max_size;
279 volatile stateWithPadding *task_status = csound->dag_task_status;
280 watchList * volatile *task_watch = csound->dag_task_watch;
281 watchList *wlmm = csound->dag_wlmm;
282 if (UNLIKELY(csound->oparms->odebug))
283 printf("DAG REINIT************************\n");
284 for (i=csound->dag_num_active; i<max; i++)
285 task_status[i].s = DONE;
286 task_status[0].s = AVAILABLE;
287 task_watch[0] = NULL;
288 for (i=1; i<csound->dag_num_active; i++) {
289 int j;
290 task_status[i].s = AVAILABLE;
291 task_watch[i] = NULL;
292 if (csound->dag_task_dep[i]==NULL) continue;
293 for (j=0; j<i; j++)
294 if (csound->dag_task_dep[i][j]) {
295 task_status[i].s = WAITING;
296 wlmm[i].id = i;
297 wlmm[i].next = task_watch[j];
298 task_watch[j] = &wlmm[i];
299 break;
300 }
301 }
302 //dag_print_state(csound);
303 }
304
305 //#define ATOMIC_READ(x) __atomic_load(&(x), __ATOMIC_SEQ_CST)
306 //#define ATOMIC_WRITE(x,v) __atomic_(&(x), v, __ATOMIC_SEQ_CST)
307 #define ATOMIC_READ(x) x
308 #define ATOMIC_WRITE(x,v) x = v;
309 #if defined(_MSC_VER)
310 #define ATOMIC_CAS(x,current,new) \
311 (current == InterlockedCompareExchange(x, new, current))
312 #else
313 #define ATOMIC_CAS(x,current,new) \
314 __atomic_compare_exchange_n(x,&(current),new, true, __ATOMIC_SEQ_CST, \
315 __ATOMIC_SEQ_CST)
316 #endif
317
318 #if defined(_MSC_VER)
319 #define ATOMIC_CAS_PTR(x,current,new) \
320 (current == InterlockedCompareExchangePointer(x, new, current))
321 #else
322 #define ATOMIC_CAS_PTR(x,current,new) \
323 __atomic_compare_exchange_n(x,&(current),new, true, __ATOMIC_SEQ_CST,\
324 __ATOMIC_SEQ_CST)
325 #endif
326
dag_get_task(CSOUND * csound,int index,int numThreads,taskID next_task)327 taskID dag_get_task(CSOUND *csound, int index, int numThreads, taskID next_task)
328 {
329 int i;
330 int count_waiting = 0;
331 int active = csound->dag_num_active;
332 int start = (index * active) / numThreads;
333 volatile stateWithPadding *task_status = csound->dag_task_status;
334 enum state current_task_status;
335
336 if (next_task != INVALID) {
337 // Have forwarded one task from the previous one
338 // assert(ATOMIC_READ(task_status[next_task].s) == WAITING);
339 ATOMIC_WRITE(task_status[next_task].s,INPROGRESS);
340 return next_task;
341 }
342
343 //printf("**GetTask from %d\n", csound->dag_num_active);
344 i = start;
345 do {
346 current_task_status = ATOMIC_READ(task_status[i].s);
347
348 switch (current_task_status) {
349 case AVAILABLE :
350 // Need to CAS as the value may have changed
351 if (ATOMIC_CAS(&(task_status[i].s), current_task_status, INPROGRESS)) {
352 return (taskID)i;
353 }
354 break;
355
356 case WAITING :
357 // printf("**%d waiting\n", i);
358 ++count_waiting;
359 break;
360
361 case INPROGRESS :
362 // print(f"**%d active\n", i);
363 break;
364
365 case DONE :
366 //printf("**%d done\n", i);
367 break;
368
369 default :
370 // Enum corrupted!
371 //assert(0);
372 break;
373 }
374
375 // Increment modulo active
376 i = (i+1 == active) ? 0 : i + 1;
377
378 } while (i != start);
379 //dag_print_state(csound);
380 if (count_waiting == 0) return (taskID)INVALID;
381 //printf("taskstodo=%d)\n", morework);
382 return (taskID)WAIT;
383 }
384
385 /* This static is OK as not written */
386 static const watchList DoNotRead = { INVALID, NULL};
387
moveWatch(CSOUND * csound,watchList * volatile * w,watchList * t)388 inline static int moveWatch(CSOUND *csound, watchList * volatile *w,
389 watchList *t)
390 {
391 IGN(csound);
392 watchList *local=*w;
393 t->next = NULL;
394 //printf("moveWatch\n");
395 do {
396 //dag_print_state(csound);
397 local = ATOMIC_READ(*w);
398 if (local==&DoNotRead) {
399 //printf("local is DoNotRead\n");
400 return 0;//was no & earlier
401 }
402 else t->next = local;
403 } while (!ATOMIC_CAS_PTR(w,local,t));
404 //dag_print_state(csound);
405 //printf("moveWatch done\n");
406 return 1;
407 }
408
dag_end_task(CSOUND * csound,taskID i)409 taskID dag_end_task(CSOUND *csound, taskID i)
410 {
411 watchList *to_notify, *next;
412 int canQueue;
413 int j, k;
414 watchList * volatile *task_watch = csound->dag_task_watch;
415 enum state current_task_status;
416 int wait_on_current_tasks;
417 taskID next_task = INVALID;
418 ATOMIC_WRITE(csound->dag_task_status[i].s, DONE); /* as DONE is zero */
419 // A write barrier /might/ be useful here to avoid the case
420 // of the list being DoNotRead but the status being something
421 // other than done. At the time of writing this wouldn't give
422 // a correctness issue, plus the semantics of GCC's CAS apparently
423 // imply a write barrier, so it should be OK.
424 { /* ATOMIC_SWAP */
425 do {
426 to_notify = ATOMIC_READ(task_watch[i]);
427 } while (!ATOMIC_CAS_PTR(&task_watch[i],to_notify,(watchList *) &DoNotRead));
428 } //to_notify = ATOMIC_SWAP(task_watch[i], &DoNotRead);
429 //printf("Ending task %d\n", i);
430 next = to_notify;
431 while (to_notify) { /* walk the list of watchers */
432 next = to_notify->next;
433 j = to_notify->id;
434 //printf("%d notifying task %d it finished\n", i, j);
435 canQueue = 1;
436 wait_on_current_tasks = 0;
437
438 for (k=0; k<j; k++) { /* seek next watch */
439 if (csound->dag_task_dep[j][k]==0) continue;
440 current_task_status = ATOMIC_READ(csound->dag_task_status[k].s);
441 //printf("investigating task %d (%d)\n", k, current_task_status);
442
443 if (current_task_status == WAITING) { // Prefer watching blocked tasks
444 //printf("found task %d to watch %d status %d\n",
445 // k, j, csound->dag_task_status[k].s);
446 if (moveWatch(csound, &task_watch[k], to_notify)) {
447 //printf("task %d now watches %d\n", j, k);
448 canQueue = 0;
449 wait_on_current_tasks = 0;
450 break;
451 }
452 else {
453 /* assert csound->dag_task_status[j].s == DONE and we are in race */
454 //printf("Racing status %d %d %d %d\n",
455 // csound->dag_task_status[j].s, i, j, k);
456 }
457
458 }
459 else if (current_task_status == AVAILABLE ||
460 current_task_status == INPROGRESS) {
461 wait_on_current_tasks = 1;
462 }
463 //else { printf("not %d\n", k); }
464 }
465
466 // Try the same thing again but this time waiting on active or available task
467 if (wait_on_current_tasks == 1) {
468 for (k=0; k<j; k++) { /* seek next watch */
469 if (csound->dag_task_dep[j][k]==0) continue;
470 current_task_status = ATOMIC_READ(csound->dag_task_status[k].s);
471 //printf("investigating task %d (%d)\n", k, current_task_status);
472
473 if (current_task_status != DONE) { // Prefer watching blocked tasks
474 //printf("found task %d to watch %d status %d\n",
475 // k, j, csound->dag_task_status[k].s);
476 if (moveWatch(csound, &task_watch[k], to_notify)) {
477 //printf("task %d now watches %d\n", j, k);
478 canQueue = 0;
479 break;
480 }
481 else {
482 /* assert csound->dag_task_status[j].s == DONE and we are in race */
483 //printf("Racing status %d %d %d %d\n",
484 // csound->dag_task_status[j].s, i, j, k);
485 }
486
487 }
488 //else { printf("not %d\n", k); }
489 }
490 }
491
492 if (canQueue) { /* could use monitor here */
493 if (next_task == INVALID) {
494 next_task = j; // Forward directly to the thread to save re-dispatch
495 } else {
496 ATOMIC_WRITE(csound->dag_task_status[j].s, AVAILABLE);
497 }
498 }
499 to_notify = next;
500 }
501 //dag_print_state(csound);
502 return next_task;
503 }
504
505
506 /* INV : Acyclic */
507 /* INV : Each entry is read by a single thread,
508 * no writes (but see OPT : Watch ordering) */
509 /* Thus no protection needed */
510
511 /* INV : Watches for different tasks are disjoint */
512 /* INV : Multiple threads can add to a watch list but only one will remove
513 * These are the only interactions */
514 /* Thus the use of CAS / atomic operations */
515
516 /* Used to mark lists that should not be added to, see NOTE : Race condition */
517 #if 0
518 watchList nullList;
519 watchList *doNotAdd = &nullList;
520 watchList endwatch = { NULL, NULL };
521
522 /* Lists of tasks that depend on the given task */
523 watchList ** watch; /* OPT : Structure lay out */
524 watchListMemoryManagement *wlmm; /* OPT : Structure lay out */
525
526 /* INV : wlmm[X].s.id == X; */ /* OPT : Data structure redundancy */
527 /* INV : status[X] == WAITING => wlmm[X].used */
528 /* INV : wlmm[X].s is in watch[Y] => wlmm[X].used */
529
530
531 /* Watch list helper functions */
532
533 void initialiseWatch (watchList **w, taskID id) {
534 wlmm[id].used = TRUE;
535 wlmm[id].s.id = id;
536 wlmm[id].s.tail = *w;
537 *w = &(wlmm[id].s);
538 }
539
540 inline watchList * getWatches(taskID id) {
541
542 return __atomic_test_and_set (&(watch[id]), doNotAdd);
543 }
544
545 int moveWatch (watchList **w, watchList *t) {
546 watchList *local;
547
548 t->tail = NULL;
549
550 do {
551 local = atomicRead(*w);
552
553 if (local == doNotAdd) {
554 return 0;
555 } else {
556 t->tail = local;
557 }
558 } while (!atomicCAS(*w,local,t)); /* OPT : Delay loop */
559
560 return 1;
561 }
562
563 void appendToWL (taskID id, watchList *l) {
564 watchList *w;
565
566 do {
567 w = watch[id];
568 l->tail = w;
569 w = __atomic_compare_exchange_n(&(watch[id]),w,l, false, \
570 __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
571 } while (!(w == l));
572
573 }
574
575 inline void deleteWatch (watchList *t) {
576 wlmm[t->id].used = FALSE;
577 }
578
579
580
581
582 typedef struct monitor {
583 pthread_mutex_t l = PTHREAD_MUTEX_INITIALIZER;
584 unsigned int threadsWaiting = 0; /* Shadows the length of
585 workAvailable wait queue */
586 queue<taskID> q; /* OPT : Dispatch order */
587 pthread_cond_t workAvailable = PTHREAD_COND_INITIALIZER;
588 pthread_cond_t done = PTHREAD_COND_INITIALIZER;
589 } monitor; /* OPT : Lock-free */
590
591 /* INV : q.size() + dispatched <= ID */
592 /* INV : foreach(id,q.contents()) { status[id] = AVAILABLE; } */
593 /* INV : threadsWaiting <= THREADS */
594
595 monitor dispatch;
596
597
598 void addWork(monitor *dispatch, taskID id) {
599 pthread_mutex_lock(&dispatch->l);
600
601 status[id] = AVAILABLE;
602 dispatch->q.push(id);
603 if (threadsWaiting >= 1) {
604 pthread_cond_signal(&dispatch->c);
605 }
606
607 pthread_mutex_unlock(&dispatch->l);
608 return;
609 }
610
611 taskID getWork(monitor *dispatch) {
612 taskID returnValue;
613
614 pthread_mutex_lock(&dispatch->l);
615
616 while (q.empty()) {
617 ++dispatch->threadsWaiting;
618
619 if (dispatch->threadsWaiting == THREADS) {
620 /* Will the last person out please turn off the lights! */
621 pthread_cond_signal(&dispatch->done);
622 }
623
624 pthread_cond_wait(&dispatch->l,&dispatch->workAvailable);
625 --dispatch->threadsWaiting;
626
627 /* NOTE : A while loop is needed as waking from this requires
628 * reacquiring the mutex and in the mean time someone
629 * might have got it first and removed the work. */
630 }
631
632 returnValue = q.pop();
633
634 pthread_mutex_unlock(&dispatch->l);
635 return returnValue;
636
637 }
638
639 void waitForWorkToBeCompleted (monitor *dispatch) {
640 /* Requires
641 * INV : threadsWaiting == THREADS <=> \forall id \in ID . status[id] == DONE
642 */
643
644 pthread_mutex_lock(&dispatch->l);
645
646 if (dispatch->threadsWaiting < THREADS) {
647 pthread_cond_wait(&dispatch->l,&dispatch->done);
648 }
649
650 /* This assertion is more difficult to prove than it might first appear */
651 assert(dispatch->threadsWaiting == THREADS);
652
653 pthread_mutex_unlock(&dispatch->l);
654 return;
655 }
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670 void mainThread (State *s) {
671
672 /* Set up the DAG */
673 if (s->firstRun || s->updateNeeded) {
674 dep = buildDAG(s); /* OPT : Dispatch order */
675 /* Other : Update anything that is indexed by task
676 * (i.e. all arrays given length ID) */
677 }
678
679 /* Reset the data structure */
680 foreach (id in ID) {
681 watch[id] = NULL;
682 }
683
684 /* Initialise the dispatch queue */
685 foreach (id in ID) { /* OPT : Dispatch order */
686 if (dep[id] == EMPTYSET) {
687 atomicWrite(status[id] = AVAILABLE);
688 addWork(*dispatch,id);
689
690 } else {
691 atomicWrite(status[id] = WAITING);
692 initialiseWatch(&watch[choose(dep[id])], id); /* OPT : Watch ordering */
693
694 }
695 }
696
697 /* INV : Data structure access invariants start here */
698 /* INV : Status only decrease from now */
699 /* INV : Watch list for id contains a subset of the things that depend on id */
700 /* INV : Each id appears in at most one watch list */
701 /* INV : doNotAdd only appears at the head of a watch list */
702 /* INV : if (watch[id] == doNotAdd) then { status[id] == DONE; } */
703
704 waitForWorkToBeCompleted(*dispatch);
705
706 return;
707 }
708
709 void workerThread (State *s) {
710 taskID work;
711 watchList *tasksToNotify, next;
712 bool canQueue;
713
714 do {
715
716 task = getWork(dispatch);
717
718 /* Do stuff */
719 atomicWrite(status[work] = INPROGRESS);
720 doStuff(work);
721 atomicWrite(status[work] = DONE); /* NOTE : Race condition */
722
723
724 tasksToNotify = getWatches(work);
725
726 while (tasksToNotify != NULL) {
727 next = tasksToNotify->tail;
728
729 canQueue = TRUE;
730 foreach (dep in dep[tasksToNotify->id]) { /* OPT : Watch ordering */
731 if (atomicRead(status[dep]) != DONE) {
732 /* NOTE : Race condition */
733 if (moveWatch(watch[dep],tasksToNotify)) {
734 canQueue = FALSE;
735 break;
736 } else {
737 /* Have hit the race condition, try the next option */
738 assert(atomicRead(status[dep]) == DONE);
739 }
740 }
741 }
742
743 if (canQueue) { /* OPT : Save one work item */
744 addWork(*dispatch,tasksToNotify->id);
745 deleteWatch(tasksToNotify);
746 }
747
748 tasksToNotify = next;
749 }
750
751 } while (1); /* NOTE : some kind of control for thread exit needed */
752
753 return;
754 }
755
756
757
758
759 /* OPT : Structure lay out
760 *
761 * All data structures that are 1. modified by one or more thread and
762 * 2. accessed by multiple threads, should be aligned to cache lines and
763 * padded so that there is only one instance per cache line. This will reduce
764 * false memory contention between objects that just happen to share a cache
765 * line. Blocking to 64 bytes will probably be sufficient and if people really
766 * care about performance that much they can tune to their particular
767 * architecture.
768 */
769
770 /* OPT : Watch ordering
771 *
772 * Moving a watch is relatively cheap (in the uncontended case) but
773 * it would be best to avoid moving watches where possible. The ideal
774 * situation would be for every task to watch the last pre-requisite.
775 * There are two places in the code that affect the watch ordering;
776 * the initial assignment and the movement when a watch is triggered.
777 * Prefering WAITING tasks (in the later) and lower priority tasks
778 * (if combined with the dispatch order optimisation below) are probably
779 * good choices. One mechanism would be to reorder the set (or array) of
780 * dependencies to store this information. When looking for a (new) watch,
781 * tasks are sorted with increasing status first and then the first one picked.
782 * Keeping the list sorted (or at least split between WAITING and others) with
783 * each update should (if the dispatch order is fixed / slowly adapting) result
784 * in the best things to watch moving to the front and thus adaptively give
785 * the best possible tasks to watch. The interaction with a disaptch order
786 * heuristic would need to be considered. Note that only one thread will
787 * look at any given element of dep[] so they can be re-ordered without
788 * needing locking.
789 */
790
791 /* OPT : Structure lay out
792 *
793 * Some of the fields are not strictly needed and are just there to make
794 * the algorithm cleaner and more intelligible. The id fields of the watch
795 * lists are not really needed as there is one per task and their position
796 * within the watchListMemoryManager array allows the task to be infered.
797 * Likewise the used flag in the memory manager is primarily for book-keeping
798 * and checking / assertions and could be omitted.
799 */
800
801 /* OPT : Delay loop
802 *
803 * In theory it is probably polite to put a slowly increasing delay in
804 * after a failed compare and swap to reduce pressure on the memory
805 * subsystem in the highly contended case. As multiple threads adding
806 * to a task's watch list simultaneously is probably a rare event, the
807 * delay loop is probably unnecessary.
808 */
809
810 /* OPT : Dispatch order
811 *
812 * The order in which tasks are dispatched affects the amount of
813 * parallelisation possible. Picking the exact scheduling order, even
814 * if the duration of the tasks is known is probably NP-Hard (see
815 * bin-packing*) so heuristics are probably the way to go. The proporition
816 * of tasks which depend on a given task is probably a reasonable primary
817 * score, with tie-breaks going to longer tasks. This can either be
818 * applied to just the initial tasks (either in ordering the nodes in the DAG)
819 * or in the order in which they are traversed. Alternatively by
820 * sorting the queue / using a heap / other priority queuing structure
821 * it might be possible to do this dynamically. The best solution would
822 * probably be adaptive with a task having its priority incremented
823 * each time another worker thread blocks on a shortage of work, with these
824 * increments also propagated 'upwards' in the DAG.
825 *
826 * *. Which means that a solver could be used to give the best possible
827 * schedule / the maximum parallelisation. This could be useful for
828 * optimisation.
829 */
830
831 /* OPT : Lock-free
832 *
833 * A lock free dispatch mechanism is probably possible as threads can
834 * scan the status array for things listed as AVAILABLE and then atomicCAS
835 * to INPROGRESS to claim them. But this starts to involve busy-waits or
836 * direct access to futexes and is probably not worth it.
837 */
838
839 /* OPT : Save one work item
840 *
841 * Rather than adding all watching tasks who have their dependencies met to
842 * the dispatch queue, saving one (perhaps the best, see OPT : Dispatch order)
843 * means the thread does not have to wait. In the case of a purely linear DAG
844 * this should be roughly as fast as the single threaded version.
845 */
846
847
848 /* NOTE : Race condition
849 *
850 * There is a subtle race condition:
851 *
852 * Thread 1 Thread 2
853 * -------- --------
854 * atomicRead(status[dep]) != DONE
855 * atomicWrite(status[work] = DONE);
856 * tasksToNotify = getWatches(work);
857 * moveWatch(watch[dep],tasksToNotify);
858 *
859 * The key cause is that the status and the watch list cannot be updated
860 * simultaneously. However as getWatches removes all watches and moves or
861 * removes them, it is sufficient to have a doNotAdd watchList node to detect
862 * this race condition and resolve it by having moveWatch() fail.
863 */
864
865 void newdag_alloc(CSOUND *csound, int numtasks)
866 {
867 doNotAdd = &endwatch;
868 ??
869 watch = (watchList **)csound->Calloc(csound, sizeof(watchList *)*numtasks);
870 wlmm = (watchListMemoryManagement *)
871 csound->Calloc(csound, sizeof(watchListMemoryManagement)*numtasks);
872
873 }
874
875 #endif
876