1
2
3 /*----------------------------------------------------------*/
4 /* */
5 /* LPLIB V3.02 */
6 /* */
7 /*----------------------------------------------------------*/
8 /* */
9 /* Description: Handles threads, scheduling */
10 /* & dependencies */
11 /* Author: Loic MARECHAL */
12 /* Creation date: feb 25 2008 */
13 /* Last modification: feb 15 2011 */
14 /* */
15 /*----------------------------------------------------------*/
16
17
18 #ifndef SERIAL
19
20 /*----------------------------------------------------------*/
21 /* Includes */
22 /*----------------------------------------------------------*/
23
24 #include <pthread.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <math.h>
29 #include <stdbool.h>
30 #include <assert.h>
31 #include <errno.h>
32 #ifdef __FreeBSD__
33 # include <sys/types.h>
34 # include <pmc.h>
35 #endif
36 #include "lplib3.h"
37
38
39 /*----------------------------------------------------------*/
40 /* Defines */
41 /*----------------------------------------------------------*/
42
43 #define MaxLibPar 10
44 #define MaxPth 128
45 #define MaxTyp 100
46 #define DefNmbSmlBlk 64
47 #define DefNmbDepBlk 256
48 #define MaxTotPip 65536
49 #define MaxPipDep 100
50 #define MaxHsh 10
51
52 enum ParCmd {RunBigWrk, RunSmlWrk, ClrMem, EndPth};
53
54
55 /*----------------------------------------------------------*/
56 /* Structures' prototypes */
57 /*----------------------------------------------------------*/
58
59 typedef struct WrkSct
60 {
61 int BegIdx, EndIdx, NmbDep, *DepWrdTab;
62 int *DepIdxTab;
63 struct WrkSct *pre, *nex;
64 }WrkSct;
65
66 typedef struct
67 {
68 int NmbLin, NmbSmlWrk, NmbBigWrk, SmlWrkSiz, BigWrkSiz, DepWrkSiz, NmbDepWrd, *DepWrdMat, *DepIdxMat;
69 char *RunDepTab;
70 WrkSct *SmlWrkTab, *BigWrkTab;
71 }TypSct;
72
73 typedef struct
74 {
75 int idx;
76 char *ClrAdr;
77 WrkSct *wrk;
78 pthread_mutex_t mtx;
79 pthread_cond_t cnd;
80 pthread_t pth;
81 struct ParSct *par;
82 }PthSct;
83
84 typedef struct PipSct
85 {
86 int idx, NmbDep, DepTab[ MaxPipDep ];
87 void *prc, *arg;
88 pthread_t pth;
89 struct ParSct *par;
90 }PipSct;
91
92 typedef struct BucSct
93 {
94 int idx[3];
95 long long int dat;
96 struct BucSct *nex;
97 }BucSct;
98
99 typedef struct
100 {
101 int mul[3], NmbOvf[ MaxPth ];
102 TypSct *typ1, *typ2;
103 BucSct *buc, *ovf[ MaxPth ];
104 }HshSct;
105
106 typedef struct ParSct
107 {
108 int NmbCpu, WrkCpt, NmbPip, PenPip, RunPip, NmbTyp, BufMax, BufCpt, req, cmd, ClrLinSiz, *PipWrd;
109 double sta[2];
110 void (*prc)(int, int, int, void *), *arg;
111 pthread_cond_t ParCnd, PipCnd;
112 pthread_mutex_t ParMtx, PipMtx;
113 pthread_t PipPth;
114 PthSct *PthTab;
115 TypSct *TypTab, *CurTyp, *DepTyp, *typ1, *typ2;
116 WrkSct *NexWrk, *BufWrk[ MaxPth / 4 ];
117 HshSct HshTab[ MaxHsh ];
118 }ParSct;
119
120 typedef struct
121 {
122 unsigned long long (*idx)[2];
123 double box[6], (*crd)[3], (*crd2)[2];
124 }ArgSct;
125
126
127 /*----------------------------------------------------------*/
128 /* Private procedures' prototypes */
129 /*----------------------------------------------------------*/
130
131 static int SetBit(int *, int);
132 static int GetBit(int *, int);
133 static int AndWrd(WrkSct *, char *);
134 static void SetWrd(WrkSct *, char *);
135 static void ClrWrd(WrkSct *, char *);
136 int CmpWrk(const void *, const void *);
137 static void *PipHdl(void *);
138 static void *PthHdl(void *);
139 static WrkSct *NexWrk(ParSct *, int);
140
141
142 /*----------------------------------------------------------*/
143 /* Global variables */
144 /*----------------------------------------------------------*/
145
146 ParSct *ParTab[ MaxLibPar+1 ];
147 int IniLibPar = 0;
148
149
150 /*----------------------------------------------------------*/
151 /* Init structures, scheduler and launch threads */
152 /*----------------------------------------------------------*/
153
InitParallel(int NmbCpu)154 int InitParallel(int NmbCpu)
155 {
156 int i, ParIdx;
157 ParSct *par = NULL;
158 PthSct *pth;
159
160 /* Check the number of requested cpu and clear the main par table at first call */
161
162 if(NmbCpu > MaxPth)
163 return(0);
164
165 if(!IniLibPar)
166 {
167 IniLibPar = 1;
168
169 for(i=1;i<=MaxLibPar;i++)
170 ParTab[i] = NULL;
171 }
172
173 /* Allocate and build main parallel structure */
174
175 for(ParIdx=1; ParIdx<=MaxLibPar; ParIdx++)
176 if(!ParTab[ ParIdx ])
177 {
178 par = ParTab[ ParIdx ] = calloc(1, sizeof(ParSct));
179 break;
180 }
181
182 if(!par)
183 return(0);
184
185 if(!(par->PthTab = calloc(NmbCpu, sizeof(PthSct))))
186 return(0);
187
188 if(!(par->TypTab = calloc((MaxTyp + 1), sizeof(TypSct))))
189 return(0);
190
191 if(!(par->PipWrd = calloc(MaxTotPip/32, sizeof(int))))
192 return(0);
193
194 par->NmbCpu = NmbCpu;
195 par->WrkCpt = par->NmbPip = par->PenPip = par->RunPip = 0;
196
197 /* Set the size of WP buffer */
198
199 if(NmbCpu >= 4)
200 par->BufMax = NmbCpu / 4;
201 else
202 par->BufMax = 1;
203
204 pthread_mutex_init(&par->ParMtx, NULL);
205 pthread_mutex_init(&par->PipMtx, NULL);
206 pthread_cond_init(&par->ParCnd, NULL);
207 pthread_cond_init(&par->PipCnd, NULL);
208
209 /* Launch pthreads */
210
211 for(i=0;i<par->NmbCpu;i++)
212 {
213 pth = &par->PthTab[i];
214 pth->idx = i;
215 pth->par = par;
216 pthread_mutex_init(&pth->mtx, NULL);
217 pthread_cond_init(&pth->cnd, NULL);
218 pthread_create(&pth->pth, NULL, PthHdl, (void *)pth);
219 }
220
221 /* Wait for all threads to be up and wainting */
222
223 pthread_mutex_lock(&par->ParMtx);
224
225 while(par->WrkCpt < par->NmbCpu)
226 pthread_cond_wait(&par->ParCnd, &par->ParMtx);
227
228 pthread_mutex_unlock(&par->ParMtx);
229
230 return(ParIdx);
231 }
232
233
234 /*----------------------------------------------------------*/
235 /* Stop all threads and free memories */
236 /*----------------------------------------------------------*/
237
StopParallel(int ParIdx)238 void StopParallel(int ParIdx)
239 {
240 int i;
241 PthSct *pth;
242 ParSct *par;
243
244 /* Get and check lib parallel instance */
245
246 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
247 return;
248
249 /* Send stop to all threads */
250
251 pthread_mutex_lock(&par->ParMtx);
252 par->cmd = EndPth;
253 pthread_mutex_unlock(&par->ParMtx);
254
255 /* Wait for all threads to complete */
256
257 for(i=0;i<par->NmbCpu;i++)
258 {
259 pth = &par->PthTab[i];
260 pthread_mutex_lock(&pth->mtx);
261 pthread_cond_signal(&pth->cnd);
262 pthread_mutex_unlock(&pth->mtx);
263 pthread_join(pth->pth, NULL);
264 }
265
266 pthread_mutex_destroy(&par->ParMtx);
267 pthread_cond_destroy(&par->ParCnd);
268
269 WaitPipeline(ParIdx);
270
271 pthread_mutex_destroy(&par->PipMtx);
272 pthread_cond_destroy(&par->PipCnd);
273
274 /* Free memories */
275
276 for(i=1;i<=MaxTyp;i++)
277 if(par->TypTab[i].NmbLin)
278 FreeType(ParIdx, i);
279
280 free(par->PthTab);
281 free(par->TypTab);
282 free(par->PipWrd);
283 free(par);
284
285 ParTab[ ParIdx ] = NULL;
286 }
287
288
289 /*----------------------------------------------------------*/
290 /* Launch the loop prc on typ1 element depending on typ2 */
291 /*----------------------------------------------------------*/
292
LaunchParallel(int ParIdx,int TypIdx1,int TypIdx2,void * prc,void * PtrArg)293 float LaunchParallel(int ParIdx, int TypIdx1, int TypIdx2, void *prc, void *PtrArg)
294 {
295 int i;
296 PthSct *pth;
297 ParSct *par;
298 TypSct *typ1, *typ2 = NULL;
299
300 /* Get and check lib parallel instance */
301
302 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
303 return(-1.);
304
305 /* Check bounds */
306
307 if( (TypIdx1 < 1) || (TypIdx1 > MaxTyp) || (TypIdx2 < 0) || (TypIdx2 > MaxTyp) || (TypIdx1 == TypIdx2) )
308 return(-1.);
309
310 typ1 = &par->TypTab[ TypIdx1 ];
311
312 if(TypIdx2)
313 {
314 /* Lock acces to global parameters */
315
316 pthread_mutex_lock(&par->ParMtx);
317
318 par->cmd = RunSmlWrk;
319 par->prc = (void (*)(int, int, int, void *))prc;
320 par->arg = PtrArg;
321 par->typ1 = typ1;
322 par->typ2 = typ2 = &par->TypTab[ TypIdx2 ];
323 par->NexWrk = typ1->SmlWrkTab;
324 par->BufCpt = 0;
325 par->WrkCpt = 0;
326 par->sta[0] = par->sta[1] = 0.;
327 par->req = 0;
328
329 /* Clear running wp */
330
331 for(i=0;i<par->NmbCpu;i++)
332 par->PthTab[i].wrk = NULL;
333
334 memset(typ1->RunDepTab, 0, typ1->NmbDepWrd * 32 * sizeof(char));
335
336 /* Build a linked list of wp */
337
338 for(i=0;i<par->typ1->NmbSmlWrk;i++)
339 {
340 typ1->SmlWrkTab[i].pre = &typ1->SmlWrkTab[ i-1 ];
341 typ1->SmlWrkTab[i].nex = &typ1->SmlWrkTab[ i+1 ];
342 }
343
344 typ1->SmlWrkTab[0].pre = typ1->SmlWrkTab[ typ1->NmbSmlWrk - 1 ].nex = NULL;
345
346 /* Start main loop : wake up threads and wait for completion or blocked threads */
347
348 do
349 {
350 /* Search for some idle threads */
351
352 par->req = 0;
353
354 for(i=0;i<par->NmbCpu;i++)
355 {
356 pth = &par->PthTab[i];
357
358 if(pth->wrk)
359 continue;
360
361 if(!(pth->wrk = NexWrk(par, i)))
362 {
363 par->req = 1;
364 break;
365 }
366
367 /* Wake up the thread and provide it with a WP list */
368
369 pthread_mutex_lock(&pth->mtx);
370 pthread_cond_signal(&pth->cnd);
371 pthread_mutex_unlock(&pth->mtx);
372 }
373
374 /* If every WP are done : exit the parallel loop */
375
376 if(par->WrkCpt == typ1->NmbSmlWrk)
377 break;
378
379 /* Otherwise, wait for a blocked thread */
380
381 pthread_cond_wait(&par->ParCnd, &par->ParMtx);
382 }while(1);
383
384 pthread_mutex_unlock(&par->ParMtx);
385
386 /* Return the average speedup */
387
388 return(par->sta[1] / par->sta[0]);
389 }
390 else
391 {
392 /* Lock acces to global parameters */
393
394 pthread_mutex_lock(&par->ParMtx);
395
396 par->cmd = RunBigWrk;
397 par->prc = (void (*)(int, int, int, void *))prc;
398 par->arg = PtrArg;
399 par->typ1 = typ1;
400 par->typ2 = NULL;
401 par->WrkCpt = 0;
402
403 for(i=0;i<typ1->NmbBigWrk;i++)
404 {
405 pth = &par->PthTab[i];
406 pth->wrk = &typ1->BigWrkTab[i];
407 }
408
409 for(i=0;i<typ1->NmbBigWrk;i++)
410 {
411 pth = &par->PthTab[i];
412 pthread_mutex_lock(&pth->mtx);
413 pthread_cond_signal(&pth->cnd);
414 pthread_mutex_unlock(&pth->mtx);
415 }
416
417 pthread_cond_wait(&par->ParCnd, &par->ParMtx);
418
419 pthread_mutex_unlock(&par->ParMtx);
420
421 /* Return the average speedup */
422
423 return(par->NmbCpu);
424 }
425 }
426
427
428 /*----------------------------------------------------------*/
429 /* Pthread handler, waits for job, does it, then signal end */
430 /*----------------------------------------------------------*/
431
PthHdl(void * ptr)432 static void *PthHdl(void *ptr)
433 {
434 PthSct *pth = (PthSct *)ptr;
435 ParSct *par = pth->par;
436
437 /* Tell the scheduler if all threads are ready */
438
439 pthread_mutex_lock(&par->ParMtx);
440 par->WrkCpt++;
441 pthread_cond_signal(&par->ParCnd);
442 pthread_mutex_lock(&pth->mtx);
443 pthread_mutex_unlock(&par->ParMtx);
444
445 /* Enter main loop until StopParallel is send */
446
447 do
448 {
449 /* Wait for a wake-up signal from the main loop */
450
451 pthread_cond_wait(&pth->cnd, &pth->mtx);
452
453 switch(par->cmd)
454 {
455 case RunBigWrk :
456 {
457 /* Launch a single big wp and signal completion to the scheduler */
458
459 par->prc(pth->wrk->BegIdx, pth->wrk->EndIdx, pth->idx, par->arg);
460
461 pthread_mutex_lock(&par->ParMtx);
462 par->WrkCpt++;
463
464 if(par->WrkCpt >= par->typ1->NmbBigWrk)
465 pthread_cond_signal(&par->ParCnd);
466
467 pthread_mutex_unlock(&par->ParMtx);
468 }break;
469
470 case RunSmlWrk :
471 {
472 do
473 {
474 /* Run the WP */
475
476 par->prc(pth->wrk->BegIdx, pth->wrk->EndIdx, pth->idx, par->arg);
477
478 /* Locked acces to global parameters : update WP count, tag WP done and signal the main loop */
479
480 pthread_mutex_lock(&par->ParMtx);
481
482 par->WrkCpt++;
483
484 if(!(pth->wrk = NexWrk(par, pth->idx)))
485 {
486 par->req = 1;
487 pthread_cond_signal(&par->ParCnd);
488 pthread_mutex_unlock(&par->ParMtx);
489 break;
490 }
491
492 if(par->req)
493 pthread_cond_signal(&par->ParCnd);
494
495 pthread_mutex_unlock(&par->ParMtx);
496 }while(1);
497 }break;
498
499 case ClrMem :
500 {
501 /* Clear memory and signal completion to the scheduler */
502
503 memset(pth->ClrAdr, 0, par->ClrLinSiz);
504
505 pthread_mutex_lock(&par->ParMtx);
506 par->WrkCpt++;
507 pthread_cond_signal(&par->ParCnd);
508 pthread_mutex_unlock(&par->ParMtx);
509 }break;
510
511 case EndPth :
512 {
513 /* Destroy the thread mutex and condition and call for join */
514
515 pthread_mutex_unlock(&pth->mtx);
516 pthread_mutex_destroy(&pth->mtx);
517 pthread_cond_destroy(&pth->cnd);
518 return(NULL);
519 }break;
520 }
521 }while(1);
522
523 return(NULL);
524 }
525
526
527 /*----------------------------------------------------------*/
528 /* Get the next WP to be computed */
529 /*----------------------------------------------------------*/
530
NexWrk(ParSct * par,int PthIdx)531 static WrkSct *NexWrk(ParSct *par, int PthIdx)
532 {
533 int i;
534 PthSct *pth = &par->PthTab[ PthIdx ];
535 WrkSct *wrk;
536
537 /* Update stats */
538
539 par->sta[0]++;
540
541 for(i=0;i<par->NmbCpu;i++)
542 if(par->PthTab[i].wrk)
543 par->sta[1]++;
544
545 /* Remove previous work's tags */
546
547 if(pth->wrk)
548 ClrWrd(pth->wrk, par->typ1->RunDepTab);
549
550 /* If the wp's buffer is empty search for some new compatible wp to fill in */
551
552 if(!par->BufCpt)
553 {
554 wrk = par->NexWrk;
555
556 while(wrk)
557 {
558 /* Check for dependencies */
559
560 if(!AndWrd(wrk, par->typ1->RunDepTab))
561 {
562 par->BufWrk[ par->BufCpt++ ] = wrk;
563
564 /* Unlink wp */
565
566 if(wrk->pre)
567 wrk->pre->nex = wrk->nex;
568 else
569 par->NexWrk = wrk->nex;
570
571 if(wrk->nex)
572 wrk->nex->pre = wrk->pre;
573
574 /* Add new work's tags */
575
576 SetWrd(wrk, par->typ1->RunDepTab);
577
578 if(par->BufCpt == par->BufMax)
579 break;
580 }
581
582 wrk = wrk->nex;
583 }
584 }
585
586 /* Return the next available wp in buffer and unlink it from the todo list */
587
588 if(par->BufCpt)
589 return(par->BufWrk[ --par->BufCpt ]);
590 else
591 return(NULL);
592 }
593
594
595 /*----------------------------------------------------------*/
596 /* Allocate a new kind of elements and set work-packages */
597 /*----------------------------------------------------------*/
598
NewType(int ParIdx,int NmbLin)599 int NewType(int ParIdx, int NmbLin)
600 {
601 int i, TypIdx=0, idx;
602 TypSct *typ;
603 ParSct *par;
604
605 /* Get and check lib parallel instance */
606
607 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
608 return(0);
609
610 if(NmbLin < par->NmbCpu)
611 return(0);
612
613 /* Search for a free type structure */
614
615 for(i=1;i<=MaxTyp;i++)
616 if(!par->TypTab[i].NmbLin)
617 {
618 TypIdx = i;
619 break;
620 }
621
622 if(!TypIdx)
623 return(0);
624
625 typ = &par->TypTab[ TypIdx ];
626 typ->NmbLin = NmbLin;
627
628 /* Compute the size of small work-packages */
629
630 if(NmbLin >= DefNmbSmlBlk * par->NmbCpu)
631 typ->SmlWrkSiz = NmbLin / (DefNmbSmlBlk * par->NmbCpu);
632 else
633 typ->SmlWrkSiz = NmbLin / par->NmbCpu;
634
635 typ->NmbSmlWrk = NmbLin / typ->SmlWrkSiz;
636
637 if(NmbLin != typ->NmbSmlWrk * typ->SmlWrkSiz)
638 typ->NmbSmlWrk++;
639
640 if(!(typ->SmlWrkTab = calloc(typ->NmbSmlWrk , sizeof(WrkSct))))
641 return(0);
642
643 /* Set small work-packages */
644
645 idx = 0;
646
647 for(i=0;i<typ->NmbSmlWrk;i++)
648 {
649 typ->SmlWrkTab[i].BegIdx = idx + 1;
650 typ->SmlWrkTab[i].EndIdx = idx + typ->SmlWrkSiz;
651 idx += typ->SmlWrkSiz;
652 }
653
654 typ->SmlWrkTab[ typ->NmbSmlWrk - 1 ].EndIdx = NmbLin;
655
656 /* Compute the size of big work-packages */
657
658 typ->BigWrkSiz = NmbLin / par->NmbCpu;
659 typ->NmbBigWrk = par->NmbCpu;
660
661 if(!(typ->BigWrkTab = calloc(typ->NmbBigWrk , sizeof(WrkSct))))
662 return(0);
663
664 /* Set big work-packages */
665
666 idx = 0;
667
668 for(i=0;i<typ->NmbBigWrk;i++)
669 {
670 typ->BigWrkTab[i].BegIdx = idx + 1;
671 typ->BigWrkTab[i].EndIdx = idx + typ->BigWrkSiz;
672 idx += typ->BigWrkSiz;
673 }
674
675 typ->BigWrkTab[ typ->NmbBigWrk - 1 ].EndIdx = NmbLin;
676
677 return(TypIdx);
678 }
679
680
681 /*----------------------------------------------------------*/
682 /* Add this kind of element to the free-list */
683 /*----------------------------------------------------------*/
684
FreeType(int ParIdx,int TypIdx)685 void FreeType(int ParIdx, int TypIdx)
686 {
687 TypSct *typ;
688 ParSct *par;
689
690 /* Get and check lib parallel instance */
691
692 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
693 return;
694
695 /* Check bounds and free mem */
696
697 if( (TypIdx < 1) || (TypIdx > MaxTyp) )
698 return;
699
700 typ = &par->TypTab[ TypIdx ];
701
702 if(typ->SmlWrkTab)
703 free(typ->SmlWrkTab);
704
705 if(typ->BigWrkTab)
706 free(typ->BigWrkTab);
707
708 if(typ->DepIdxMat)
709 free(typ->DepIdxMat);
710
711 if(typ->RunDepTab)
712 free(typ->RunDepTab);
713
714 if(typ->DepWrdMat)
715 free(typ->DepWrdMat);
716
717 memset(typ, 0, sizeof(TypSct));
718 }
719
720
721 /*----------------------------------------------------------*/
722 /* Allocate a dependency matrix linking both types */
723 /*----------------------------------------------------------*/
724
BeginDependency(int ParIdx,int TypIdx1,int TypIdx2)725 int BeginDependency(int ParIdx, int TypIdx1, int TypIdx2)
726 {
727 int i;
728 ParSct *par;
729 TypSct *typ1, *typ2;
730
731 /* Get and check lib parallel instance */
732
733 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
734 return(0);
735
736 /* Check bounds */
737
738 par->CurTyp = typ1 = &par->TypTab[ TypIdx1 ];
739 par->DepTyp = typ2 = &par->TypTab[ TypIdx2 ];
740
741 if( (TypIdx1 < 1) || (TypIdx1 > MaxTyp) || (TypIdx2 < 1) || (TypIdx2 > MaxTyp) || (typ1 == typ2) \
742 || !typ1->NmbLin || !typ2->NmbLin)
743 {
744 return(0);
745 }
746
747 /* Compute dependency table's size */
748
749 if(typ2->NmbLin >= DefNmbDepBlk * par->NmbCpu)
750 typ1->DepWrkSiz = typ2->NmbLin / (DefNmbDepBlk * par->NmbCpu);
751 else
752 typ1->DepWrkSiz = typ2->NmbLin / par->NmbCpu;
753
754 typ1->NmbDepWrd = typ2->NmbLin / (typ1->DepWrkSiz * 32);
755
756 if(typ2->NmbLin != typ1->NmbDepWrd * typ1->DepWrkSiz * 32)
757 typ1->NmbDepWrd++;
758
759 /* Allocate a global dependency table */
760
761 if(!(typ1->DepWrdMat = calloc(typ1->NmbSmlWrk * typ1->NmbDepWrd, sizeof(int))))
762 return(0);
763
764 /* Then spread sub-tables among WP */
765
766 for(i=0;i<typ1->NmbSmlWrk;i++)
767 {
768 typ1->SmlWrkTab[i].NmbDep = 0;
769 typ1->SmlWrkTab[i].DepWrdTab = &typ1->DepWrdMat[ i * typ1->NmbDepWrd ];
770 }
771
772 /* Allocate a running tags table */
773
774 if(!(typ1->RunDepTab = calloc(typ1->NmbDepWrd * 32, sizeof(char))))
775 return(0);
776
777 return(1);
778 }
779
780
781 /*----------------------------------------------------------*/
782 /* Type1 element idx1 depends on type2 element idx2 */
783 /*----------------------------------------------------------*/
784
AddDependency(int ParIdx,int idx1,int idx2)785 void AddDependency(int ParIdx, int idx1, int idx2)
786 {
787 WrkSct *wrk;
788 ParSct *par;
789
790 /* Get and check lib parallel instance */
791
792 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
793 return;
794
795 /* Set and count dependency bit */
796
797 wrk = &par->CurTyp->SmlWrkTab[ (idx1-1) / par->CurTyp->SmlWrkSiz ];
798
799 if(!SetBit(wrk->DepWrdTab, (idx2-1) / par->CurTyp->DepWrkSiz ))
800 wrk->NmbDep++;
801 }
802
803
804 /*----------------------------------------------------------*/
805 /* Sort wp depending on their number of dependencies */
806 /*----------------------------------------------------------*/
807
EndDependency(int ParIdx,float DepSta[2])808 void EndDependency(int ParIdx, float DepSta[2])
809 {
810 int i, j, idx=0, NmbDep, NmbDepBit, TotNmbDep = 0;
811 ParSct *par;
812 TypSct *typ1, *typ2;
813 WrkSct *wrk;
814
815 /* Get and check lib parallel instance */
816
817 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
818 return;
819
820 /* Compute average number of collisions */
821
822 DepSta[1] = 0.;
823 typ1 = par->CurTyp;
824 typ2 = par->DepTyp;
825
826 for(i=0;i<typ1->NmbSmlWrk;i++)
827 {
828 TotNmbDep += typ1->SmlWrkTab[i].NmbDep;
829
830 if(typ1->SmlWrkTab[i].NmbDep > DepSta[1])
831 DepSta[1] = typ1->SmlWrkTab[i].NmbDep;
832 }
833
834 DepSta[0] = TotNmbDep;
835
836 /* Allocate a global dependency index table */
837
838 if(!(typ1->DepIdxMat = calloc(TotNmbDep, sizeof(int))))
839 return;
840
841 /* Then spread and fill the sub-tables among WP */
842
843 NmbDep = typ1->NmbDepWrd * 32;
844
845 for(i=0;i<typ1->NmbSmlWrk;i++)
846 {
847 wrk = &typ1->SmlWrkTab[i];
848 wrk->DepIdxTab = &typ1->DepIdxMat[ idx ];
849 idx += wrk->NmbDep;
850 wrk->NmbDep = 0;
851
852 for(j=0;j<NmbDep;j++)
853 if(GetBit(wrk->DepWrdTab, j))
854 wrk->DepIdxTab[ wrk->NmbDep++ ] = j;
855 }
856
857 /* Compute stats */
858
859 NmbDepBit = typ2->NmbLin / typ1->DepWrkSiz;
860
861 if(typ2->NmbLin - NmbDepBit * typ1->DepWrkSiz)
862 NmbDepBit++;
863
864 DepSta[0] = 100 * DepSta[0] / (typ1->NmbSmlWrk * NmbDepBit);
865 DepSta[1] = 100 * DepSta[1] / NmbDepBit;
866
867 /* Sort WP from highest collision number to the lowest */
868
869 qsort(typ1->SmlWrkTab, typ1->NmbSmlWrk, sizeof(WrkSct), CmpWrk);
870
871 par->CurTyp = NULL;
872 }
873
874
875 /*----------------------------------------------------------*/
876 /* Test and set a bit in a multibyte number */
877 /*----------------------------------------------------------*/
878
SetBit(int * tab,int idx)879 static int SetBit(int *tab, int idx)
880 {
881 int res = ( tab[ idx >> 5 ] & (1 << (idx & 31)) );
882 tab[ idx >> 5 ] |= 1 << (idx & 31);
883 return(res);
884 }
885
886
887 /*----------------------------------------------------------*/
888 /* Test a bit in a multibyte number */
889 /*----------------------------------------------------------*/
890
GetBit(int * tab,int idx)891 static int GetBit(int *tab, int idx)
892 {
893 return( tab[ idx >> 5 ] & (1 << (idx & 31)) );
894 }
895
896
897 /*----------------------------------------------------------*/
898 /* Check wether two WP share common resources -> locked */
899 /*----------------------------------------------------------*/
900
AndWrd(WrkSct * wrk,char * wrd)901 static int AndWrd(WrkSct *wrk, char *wrd)
902 {
903 int i;
904
905 for(i=0;i<wrk->NmbDep;i++)
906 if(wrd[ wrk->DepIdxTab[i] ])
907 return(1);
908
909 return(0);
910 }
911
SetWrd(WrkSct * wrk,char * wrd)912 static void SetWrd(WrkSct *wrk, char *wrd)
913 {
914 int i;
915
916 for(i=0;i<wrk->NmbDep;i++)
917 wrd[ wrk->DepIdxTab[i] ] = 1;
918 }
919
ClrWrd(WrkSct * wrk,char * wrd)920 static void ClrWrd(WrkSct *wrk, char *wrd)
921 {
922 int i;
923
924 for(i=0;i<wrk->NmbDep;i++)
925 wrd[ wrk->DepIdxTab[i] ] = 0;
926 }
927
928
929 /*----------------------------------------------------------*/
930 /* Compare two workpackages number of bits */
931 /*----------------------------------------------------------*/
932
CmpWrk(const void * ptr1,const void * ptr2)933 int CmpWrk(const void *ptr1, const void *ptr2)
934 {
935 WrkSct *w1, *w2;
936
937 w1 = (WrkSct *)ptr1;
938 w2 = (WrkSct *)ptr2;
939
940 if(w1->NmbDep > w2->NmbDep)
941 return(-1);
942 else if(w1->NmbDep < w2->NmbDep)
943 return(1);
944 else
945 return(0);
946 }
947
948
949 /*----------------------------------------------------------*/
950 /* Launch the loop prc on typ1 element depending on typ2 */
951 /*----------------------------------------------------------*/
952
ParallelMemClear(int ParIdx,void * PtrArg,long siz)953 int ParallelMemClear(int ParIdx, void *PtrArg, long siz)
954 {
955 char *tab = (char *)PtrArg;
956 int i;
957 PthSct *pth;
958 ParSct *par;
959
960 /* Get and check lib parallel instance, adresse and size */
961
962 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) || !tab || (siz < par->NmbCpu) )
963 return(0);
964
965 /* Lock acces to global parameters */
966
967 pthread_mutex_lock(&par->ParMtx);
968
969 par->cmd = ClrMem;
970 par->ClrLinSiz = siz / par->NmbCpu;
971 par->WrkCpt = 0;
972
973 /* Spread the buffer among each thread and wake then up */
974
975 for(i=0;i<par->NmbCpu;i++)
976 {
977 pth = &par->PthTab[i];
978 pth->ClrAdr = &tab[ i * par->ClrLinSiz ];
979
980 pthread_mutex_lock(&pth->mtx);
981 pthread_cond_signal(&pth->cnd);
982 pthread_mutex_unlock(&pth->mtx);
983 }
984
985 /* Wait for each thread to complete */
986
987 while(par->WrkCpt < par->NmbCpu)
988 pthread_cond_wait(&par->ParCnd, &par->ParMtx);
989
990 pthread_mutex_unlock(&par->ParMtx);
991
992 return(1);
993 }
994
995
996 /*----------------------------------------------------------*/
997 /* Wait for a condition, launch and detach a user procedure */
998 /*----------------------------------------------------------*/
999
LaunchPipeline(int ParIdx,void * prc,void * PtrArg,int NmbDep,int * DepTab)1000 int LaunchPipeline(int ParIdx, void *prc, void *PtrArg, int NmbDep, int *DepTab)
1001 {
1002 int i;
1003 PipSct *NewPip=NULL;
1004 ParSct *par;
1005
1006 /* Get and check lib parallel instance and the number of pipes and dependencies */
1007
1008 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) \
1009 || (NmbDep > MaxPipDep) || (par->NmbPip >= MaxTotPip) )
1010 {
1011 return(0);
1012 }
1013
1014 /* Allocate and setup a new pipe */
1015
1016 if(!(NewPip = malloc(sizeof(PipSct))))
1017 return(0);
1018
1019 NewPip->prc = prc;
1020 NewPip->arg = PtrArg;
1021 NewPip->par = par;
1022 NewPip->NmbDep = NmbDep;
1023
1024 for(i=0;i<NmbDep;i++)
1025 NewPip->DepTab[i] = DepTab[i];
1026
1027 /* Lock pipe mutex, increment pipe counter and launch the pipe regardless dependencies */
1028
1029 pthread_mutex_lock(&par->PipMtx);
1030 NewPip->idx = ++par->NmbPip;
1031 par->PenPip++;
1032 pthread_create(&NewPip->pth, NULL, PipHdl, (void *)NewPip);
1033 pthread_mutex_unlock(&par->PipMtx);
1034
1035 return(NewPip->idx);
1036 }
1037
1038
1039 /*----------------------------------------------------------*/
1040 /* Thread handler launching and waitint for user's procedure*/
1041 /*----------------------------------------------------------*/
1042
PipHdl(void * ptr)1043 static void *PipHdl(void *ptr)
1044 {
1045 int RunFlg, i;
1046 PipSct *pip = (PipSct *)ptr;
1047 ParSct *par = pip->par;
1048 void (*prc)(void *);
1049
1050 /* Wait for conditions to be met */
1051
1052 do
1053 {
1054 pthread_mutex_lock(&par->PipMtx);
1055
1056 if(par->RunPip < par->NmbCpu)
1057 {
1058 RunFlg = 1;
1059
1060 for(i=0;i<pip->NmbDep;i++)
1061 if(!GetBit(par->PipWrd, pip->DepTab[i]))
1062 {
1063 RunFlg = 0;
1064 break;
1065 }
1066 }
1067
1068 if(!RunFlg)
1069 {
1070 pthread_mutex_unlock(&par->PipMtx);
1071 usleep(1000);
1072 }
1073 }while(!RunFlg);
1074
1075 /* Execute the user's procedure and set the flag to 2 (done) */
1076
1077 prc = (void (*)(void *))pip->prc;
1078 par->RunPip++;
1079
1080 pthread_mutex_unlock(&par->PipMtx);
1081
1082 prc(pip->arg);
1083
1084 pthread_mutex_lock(&par->PipMtx);
1085 SetBit(par->PipWrd, pip->idx);
1086 par->PenPip--;
1087 par->RunPip--;
1088 free(pip);
1089 pthread_mutex_unlock(&par->PipMtx);
1090
1091 return(NULL);
1092 }
1093
1094
1095 /*----------------------------------------------------------*/
1096 /* Wait for all pipelined procedures to complete */
1097 /*----------------------------------------------------------*/
1098
WaitPipeline(int ParIdx)1099 void WaitPipeline(int ParIdx)
1100 {
1101 int PenPip;
1102 ParSct *par;
1103
1104 /* Get and check lib parallel instance */
1105
1106 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
1107 return;
1108
1109 do
1110 {
1111 pthread_mutex_lock(&par->PipMtx);
1112 PenPip = par->PenPip;
1113 pthread_mutex_unlock(&par->PipMtx);
1114 usleep(1000);
1115 }while(PenPip);
1116 }
1117
1118
1119 /*-
1120 * Copyright (c) 1992, 1993
1121 * The Regents of the University of California. All rights reserved.
1122 * Multithread implementation Copyright (c) 2006, 2007 Diomidis Spinellis.
1123 *
1124 * Redistribution and use in source and binary forms, with or without
1125 * modification, are permitted provided that the following conditions
1126 * are met:
1127 * 1. Redistributions of source code must retain the above copyright
1128 * notice, this list of conditions and the following disclaimer.
1129 * 2. Redistributions in binary form must reproduce the above copyright
1130 * notice, this list of conditions and the following disclaimer in the
1131 * documentation and/or other materials provided with the distribution.
1132 * 4. Neither the name of the University nor the names of its contributors
1133 * may be used to endorse or promote products derived from this software
1134 * without specific prior written permission.
1135 *
1136 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
1137 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1138 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1139 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
1140 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1141 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
1142 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
1143 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
1144 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
1145 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
1146 * SUCH DAMAGE.
1147 */
1148
1149 #define verify(x) (x)
1150 #define DLOG(...)
1151 #define min(a, b) (a) < (b) ? a : b
1152
1153 typedef int cmp_t(const void *, const void *);
1154 static inline char *med3(char *, char *, char *, cmp_t *, void *);
1155 static inline void swapfunc(char *, char *, int, int);
1156
1157 /*
1158 * Qsort routine from Bentley & McIlroy's "Engineering a Sort Function".
1159 */
1160 #define swapcode(TYPE, parmi, parmj, n) { \
1161 long i = (n) / sizeof (TYPE); \
1162 TYPE *pi = (TYPE *) (parmi); \
1163 TYPE *pj = (TYPE *) (parmj); \
1164 do { \
1165 TYPE t = *pi; \
1166 *pi++ = *pj; \
1167 *pj++ = t; \
1168 } while (--i > 0); \
1169 }
1170
1171
swapfunc(char * a,char * b,int n,int swaptype)1172 static inline void swapfunc(char *a, char *b, int n, int swaptype)
1173 {
1174 if(swaptype <= 1)
1175 swapcode(long, a, b, n)
1176 else
1177 swapcode(char, a, b, n)
1178 }
1179
1180 #define swap(a, b) \
1181 if (swaptype == 0) { \
1182 long t = *(long *)(a); \
1183 *(long *)(a) = *(long *)(b); \
1184 *(long *)(b) = t; \
1185 } else \
1186 swapfunc(a, b, es, swaptype)
1187
1188 #define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n, swaptype)
1189
1190 #define CMP(t, x, y) (cmp((x), (y)))
1191
med3(char * a,char * b,char * c,cmp_t * cmp,void * thunk)1192 static inline char *med3(char *a, char *b, char *c, cmp_t *cmp, void *thunk)
1193 {
1194 return CMP(thunk, a, b) < 0 ?
1195 (CMP(thunk, b, c) < 0 ? b : (CMP(thunk, a, c) < 0 ? c : a ))
1196 :(CMP(thunk, b, c) > 0 ? b : (CMP(thunk, a, c) < 0 ? a : c ));
1197 }
1198
1199 /*
1200 * We use some elaborate condition variables and signalling
1201 * to ensure a bound of the number of active threads at
1202 * 2 * maxthreads and the size of the thread data structure
1203 * to maxthreads.
1204 */
1205
1206 /* Condition of starting a new thread. */
1207 enum thread_state {
1208 ts_idle, /* Idle, waiting for instructions. */
1209 ts_work, /* Has work to do. */
1210 ts_term /* Asked to terminate. */
1211 };
1212
1213 /* Variant part passed to qsort invocations. */
1214 struct qsort {
1215 enum thread_state st; /* For coordinating work. */
1216 struct common *common; /* Common shared elements. */
1217 void *a; /* Array base. */
1218 size_t n; /* Number of elements. */
1219 pthread_t id; /* Thread id. */
1220 pthread_mutex_t mtx_st; /* For signalling state change. */
1221 pthread_cond_t cond_st; /* For signalling state change. */
1222 };
1223
1224 /* Invariant common part, shared across invocations. */
1225 struct common {
1226 int swaptype; /* Code to use for swapping */
1227 size_t es; /* Element size. */
1228 void *thunk; /* Thunk for qsort_r */
1229 cmp_t *cmp; /* Comparison function */
1230 int nthreads; /* Total number of pool threads. */
1231 int idlethreads; /* Number of idle threads in pool. */
1232 int forkelem; /* Minimum number of elements for a new thread. */
1233 struct qsort *pool; /* Fixed pool of threads. */
1234 pthread_mutex_t mtx_al; /* For allocating threads in the pool. */
1235 };
1236
1237 static void *qsort_thread(void *p);
1238
1239 /* The multithreaded qsort public interface */
qsort_mt(void * a,size_t n,size_t es,cmp_t * cmp,int maxthreads,int forkelem)1240 void qsort_mt(void *a, size_t n, size_t es, cmp_t *cmp, int maxthreads, int forkelem)
1241 {
1242 struct qsort *qs;
1243 struct common c;
1244 int i, islot;
1245 bool bailout = true;
1246
1247 if (n < forkelem)
1248 goto f1;
1249 errno = 0;
1250 #ifdef __FreeBSD__
1251 int ncpu;
1252 if (maxthreads == 0) {
1253 /*
1254 * Other candidates:
1255 * NPROC environment variable (BSD/OS, CrayOS)
1256 * sysctl hw.ncpu or kern.smp.cpus
1257 */
1258 if (pmc_init() == 0 && (ncpu = pmc_ncpu()) != -1)
1259 maxthreads = ncpu;
1260 else
1261 maxthreads = 2;
1262 }
1263 #endif
1264 /* XXX temporarily disabled for stress and performance testing.
1265 if (maxthreads == 1)
1266 goto f1;
1267 */
1268 /* Try to initialize the resources we need. */
1269 if (pthread_mutex_init(&c.mtx_al, NULL) != 0)
1270 goto f1;
1271 if ((c.pool = (struct qsort *)calloc(maxthreads, sizeof(struct qsort))) ==NULL)
1272 goto f2;
1273 for (islot = 0; islot < maxthreads; islot++) {
1274 qs = &c.pool[islot];
1275 if (pthread_mutex_init(&qs->mtx_st, NULL) != 0)
1276 goto f3;
1277 if (pthread_cond_init(&qs->cond_st, NULL) != 0) {
1278 verify(pthread_mutex_destroy(&qs->mtx_st));
1279 goto f3;
1280 }
1281 qs->st = ts_idle;
1282 qs->common = &c;
1283 if (pthread_create(&qs->id, NULL, qsort_thread, qs) != 0) {
1284 verify(pthread_mutex_destroy(&qs->mtx_st));
1285 verify(pthread_cond_destroy(&qs->cond_st));
1286 goto f3;
1287 }
1288 }
1289
1290 /* All systems go. */
1291 bailout = false;
1292
1293 /* Initialize common elements. */
1294 c.swaptype = ((char *)a - (char *)0) % sizeof(long) || \
1295 es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1;
1296 c.es = es;
1297 c.cmp = cmp;
1298 c.forkelem = forkelem;
1299 c.idlethreads = c.nthreads = maxthreads;
1300
1301 /* Hand out the first work batch. */
1302 qs = &c.pool[0];
1303 verify(pthread_mutex_lock(&qs->mtx_st));
1304 qs->a = a;
1305 qs->n = n;
1306 qs->st = ts_work;
1307 c.idlethreads--;
1308 verify(pthread_cond_signal(&qs->cond_st));
1309 verify(pthread_mutex_unlock(&qs->mtx_st));
1310
1311 /*
1312 * Wait for all threads to finish, and
1313 * free acquired resources.
1314 */
1315 f3: for (i = 0; i < islot; i++) {
1316 qs = &c.pool[i];
1317 if (bailout) {
1318 verify(pthread_mutex_lock(&qs->mtx_st));
1319 qs->st = ts_term;
1320 verify(pthread_cond_signal(&qs->cond_st));
1321 verify(pthread_mutex_unlock(&qs->mtx_st));
1322 }
1323 verify(pthread_join(qs->id, NULL));
1324 verify(pthread_mutex_destroy(&qs->mtx_st));
1325 verify(pthread_cond_destroy(&qs->cond_st));
1326 }
1327 free(c.pool);
1328 f2: verify(pthread_mutex_destroy(&c.mtx_al));
1329 if (bailout) {
1330 DLOG("Resource initialization failed; bailing out.\n");
1331 /* XXX should include a syslog call here */
1332 fprintf(stderr, "Resource initialization failed; bailing out.\n");
1333 f1: qsort(a, n, es, cmp);
1334 }
1335 }
1336
1337 #define thunk NULL
1338
1339 /*
1340 * Allocate an idle thread from the pool, lock its
1341 * mutex, change its state to work, decrease the number
1342 * of idle threads, and return a
1343 * pointer to its data area.
1344 * Return NULL, if no thread is available.
1345 */
allocate_thread(struct common * c)1346 static struct qsort *allocate_thread(struct common *c)
1347 {
1348 int i;
1349
1350 verify(pthread_mutex_lock(&c->mtx_al));
1351 for (i = 0; i < c->nthreads; i++)
1352 if (c->pool[i].st == ts_idle) {
1353 c->idlethreads--;
1354 c->pool[i].st = ts_work;
1355 verify(pthread_mutex_lock(&c->pool[i].mtx_st));
1356 verify(pthread_mutex_unlock(&c->mtx_al));
1357 return (&c->pool[i]);
1358 }
1359 verify(pthread_mutex_unlock(&c->mtx_al));
1360 return (NULL);
1361 }
1362
1363 /* Thread-callable quicksort. */
qsort_algo(struct qsort * qs)1364 static void qsort_algo(struct qsort *qs)
1365 {
1366 char *pa, *pb, *pc, *pd, *pl, *pm, *pn;
1367 int d, r, swaptype, swap_cnt;
1368 void *a; /* Array of elements. */
1369 size_t n, es; /* Number of elements; size. */
1370 cmp_t *cmp;
1371 int nl, nr;
1372 struct common *c;
1373 struct qsort *qs2;
1374 pthread_t id;
1375
1376 /* Initialize qsort arguments. */
1377 id = qs->id;
1378 c = qs->common;
1379 es = c->es;
1380 cmp = c->cmp;
1381 swaptype = c->swaptype;
1382 a = qs->a;
1383 n = qs->n;
1384 top:
1385 DLOG("%10x n=%-10d Sort starting.\n", id, n);
1386
1387 /* From here on qsort(3) business as usual. */
1388 swap_cnt = 0;
1389 if (n < 7) {
1390 for (pm = (char *)a + es; pm < (char *)a + n * es; pm += es)
1391 for (pl = pm;
1392 pl > (char *)a && CMP(thunk, pl - es, pl) > 0;
1393 pl -= es)
1394 swap(pl, pl - es);
1395 return;
1396 }
1397 pm = (char *)a + (n / 2) * es;
1398 if (n > 7) {
1399 pl = a;
1400 pn = (char *)a + (n - 1) * es;
1401 if (n > 40) {
1402 d = (n / 8) * es;
1403 pl = med3(pl, pl + d, pl + 2 * d, cmp, thunk);
1404 pm = med3(pm - d, pm, pm + d, cmp, thunk);
1405 pn = med3(pn - 2 * d, pn - d, pn, cmp, thunk);
1406 }
1407 pm = med3(pl, pm, pn, cmp, thunk);
1408 }
1409 swap(a, pm);
1410 pa = pb = (char *)a + es;
1411
1412 pc = pd = (char *)a + (n - 1) * es;
1413 for (;;) {
1414 while (pb <= pc && (r = CMP(thunk, pb, a)) <= 0) {
1415 if (r == 0) {
1416 swap_cnt = 1;
1417 swap(pa, pb);
1418 pa += es;
1419 }
1420 pb += es;
1421 }
1422 while (pb <= pc && (r = CMP(thunk, pc, a)) >= 0) {
1423 if (r == 0) {
1424 swap_cnt = 1;
1425 swap(pc, pd);
1426 pd -= es;
1427 }
1428 pc -= es;
1429 }
1430 if (pb > pc)
1431 break;
1432 swap(pb, pc);
1433 swap_cnt = 1;
1434 pb += es;
1435 pc -= es;
1436 }
1437 if (swap_cnt == 0) { /* Switch to insertion sort */
1438 for (pm = (char *)a + es; pm < (char *)a + n * es; pm += es)
1439 for (pl = pm;
1440 pl > (char *)a && CMP(thunk, pl - es, pl) > 0;
1441 pl -= es)
1442 swap(pl, pl - es);
1443 return;
1444 }
1445
1446 pn = (char *)a + n * es;
1447 r = min(pa - (char *)a, pb - pa);
1448 vecswap(a, pb - r, r);
1449 r = min(pd - pc, pn - pd - es);
1450 vecswap(pb, pn - r, r);
1451
1452 nl = (pb - pa) / es;
1453 nr = (pd - pc) / es;
1454 DLOG("%10x n=%-10d Partitioning finished ln=%d rn=%d.\n", id, n, nl, nr);
1455
1456 /* Now try to launch subthreads. */
1457 if (nl > c->forkelem && nr > c->forkelem &&
1458 (qs2 = allocate_thread(c)) != NULL) {
1459 DLOG("%10x n=%-10d Left farmed out to %x.\n", id, n, qs2->id);
1460 qs2->a = a;
1461 qs2->n = nl;
1462 verify(pthread_cond_signal(&qs2->cond_st));
1463 verify(pthread_mutex_unlock(&qs2->mtx_st));
1464 } else if (nl > 0) {
1465 DLOG("%10x n=%-10d Left will be done in-house.\n", id, n);
1466 qs->a = a;
1467 qs->n = nl;
1468 qsort_algo(qs);
1469 }
1470 if (nr > 0) {
1471 DLOG("%10x n=%-10d Right will be done in-house.\n", id, n);
1472 a = pn - nr * es;
1473 n = nr;
1474 goto top;
1475 }
1476 }
1477
1478 /* Thread-callable quicksort. */
qsort_thread(void * p)1479 static void *qsort_thread(void *p)
1480 {
1481 struct qsort *qs, *qs2;
1482 int i;
1483 struct common *c;
1484 pthread_t id;
1485
1486 qs = p;
1487 id = qs->id;
1488 c = qs->common;
1489 again:
1490 /* Wait for work to be allocated. */
1491 DLOG("%10x n=%-10d Thread waiting for work.\n", id, 0);
1492 verify(pthread_mutex_lock(&qs->mtx_st));
1493 while (qs->st == ts_idle)
1494 verify(pthread_cond_wait(&qs->cond_st, &qs->mtx_st));
1495 verify(pthread_mutex_unlock(&qs->mtx_st));
1496 if (qs->st == ts_term) {
1497 DLOG("%10x n=%-10d Thread signalled to exit.\n", id, 0);
1498 return(NULL);
1499 }
1500 assert(qs->st == ts_work);
1501
1502 qsort_algo(qs);
1503
1504 verify(pthread_mutex_lock(&c->mtx_al));
1505 qs->st = ts_idle;
1506 c->idlethreads++;
1507 DLOG("%10x n=%-10d Finished idlethreads=%d.\n", id, 0, c->idlethreads);
1508 if (c->idlethreads == c->nthreads) {
1509 DLOG("%10x n=%-10d All threads idle, signalling shutdown.\n", id, 0);
1510 for (i = 0; i < c->nthreads; i++) {
1511 qs2 = &c->pool[i];
1512 if (qs2 == qs)
1513 continue;
1514 verify(pthread_mutex_lock(&qs2->mtx_st));
1515 qs2->st = ts_term;
1516 verify(pthread_cond_signal(&qs2->cond_st));
1517 verify(pthread_mutex_unlock(&qs2->mtx_st));
1518 }
1519 DLOG("%10x n=%-10d Shutdown signalling complete.\n", id, 0);
1520 verify(pthread_mutex_unlock(&c->mtx_al));
1521 return(NULL);
1522 }
1523 verify(pthread_mutex_unlock(&c->mtx_al));
1524 goto again;
1525 }
1526
1527
1528 /*----------------------------------------------------------*/
1529 /* Lplib qsort_mt encapsulation */
1530 /*----------------------------------------------------------*/
1531
ParallelQsort(int ParIdx,void * base,size_t nel,size_t width,int (* compar)(const void *,const void *))1532 void ParallelQsort(int ParIdx, void *base, size_t nel, size_t width, int (*compar)(const void *, const void *))
1533 {
1534 ParSct *par;
1535
1536 /* Get and check lib parallel instance */
1537
1538 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
1539 return;
1540
1541 qsort_mt(base, nel, width, compar, par->NmbCpu, 10000);
1542 }
1543
1544
1545 /*----------------------------------------------------------*/
1546 /* Compute the hilbert code from 3d coordinates */
1547 /*----------------------------------------------------------*/
1548
RenPrc(int BegIdx,int EndIdx,int PthIdx,ArgSct * arg)1549 static void RenPrc(int BegIdx, int EndIdx, int PthIdx, ArgSct *arg)
1550 {
1551 unsigned long long IntCrd[3], m=1LL<<63, cod;
1552 int i, j, b, GeoWrd, NewWrd, BitTab[3] = {1,2,4};
1553 double dbl;
1554 int rot[8], GeoCod[8]={0,3,7,4,1,2,6,5}; /* Z curve = {5,4,7,6,1,0,3,2} */
1555 int HilCod[8][8] = {{0,7,6,1,2,5,4,3}, {0,3,4,7,6,5,2,1}, {0,3,4,7,6,5,2,1}, {2,3,0,1,6,7,4,5},\
1556 {2,3,0,1,6,7,4,5}, {6,5,2,1,0,3,4,7}, {6,5,2,1,0,3,4,7}, {4,3,2,5,6,1,0,7}};
1557
1558 for(i=BegIdx; i<=EndIdx; i++)
1559 {
1560 /* Convert double precision coordinates to integers */
1561
1562 for(j=0;j<3;j++)
1563 {
1564 dbl = (arg->crd[i][j] - arg->box[j]) * arg->box[j+3];
1565 IntCrd[j] = dbl;
1566 }
1567
1568 /* Binary hilbert renumbering loop */
1569
1570 cod = 0;
1571
1572 for(j=0;j<8;j++)
1573 rot[j] = GeoCod[j];
1574
1575 for(b=0;b<21;b++)
1576 {
1577 GeoWrd = 0;
1578
1579 for(j=0;j<3;j++)
1580 {
1581 if(IntCrd[j] & m)
1582 GeoWrd |= BitTab[j];
1583
1584 IntCrd[j] = IntCrd[j]<<1;
1585 }
1586
1587 NewWrd = rot[ GeoWrd ];
1588 cod = cod<<3 | NewWrd;
1589
1590 for(j=0;j<8;j++)
1591 rot[j] = HilCod[ NewWrd ][ rot[j] ];
1592 }
1593
1594 arg->idx[i][0] = cod;
1595 arg->idx[i][1] = i;
1596 }
1597 }
1598
1599
1600 /*----------------------------------------------------------*/
1601 /* Comparison of two items for the qsort */
1602 /*----------------------------------------------------------*/
1603
CmpPrc(const void * a,const void * b)1604 int CmpPrc(const void *a, const void *b)
1605 {
1606 unsigned long long *pa = (unsigned long long *)a, *pb = (unsigned long long *)b;
1607
1608 if(*pa > *pb)
1609 return(1);
1610 else
1611 return(-1);
1612 }
1613
1614
1615 /*----------------------------------------------------------*/
1616 /* Renumber a set of coordinates through a Hilbert SFC */
1617 /*----------------------------------------------------------*/
1618
HilbertRenumbering(int ParIdx,int NmbLin,double box[6],double (* crd)[3],unsigned long long (* idx)[2])1619 void HilbertRenumbering(int ParIdx, int NmbLin, double box[6], double (*crd)[3], unsigned long long (*idx)[2])
1620 {
1621 int i, NewTyp;
1622 double len = pow(2,64);
1623 ParSct *par;
1624 ArgSct arg;
1625
1626 /* Get and check lib parallel instance */
1627
1628 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
1629 return;
1630
1631 NewTyp = NewType(ParIdx, NmbLin);
1632 arg.crd = crd;
1633 arg.idx = idx;
1634 arg.box[0] = box[0];
1635 arg.box[1] = box[1];
1636 arg.box[2] = box[2];
1637 arg.box[3] = len / (box[3] - box[0]);
1638 arg.box[4] = len / (box[4] - box[1]);
1639 arg.box[5] = len / (box[5] - box[2]);
1640
1641 LaunchParallel(ParIdx, NewTyp, 0, (void *)RenPrc, (void *)&arg);
1642 ParallelQsort(ParIdx, &idx[1][0], NmbLin, 2 * sizeof(long long), CmpPrc);
1643
1644 for(i=1;i<=NmbLin;i++)
1645 idx[ idx[i][1] ][0] = i;
1646 }
1647
1648
1649 /*----------------------------------------------------------*/
1650 /* Compute the hilbert code from 2d coordinates */
1651 /*----------------------------------------------------------*/
1652
RenPrc2D(int BegIdx,int EndIdx,int PthIdx,ArgSct * arg)1653 static void RenPrc2D(int BegIdx, int EndIdx, int PthIdx, ArgSct *arg)
1654 {
1655 unsigned long long IntCrd[2], m=1LL<<62, cod;
1656 int i, j, b, GeoWrd, NewWrd, BitTab[2] = {1,2};
1657 double dbl;
1658 int rot[4], GeoCod[4]={1,2,0,3};
1659 int HilCod[4][4] = {{0,3,2,1}, {0,1,2,3}, {0,1,2,3}, {2,1,0,3}};
1660
1661 for(i=BegIdx; i<=EndIdx; i++)
1662 {
1663 /* Convert double precision coordinates to integers */
1664
1665 for(j=0;j<2;j++)
1666 {
1667 dbl = (arg->crd2[i][j] - arg->box[j]) * arg->box[j+2];
1668 IntCrd[j] = dbl;
1669 }
1670
1671 /* Binary hilbert renumbering loop */
1672
1673 cod = 0;
1674
1675 for(j=0;j<4;j++)
1676 rot[j] = GeoCod[j];
1677
1678 for(b=0;b<31;b++)
1679 {
1680 GeoWrd = 0;
1681
1682 for(j=0;j<2;j++)
1683 {
1684 if(IntCrd[j] & m)
1685 GeoWrd |= BitTab[j];
1686
1687 IntCrd[j] = IntCrd[j]<<1;
1688 }
1689
1690 NewWrd = rot[ GeoWrd ];
1691 cod = cod<<2 | NewWrd;
1692
1693 for(j=0;j<4;j++)
1694 rot[j] = HilCod[ NewWrd ][ rot[j] ];
1695 }
1696
1697 arg->idx[i][0] = cod;
1698 arg->idx[i][1] = i;
1699 }
1700 }
1701
1702
1703 /*----------------------------------------------------------*/
1704 /* Renumber a set of 2D coordinates through a Hilbert SFC */
1705 /*----------------------------------------------------------*/
1706
HilbertRenumbering2D(int ParIdx,int NmbLin,double box[4],double (* crd)[2],unsigned long long (* idx)[2])1707 void HilbertRenumbering2D(int ParIdx, int NmbLin, double box[4], double (*crd)[2], unsigned long long (*idx)[2])
1708 {
1709 int i, NewTyp;
1710 double len = pow(2,62);
1711 ParSct *par;
1712 ArgSct arg;
1713
1714 /* Get and check lib parallel instance */
1715
1716 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
1717 return;
1718
1719 NewTyp = NewType(ParIdx, NmbLin);
1720 arg.crd2 = crd;
1721 arg.idx = idx;
1722 arg.box[0] = box[0];
1723 arg.box[1] = box[1];
1724 arg.box[2] = len / (box[2] - box[0]);
1725 arg.box[3] = len / (box[3] - box[1]);
1726
1727 LaunchParallel(ParIdx, NewTyp, 0, (void *)RenPrc2D, (void *)&arg);
1728 ParallelQsort(ParIdx, &idx[1][0], NmbLin, 2 * sizeof(long long), CmpPrc);
1729
1730 for(i=1;i<=NmbLin;i++)
1731 idx[ idx[i][1] ][0] = i;
1732 }
1733
1734
1735 /*----------------------------------------------------------*/
1736 /* Setup a parallel hash table and return its index */
1737 /*----------------------------------------------------------*/
1738
AllocHash(int ParIdx,int BasTyp,int DepTyp)1739 int AllocHash(int ParIdx, int BasTyp, int DepTyp)
1740 {
1741 int HshIdx, mul, i;
1742 ParSct *par;
1743 HshSct *hsh;
1744 TypSct *typ1, *typ2;
1745
1746 /* Get and check lib parallel instance */
1747
1748 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) || !BasTyp || !DepTyp)
1749 return(0);
1750
1751 typ1 = &par->TypTab[ BasTyp ];
1752 typ2 = &par->TypTab[ DepTyp ];
1753
1754 for(HshIdx=1; HshIdx< MaxHsh; HshIdx++)
1755 if(!par->HshTab[ HshIdx ].typ1)
1756 break;
1757
1758 if( (HshIdx == MaxHsh) || !typ1->NmbLin || !typ2->NmbLin )
1759 return(0);
1760
1761 hsh = &par->HshTab[ HshIdx ];
1762 hsh->typ1 = typ1;
1763 hsh->typ2 = typ2;
1764 mul = pow(typ1->NmbLin, 1./3.) - 1;
1765 hsh->mul[0] = mul;
1766 hsh->mul[1] = mul * mul;
1767 hsh->mul[2] = mul * mul * mul;
1768 hsh->buc = calloc(typ1->NmbLin, sizeof(BucSct));
1769
1770 printf("hash mul = %d %d %d\n",hsh->mul[2],hsh->mul[1],hsh->mul[0]);
1771 printf("hash size = %d, adr = %p\n",typ1->NmbLin,hsh->buc);
1772
1773 for(i=0;i<par->NmbCpu;i++)
1774 {
1775 hsh->ovf[i] = calloc(typ1->NmbLin / par->NmbCpu, sizeof(BucSct));
1776 hsh->NmbOvf[i] = 0;
1777 }
1778
1779 return(HshIdx);
1780 }
1781
1782
1783 /*----------------------------------------------------------*/
1784 /* Release a hash table */
1785 /*----------------------------------------------------------*/
1786
FreeHash(int ParIdx,int HshIdx)1787 void FreeHash(int ParIdx, int HshIdx)
1788 {
1789 int i;
1790 ParSct *par;
1791
1792 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]))
1793 return;
1794
1795 if( (HshIdx < 1) || (HshIdx > MaxHsh) )
1796 return;
1797
1798 if(par->HshTab[ HshIdx ].buc)
1799 free(par->HshTab[ HshIdx ].buc);
1800
1801 for(i=0;i<par->NmbCpu;i++)
1802 if(par->HshTab[ HshIdx ].ovf[i])
1803 free(par->HshTab[ HshIdx ].ovf[i]);
1804
1805 memset(&par->HshTab[ HshIdx ], 0, sizeof(HshSct));
1806 }
1807
1808
1809 /*----------------------------------------------------------*/
1810 /* Multithread compatible hash insertertion */
1811 /*----------------------------------------------------------*/
1812
AddHash(int ParIdx,int PthIdx,int HshIdx,int a,int b,int c,long long int dat)1813 long long int AddHash(int ParIdx, int PthIdx, int HshIdx, int a, int b, int c, long long int dat)
1814 {
1815 int i;
1816 long long int key, idx[3];
1817 ParSct *par;
1818 HshSct *hsh;
1819 BucSct *buc, *ovf;
1820
1821 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]))
1822 return(0);
1823
1824 if( (HshIdx < 1) || (HshIdx > MaxHsh) )
1825 return(0);
1826
1827 hsh = &par->HshTab[ HshIdx ];
1828
1829 if(a < b)
1830 {
1831 if(b < c)
1832 {
1833 idx[0] = a;
1834 idx[1] = b;
1835 idx[2] = c;
1836 }
1837 else if(a < c)
1838 {
1839 idx[0] = a;
1840 idx[1] = c;
1841 idx[2] = b;
1842 }
1843 else
1844 {
1845 idx[0] = c;
1846 idx[1] = a;
1847 idx[2] = b;
1848 }
1849 }
1850 else
1851 {
1852 if(a < c)
1853 {
1854 idx[0] = b;
1855 idx[1] = a;
1856 idx[2] = c;
1857 }
1858 else if(b < c)
1859 {
1860 idx[0] = b;
1861 idx[1] = c;
1862 idx[2] = a;
1863 }
1864 else
1865 {
1866 idx[0] = c;
1867 idx[1] = b;
1868 idx[2] = a;
1869 }
1870 }
1871
1872 key = (hsh->mul[2] * idx[2] + hsh->mul[1] * idx[1] + hsh->mul[0] * idx[0]) / hsh->typ2->NmbLin;
1873 buc = &hsh->buc[ key ];
1874
1875 /* printf("pth %d, hash %d / %p, tab = %p, ovf = %p, indices = %lld %lld %lld, key %lld, buc = %p\n",\
1876 PthIdx, HshIdx, hsh, hsh->buc, hsh->ovf[ PthIdx ], idx[2], idx[1], idx[0], key, buc);
1877 */
1878 if(!buc->idx[2])
1879 {
1880 for(i=0;i<3;i++)
1881 buc->idx[i] = idx[i];
1882
1883 buc->dat = dat;
1884 return(0);
1885 }
1886 else
1887 do
1888 {
1889 if( (buc->idx[0] == idx[0]) && (buc->idx[1] == idx[1]) && (buc->idx[2] == idx[2]) )
1890 return(buc->dat);
1891 else if(!buc->nex)
1892 {
1893 if(hsh->NmbOvf[ PthIdx ] >= hsh->typ1->NmbLin / par->NmbCpu)
1894 {
1895 hsh->ovf[ PthIdx ] = calloc(hsh->typ1->NmbLin / par->NmbCpu, sizeof(BucSct));
1896 hsh->NmbOvf[ PthIdx ] = 0;
1897 puts("realloc");
1898 }
1899
1900 ovf = &hsh->ovf[ PthIdx ][ hsh->NmbOvf[ PthIdx ]++ ];
1901 ovf->nex = hsh->buc[ key ].nex;
1902 hsh->buc[ key ].nex = ovf;
1903 ovf->dat = dat;
1904
1905 for(i=0;i<3;i++)
1906 ovf->idx[i] = idx[i];
1907
1908 return(0);
1909 }
1910 }while(buc = buc->nex);
1911
1912 return(0);
1913 }
1914
1915
1916 #else
1917
1918
1919 #include <stdio.h>
1920 #include <stdlib.h>
1921 #include <string.h>
1922 #include "lplib3.h"
1923
1924 #define MaxLibPar 10
1925 #define MaxPth 128
1926 #define MaxTyp 100
1927
1928 typedef struct
1929 {
1930 int NmbLin;
1931 }TypSct;
1932
1933 typedef struct ParSct
1934 {
1935 int NmbTyp;
1936 float sta[2];
1937 void (*prc)(int, int, int, void *), *arg;
1938 TypSct TypTab[ MaxTyp+1 ];
1939 }ParSct;
1940
1941 ParSct *ParTab[ MaxLibPar+1 ];
1942 int IniLibPar = 0;
1943
1944
InitParallel(int NmbCpu)1945 int InitParallel(int NmbCpu)
1946 {
1947 int i, ParIdx;
1948 ParSct *par = NULL;
1949
1950 if(NmbCpu > MaxPth)
1951 return(0);
1952
1953 if(!IniLibPar)
1954 {
1955 IniLibPar = 1;
1956
1957 for(i=1;i<=MaxLibPar;i++)
1958 ParTab[i] = NULL;
1959 }
1960
1961 /* Allocate and build main parallel structure */
1962
1963 for(ParIdx=1; ParIdx<=MaxLibPar; ParIdx++)
1964 if(!ParTab[ ParIdx ])
1965 {
1966 par = ParTab[ ParIdx ] = calloc(1, sizeof(ParSct));
1967 break;
1968 }
1969
1970 if(!par)
1971 return(0);
1972
1973 return(ParIdx);
1974 }
1975
1976
StopParallel(int ParIdx)1977 void StopParallel(int ParIdx)
1978 {
1979 int i;
1980 ParSct *par;
1981
1982 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
1983 return;
1984
1985 for(i=1;i<=MaxTyp;i++)
1986 FreeType(ParIdx, i);
1987
1988 free(par);
1989 ParTab[ ParIdx ] = NULL;
1990 }
1991
1992
NewType(int ParIdx,int NmbLin)1993 int NewType(int ParIdx, int NmbLin)
1994 {
1995 int i, TypIdx=0;
1996 TypSct *typ;
1997 ParSct *par;
1998
1999 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
2000 return(0);
2001
2002 if(NmbLin <= 0)
2003 return(0);
2004
2005 for(i=1;i<=MaxTyp;i++)
2006 if(!par->TypTab[i].NmbLin)
2007 {
2008 TypIdx = i;
2009 break;
2010 }
2011
2012 if(!TypIdx)
2013 return(0);
2014
2015 typ = &par->TypTab[ TypIdx ];
2016 typ->NmbLin = NmbLin;
2017
2018 return(TypIdx);
2019 }
2020
2021
FreeType(int ParIdx,int TypIdx)2022 void FreeType(int ParIdx, int TypIdx)
2023 {
2024 }
2025
2026
BeginDependency(int ParIdx,int TypIdx1,int TypIdx2)2027 int BeginDependency(int ParIdx, int TypIdx1, int TypIdx2)
2028 {
2029 return(0);
2030 }
2031
2032
AddDependency(int ParIdx,int idx1,int idx2)2033 void AddDependency(int ParIdx, int idx1, int idx2)
2034 {
2035 }
2036
2037
EndDependency(int ParIdx,float DepSta[2])2038 void EndDependency(int ParIdx, float DepSta[2])
2039 {
2040 }
2041
2042
LaunchParallel(int ParIdx,int typ1,int typ2,void * prc,void * PtrArg)2043 float LaunchParallel(int ParIdx, int typ1, int typ2, void *prc, void *PtrArg)
2044 {
2045 ParSct *par;
2046 void (*UsrPrc)(int, int, int, void *) = (void (*)(int, int, int, void *))prc;
2047
2048 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
2049 return(-1.);
2050
2051 if( (typ1 < 1) || (typ1 > MaxTyp) || (typ2 < 0) || (typ2 > MaxTyp) || (typ1 == typ2) )
2052 return(-1.);
2053
2054 UsrPrc(1, par->TypTab[ typ1 ].NmbLin, 0, PtrArg);
2055
2056 return(1.);
2057 }
2058
2059
ParallelMemClear(int ParIdx,void * PtrArg,long siz)2060 int ParallelMemClear(int ParIdx, void *PtrArg, long siz)
2061 {
2062 memset(PtrArg, 0, siz);
2063 return(1);
2064 }
2065
ParallelQsort(int ParIdx,void * base,size_t nel,size_t width,int (* compar)(const void *,const void *))2066 void ParallelQsort(int ParIdx, void *base, size_t nel, size_t width, int (*compar)(const void *, const void *))
2067 {
2068 ParSct *par;
2069
2070 /* Get and check lib parallel instance */
2071
2072 if( (ParIdx < 1) || (ParIdx > MaxLibPar) || !(par = ParTab[ ParIdx ]) )
2073 return;
2074
2075 qsort(base, nel, width, compar);
2076 }
2077
2078 #endif
2079