1 /*
2 **  cs_new_dispatch.c
3 **
4 **    Copyright (C)  Martin Brain (mjb@cs.bath.ac.uk) 04/08/12
5 **    Realisation in code for Csound John ffitch Feb 2013
6 **
7     This file is part of Csound.
8 
9     The Csound Library is free software; you can redistribute it
10     and/or modify it under the terms of the GNU Lesser General Public
11     License as published by the Free Software Foundation; either
12     version 2.1 of the License, or (at your option) any later version.
13 
14     Csound is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU Lesser General Public License for more details.
18 
19     You should have received a copy of the GNU Lesser General Public
20     License along with Csound; if not, write to the Free Software
21     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22     02110-1301 USA
23 
24 
25 ** Fast system for managing task dependencies and dispatching to threads.
26 **
27 ** Has a DAG of tasks and has to assign them to worker threads while respecting
28 ** dependency order.
29 **
30 ** OPT marks code relevant to particular optimisations (listed below the code).
31 ** INV marks invariants
32 ** NOTE marks notes
33 */
34 
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <limits.h>
38 #include "csoundCore.h"
39 #include "cs_par_base.h"
40 #include "cs_par_orc_semantics.h"
41 #include <stdbool.h>
42 
43 #if defined(_MSC_VER)
44 /* For InterlockedCompareExchange */
45 #include <windows.h>
46 #endif
47 
48 /* Used as an error value */
49 //typedef int taskID;
50 #define INVALID (-1)
51 #define WAIT    (-2)
52 
53 /* Each task has a status */
54 //enum state { WAITING = 3,          /* Dependencies have not been finished */
55 //           AVAILABLE = 2,        /* Dependencies met, ready to be run */
56 //           INPROGRESS = 1,       /* Has been started */
57 //           DONE = 0 };           /* Has been completed */
58 
59 /* Sets of prerequiste tasks for each task */
60 //typedef struct _watchList {
61 //  taskID id;
62 //  struct _watchList *next;
63 //} watchList;
64 
65 /* Array of states of each task -- need to move to CSOUND structure */
66 //static enum state *task_status = NULL;          /* OPT : Structure lay out */
67 //static watchList **task_watch = NULL;
68 //static INSDS **task_map = NULL;
69 
70 /* INV : Read by multiple threads, updated by only one */
71 /* Thus use atomic read and write */
72 
73 //static char ** task_dep;                        /* OPT : Structure lay out */
74 //static watchList * wlmm;
75 
76 #define INIT_SIZE (100)
77 //static int task_max_size;
78 
dag_print_state(CSOUND * csound)79 static void dag_print_state(CSOUND *csound)
80 {
81     int i;
82     watchList *w;
83     printf("*** %d tasks\n", csound->dag_num_active);
84     for (i=0; i<csound->dag_num_active; i++) {
85       printf("%d(%d): ", i, csound->dag_task_map[i]->insno);
86       switch (csound->dag_task_status[i].s) {
87       case DONE:
88         printf("status=DONE (watchList ");
89         w = csound->dag_task_watch[i];
90         while (w) { printf("%d ", w->id); w=w->next; }
91         printf(")\n");
92         break;
93       case INPROGRESS:
94         printf("status=INPROGRESS (watchList ");
95         w = csound->dag_task_watch[i];
96         while (w) { printf("%d ", w->id); w=w->next; }
97         printf(")\n");
98         break;
99       case AVAILABLE:
100         printf("status=AVAILABLE (watchList ");
101         w = csound->dag_task_watch[i];
102         while (w) { printf("%d ", w->id); w=w->next; }
103         printf(")\n");
104         break;
105       case WAITING:
106         {
107           char *tt = csound->dag_task_dep[i];
108           int j;
109           printf("status=WAITING for tasks [");
110           for (j=0; j<i; j++) if (tt[j]) printf("%d ", j);
111           printf("]\n");
112         }
113         break;
114       default:
115         printf("status=???\n"); break;
116       }
117     }
118 }
119 
120 /* For now allocate a fixed maximum number of tasks; FIXME */
create_dag(CSOUND * csound)121 static void create_dag(CSOUND *csound)
122 {
123     /* Allocate the main task status and watchlists */
124     int max = csound->dag_task_max_size;
125     csound->dag_task_status = csound->Calloc(csound, sizeof(stateWithPadding)*max);
126     csound->dag_task_watch  = csound->Calloc(csound, sizeof(watchList*)*max);
127     csound->dag_task_map    = csound->Calloc(csound, sizeof(INSDS*)*max);
128     csound->dag_task_dep    = (char **)csound->Calloc(csound, sizeof(char*)*max);
129     csound->dag_wlmm = (watchList *)csound->Calloc(csound, sizeof(watchList)*max);
130 }
131 
recreate_dag(CSOUND * csound)132 static void recreate_dag(CSOUND *csound)
133 {
134     /* Allocate the main task status and watchlists */
135     int max = csound->dag_task_max_size;
136     csound->dag_task_status =
137       csound->ReAlloc(csound, (stateWithPadding *)csound->dag_task_status,
138                sizeof(stateWithPadding)*max);
139     csound->dag_task_watch  =
140       csound->ReAlloc(csound, (struct watchList *)csound->dag_task_watch,
141                sizeof(watchList*)*max);
142     csound->dag_task_map    =
143       csound->ReAlloc(csound, (INSDS *)csound->dag_task_map, sizeof(INSDS*)*max);
144     csound->dag_task_dep    =
145       (char **)csound->ReAlloc(csound, csound->dag_task_dep, sizeof(char*)*max);
146     csound->dag_wlmm        =
147       (watchList *)csound->ReAlloc(csound, csound->dag_wlmm, sizeof(watchList)*max);
148 }
149 
dag_get_info(CSOUND * csound,int insno)150 static INSTR_SEMANTICS *dag_get_info(CSOUND* csound, int insno)
151 {
152     INSTR_SEMANTICS *current_instr =
153       csp_orc_sa_instr_get_by_num(csound, insno);
154     if (current_instr == NULL) {
155       current_instr =
156         csp_orc_sa_instr_get_by_name(csound,
157            csound->engineState.instrtxtp[insno]->insname);
158       if (UNLIKELY(current_instr == NULL))
159         csound->Die(csound,
160                     Str("Failed to find semantic information"
161                         " for instrument '%i'"),
162                     insno);
163     }
164     return current_instr;
165 }
166 
dag_intersect(CSOUND * csound,struct set_t * current,struct set_t * later,int cnt)167 static int dag_intersect(CSOUND *csound, struct set_t *current,
168                          struct set_t *later, int cnt)
169 {
170     IGN(cnt);
171     struct set_t *ans;
172     int res = 0;
173     struct set_element_t *ele;
174     ans = csp_set_intersection(csound, current, later);
175     res = ans->count;
176     ele = ans->head;
177     while (ele != NULL) {
178       struct set_element_t *next = ele->next;
179       csound->Free(csound, ele);
180       ele = next; res++;
181     }
182     csound->Free(csound, ans);
183     return res;
184 }
185 
dag_build(CSOUND * csound,INSDS * chain)186 void dag_build(CSOUND *csound, INSDS *chain)
187 {
188     INSDS *save = chain;
189     INSDS **task_map;
190     int i;
191 
192     //printf("DAG BUILD***************************************\n");
193     csound->dag_num_active = 0;
194     while (chain != NULL) {
195       csound->dag_num_active++;
196       chain = chain->nxtact;
197     }
198     if (csound->dag_num_active>csound->dag_task_max_size) {
199       //printf("**************need to extend task vector\n");
200       csound->dag_task_max_size = csound->dag_num_active+INIT_SIZE;
201       recreate_dag(csound);
202     }
203     if (csound->dag_task_status == NULL)
204       create_dag(csound); /* Should move elsewhere */
205     else {
206       memset((void*)csound->dag_task_watch, '\0',
207              sizeof(watchList*)*csound->dag_task_max_size);
208       for (i=0; i<csound->dag_task_max_size; i++) {
209         if (csound->dag_task_dep[i]) {
210           csound->dag_task_dep[i]= NULL;
211         }
212         csound->dag_wlmm[i].id = INVALID;
213       }
214     }
215     task_map = csound->dag_task_map;
216     for (i=0; i<csound->dag_num_active; i++) {
217       csound->dag_task_status[i].s = AVAILABLE;
218       csound->dag_wlmm[i].id=i;
219     }
220     csound->dag_changed = 0;
221     if (UNLIKELY(csound->oparms->odebug))
222       printf("dag_num_active = %d\n", csound->dag_num_active);
223     i = 0; chain = save;
224     while (chain != NULL) {     /* for each instance check against later */
225       int j = i+1;              /* count of instance */
226       if (UNLIKELY(csound->oparms->odebug))
227         printf("\nWho depends on %d (instr %d)?\n", i, chain->insno);
228       INSDS *next = chain->nxtact;
229       INSTR_SEMANTICS *current_instr = dag_get_info(csound, chain->insno);
230       //csp_set_print(csound, current_instr->read);
231       //csp_set_print(csound, current_instr->write);
232       while (next) {
233         INSTR_SEMANTICS *later_instr = dag_get_info(csound, next->insno);
234         int cnt = 0;
235         if (UNLIKELY(csound->oparms->odebug)) printf("%d ", j);
236         //csp_set_print(csound, later_instr->read);
237         //csp_set_print(csound, later_instr->write);
238         //csp_set_print(csound, later_instr->read_write);
239         if (dag_intersect(csound, current_instr->write,
240                           later_instr->read, cnt++)       ||
241             dag_intersect(csound, current_instr->read_write,
242                           later_instr->read, cnt++)       ||
243             dag_intersect(csound, current_instr->read,
244                           later_instr->write, cnt++)      ||
245             dag_intersect(csound, current_instr->write,
246                           later_instr->write, cnt++)      ||
247             dag_intersect(csound, current_instr->read_write,
248                           later_instr->write, cnt++)      ||
249             dag_intersect(csound, current_instr->read,
250                           later_instr->read_write, cnt++) ||
251             dag_intersect(csound, current_instr->write,
252                           later_instr->read_write, cnt++)) {
253           char *tt = csound->dag_task_dep[j];
254           if (tt==NULL) {
255             /* get dep vector if missing and set watch first time */
256             tt = csound->dag_task_dep[j] =
257               (char*)csound->Calloc(csound, sizeof(char)*(j+1));
258             csound->dag_task_status[j].s = WAITING;
259             csound->dag_wlmm[j].next = csound->dag_task_watch[i];
260             csound->dag_wlmm[j].id = j;
261             csound->dag_task_watch[i] = &(csound->dag_wlmm[j]);
262             //printf("set watch %d to %d\n", j, i);
263           }
264           tt[i] = 1;
265           //printf("-yes ");
266         }
267         j++; next = next->nxtact;
268       }
269       task_map[i] = chain;
270       i++; chain = chain->nxtact;
271     }
272     if (UNLIKELY(csound->oparms->odebug)) dag_print_state(csound);
273 }
274 
dag_reinit(CSOUND * csound)275 void dag_reinit(CSOUND *csound)
276 {
277     int i;
278     int max = csound->dag_task_max_size;
279     volatile stateWithPadding *task_status = csound->dag_task_status;
280     watchList * volatile *task_watch = csound->dag_task_watch;
281     watchList *wlmm = csound->dag_wlmm;
282     if (UNLIKELY(csound->oparms->odebug))
283       printf("DAG REINIT************************\n");
284     for (i=csound->dag_num_active; i<max; i++)
285       task_status[i].s = DONE;
286     task_status[0].s = AVAILABLE;
287     task_watch[0] = NULL;
288     for (i=1; i<csound->dag_num_active; i++) {
289       int j;
290       task_status[i].s = AVAILABLE;
291       task_watch[i] = NULL;
292       if (csound->dag_task_dep[i]==NULL) continue;
293       for (j=0; j<i; j++)
294         if (csound->dag_task_dep[i][j]) {
295           task_status[i].s = WAITING;
296           wlmm[i].id = i;
297           wlmm[i].next = task_watch[j];
298           task_watch[j] = &wlmm[i];
299           break;
300         }
301     }
302     //dag_print_state(csound);
303 }
304 
305 //#define ATOMIC_READ(x) __atomic_load(&(x), __ATOMIC_SEQ_CST)
306 //#define ATOMIC_WRITE(x,v) __atomic_(&(x), v, __ATOMIC_SEQ_CST)
307 #define ATOMIC_READ(x) x
308 #define ATOMIC_WRITE(x,v) x = v;
309 #if defined(_MSC_VER)
310 #define ATOMIC_CAS(x,current,new) \
311   (current == InterlockedCompareExchange(x, new, current))
312 #else
313 #define ATOMIC_CAS(x,current,new)  \
314   __atomic_compare_exchange_n(x,&(current),new, true, __ATOMIC_SEQ_CST, \
315                               __ATOMIC_SEQ_CST)
316 #endif
317 
318 #if defined(_MSC_VER)
319 #define ATOMIC_CAS_PTR(x,current,new) \
320   (current == InterlockedCompareExchangePointer(x, new, current))
321 #else
322 #define ATOMIC_CAS_PTR(x,current,new)  \
323   __atomic_compare_exchange_n(x,&(current),new, true, __ATOMIC_SEQ_CST,\
324                               __ATOMIC_SEQ_CST)
325 #endif
326 
dag_get_task(CSOUND * csound,int index,int numThreads,taskID next_task)327 taskID dag_get_task(CSOUND *csound, int index, int numThreads, taskID next_task)
328 {
329     int i;
330     int count_waiting = 0;
331     int active = csound->dag_num_active;
332     int start = (index * active) / numThreads;
333     volatile stateWithPadding *task_status = csound->dag_task_status;
334     enum state current_task_status;
335 
336     if (next_task != INVALID) {
337       // Have forwarded one task from the previous one
338       // assert(ATOMIC_READ(task_status[next_task].s) == WAITING);
339       ATOMIC_WRITE(task_status[next_task].s,INPROGRESS);
340       return next_task;
341     }
342 
343     //printf("**GetTask from %d\n", csound->dag_num_active);
344     i = start;
345     do {
346       current_task_status = ATOMIC_READ(task_status[i].s);
347 
348       switch (current_task_status) {
349       case AVAILABLE :
350         // Need to CAS as the value may have changed
351         if (ATOMIC_CAS(&(task_status[i].s), current_task_status, INPROGRESS)) {
352           return (taskID)i;
353         }
354         break;
355 
356       case WAITING :
357         //  printf("**%d waiting\n", i);
358         ++count_waiting;
359         break;
360 
361       case INPROGRESS :
362         //  print(f"**%d active\n", i);
363         break;
364 
365       case DONE :
366         //printf("**%d done\n", i);
367         break;
368 
369       default :
370         // Enum corrupted!
371         //assert(0);
372         break;
373       }
374 
375       // Increment modulo active
376       i = (i+1 == active) ? 0 : i + 1;
377 
378     } while (i != start);
379     //dag_print_state(csound);
380     if (count_waiting == 0) return (taskID)INVALID;
381     //printf("taskstodo=%d)\n", morework);
382     return (taskID)WAIT;
383 }
384 
385 /* This static is OK as not written */
386 static const watchList DoNotRead = { INVALID, NULL};
387 
moveWatch(CSOUND * csound,watchList * volatile * w,watchList * t)388 inline static int moveWatch(CSOUND *csound, watchList * volatile *w,
389                             watchList *t)
390 {
391      IGN(csound);
392     watchList *local=*w;
393     t->next = NULL;
394     //printf("moveWatch\n");
395     do {
396       //dag_print_state(csound);
397       local = ATOMIC_READ(*w);
398       if (local==&DoNotRead) {
399         //printf("local is DoNotRead\n");
400         return 0;//was no & earlier
401       }
402       else t->next = local;
403     } while (!ATOMIC_CAS_PTR(w,local,t));
404     //dag_print_state(csound);
405     //printf("moveWatch done\n");
406     return 1;
407 }
408 
dag_end_task(CSOUND * csound,taskID i)409 taskID dag_end_task(CSOUND *csound, taskID i)
410 {
411     watchList *to_notify, *next;
412     int canQueue;
413     int j, k;
414     watchList * volatile *task_watch = csound->dag_task_watch;
415     enum state current_task_status;
416     int wait_on_current_tasks;
417     taskID next_task = INVALID;
418     ATOMIC_WRITE(csound->dag_task_status[i].s, DONE); /* as DONE is zero */
419     // A write barrier /might/ be useful here to avoid the case
420     // of the list being DoNotRead but the status being something
421     // other than done.  At the time of writing this wouldn't give
422     // a correctness issue, plus the semantics of GCC's CAS apparently
423     // imply a write barrier, so it should be OK.
424     {                                      /* ATOMIC_SWAP */
425       do {
426         to_notify = ATOMIC_READ(task_watch[i]);
427       } while (!ATOMIC_CAS_PTR(&task_watch[i],to_notify,(watchList *) &DoNotRead));
428     } //to_notify = ATOMIC_SWAP(task_watch[i], &DoNotRead);
429     //printf("Ending task %d\n", i);
430     next = to_notify;
431     while (to_notify) {         /* walk the list of watchers */
432       next = to_notify->next;
433       j = to_notify->id;
434       //printf("%d notifying task %d it finished\n", i, j);
435       canQueue = 1;
436       wait_on_current_tasks = 0;
437 
438       for (k=0; k<j; k++) {     /* seek next watch */
439         if (csound->dag_task_dep[j][k]==0) continue;
440         current_task_status = ATOMIC_READ(csound->dag_task_status[k].s);
441         //printf("investigating task %d (%d)\n", k, current_task_status);
442 
443         if (current_task_status == WAITING) {   // Prefer watching blocked tasks
444           //printf("found task %d to watch %d status %d\n",
445           //       k, j, csound->dag_task_status[k].s);
446           if (moveWatch(csound, &task_watch[k], to_notify)) {
447             //printf("task %d now watches %d\n", j, k);
448             canQueue = 0;
449             wait_on_current_tasks = 0;
450             break;
451           }
452           else {
453             /* assert csound->dag_task_status[j].s == DONE and we are in race */
454             //printf("Racing status %d %d %d %d\n",
455             //       csound->dag_task_status[j].s, i, j, k);
456           }
457 
458         }
459         else if (current_task_status == AVAILABLE ||
460                  current_task_status == INPROGRESS) {
461           wait_on_current_tasks = 1;
462         }
463         //else { printf("not %d\n", k); }
464       }
465 
466       // Try the same thing again but this time waiting on active or available task
467       if (wait_on_current_tasks == 1) {
468         for (k=0; k<j; k++) {     /* seek next watch */
469           if (csound->dag_task_dep[j][k]==0) continue;
470           current_task_status = ATOMIC_READ(csound->dag_task_status[k].s);
471           //printf("investigating task %d (%d)\n", k, current_task_status);
472 
473           if (current_task_status != DONE) {   // Prefer watching blocked tasks
474             //printf("found task %d to watch %d status %d\n",
475             //       k, j, csound->dag_task_status[k].s);
476             if (moveWatch(csound, &task_watch[k], to_notify)) {
477               //printf("task %d now watches %d\n", j, k);
478               canQueue = 0;
479               break;
480             }
481             else {
482               /* assert csound->dag_task_status[j].s == DONE and we are in race */
483               //printf("Racing status %d %d %d %d\n",
484               //       csound->dag_task_status[j].s, i, j, k);
485             }
486 
487           }
488           //else { printf("not %d\n", k); }
489         }
490       }
491 
492       if (canQueue) {           /*  could use monitor here */
493         if (next_task == INVALID) {
494           next_task = j; // Forward directly to the thread to save re-dispatch
495         } else {
496           ATOMIC_WRITE(csound->dag_task_status[j].s, AVAILABLE);
497         }
498       }
499       to_notify = next;
500     }
501     //dag_print_state(csound);
502     return next_task;
503 }
504 
505 
506 /* INV : Acyclic */
507 /* INV : Each entry is read by a single thread,
508  *       no writes (but see OPT : Watch ordering) */
509 /* Thus no protection needed */
510 
511 /* INV : Watches for different tasks are disjoint */
512 /* INV : Multiple threads can add to a watch list but only one will remove
513  *       These are the only interactions */
514 /* Thus the use of CAS / atomic operations */
515 
516 /* Used to mark lists that should not be added to, see NOTE : Race condition */
517 #if 0
518 watchList nullList;
519 watchList *doNotAdd = &nullList;
520 watchList endwatch = { NULL, NULL };
521 
522 /* Lists of tasks that depend on the given task */
523 watchList ** watch;         /* OPT : Structure lay out */
524 watchListMemoryManagement *wlmm; /* OPT : Structure lay out */
525 
526 /* INV : wlmm[X].s.id == X; */  /* OPT : Data structure redundancy */
527 /* INV : status[X] == WAITING => wlmm[X].used */
528 /* INV : wlmm[X].s is in watch[Y] => wlmm[X].used */
529 
530 
531 /* Watch list helper functions */
532 
533 void initialiseWatch (watchList **w, taskID id) {
534   wlmm[id].used = TRUE;
535   wlmm[id].s.id = id;
536   wlmm[id].s.tail = *w;
537   *w = &(wlmm[id].s);
538 }
539 
540 inline watchList * getWatches(taskID id) {
541 
542     return __atomic_test_and_set (&(watch[id]), doNotAdd);
543 }
544 
545 int moveWatch (watchList **w, watchList *t) {
546   watchList *local;
547 
548   t->tail = NULL;
549 
550   do {
551     local = atomicRead(*w);
552 
553     if (local == doNotAdd) {
554       return 0;
555     } else {
556       t->tail = local;
557     }
558   } while (!atomicCAS(*w,local,t));   /* OPT : Delay loop */
559 
560   return 1;
561 }
562 
563 void appendToWL (taskID id, watchList *l) {
564   watchList *w;
565 
566   do {
567     w = watch[id];
568     l->tail = w;
569     w = __atomic_compare_exchange_n(&(watch[id]),w,l, false, \
570                                     __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
571   } while (!(w == l));
572 
573 }
574 
575 inline void deleteWatch (watchList *t) {
576   wlmm[t->id].used = FALSE;
577 }
578 
579 
580 
581 
582 typedef struct monitor {
583   pthread_mutex_t l = PTHREAD_MUTEX_INITIALIZER;
584   unsigned int threadsWaiting = 0;    /* Shadows the length of
585                                          workAvailable wait queue */
586   queue<taskID> q;                    /* OPT : Dispatch order */
587   pthread_cond_t workAvailable = PTHREAD_COND_INITIALIZER;
588   pthread_cond_t done = PTHREAD_COND_INITIALIZER;
589 } monitor;                                    /* OPT : Lock-free */
590 
591 /* INV : q.size() + dispatched <= ID */
592 /* INV : foreach(id,q.contents()) { status[id] = AVAILABLE; } */
593 /* INV : threadsWaiting <= THREADS */
594 
595 monitor dispatch;
596 
597 
598 void addWork(monitor *dispatch, taskID id) {
599   pthread_mutex_lock(&dispatch->l);
600 
601   status[id] = AVAILABLE;
602   dispatch->q.push(id);
603   if (threadsWaiting >= 1) {
604     pthread_cond_signal(&dispatch->c);
605   }
606 
607   pthread_mutex_unlock(&dispatch->l);
608   return;
609 }
610 
611 taskID getWork(monitor *dispatch) {
612   taskID returnValue;
613 
614   pthread_mutex_lock(&dispatch->l);
615 
616   while (q.empty()) {
617     ++dispatch->threadsWaiting;
618 
619     if (dispatch->threadsWaiting == THREADS) {
620       /* Will the last person out please turn off the lights! */
621       pthread_cond_signal(&dispatch->done);
622     }
623 
624     pthread_cond_wait(&dispatch->l,&dispatch->workAvailable);
625     --dispatch->threadsWaiting;
626 
627     /* NOTE : A while loop is needed as waking from this requires
628      * reacquiring the mutex and in the mean time someone
629      * might have got it first and removed the work. */
630   }
631 
632   returnValue = q.pop();
633 
634   pthread_mutex_unlock(&dispatch->l);
635   return returnValue;
636 
637 }
638 
639 void waitForWorkToBeCompleted (monitor *dispatch) {
640   /* Requires
641    * INV : threadsWaiting == THREADS <=> \forall id \in ID . status[id] == DONE
642    */
643 
644   pthread_mutex_lock(&dispatch->l);
645 
646   if (dispatch->threadsWaiting < THREADS) {
647     pthread_cond_wait(&dispatch->l,&dispatch->done);
648   }
649 
650   /* This assertion is more difficult to prove than it might first appear */
651   assert(dispatch->threadsWaiting == THREADS);
652 
653   pthread_mutex_unlock(&dispatch->l);
654   return;
655 }
656 
657 
658 
659 
660 
661 
662 
663 
664 
665 
666 
667 
668 
669 
670 void mainThread (State *s) {
671 
672   /* Set up the DAG */
673   if (s->firstRun || s->updateNeeded) {
674     dep = buildDAG(s);        /* OPT : Dispatch order */
675     /* Other : Update anything that is indexed by task
676      * (i.e. all arrays given length ID) */
677   }
678 
679   /* Reset the data structure */
680   foreach (id in ID) {
681     watch[id] = NULL;
682   }
683 
684   /* Initialise the dispatch queue */
685   foreach (id in ID) {       /* OPT : Dispatch order */
686     if (dep[id] == EMPTYSET) {
687       atomicWrite(status[id] = AVAILABLE);
688       addWork(*dispatch,id);
689 
690     } else {
691       atomicWrite(status[id] = WAITING);
692       initialiseWatch(&watch[choose(dep[id])], id);  /* OPT : Watch ordering */
693 
694     }
695   }
696 
697 /* INV : Data structure access invariants start here */
698 /* INV : Status only decrease from now */
699 /* INV : Watch list for id contains a subset of the things that depend on id */
700 /* INV : Each id appears in at most one watch list */
701 /* INV : doNotAdd only appears at the head of a watch list */
702 /* INV : if (watch[id] == doNotAdd) then { status[id] == DONE; } */
703 
704   waitForWorkToBeCompleted(*dispatch);
705 
706   return;
707 }
708 
709 void workerThread (State *s) {
710   taskID work;
711   watchList *tasksToNotify, next;
712   bool canQueue;
713 
714   do {
715 
716     task = getWork(dispatch);
717 
718     /* Do stuff */
719     atomicWrite(status[work] = INPROGRESS);
720     doStuff(work);
721     atomicWrite(status[work] = DONE);    /* NOTE : Race condition */
722 
723 
724     tasksToNotify = getWatches(work);
725 
726     while (tasksToNotify != NULL) {
727       next = tasksToNotify->tail;
728 
729       canQueue = TRUE;
730       foreach (dep in dep[tasksToNotify->id]) {  /* OPT : Watch ordering */
731         if (atomicRead(status[dep]) != DONE) {
732           /* NOTE : Race condition */
733           if (moveWatch(watch[dep],tasksToNotify)) {
734             canQueue = FALSE;
735             break;
736           } else {
737             /* Have hit the race condition, try the next option */
738             assert(atomicRead(status[dep]) == DONE);
739           }
740         }
741       }
742 
743       if (canQueue) {                    /* OPT : Save one work item */
744         addWork(*dispatch,tasksToNotify->id);
745         deleteWatch(tasksToNotify);
746       }
747 
748       tasksToNotify = next;
749     }
750 
751   } while (1);  /* NOTE : some kind of control for thread exit needed */
752 
753   return;
754 }
755 
756 
757 
758 
759 /* OPT : Structure lay out
760  *
761  * All data structures that are 1. modified by one or more thread and
762  * 2. accessed by multiple threads, should be aligned to cache lines and
763  * padded so that there is only one instance per cache line.  This will reduce
764  * false memory contention between objects that just happen to share a cache
765  * line.  Blocking to 64 bytes will probably be sufficient and if people really
766  * care about performance that much they can tune to their particular
767  * architecture.
768  */
769 
770 /* OPT : Watch ordering
771  *
772  * Moving a watch is relatively cheap (in the uncontended case) but
773  * it would be best to avoid moving watches where possible.  The ideal
774  * situation would be for every task to watch the last pre-requisite.
775  * There are two places in the code that affect the watch ordering;
776  * the initial assignment and the movement when a watch is triggered.
777  * Prefering WAITING tasks (in the later) and lower priority tasks
778  * (if combined with the dispatch order optimisation below) are probably
779  * good choices.  One mechanism would be to reorder the set (or array) of
780  * dependencies to store this information.  When looking for a (new) watch,
781  * tasks are sorted with increasing status first and then the first one picked.
782  * Keeping the list sorted (or at least split between WAITING and others) with
783  * each update should (if the dispatch order is fixed / slowly adapting) result
784  * in the best things to watch moving to the front and thus adaptively give
785  * the best possible tasks to watch.  The interaction with a disaptch order
786  * heuristic would need to be considered.  Note that only one thread will
787  * look at any given element of dep[] so they can be re-ordered without
788  * needing locking.
789  */
790 
791 /* OPT : Structure lay out
792  *
793  * Some of the fields are not strictly needed and are just there to make
794  * the algorithm cleaner and more intelligible.  The id fields of the watch
795  * lists are not really needed as there is one per task and their position
796  * within the watchListMemoryManager array allows the task to be infered.
797  * Likewise the used flag in the memory manager is primarily for book-keeping
798  * and checking / assertions and could be omitted.
799  */
800 
801 /* OPT : Delay loop
802  *
803  * In theory it is probably polite to put a slowly increasing delay in
804  * after a failed compare and swap to reduce pressure on the memory
805  * subsystem in the highly contended case.  As multiple threads adding
806  * to a task's watch list simultaneously is probably a rare event, the
807  * delay loop is probably unnecessary.
808  */
809 
810 /* OPT : Dispatch order
811  *
812  * The order in which tasks are dispatched affects the amount of
813  * parallelisation possible.  Picking the exact scheduling order, even
814  * if the duration of the tasks is known is probably NP-Hard (see
815  * bin-packing*) so heuristics are probably the way to go.  The proporition
816  * of tasks which depend on a given task is probably a reasonable primary
817  * score, with tie-breaks going to longer tasks.  This can either be
818  * applied to just the initial tasks (either in ordering the nodes in the DAG)
819  * or in the order in which they are traversed.  Alternatively by
820  * sorting the queue / using a heap / other priority queuing structure
821  * it might be possible to do this dynamically.  The best solution would
822  * probably be adaptive with a task having its priority incremented
823  * each time another worker thread blocks on a shortage of work, with these
824  * increments also propagated 'upwards' in the DAG.
825  *
826  * *. Which means that a solver could be used to give the best possible
827  *    schedule / the maximum parallelisation.  This could be useful for
828  *    optimisation.
829  */
830 
831 /* OPT : Lock-free
832  *
833  * A lock free dispatch mechanism is probably possible as threads can
834  * scan the status array for things listed as AVAILABLE and then atomicCAS
835  * to INPROGRESS to claim them.  But this starts to involve busy-waits or
836  * direct access to futexes and is probably not worth it.
837  */
838 
839 /* OPT : Save one work item
840  *
841  * Rather than adding all watching tasks who have their dependencies met to
842  * the dispatch queue, saving one (perhaps the best, see OPT : Dispatch order)
843  * means the thread does not have to wait.  In the case of a purely linear DAG
844  * this should be roughly as fast as the single threaded version.
845  */
846 
847 
848 /* NOTE : Race condition
849  *
850  * There is a subtle race condition:
851  *
852  *   Thread 1                             Thread 2
853  *   --------                             --------
854  *                                        atomicRead(status[dep]) != DONE
855  *   atomicWrite(status[work] = DONE);
856  *   tasksToNotify = getWatches(work);
857  *                                        moveWatch(watch[dep],tasksToNotify);
858  *
859  * The key cause is that the status and the watch list cannot be updated
860  * simultaneously.  However as getWatches removes all watches and moves or
861  * removes them, it is sufficient to have a doNotAdd watchList node to detect
862  * this race condition and resolve it by having moveWatch() fail.
863  */
864 
865 void newdag_alloc(CSOUND *csound, int numtasks)
866 {
867     doNotAdd = &endwatch;
868 ??
869     watch = (watchList **)csound->Calloc(csound, sizeof(watchList *)*numtasks);
870     wlmm = (watchListMemoryManagement *)
871       csound->Calloc(csound, sizeof(watchListMemoryManagement)*numtasks);
872 
873 }
874 
875 #endif
876