1 #include "cado.h" // IWYU pragma: keep
2 #include <pthread.h>      // for pthread_t
3 #include <stddef.h>       // for ptrdiff_t
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 
8 #include <sys/types.h>
9 #include <errno.h>
10 #include <stdarg.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <ctype.h>
14 #include "select_mpi.h"
15 #include "parallelizing_info.h"
16 #include "macros.h"
17 #include "misc.h"
18 #include "verbose.h"
19 #include "portability.h"
20 
21 
22 #include <sys/time.h>   // gettimeofday
23 #ifdef  HAVE_UTSNAME_H
24 #include <sys/utsname.h>
25 #include <limits.h>
26 #endif
27 
28 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
29 #include "cpubinding.h"
30 #include "params.h"
31 #endif  /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
32 
pi_comm_init_pthread_things(pi_comm_ptr w,const char * desc)33 static inline void pi_comm_init_pthread_things(pi_comm_ptr w, const char * desc)
34 {
35     struct pthread_things * res;
36 
37     res = malloc(sizeof(struct pthread_things));
38 
39     barrier_init(res->bh, w->ncores);
40     my_pthread_barrier_init(res->b, NULL, w->ncores);
41     my_pthread_mutex_init(res->m, NULL);
42     res->desc = strdup(desc);
43 
44     w->th = res;
45 }
46 
pi_comm_destroy_pthread_things(pi_comm_ptr w)47 static inline void pi_comm_destroy_pthread_things(pi_comm_ptr w)
48 {
49     barrier_destroy(w->th->bh);
50     my_pthread_barrier_destroy(w->th->b);
51 
52     my_pthread_mutex_destroy(w->th->m);
53     /* Beware ! Freeing mustn't happen more than once ! */
54     free(w->th->desc);
55     free(w->th);
56     w->th = NULL;
57 }
58 
print_several(unsigned int n1,unsigned int n2,char a,char b,unsigned int w)59 static void print_several(unsigned int n1, unsigned int n2, char a, char b, unsigned int w)
60 {
61     char * toto;
62     toto = malloc(w+2);
63     printf("%c",b);
64     memset(toto, a, w+1);
65     toto[w+1]='\0';
66     for(unsigned int i = 0 ; i < n1 ; i++) {
67         toto[w]=a;
68         for(unsigned int j = 0 ; j < n2-1 ; j++) {
69             printf("%s", toto);
70         }
71         toto[w]=b;
72         printf("%s", toto);
73     }
74     printf("\n");
75     free(toto);
76 }
77 
78 struct pi_go_helper_s {
79     void *(*fcn)(parallelizing_info_ptr, param_list pl, void*);
80     parallelizing_info_ptr p;
81     param_list_ptr pl;
82     void * arg;
83 };
84 
pi_go_helper_func(struct pi_go_helper_s * s)85 void * pi_go_helper_func(struct pi_go_helper_s * s)
86 {
87     pi_interleaving_enter(s->p);
88 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
89     cpubinding_do_pinning(s->p->cpubinding_info, s->p->wr[1]->trank, s->p->wr[0]->trank);
90 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
91     int debug = 0;
92     param_list_parse_int(s->pl, "debug-parallel-bwc", &debug);
93 
94     if (debug) {
95         pi_log_init(s->p->m);
96         pi_log_init(s->p->wr[0]);
97         pi_log_init(s->p->wr[1]);
98     }
99 
100     void * ret = (s->fcn)(s->p, s->pl, s->arg);
101 
102     if (debug) {
103         pi_log_clear(s->p->m);
104         pi_log_clear(s->p->wr[0]);
105         pi_log_clear(s->p->wr[1]);
106     }
107 
108     pi_interleaving_flip(s->p);
109     pi_interleaving_leave(s->p);
110     return ret;
111 }
112 
113 typedef void * (*pthread_callee_t)(void*);
114 
115 /*
116    void pi_errhandler(MPI_Comm * comm, int * err, ...)
117    {
118    int size;
119    int rank;
120    MPI_Comm_size(*comm, &size);
121    MPI_Comm_rank(*comm, &rank);
122    fprintf(stderr, "Fatal MPI error ;\n");
123    fprintf(stderr, " job %d in a comm. of size %d ;\n", rank, size);
124    if (err == NULL) {
125    fprintf(stderr, " no error code.\n");
126    } else {
127    char buf[MPI_MAX_ERROR_STRING];
128    int len = sizeof(buf);
129    MPI_Error_string(*err, buf, &len);
130    fprintf(stderr, " %s\n", buf);
131    }
132    abort();
133    }
134    */
135 
grid_print(parallelizing_info_ptr pi,char * buf,size_t siz,int print)136 void grid_print(parallelizing_info_ptr pi, char * buf, size_t siz, int print)
137 {
138     pi_comm_ptr wr = pi->m;
139     unsigned long maxsize = siz;
140     pi_allreduce(NULL, &maxsize, 1, BWC_PI_UNSIGNED_LONG, BWC_PI_MAX, wr);
141 
142     char * strings = malloc(wr->totalsize * maxsize);
143     memset(strings, 0, wr->totalsize * maxsize);
144 
145     int me = wr->jrank * wr->ncores + wr->trank;
146 
147     /* instead of doing memcpy, we align the stuff. */
148     char * fmt;
149     int rc = asprintf(&fmt, "%%-%lus", maxsize-1);
150     ASSERT_ALWAYS(rc >= 0);
151     snprintf(strings + me * maxsize, maxsize, fmt, buf);
152     free(fmt);
153     pi_allreduce(NULL, strings, wr->totalsize * maxsize, BWC_PI_BYTE, BWC_PI_BXOR, wr);
154 
155     char * ptr = strings;
156 
157     serialize(pi->m);
158 
159     if (print) {
160         /* There's also the null byte counted in siz. So that makes one
161          * less.
162          */
163         unsigned int nj0 = pi->wr[0]->njobs;
164         unsigned int nj1 = pi->wr[1]->njobs;
165         unsigned int nt0 = pi->wr[0]->ncores;
166         unsigned int nt1 = pi->wr[1]->ncores;
167         print_several(nj0, nt0, '-', '+', maxsize-1);
168         for(unsigned int j1 = 0 ; j1 < nj1 ; j1++) {
169             for(unsigned int t1 = 0 ; t1 < nt1 ; t1++) {
170                 printf("|");
171                 for(unsigned int j0 = 0 ; j0 < nj0 ; j0++) {
172                     for(unsigned int t0 = 0 ; t0 < nt0 ; t0++) {
173                         int fence = t0 == nt0 - 1;
174                         printf("%s%c", ptr, fence ? '|' : ' ');
175                         ptr += maxsize;
176                     }
177                 }
178                 printf("\n");
179             }
180             print_several(nj0, nt0, '-', '+', maxsize-1);
181         }
182     }
183     serialize(wr);
184     free(strings);
185 }
186 
get_node_number_and_prefix(parallelizing_info_ptr pi)187 static void get_node_number_and_prefix(parallelizing_info_ptr pi)
188 {
189     int len = strlen(pi->nodename);
190     int minlen = len;
191     int maxlen = len;
192     MPI_Allreduce(MPI_IN_PLACE, &minlen, 1, MPI_INT, MPI_MIN, pi->m->pals);
193     MPI_Allreduce(MPI_IN_PLACE, &maxlen, 1, MPI_INT, MPI_MAX, pi->m->pals);
194     int j;
195     for(j = 0 ; j < minlen ; j++) {
196         int c = pi->nodename[j];
197         if (isdigit(c)) {
198             c = -1;
199         }
200         int cmin = c;
201         int cmax = c;
202         MPI_Allreduce(MPI_IN_PLACE, &cmin, 1, MPI_INT, MPI_MIN, pi->m->pals);
203         MPI_Allreduce(MPI_IN_PLACE, &cmax, 1, MPI_INT, MPI_MAX, pi->m->pals);
204         if (cmin != cmax || cmin < 0)
205             break;
206     }
207     memcpy(pi->nodeprefix, pi->nodename, j);
208     pi->nodeprefix[j]='\0';
209     if (j && pi->nodeprefix[j-1] == '-')
210         pi->nodeprefix[j-1] = '\0';
211     memset(pi->nodenumber_s, ' ', maxlen - j);
212     pi->nodenumber_s[maxlen-j]='\0';
213     memcpy(pi->nodenumber_s + maxlen-len, pi->nodename + j, len-j);
214     if (j == maxlen) {
215         // all nodes have the same name. Most probably because we're only
216         // on one node. Arrange for the display to look vaguely right.
217         memcpy(pi->nodeprefix, pi->nodename, PI_NAMELEN);
218         memcpy(pi->nodenumber_s, pi->nodename, PI_NAMELEN);
219     }
220 }
221 
display_process_grid(parallelizing_info_ptr pi)222 static void display_process_grid(parallelizing_info_ptr pi)
223 {
224     char * all_node_ids = malloc(PI_NAMELEN * pi->m->njobs);
225     char * all_node_ids2 = malloc(PI_NAMELEN * pi->m->njobs);
226     typedef int cpair[2];
227     cpair my_coords = { pi->wr[0]->jrank, pi->wr[1]->jrank, };
228     cpair * all_cpairs = malloc(sizeof(cpair) * pi->m->njobs);
229 
230     MPI_Allgather(pi->nodenumber_s, PI_NAMELEN, MPI_BYTE,
231             all_node_ids, PI_NAMELEN, MPI_BYTE, pi->m->pals);
232     MPI_Allgather(my_coords, 2, MPI_INT, all_cpairs, 2, MPI_INT, pi->m->pals);
233 
234     for(unsigned int i = 0 ; i < pi->m->njobs ; i++) {
235         int x = all_cpairs[i][0];
236         int y = all_cpairs[i][1];
237         unsigned int j = y * pi->wr[0]->njobs + x;
238         if (j >= pi->m->njobs) {
239             fprintf(stderr, "uh ?\n");
240             pi_abort(EXIT_FAILURE, pi->m);
241         }
242         memcpy(all_node_ids2 + j * PI_NAMELEN,
243                 all_node_ids + i * PI_NAMELEN,
244                 PI_NAMELEN);
245     }
246 
247     if (pi->m->jrank == 0) {
248         if (strlen(pi->nodeprefix)) {
249             printf("%d nodes on %s\n", pi->m->njobs, pi->nodeprefix);
250         } else {
251             printf("%d nodes, no common name prefix\n", pi->m->njobs);
252         }
253         char * pad = malloc(PI_NAMELEN);
254         memset(pad, ' ', PI_NAMELEN);
255         int node_id_len = strlen(pi->nodenumber_s);
256         pad[node_id_len]='\0';
257         if (node_id_len) {
258             if (pi->thr_orig[0]) {
259                 pad[node_id_len/2]='\'';
260             } else {
261                 pad[node_id_len/2]='.';
262             }
263         }
264         int mapping_error = 0;
265         for(unsigned int i = 0 ; i < pi->wr[1]->njobs ; i++) {       // i == y
266         for(unsigned int it = 0 ; it < pi->wr[1]->ncores ; it++) {
267             for(unsigned int j = 0 ; j < pi->wr[0]->njobs ; j++) {       // j == x
268             for(unsigned int jt = 0 ; jt < pi->wr[0]->ncores ; jt++) {       // jt == x
269                 char * what = pad;
270                 if (!it && !jt) {
271                     what = all_node_ids2 + PI_NAMELEN * (i * pi->wr[0]->njobs + j);
272                 }
273                 if (pi->thr_orig[0]) {
274                     /* then we have it==jt==0, but the real leader is not
275                      * this one. Check at least that we have the proper
276                      * identity (same leader)
277                      */
278                     ASSERT_ALWAYS(!it && !jt);
279                     unsigned int i0 = i - (i % pi->thr_orig[0]);
280                     unsigned int j0 = j - (j % pi->thr_orig[1]);
281                     const char * ref = all_node_ids2 + PI_NAMELEN * (i0 * pi->wr[0]->njobs + j0);
282                     if (strcmp(ref, what) != 0) {
283                         mapping_error=1;
284                     }
285                     if (i != i0 || j != j0)
286                         what = pad;
287                 }
288                 printf(" %s", what);
289             }
290             }
291             printf("\n");
292         }
293         }
294         if (mapping_error) {
295             fprintf(stderr, "Error: we have not obtained the expected node mapping; the only_mpi=1 setting is VERY sensible to the process mapping chosen by the MPI implementation. Chances are you got it wrong. A working setup with openmpi-1.8.2 is a uniqueified host file, with --mca rmaps_base_mapping_policy slot (which is the default setting).\n");
296             exit(1);
297         }
298         free(pad);
299     }
300     free(all_node_ids2);
301     free(all_node_ids);
302     free(all_cpairs);
303     MPI_Barrier(pi->m->pals);
304 }
305 
pi_init_mpilevel(parallelizing_info_ptr pi,param_list pl)306 static void pi_init_mpilevel(parallelizing_info_ptr pi, param_list pl)
307 {
308     int err;
309 
310     memset(pi, 0, sizeof(parallelizing_info));
311 
312 #ifdef HAVE_UTSNAME_H
313     {
314         struct utsname u[1];
315         uname(u);
316         memcpy(pi->nodename, u->nodename, MIN(PI_NAMELEN, sizeof(u->nodename)));
317         pi->nodename[PI_NAMELEN-1]='\0';
318         char * p = strchr(pi->nodename, '.');
319         if (p) *p='\0';
320     }
321 #else
322     strncpy(pi->nodename, "unknown", sizeof(pi->nodename));
323 #endif
324 
325 #ifndef NDEBUG
326     /* Must make sure that we have proper ASSERTS after MPI calls then. */
327     MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
328 #endif
329 
330     int mpi[2] = {1,1};
331     int thr[2] = {1,1};
332     int only_mpi = 0;
333 
334     param_list_parse_intxint(pl, "mpi", mpi);
335     param_list_parse_intxint(pl, "thr", thr);
336     param_list_parse_int(pl, "only_mpi", &only_mpi);
337 
338     unsigned int nhj = mpi[0];
339     unsigned int nvj = mpi[1];
340     unsigned int nhc = thr[0];
341     unsigned int nvc = thr[1];
342 
343     pi->m->njobs = nhj * nvj;
344     pi->m->ncores = nhc * nvc;
345     pi->m->totalsize = pi->m->njobs * pi->m->ncores;
346     // MPI_Errhandler my_eh;
347     // MPI_Comm_create_errhandler(&pi_errhandler, &my_eh);
348     // MPI_Comm_set_errhandler(MPI_COMM_WORLD, my_eh);
349 
350     if (only_mpi) {
351         int grank;
352         MPI_Comm_rank(MPI_COMM_WORLD, & grank);
353         if (grank == 0)
354             printf("Making %d independent single-threaded MPI jobs, instead of %d %d-thread jobs\n",
355                 pi->m->totalsize, pi->m->njobs, pi->m->ncores);
356         int tt = pi->m->ncores;
357 
358         int rank;
359         MPI_Comm_rank(MPI_COMM_WORLD, &rank);
360         int nnum = rank / tt;
361         int nidx = rank % tt;
362         int ni = nnum / nvj;
363         int nj = nnum % nvj;
364         int ti = nidx / nvc;
365         int tj = nidx % nvc;
366         int i = ni * nhc + ti;
367         int j = nj * nvc + tj;
368         int nrank = i * (nvj * nvc) + j;
369 
370         MPI_Comm_split(MPI_COMM_WORLD, 0, nrank, & pi->m->pals);
371         pi->m->ncores = 1;
372         pi->m->njobs = pi->m->totalsize;
373         memcpy(pi->thr_orig, thr, 2*sizeof(int));
374         mpi[0] *= thr[0]; thr[0]=1;
375         mpi[1] *= thr[1]; thr[1]=1;
376         nhj = mpi[0];
377         nvj = mpi[1];
378         nhc = thr[0];
379         nvc = thr[1];
380     } else {
381         pi->m->pals = MPI_COMM_WORLD;
382     }
383 
384     MPI_Comm_rank(pi->m->pals, (int*) & pi->m->jrank);
385 
386     int size;
387     MPI_Comm_size(pi->m->pals, & size);
388     if ((unsigned int) size != pi->m->njobs) {
389         if (pi->m->jrank == 0) {
390             fprintf(stderr, "Inconsistency -- exactly %u == %u * %u"
391                     " MPI jobs are needed -- got %u\n",
392                     pi->m->njobs, nhj, nvj, size);
393         }
394         exit(1);
395     }
396 
397     pi->m->trank = 0;
398 
399     /* Init to the simple case */
400     pi->wr[0]->njobs = nvj;
401     pi->wr[0]->ncores = nvc;
402     pi->wr[1]->njobs = nhj;
403     pi->wr[1]->ncores = nhc;
404     pi->wr[0]->totalsize = pi->wr[0]->njobs * pi->wr[0]->ncores;
405     pi->wr[1]->totalsize = pi->wr[1]->njobs * pi->wr[1]->ncores;
406 
407     // Here we want the _common row number_ ; not to be confused with the
408     // rank !
409     unsigned int jcommon[2];
410     jcommon[0] = pi->m->jrank / nvj;
411     jcommon[1] = pi->m->jrank % nvj;
412 
413     pi->wr[0]->trank = 0;
414     pi->wr[1]->trank = 0;
415 
416     for(int d = 0 ; d < 2 ; d++) {
417         // A subgroup contains all process having the same jcommon value.
418         // Therefore, we have as many horizontal MPI barriers set up as
419         // one finds MPI job numbers across a column.
420         err = MPI_Comm_split(pi->m->pals, jcommon[d],
421                 pi->m->jrank, &pi->wr[d]->pals);
422         ASSERT_ALWAYS(!err);
423         MPI_Comm_rank(pi->wr[d]->pals, (int*) &pi->wr[d]->jrank);
424     }
425 
426     get_node_number_and_prefix(pi);
427     display_process_grid(pi);
428 
429 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
430     /* prepare the cpu binding messages, and print the unique messages we
431      * receive */
432     if (!verbose_enabled(CADO_VERBOSE_PRINT_BWC_CPUBINDING)) {
433         pi->cpubinding_info = cpubinding_get_info(NULL, pl, thr);
434     } else {
435         char * cpubinding_messages;
436         pi->cpubinding_info = cpubinding_get_info(&cpubinding_messages, pl, thr);
437         int msgsize = 0;
438         if (cpubinding_messages)
439             msgsize = strlen(cpubinding_messages);
440         MPI_Allreduce(MPI_IN_PLACE, &msgsize, 1, MPI_INT, MPI_MAX, pi->m->pals);
441         msgsize++;
442         int chunksize = PI_NAMELEN + msgsize;
443         char * big_pool = malloc(pi->m->njobs * chunksize);
444         memset(big_pool, 0, pi->m->njobs * chunksize);
445         if (cpubinding_messages) {
446             int rc;
447             rc = strlcpy(big_pool + pi->m->jrank * chunksize, pi->nodename, PI_NAMELEN);
448             ASSERT_ALWAYS(rc == (int) strlen(pi->nodename));
449             rc = strlcpy(big_pool + pi->m->jrank * chunksize + PI_NAMELEN, cpubinding_messages, msgsize);
450             ASSERT_ALWAYS(rc == (int) strlen(cpubinding_messages));
451             free(cpubinding_messages);
452         }
453 
454         MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
455                 big_pool, chunksize, MPI_BYTE, pi->m->pals);
456 
457         if (pi->m->jrank == 0) {
458             // const char * refnode = NULL;
459             const char * ref = NULL;
460             for(unsigned int i = 0 ; i < pi->m->njobs ; i++) {
461                 const char * node = big_pool + i * chunksize;
462                 const char * msg = node + PI_NAMELEN;
463                 if (!ref || strcmp(msg, ref) != 0) {
464                     printf("cpubinding messages on %s:\n%s", node, msg);
465                     ref = msg;
466                     /*
467                        refnode = node;
468                        } else {
469                        printf("cpubinding messages on %s: same as %s\n", node, refnode);
470                        */
471                 }
472             }
473         }
474         free(big_pool);
475     }
476 #else
477     if (param_list_lookup_string(pl, "cpubinding")) {
478         if (pi->m->jrank == 0) {
479 #ifdef HAVE_HWLOC
480                     printf("cpubinding: parameter ignored (no C++11)\n");
481 #elif defined(HAVE_CXX11)
482                     printf("cpubinding: parameter ignored (no hwloc)\n");
483 #else
484                     printf("cpubinding: parameter ignored (no hwloc, no C++11)\n");
485 #endif
486         }
487     }
488 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
489 }
490 
491 /* How do we build a rank in pi->m from two rank in pi->wr[inner] and
492  * pi->wr->outer ?
493  */
mrank_from_tworanks(parallelizing_info_ptr pi,int d,int ji,int jo)494 static int mrank_from_tworanks(parallelizing_info_ptr pi, int d, int ji, int jo)
495 {
496     int jj[2];
497     jj[d] = ji;
498     jj[!d] = jo;
499     return jj[0] + jj[1] * pi->wr[0]->njobs;
500     /* d == 0:
501      * ji == rank when reading horizontally
502      * jo == rank when reading vertically
503      */
504 }
505 
506 
507 static parallelizing_info *
pi_grid_init(parallelizing_info_ptr pi)508 pi_grid_init(parallelizing_info_ptr pi)
509 {
510     // unsigned int nvj = pi->wr[0]->njobs;
511     // unsigned int nhj = pi->wr[1]->njobs;
512     unsigned int nvc = pi->wr[0]->ncores;
513     unsigned int nhc = pi->wr[1]->ncores;
514 
515     /* used in several places for doing snprintf */
516     char buf[20];
517 
518     // OK. So far we have something reasonable except for per-thread
519     // setup. We have to first duplicate our structure so as to
520     // specialize the pi things, and then set up agreed barriers.
521 
522     // the global barrier is not too much work.
523     pi_comm_init_pthread_things(pi->m, "main");
524 
525     // column and row barriers are more tricky.
526     parallelizing_info * grid;
527     grid = (parallelizing_info *) malloc(nhc * nvc * sizeof(parallelizing_info));
528     // setting to zero is important, since several fields are ensured as
529     // null.
530     memset(grid, 0, nhc * nvc * sizeof(parallelizing_info));
531 
532     char commname[64];
533 #ifndef MPI_LIBRARY_MT_CAPABLE
534     MPI_Comm_set_name(pi->m->pals, "main");
535     snprintf(commname, sizeof(commname), "rows%u-%u",
536             pi->wr[1]->jrank * pi->wr[1]->ncores,
537             (pi->wr[1]->jrank + 1) * pi->wr[1]->ncores - 1);
538     MPI_Comm_set_name(pi->wr[0]->pals, commname);
539     snprintf(commname, sizeof(commname), "cols%u-%u",
540             pi->wr[0]->jrank * pi->wr[0]->ncores,
541             (pi->wr[0]->jrank + 1) * pi->wr[0]->ncores - 1);
542     MPI_Comm_set_name(pi->wr[1]->pals, commname);
543 #endif
544 
545     for(unsigned int k = 0 ; k < nhc * nvc ; k++) {
546         parallelizing_info_ptr e = grid[k];
547         memcpy(e, pi, sizeof(parallelizing_info));
548         unsigned int k0 = k % nvc;
549         unsigned int k1 = k / nvc;
550         e->m->trank = k;
551         e->wr[0]->trank = k0;
552         e->wr[1]->trank = k1;
553 #ifdef  MPI_LIBRARY_MT_CAPABLE
554         MPI_Comm_dup(pi->m->pals, &(e->m->pals));
555         MPI_Comm_dup(pi->wr[0]->pals, &(e->wr[0]->pals));
556         MPI_Comm_dup(pi->wr[1]->pals, &(e->wr[1]->pals));
557 
558         /* give a name */
559         snprintf(commname, sizeof(commname), "main.%u", k);
560         MPI_Comm_set_name(e->m->pals, commname);
561         snprintf(commname, sizeof(commname), "row%u.%u",
562                 e->wr[1]->jrank * e->wr[1]->ncores + e->wr[1]->trank,
563                 e->wr[0]->trank);
564         MPI_Comm_set_name(e->wr[0]->pals, commname);
565         snprintf(commname, sizeof(commname), "col%u.%u",
566                 e->wr[0]->jrank * e->wr[0]->ncores + e->wr[0]->trank,
567                 e->wr[1]->trank);
568         MPI_Comm_set_name(e->wr[1]->pals, commname);
569 #endif  /* MPI_LIBRARY_MT_CAPABLE */
570     }
571 
572     // row barriers.
573     // we've got pi->wr[1]->ncores cores working on separate rows (that's
574     // the number of cores we encounter when walking down one column). So
575     // each row leader is at index c * pi->wr[0]->ncores, because
576     // pi->wr[0]->ncores is the number of cores working on separate
577     // columns.
578     //
579     for(unsigned int c = 0 ; c < pi->wr[1]->ncores ; c++) {
580         snprintf(buf, sizeof(buf), "r%u",
581                 pi->wr[1]->jrank * pi->wr[1]->ncores + c);
582         unsigned int Nc = c * pi->wr[0]->ncores;
583         pi_comm_ptr leader = grid[Nc]->wr[0];
584         pi_comm_init_pthread_things(leader, buf);
585         // replicate.
586         for(unsigned int k = 0 ; k < pi->wr[0]->ncores ; k++) {
587             grid[k + Nc]->wr[0]->th = leader->th;
588         }
589     }
590 
591     // column barriers.
592     for(unsigned int c = 0 ; c < pi->wr[0]->ncores ; c++) {
593         snprintf(buf, sizeof(buf), "c%u",
594                 pi->wr[0]->jrank * pi->wr[0]->ncores + c);
595         pi_comm_ptr leader = grid[c]->wr[1];
596         pi_comm_init_pthread_things(leader, buf);
597         // replicate.
598         for(unsigned int k = 0 ; k < pi->wr[1]->ncores ; k++) {
599             unsigned int Nk = k * pi->wr[0]->ncores;
600             grid[c + Nk]->wr[1]->th = leader->th;
601         }
602     }
603 
604     return grid;
605 }
606 
607 #if 0
608 /* TODO: rewrite this using grid_print instead */
609 static void
610 pi_grid_print_sketch(parallelizing_info_ptr pi, parallelizing_info * grid)
611 {
612     char buf[20];
613     for(unsigned int ij = 0 ; ij < pi->wr[1]->njobs ; ij++) {
614         // note that at this point, we're not yet MT.
615         int err = MPI_Barrier(pi->m->pals);
616         ASSERT_ALWAYS(!err);
617 
618         if (pi->wr[1]->jrank != ij) continue;
619 
620         if (pi->m->jrank == 0)
621             print_several(pi->wr[0]->njobs,pi->wr[0]->ncores, '-', '+', 12);
622 
623         for(unsigned int it = 0 ; it < pi->wr[1]->ncores ; it++) {
624             if (pi->wr[0]->jrank == 0) printf("|");
625             for(unsigned int jj = 0 ; jj < pi->wr[0]->njobs ; jj++) {
626                 err = MPI_Barrier(pi->wr[0]->pals);
627                 ASSERT_ALWAYS(!err);
628                 for(unsigned int jt = 0 ; jt < pi->wr[0]->ncores ; jt++) {
629                     parallelizing_info_srcptr tpi;
630                     tpi = grid[it*pi->wr[0]->ncores+jt];
631                     snprintf(buf, sizeof(buf), "J%uT%u%s%s",
632                             tpi->m->jrank,
633                             tpi->m->trank,
634                             tpi->wr[0]->th->desc,
635                             tpi->wr[1]->th->desc);
636                     err = MPI_Bcast(buf, sizeof(buf),
637                             MPI_BYTE, jj, pi->wr[0]->pals);
638                     ASSERT_ALWAYS(!err);
639                     int fence = jt == pi->wr[0]->ncores - 1;
640                     if (pi->wr[0]->jrank == 0)
641                         printf("%-12s%c", buf, fence ? '|' : ' ');
642                 }
643             }
644             if (pi->wr[0]->jrank == 0) printf("\n");
645         }
646         if (pi->wr[1]->jrank == ij && pi->wr[0]->jrank == 0)
647             print_several(pi->wr[0]->njobs,pi->wr[0]->ncores, '-', '+', 12);
648     }
649 }
650 #endif
651 
652 /* given a grid of processes which has beed set up with pi_grid_init,
653  * start the processes.
654  *
655  * The ngrids argument is here to accomodate the interleaved case.
656  */
pi_go_mt_now(parallelizing_info ** grids,unsigned int ngrids,unsigned int n,void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)657 static void pi_go_mt_now(
658         parallelizing_info ** grids,
659         unsigned int ngrids,
660         unsigned int n,
661         void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
662         param_list pl,
663         void * arg)
664 {
665     my_pthread_t * tgrid;
666     struct pi_go_helper_s * helper_grid;
667 
668     tgrid = (my_pthread_t *) malloc(n * ngrids * sizeof(my_pthread_t));
669     helper_grid = malloc(n * ngrids * sizeof(struct pi_go_helper_s));
670 
671     for(unsigned int k = 0 ; k < n * ngrids ; k++) {
672         helper_grid[k].fcn = fcn;
673         helper_grid[k].pl = pl;
674         helper_grid[k].arg = arg;
675         helper_grid[k].p = (parallelizing_info_ptr) grids[k/n] + (k%n);
676         my_pthread_create(tgrid+k, NULL,
677                 (pthread_callee_t) pi_go_helper_func, helper_grid+k);
678     }
679 
680     // nothing done here. We're the main job.
681     for(unsigned int k = 0 ; k < n * ngrids ; k++) {
682         my_pthread_join(tgrid[k], NULL);
683     }
684     free(helper_grid);
685     free(tgrid);
686 }
687 
pi_grid_clear(parallelizing_info_ptr pi,parallelizing_info * grid)688 static void pi_grid_clear(parallelizing_info_ptr pi, parallelizing_info * grid)
689 {
690     // destroy row barriers.
691     for(unsigned int c = 0 ; c < pi->wr[1]->ncores ; c++) {
692         unsigned int Nc = c * pi->wr[0]->ncores;
693         pi_comm_destroy_pthread_things(grid[Nc]->wr[0]);
694     }
695     // destroy column barriers
696     for(unsigned int c = 0 ; c < pi->wr[0]->ncores ; c++) {
697         pi_comm_destroy_pthread_things(grid[c]->wr[1]);
698     }
699 #ifdef  MPI_LIBRARY_MT_CAPABLE
700     for(unsigned int k = 0 ; k < pi->m->ncores ; k++) {
701         MPI_Comm_free(&grid[k]->m->pals);
702         MPI_Comm_free(&grid[k]->wr[0]->pals);
703         MPI_Comm_free(&grid[k]->wr[1]->pals);
704     }
705 #endif  /* MPI_LIBRARY_MT_CAPABLE */
706 
707     /* Don't do pi_log_clear, just like we haven't done pi_log_init. The
708      * sub-threads may define it, in which case it's their responsibility
709      * to clear the thing.
710      */
711 
712     free(grid);
713 }
714 
pi_clear_mpilevel(parallelizing_info_ptr pi)715 static void pi_clear_mpilevel(parallelizing_info_ptr pi)
716 {
717 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
718     int thr[2] = { pi->wr[0]->ncores, pi->wr[1]->ncores };
719     cpubinding_free_info(pi->cpubinding_info, thr);
720 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
721 
722     pi_comm_destroy_pthread_things(pi->m);
723 
724     for(int d = 0 ; d < 2 ; d++) {
725         MPI_Comm_free(&pi->wr[d]->pals);
726     }
727     if (pi->m->pals != MPI_COMM_WORLD)
728         MPI_Comm_free(&pi->m->pals);
729 
730     // MPI_Errhandler_free(&my_eh);
731 }
732 
733 #if 0
734 static void shout_going_mt(pi_comm_ptr m)
735 {
736     int err = MPI_Barrier(m->pals);
737     ASSERT_ALWAYS(!err);
738     if (m->jrank == m->njobs - 1)
739         printf("going multithread now\n");
740     err = MPI_Barrier(m->pals);
741     ASSERT_ALWAYS(!err);
742 }
743 #endif
744 
pi_go_inner_not_interleaved(void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)745 static void pi_go_inner_not_interleaved(
746         void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
747         param_list pl,
748         void * arg)
749 {
750     parallelizing_info pi;
751     parallelizing_info * grid;
752     pi_init_mpilevel(pi, pl);
753     grid = pi_grid_init(pi);
754     // shout_going_mt(pi->m);
755     // pi_grid_print_sketch(pi, grids[0]);
756     pi_go_mt_now(&grid, 1, pi->m->ncores, fcn, pl, arg);
757     pi_grid_clear(pi, grid);
758     pi_clear_mpilevel(pi);
759 }
760 
pi_go_inner_interleaved(void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)761 static void pi_go_inner_interleaved(
762         void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
763         param_list pl,
764         void * arg)
765 {
766     parallelizing_info pi[2];
767     parallelizing_info * grids[2];
768 
769     pi_init_mpilevel(pi[0], pl);
770     memcpy(pi[1], pi[0], sizeof(parallelizing_info));
771 
772     pi[0]->interleaved = malloc(sizeof(pi_interleaving));
773     pi[0]->interleaved->idx = 0;
774 
775     pi[1]->interleaved = malloc(sizeof(pi_interleaving));
776     pi[1]->interleaved->idx = 1;
777 
778     /* Now the whole point is that it's the _same_ barrier ! */
779     my_pthread_barrier_t * b = malloc(sizeof(my_pthread_barrier_t));
780     my_pthread_barrier_init(b, NULL, 2 * pi[0]->m->ncores);
781     pi[0]->interleaved->b = b;
782     pi[1]->interleaved->b = b;
783 
784     pi_dictionary d;
785     pi_dictionary_init(d);
786     pi[0]->dict = d;
787     pi[1]->dict = d;
788 
789     grids[0] = pi_grid_init(pi[0]);
790     grids[1] = pi_grid_init(pi[1]);
791 
792     // shout_going_mt(pi[0]->m);
793     // pi_grid_print_sketch(pi, grids[0]);
794 
795     pi_go_mt_now(grids, 2, pi[0]->m->ncores, fcn, pl, arg);
796 
797     pi_grid_clear(pi[0], grids[0]);
798     pi_grid_clear(pi[1], grids[1]);
799 
800     my_pthread_barrier_destroy(b);
801     pi_dictionary_clear(d);
802 
803     free(pi[0]->interleaved);
804     free(pi[1]->interleaved);
805 
806     pi_clear_mpilevel(pi[0]);
807     /* pi[1] is a simple copy, it doesn't have to be cleared. */
808 }
809 
pi_dictionary_init(pi_dictionary_ptr d)810 void pi_dictionary_init(pi_dictionary_ptr d)
811 {
812     my_pthread_rwlock_init(d->m, NULL);
813     d->e = NULL;
814 }
815 
pi_dictionary_clear(pi_dictionary_ptr d)816 void pi_dictionary_clear(pi_dictionary_ptr d)
817 {
818     /* last grab of the lock -- altough it would be preferred if we
819      * weren't mt of course ! */
820     my_pthread_rwlock_wrlock(d->m);
821     pi_dictionary_entry_ptr ne = d->e;
822     for( ; ne ; ) {
823         pi_dictionary_entry_ptr nne = ne->next;
824         free(ne);
825         ne = nne;
826     }
827     d->e = NULL;
828     my_pthread_rwlock_unlock(d->m);
829     my_pthread_rwlock_destroy(d->m);
830 }
831 
pi_store_generic(parallelizing_info_ptr pi,unsigned long key,unsigned long who,void * value)832 void pi_store_generic(parallelizing_info_ptr pi, unsigned long key, unsigned long who, void * value)
833 {
834     pi_dictionary_ptr d = pi->dict;
835     ASSERT_ALWAYS(d != NULL);
836     pi_dictionary_entry_ptr n = malloc(sizeof(pi_dictionary_entry));
837     n->key = key;
838     n->who = who;
839     n->value = value;
840     my_pthread_rwlock_wrlock(d->m);
841     n->next = d->e;
842     d->e = n;
843     my_pthread_rwlock_unlock(d->m);
844 }
845 
pi_load_generic(parallelizing_info_ptr pi,unsigned long key,unsigned long who)846 void * pi_load_generic(parallelizing_info_ptr pi, unsigned long key, unsigned long who)
847 {
848     pi_dictionary_ptr d = pi->dict;
849     ASSERT_ALWAYS(d != NULL);
850     void * r = NULL;
851     my_pthread_rwlock_rdlock(d->m);
852     pi_dictionary_entry_ptr e = d->e;
853     for( ; e ; e = e->next) {
854         if (e->key == key && e->who == who) {
855             r = e->value;
856             break;
857         }
858     }
859     my_pthread_rwlock_unlock(d->m);
860     return r;
861 }
862 
parallelizing_info_decl_usage(param_list pl)863 void parallelizing_info_decl_usage(param_list pl)/*{{{*/
864 {
865     param_list_decl_usage(pl, "mpi", "number of MPI nodes across which the execution will span, with mesh dimensions");
866     param_list_decl_usage(pl, "thr", "number of threads (on each node) for the program, with mesh dimensions");
867     param_list_decl_usage(pl, "interleaving", "whether we should start two interleaved sets of threads. Only supported by some programs, has no effect on others.");
868     param_list_decl_usage(pl, "only_mpi", "replace threads by distinct MPI jobs");
869     param_list_decl_usage(pl, "debug-parallel-bwc", "enable heavy debugging messages for desperate cases");
870 
871 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
872     cpubinding_decl_usage(pl);
873 #else
874     param_list_decl_usage(pl, "cpubinding", "(ignored, no sufficient sotftware support)");
875 #endif
876 }
877 /*}}}*/
878 
parallelizing_info_lookup_parameters(param_list_ptr pl)879 void parallelizing_info_lookup_parameters(param_list_ptr pl)/*{{{*/
880 {
881     /* These will all be looked later (within this file, though).
882      */
883     param_list_lookup_string(pl, "mpi");
884     param_list_lookup_string(pl, "thr");
885     param_list_lookup_string(pl, "interleaving");
886     param_list_lookup_string(pl, "only_mpi");
887     param_list_lookup_string(pl, "debug-parallel-bwc");
888 
889 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
890     cpubinding_lookup_parameters(pl);
891 #else
892     param_list_lookup_string(pl, "cpubinding");
893 #endif
894 }
895 /*}}}*/
896 
897 
pi_go(void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)898 void pi_go(
899         void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
900         param_list pl,
901         void * arg)
902 {
903     int interleaving = 0;
904     param_list_parse_int(pl, "interleaving", &interleaving);
905     if (interleaving) {
906         pi_go_inner_interleaved(fcn, pl, arg);
907     } else {
908         pi_go_inner_not_interleaved(fcn, pl, arg);
909     }
910 }
911 
912 
pi_log_init(pi_comm_ptr wr)913 void pi_log_init(pi_comm_ptr wr)
914 {
915     struct pi_log_book * lb = malloc(sizeof(struct pi_log_book));
916     memset(lb, 0, sizeof(struct pi_log_book));
917     wr->log_book = lb;
918 
919     char commname[MPI_MAX_OBJECT_NAME];
920     int namelen = sizeof(commname);
921     MPI_Comm_get_name(wr->pals, commname, &namelen);
922 
923     if (wr->jrank == 0 && wr->trank == 0) {
924         printf("Enabled logging for %s:%s\n", commname, wr->th->desc);
925     }
926 }
927 
pi_log_clear(pi_comm_ptr wr)928 void pi_log_clear(pi_comm_ptr wr)
929 {
930     if (wr->log_book)
931         free(wr->log_book);
932     wr->log_book = (void*) 0xdeadbeef;
933 }
934 
pi_log_op(pi_comm_ptr wr,const char * fmt,...)935 void pi_log_op(pi_comm_ptr wr, const char * fmt, ...)
936 {
937     va_list ap;
938     struct pi_log_book * lb = wr->log_book;
939     if (!lb)
940         return;
941 
942     va_start(ap, fmt);
943 
944     ASSERT_ALWAYS(lb->next < PI_LOG_BOOK_ENTRIES);
945 
946     struct pi_log_entry * e = &(lb->t[lb->next]);
947 
948     gettimeofday(e->tv, NULL);
949 
950     vsnprintf(e->what, sizeof(e->what), fmt, ap);
951     if (wr->ncores > 1)
952         fprintf(stderr, "%s:%d:%d %s\n", wr->th->desc, wr->jrank, wr->trank, e->what);
953     va_end(ap);
954 
955     lb->next++;
956     lb->next %= PI_LOG_BOOK_ENTRIES;
957     lb->hsize++;
958 }
959 
pi_log_print_backend(pi_comm_ptr wr,const char * myname,char ** strings,int * n,int alloc)960 static void pi_log_print_backend(pi_comm_ptr wr, const char * myname, char ** strings, int * n, int alloc)
961 {
962     struct pi_log_book * lb = wr->log_book;
963     if (!lb) return;
964 
965     ASSERT_ALWAYS(lb->next < PI_LOG_BOOK_ENTRIES);
966 
967     int h = lb->hsize;
968     if (h >= PI_LOG_BOOK_ENTRIES) {
969         h = PI_LOG_BOOK_ENTRIES;
970     }
971     int i0 = lb->next - h;
972     if (i0 < 0)
973         i0 += PI_LOG_BOOK_ENTRIES;
974     char commname[MPI_MAX_OBJECT_NAME];
975     int namelen = sizeof(commname);
976     MPI_Comm_get_name(wr->pals, commname, &namelen);
977 
978     int rc;
979 
980     for(int i = i0 ; h-- ; ) {
981         struct pi_log_entry * e = &(lb->t[i]);
982 
983         ASSERT_ALWAYS(*n < alloc);
984         rc = asprintf(&(strings[*n]), "%" PRIu64 ".%06u (%s) %s %s",
985                 (uint64_t) e->tv->tv_sec,
986                 (unsigned int) e->tv->tv_usec,
987                 myname,
988                 e->what,
989                 commname);
990         ASSERT_ALWAYS(rc != -1);
991         ++*n;
992 
993         i++;
994         if (i == PI_LOG_BOOK_ENTRIES)
995             i = 0;
996     }
997 }
998 
pi_log_print(pi_comm_ptr wr)999 void pi_log_print(pi_comm_ptr wr)
1000 {
1001     /* FIXME stdout or stderr ? */
1002     int alloc = PI_LOG_BOOK_ENTRIES;
1003     char ** strings = malloc(alloc * sizeof(char*));
1004     int n = 0;
1005     pi_log_print_backend(wr, "", strings, &n, alloc);
1006     for(int i = 0 ; i < n ; i++) {
1007         puts(strings[i]);
1008         free(strings[i]);
1009     }
1010     fflush(stdout);
1011     free(strings);
1012 }
1013 
1014 typedef int (*sortfunc_t)(const void *, const void *);
1015 
p_strcmp(char const * const * a,char const * const * b)1016 int p_strcmp(char const * const * a, char const * const * b)
1017 {
1018     /* Input is formatted so that sorting makes sense. */
1019     return strcmp(*a, *b);
1020 }
1021 
pi_log_print_all(parallelizing_info_ptr pi)1022 void pi_log_print_all(parallelizing_info_ptr pi)
1023 {
1024     int alloc = 3 * PI_LOG_BOOK_ENTRIES;
1025     char ** strings = malloc(alloc * sizeof(char*));
1026     int n = 0;
1027     char * myname;
1028     int rc;
1029     rc = asprintf(&myname, "%s%s", pi->wr[0]->th->desc, pi->wr[1]->th->desc);
1030     ASSERT_ALWAYS(rc != -1);
1031 
1032     pi_log_print_backend(pi->m, myname, strings, &n, alloc);
1033     pi_log_print_backend(pi->wr[0], myname, strings, &n, alloc);
1034     pi_log_print_backend(pi->wr[1], myname, strings, &n, alloc);
1035     qsort(strings, n, sizeof(char*), (sortfunc_t) &p_strcmp);
1036 
1037     for(int i = 0 ; i < n ; i++) {
1038         puts(strings[i]);
1039         free(strings[i]);
1040     }
1041     free(myname);
1042     free(strings);
1043 }
1044 
1045 /* {{{ predefined types and operations */
1046 static struct pi_datatype_s pi_predefined_types[] = {
1047     { MPI_INT,	                NULL, sizeof(int) },
1048     { MPI_DOUBLE,	        NULL, sizeof(double) },
1049     { MPI_BYTE,	                NULL, 1 },
1050     { MPI_UNSIGNED,	        NULL, sizeof(unsigned) },
1051     { MPI_UNSIGNED_LONG,	NULL, sizeof(unsigned long) },
1052     { MPI_UNSIGNED_LONG_LONG,	NULL, sizeof(unsigned long long) },
1053     { MPI_LONG,	                NULL, sizeof(long) },
1054 };
1055 
1056 /* TODO: mpi-3.0 introduced (at last) MPI_UINT64_T and friends. There are
1057  * several places where this can come in handy. So once we set our mind
1058  * on requiring mpi-3.0, we can do some simplifications here and there.
1059  */
1060 pi_datatype_ptr BWC_PI_INT              = pi_predefined_types + 0;
1061 pi_datatype_ptr BWC_PI_DOUBLE           = pi_predefined_types + 1;
1062 pi_datatype_ptr BWC_PI_BYTE             = pi_predefined_types + 2;
1063 pi_datatype_ptr BWC_PI_UNSIGNED         = pi_predefined_types + 3;
1064 pi_datatype_ptr BWC_PI_UNSIGNED_LONG    = pi_predefined_types + 4;
1065 pi_datatype_ptr BWC_PI_UNSIGNED_LONG_LONG    = pi_predefined_types + 5;
1066 pi_datatype_ptr BWC_PI_LONG             = pi_predefined_types + 6;
1067 
1068 char statically_assert_that_size_t_is_either_ulong_or_ulonglong[((sizeof(size_t) == sizeof(unsigned long)) || (sizeof(size_t) == sizeof(unsigned long long))) ? 1 : -1];
1069 pi_datatype_ptr BWC_PI_SIZE_T = pi_predefined_types + 4 + (sizeof(size_t) == sizeof(unsigned long long));
1070 
1071 
1072 
1073 struct pi_op_s BWC_PI_MIN[1]	= { { .stock = MPI_MIN, } };
1074 struct pi_op_s BWC_PI_MAX[1]	= { { .stock = MPI_MAX, } };
1075 
1076 static void pi_dispatch_op_add_stock(void *, void *, int *, MPI_Datatype *);
1077 static void pi_dispatch_op_add_custom(void *, void *, size_t, pi_datatype_ptr);
1078 
1079 struct pi_op_s BWC_PI_SUM[1]	= { {
1080         .stock = MPI_SUM,
1081         .f_stock = pi_dispatch_op_add_stock,
1082         .f_custom = pi_dispatch_op_add_custom,
1083     } };
1084 struct pi_op_s BWC_PI_BXOR[1]	= { { .stock = MPI_BXOR, } };
1085 struct pi_op_s BWC_PI_BAND[1]	= { { .stock = MPI_BAND, } };
1086 struct pi_op_s BWC_PI_BOR[1]	= { { .stock = MPI_BOR, } };
1087 
1088 struct reduction_function {
1089     MPI_Datatype datatype;
1090     MPI_Op op;
1091     thread_reducer_t f;
1092 };
1093 
1094 /* {{{ predefined reduction functions */
1095 
1096 /* always: a == my value, b == the other value */
reducer_byte_min(const unsigned char * b,unsigned char * a,size_t s)1097 static void reducer_byte_min(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_byte_max(const unsigned char * b,unsigned char * a,size_t s)1098 static void reducer_byte_max(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_byte_sum(const unsigned char * b,unsigned char * a,size_t s)1099 static void reducer_byte_sum(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_byte_bxor(const unsigned char * b,unsigned char * a,size_t s)1100 static void reducer_byte_bxor(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) *a ^= *b; }
reducer_int_min(const int * b,int * a,size_t s)1101 static void reducer_int_min(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_int_max(const int * b,int * a,size_t s)1102 static void reducer_int_max(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_int_sum(const int * b,int * a,size_t s)1103 static void reducer_int_sum(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_int_band(const int * b,int * a,size_t s)1104 static void reducer_int_band(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) *a &= *b; }
reducer_int_bor(const int * b,int * a,size_t s)1105 static void reducer_int_bor(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) *a |= *b; }
reducer_uint_min(const unsigned int * b,unsigned int * a,size_t s)1106 static void reducer_uint_min(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_uint_max(const unsigned int * b,unsigned int * a,size_t s)1107 static void reducer_uint_max(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_uint_sum(const unsigned int * b,unsigned int * a,size_t s)1108 static void reducer_uint_sum(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_uint_band(const unsigned int * b,unsigned int * a,size_t s)1109 static void reducer_uint_band(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) *a &= *b; }
reducer_uint_bor(const unsigned int * b,unsigned int * a,size_t s)1110 static void reducer_uint_bor(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) *a |= *b; }
reducer_double_min(const double * b,double * a,size_t s)1111 static void reducer_double_min(const double * b, double * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_double_max(const double * b,double * a,size_t s)1112 static void reducer_double_max(const double * b, double * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_double_sum(const double * b,double * a,size_t s)1113 static void reducer_double_sum(const double * b, double * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_long_min(const long * b,long * a,size_t s)1114 static void reducer_long_min(const long * b, long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_long_max(const long * b,long * a,size_t s)1115 static void reducer_long_max(const long * b, long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_long_sum(const long * b,long * a,size_t s)1116 static void reducer_long_sum(const long * b, long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_longlong_min(const long long * b,long long * a,size_t s)1117 static void reducer_longlong_min(const long long * b, long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_longlong_max(const long long * b,long long * a,size_t s)1118 static void reducer_longlong_max(const long long * b, long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_longlong_sum(const long long * b,long long * a,size_t s)1119 static void reducer_longlong_sum(const long long * b, long long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_ulong_min(const unsigned long * b,unsigned long * a,size_t s)1120 static void reducer_ulong_min(const unsigned long * b, unsigned long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_ulong_max(const unsigned long * b,unsigned long * a,size_t s)1121 static void reducer_ulong_max(const unsigned long * b, unsigned long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_ulong_sum(const unsigned long * b,unsigned long * a,size_t s)1122 static void reducer_ulong_sum(const unsigned long * b, unsigned long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_ulonglong_min(const unsigned long long * b,unsigned long long * a,size_t s)1123 static void reducer_ulonglong_min(const unsigned long long * b, unsigned long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_ulonglong_max(const unsigned long long * b,unsigned long long * a,size_t s)1124 static void reducer_ulonglong_max(const unsigned long long * b, unsigned long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_ulonglong_sum(const unsigned long long * b,unsigned long long * a,size_t s)1125 static void reducer_ulonglong_sum(const unsigned long long * b, unsigned long long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
1126 
1127 struct reduction_function predefined_functions[] = {
1128     { MPI_BYTE,          MPI_MIN,  (thread_reducer_t) reducer_byte_min, },
1129     { MPI_BYTE,          MPI_MAX,  (thread_reducer_t) reducer_byte_max, },
1130     { MPI_BYTE,          MPI_SUM,  (thread_reducer_t) reducer_byte_sum, },
1131     { MPI_BYTE,          MPI_BXOR, (thread_reducer_t) reducer_byte_bxor, },
1132     { MPI_INT,           MPI_MIN,  (thread_reducer_t) reducer_int_min, },
1133     { MPI_INT,           MPI_MAX,  (thread_reducer_t) reducer_int_max, },
1134     { MPI_INT,           MPI_SUM,  (thread_reducer_t) reducer_int_sum, },
1135     { MPI_INT,           MPI_BAND, (thread_reducer_t) reducer_int_band, },
1136     { MPI_INT,           MPI_BOR,  (thread_reducer_t) reducer_int_bor, },
1137     { MPI_UNSIGNED,      MPI_MIN,  (thread_reducer_t) reducer_uint_min, },
1138     { MPI_UNSIGNED,      MPI_MAX,  (thread_reducer_t) reducer_uint_max, },
1139     { MPI_UNSIGNED,      MPI_SUM,  (thread_reducer_t) reducer_uint_sum, },
1140     { MPI_UNSIGNED,      MPI_BAND, (thread_reducer_t) reducer_uint_band, },
1141     { MPI_UNSIGNED,      MPI_BOR,  (thread_reducer_t) reducer_uint_bor, },
1142     { MPI_DOUBLE,        MPI_MIN,  (thread_reducer_t) reducer_double_min, },
1143     { MPI_DOUBLE,        MPI_MAX,  (thread_reducer_t) reducer_double_max, },
1144     { MPI_DOUBLE,        MPI_SUM,  (thread_reducer_t) reducer_double_sum, },
1145     { MPI_LONG,          MPI_MIN,  (thread_reducer_t) reducer_long_min, },
1146     { MPI_LONG,          MPI_MAX,  (thread_reducer_t) reducer_long_max, },
1147     { MPI_LONG,          MPI_SUM,  (thread_reducer_t) reducer_long_sum, },
1148     { MPI_UNSIGNED_LONG, MPI_MIN,  (thread_reducer_t) reducer_ulong_min, },
1149     { MPI_UNSIGNED_LONG, MPI_MAX,  (thread_reducer_t) reducer_ulong_max, },
1150     { MPI_UNSIGNED_LONG, MPI_SUM,  (thread_reducer_t) reducer_ulong_sum, },
1151     { MPI_LONG_LONG,          MPI_MIN,  (thread_reducer_t) reducer_longlong_min, },
1152     { MPI_LONG_LONG,          MPI_MAX,  (thread_reducer_t) reducer_longlong_max, },
1153     { MPI_LONG_LONG,          MPI_SUM,  (thread_reducer_t) reducer_longlong_sum, },
1154     { MPI_UNSIGNED_LONG_LONG, MPI_MIN,  (thread_reducer_t) reducer_ulonglong_min, },
1155     { MPI_UNSIGNED_LONG_LONG, MPI_MAX,  (thread_reducer_t) reducer_ulonglong_max, },
1156     { MPI_UNSIGNED_LONG_LONG, MPI_SUM,  (thread_reducer_t) reducer_ulonglong_sum, },
1157     { 0, 0, NULL, },
1158 };
1159 
lookup_reduction_function(MPI_Datatype datatype,MPI_Op op)1160 static thread_reducer_t lookup_reduction_function(MPI_Datatype datatype, MPI_Op op)
1161 {
1162     struct reduction_function * x = predefined_functions;
1163     for( ; x->f && ! (x->datatype == datatype && x->op == op) ; x++) ;
1164     if (!x->f) {
1165         fprintf(stderr, "bad reduction function\n");
1166         abort();
1167     }
1168     return x->f;
1169 }
1170 
1171 /* }}} */
1172 
1173 /* }}} */
1174 
1175 /* {{{ user-defined (mpfq) types and operations */
1176 
1177 static int pi_mpi_attribute_key;
1178 
1179 /* we need to present the mpi implementation with proper user functions
1180  * for operating on the mpfq types. Do so only for addition, since it's
1181  * the only one we use eventually.
1182  *
1183  * Of course we could conceivably do so for more operations.
1184  */
pi_dispatch_op_add_stock(void * invec,void * inoutvec,int * len,MPI_Datatype * datatype)1185 static void pi_dispatch_op_add_stock(void *invec, void *inoutvec, int *len, MPI_Datatype *datatype)
1186 {
1187     /* get the mpfq_vbase_ptr associated to the datatype */
1188     int got_it;
1189     mpfq_vbase_ptr abase;
1190     MPI_Type_get_attr(*datatype, pi_mpi_attribute_key, (void*) &abase, &got_it);
1191     ASSERT(got_it);
1192     abase->vec_add(abase, inoutvec, inoutvec, invec, *len);
1193 }
pi_dispatch_op_add_custom(void * invec,void * inoutvec,size_t len,pi_datatype_ptr datatype)1194 static void pi_dispatch_op_add_custom(void *invec, void *inoutvec, size_t len, pi_datatype_ptr datatype)
1195 {
1196     /* FIXME: mpfq's vec_add should really take size_t arguments */
1197     ASSERT_ALWAYS(len < (size_t) UINT_MAX);
1198     datatype->abase->vec_add(datatype->abase, inoutvec, inoutvec, invec, len);
1199 }
1200 
1201 /* XXX must be called in single threaded context  */
pi_init_attribute_things()1202 static void pi_init_attribute_things()
1203 {
1204     MPI_Type_create_keyval(MPI_TYPE_DUP_FN, MPI_TYPE_NULL_DELETE_FN, &pi_mpi_attribute_key, NULL);
1205     MPI_Op_create(BWC_PI_SUM->f_stock, 1, &BWC_PI_SUM->custom);
1206 }
1207 
1208 /* XXX must be called in single threaded context  */
pi_clear_attribute_things()1209 static void pi_clear_attribute_things()
1210 {
1211     MPI_Op_free(&BWC_PI_SUM->custom);
1212     MPI_Type_free_keyval(&pi_mpi_attribute_key);
1213 }
1214 
pi_alloc_mpfq_datatype(parallelizing_info_ptr pi,mpfq_vbase_ptr abase)1215 pi_datatype_ptr pi_alloc_mpfq_datatype(parallelizing_info_ptr pi, mpfq_vbase_ptr abase)
1216 {
1217     pi_datatype_ptr ptr = shared_malloc_set_zero(pi->m, sizeof(struct pi_datatype_s));
1218     if (pi->m->trank == 0) {
1219         ptr->abase = abase;
1220         ptr->item_size = abase->vec_elt_stride(abase, 1);
1221         MPI_Type_contiguous(ptr->item_size, MPI_BYTE, &ptr->datatype);
1222         MPI_Type_commit(&ptr->datatype);
1223         MPI_Type_set_attr(ptr->datatype, pi_mpi_attribute_key, abase);
1224     }
1225     serialize_threads(pi->m);
1226     return ptr;
1227 }
1228 
pi_free_mpfq_datatype(parallelizing_info_ptr pi,pi_datatype_ptr ptr)1229 void pi_free_mpfq_datatype(parallelizing_info_ptr pi, pi_datatype_ptr ptr)
1230 {
1231     serialize_threads(pi->m);
1232     if (pi->m->trank == 0) {
1233         MPI_Type_delete_attr(ptr->datatype, pi_mpi_attribute_key);
1234         MPI_Type_free(&ptr->datatype);
1235     }
1236     shared_free(pi->m, ptr);
1237 }
1238 
1239 /* This may *not* use MPI_Reduce_local, because we might not have an MPI
1240  * implementation in the first place, just a set of placeholers. So it is
1241  * important that we find the operation to perform by ourselves
1242  */
pi_reduce_local(void * inbuf,void * inoutbuf,size_t count,pi_datatype_ptr datatype,pi_op_ptr op)1243 void pi_reduce_local(void *inbuf, void *inoutbuf, size_t count,
1244                     pi_datatype_ptr datatype, pi_op_ptr op)
1245 {
1246     if (datatype->abase) {
1247         if (op->f_custom) {
1248             op->f_custom(inbuf, inoutbuf, count, datatype);
1249         } else {
1250             fprintf(stderr, "undefined operation on mpfq type\n");
1251             abort();
1252         }
1253     } else {
1254         thread_reducer_t f = lookup_reduction_function(datatype->datatype, op->stock);
1255         (*f)(inbuf, inoutbuf, count);
1256     }
1257 }
1258 
1259 /* }}} */
1260 
1261 /* {{{ collective operations with pi_comm_ptrs
1262  *
1263  * we provide a calling interface which is similar to mpi.
1264  *
1265  * we inherit the same crappy notion of datatypes, and use the same
1266  * MPI_Datatype data for it. When not building with MPI, our "fake MPI"
1267  * headers provides magic constants which are sufficient to our needs.
1268  *
1269  * We define the following functions:
1270  *
1271  *      {pi,pi_thread}_{bcast,allreduce,data_eq}
1272  *
1273  * where data_eq is a collective operation returning true or false
1274  * depending on whether the pointed data area is consistent across all
1275  * callers.
1276  *
1277  * Being thread-level collective functions, these routines assume that
1278  * the provided pointers are different on the calling threads (otherwise
1279  * there's no point).
1280  *
1281  * Notice also that as long as the underlying mpi implementation cannot
1282  * be considered thread-safe in the MPI_THREAD_MULTIPLE meaning, it is
1283  * not possible to call any of the pi collective functions simultaneously
1284  * from two distinct parallel communicators. That is, either calls are with
1285  * pi->m, or with pi->wr[d] provided that pi->wr[!d]->trank == 0. The
1286  * SEVERAL_THREADS_PLAY_MPI_BEGIN construct may be useful, as follows:
1287  *      SEVERAL_THREADS_PLAY_MPI_BEGIN(pi->wr[!d]) {
1288  *              pi-collective operation on pi->wr[d]
1289  *      }
1290  *      SEVERAL_THREADS_PLAY_MPI_END();
1291  */
1292 
1293 struct pi_collective_arg {
1294     pi_comm_ptr wr;
1295     /* this pointer is written from all threads. Its contents are
1296      * meaningful only for the root thread */
1297     void * ptr;
1298     unsigned int root;
1299     pi_datatype_ptr datatype;
1300     size_t count;
1301     /* last two only for reduction */
1302     pi_op_ptr op;
1303     void * dptr;
1304 };
1305 
1306 /* {{{ broadcast
1307  *
1308  * thread-level strategy:
1309  * upon entering the barrier, the "root" threads fills in
1310  * a->wr->th->utility_ptr its own pointer. Upon leaving the barrier, all
1311  * threads copy data from there to their own buffer.
1312  *
1313  * mpi-level strategy:
1314  * broadcast on all other nodes from the root first, then on all threads
1315  * separately on each node.
1316  */
pi_thread_bcast_in(int s MAYBE_UNUSED,struct pi_collective_arg * a)1317 static void pi_thread_bcast_in(int s MAYBE_UNUSED, struct pi_collective_arg * a)
1318 {
1319     /* here we have a mutex locked */
1320     if (a->wr->trank == a->root) {
1321         a->wr->th->utility_ptr = a->ptr;
1322     }
1323 }
1324 
pi_thread_bcast_out(int s MAYBE_UNUSED,struct pi_collective_arg * a)1325 static void pi_thread_bcast_out(int s MAYBE_UNUSED, struct pi_collective_arg * a)
1326 {
1327     /* here we know that exactly one thread has just filled the
1328      * a->wr->th->utility_ptr, so that a memcpy is possible, as long as
1329      * the owner of the data area has not touched it again */
1330     if (a->ptr != a->wr->th->utility_ptr) {       /* placate valgrind */
1331         memcpy(a->ptr, a->wr->th->utility_ptr, a->count * a->datatype->item_size);
1332     }
1333 }
1334 
1335 /* broadcast the data area pointed to by [ptr, ptr+size[ to all threads.
1336  * Pointers at calling threads must differ. */
pi_thread_bcast(void * ptr,size_t count,pi_datatype_ptr datatype,unsigned int root,pi_comm_ptr wr)1337 void pi_thread_bcast(void * ptr, size_t count, pi_datatype_ptr datatype, unsigned int root, pi_comm_ptr wr)
1338 {
1339     struct pi_collective_arg a[1];
1340     a->wr = wr;
1341     a->ptr = ptr;
1342     a->root = root;
1343     a->datatype = datatype;
1344     a->count = count;
1345     barrier_wait(wr->th->bh,
1346             (void(*)(int,void*)) &pi_thread_bcast_in,
1347             (void(*)(int,void*)) &pi_thread_bcast_out,
1348             a);
1349     serialize_threads(wr);
1350 }
1351 
pi_bcast(void * ptr,size_t count,pi_datatype_ptr datatype,unsigned int jroot,unsigned int troot,pi_comm_ptr wr)1352 void pi_bcast(void * ptr, size_t count, pi_datatype_ptr datatype, unsigned int jroot, unsigned int troot, pi_comm_ptr wr)
1353 {
1354     int err;
1355 
1356     ASSERT(jroot < wr->njobs);
1357     ASSERT(troot < wr->ncores);
1358     if (wr->trank == troot) {
1359         err = MPI_Bcast(ptr, count, datatype->datatype, jroot, wr->pals);
1360         ASSERT_ALWAYS(!err);
1361     }
1362     pi_thread_bcast(ptr, count, datatype, troot, wr);
1363 }
1364 
pi_abort(int err,pi_comm_ptr wr)1365 void pi_abort(int err, pi_comm_ptr wr)
1366 {
1367     /* This only exists because an MPI_Abort that is initiated by all
1368      * threads is likely to clutter the output quite a lot */
1369     if (wr->trank == 0)
1370         MPI_Abort(wr->pals, err);
1371 }
1372 /* }}} */
1373 
1374 /* {{{ allreduce
1375  *
1376  * thread-level strategy (dumb): use the provided sendbuf's, one after another,
1377  * as "last worked on" areas. The last being reduced in is then used to
1378  * communicate the data to all other threads.
1379  *
1380  * mpi-level strategy: thread-level allreduce, then in-place mpi-level allreduce
1381  * from the leader threads, the thread-level broadcast of the result.
1382  */
1383 /* this gets called with s = (count-1), (count-2), ..., 0 */
pi_thread_allreduce_in(int s,struct pi_collective_arg * a)1384 static void pi_thread_allreduce_in(int s, struct pi_collective_arg * a)
1385 {
1386     /* utility_ptr points to the "last worked on" area.
1387      * first call: set the "last worked on" area to our pointer.
1388      * further calls: reduce our data with the "lwo" data, and set the
1389      * "lwo" pointer to our modified data in our pointer.
1390      *
1391      * last call: reduction done, copy to dptr.
1392      */
1393     if (s < (int) a->wr->ncores - 1) {
1394         pi_reduce_local(a->wr->th->utility_ptr, a->ptr, a->count, a->datatype, a->op);
1395     }
1396     a->wr->th->utility_ptr = a->ptr;
1397 }
1398 
pi_thread_allreduce_out(int s MAYBE_UNUSED,struct pi_collective_arg * a)1399 static void pi_thread_allreduce_out(int s MAYBE_UNUSED, struct pi_collective_arg * a)
1400 {
1401     /* almost the same as pi_thread_bcast_out, except that we work on
1402      * dptr...
1403      */
1404     if (a->dptr != a->wr->th->utility_ptr) {       /* placate valgrind */
1405         memcpy(a->dptr, a->wr->th->utility_ptr, a->count * a->datatype->item_size);
1406     }
1407 }
1408 
1409 /* ptr == dptr is supported */
pi_thread_allreduce(void * ptr,void * dptr,size_t count,pi_datatype_ptr datatype,pi_op_ptr op,pi_comm_ptr wr)1410 void pi_thread_allreduce(void *ptr, void *dptr, size_t count,
1411                 pi_datatype_ptr datatype, pi_op_ptr op, pi_comm_ptr wr)
1412 {
1413     /* this code is buggy in the not-in-place case. let's fall back
1414      * to the in-place situation unconditionally.
1415      *
1416      * nature of the bug: if we don't do the fallback below, then clearly
1417      * a->ptr is being written to. We should clean this up, and use const
1418      * when possible.
1419      */
1420     if (ptr && ptr != dptr) {
1421         memcpy(dptr, ptr, count * datatype->item_size);
1422         ptr = NULL;
1423     }
1424     struct pi_collective_arg a[1];
1425     a->wr = wr;
1426     a->ptr = ptr ? ptr : dptr;  /* handle in-place */
1427     a->root = 0;        /* meaningless for allreduce */
1428     a->datatype = datatype;
1429     a->count = count;
1430     a->op = op;
1431     a->dptr = dptr;
1432     barrier_wait(wr->th->bh,
1433             (void(*)(int,void*)) &pi_thread_allreduce_in,
1434             (void(*)(int,void*)) &pi_thread_allreduce_out,
1435             a);
1436     serialize_threads(wr);
1437 }
1438 
1439 
1440 /* This intentionally has the same prototype as MPI_Allreduce */
1441 /* Only a few operations and datatypes are supported.
1442  *
1443  * NULL at sendbuf means in-place.
1444  */
pi_allreduce(void * sendbuf,void * recvbuf,size_t count,pi_datatype_ptr datatype,pi_op_ptr op,pi_comm_ptr wr)1445 void pi_allreduce(void *sendbuf, void *recvbuf, size_t count,
1446         pi_datatype_ptr datatype, pi_op_ptr op, pi_comm_ptr wr)
1447 {
1448     ASSERT_ALWAYS(count <= (size_t) INT_MAX);
1449     pi_thread_allreduce(sendbuf, recvbuf, count, datatype, op, wr);
1450     if (wr->trank == 0) {
1451         if (datatype->abase) {
1452             /* Then it's a type for which we supposedly have written an
1453              * overloaded operation. */
1454             MPI_Allreduce(MPI_IN_PLACE, recvbuf, count, datatype->datatype, op->custom, wr->pals);
1455         } else {
1456             MPI_Allreduce(MPI_IN_PLACE, recvbuf, count, datatype->datatype, op->stock, wr->pals);
1457         }
1458     }
1459     /* now it's just a matter of broadcasting to all threads */
1460     pi_thread_bcast(recvbuf, count, datatype, 0, wr);
1461 }
1462 
pi_allgather(void * sendbuf,size_t sendcount,pi_datatype_ptr sendtype,void * recvbuf,size_t recvcount,pi_datatype_ptr recvtype,pi_comm_ptr wr)1463 extern void pi_allgather(void * sendbuf,
1464         size_t sendcount, pi_datatype_ptr sendtype,
1465         void *recvbuf,
1466         size_t recvcount, pi_datatype_ptr recvtype,
1467         pi_comm_ptr wr)
1468 {
1469     ASSERT_ALWAYS(sendbuf == NULL);
1470     ASSERT_ALWAYS(sendtype == NULL);
1471     ASSERT_ALWAYS(sendcount == 0);
1472     size_t per_thread = recvcount * recvtype->item_size;
1473     /* share the adress of the leader's buffer with everyone */
1474     void * recvbuf_leader = recvbuf;
1475     pi_thread_bcast(&recvbuf_leader, sizeof(void*), BWC_PI_BYTE, 0, wr);
1476     /* Now everyone writes its data there. */
1477     memcpy(recvbuf_leader + (wr->jrank * wr->ncores + wr->trank) * per_thread, recvbuf + (wr->jrank * wr->ncores + wr->trank) * per_thread, per_thread);
1478     serialize_threads(wr);
1479     if (wr->trank == 0) {
1480         MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
1481                 recvbuf, wr->ncores * per_thread, MPI_BYTE,
1482                 wr->pals);
1483     }
1484     serialize_threads(wr);
1485     /* and copy back */
1486     memcpy(recvbuf + (wr->jrank * wr->ncores + wr->trank) * per_thread, recvbuf_leader + (wr->jrank * wr->ncores + wr->trank) * per_thread, per_thread);
1487 }
1488 
1489 /* }}} */
1490 
1491 /* {{{ data_eq */
1492 
pi_thread_data_eq(void * buffer,size_t count,pi_datatype_ptr datatype,pi_comm_ptr wr)1493 int pi_thread_data_eq(void *buffer, size_t count, pi_datatype_ptr datatype, pi_comm_ptr wr)
1494 {
1495     void ** bufs = shared_malloc(wr, wr->ncores * sizeof(void*));
1496     int * oks = shared_malloc(wr, wr->ncores * sizeof(int));
1497 
1498     bufs[wr->trank] = buffer;
1499     oks[wr->trank] = 1;
1500 
1501     /* given 5 threads, here is for example what is done at each step.
1502      *
1503      * stride=1
1504      *  ok0 = (x0==x1)
1505      *  ok2 = (x2==x3)
1506      * stride=2
1507      *  ok0 = x0==x1==x2==x3
1508      * stride 4
1509      *  ok0 = x0==x1==x2==x3==x4
1510      */
1511     for(unsigned int stride = 1 ; stride < wr->ncores ; stride <<= 1) {
1512         unsigned int i = wr->trank;
1513         serialize_threads(wr);       /* because of this, we don't
1514                                            leave the loop with break */
1515         if (i + stride >= wr->ncores) continue;
1516         /* many threads are doing nothing here */
1517         if (i & stride) continue;
1518         oks[i] = oks[i] && oks[i + stride] && memcmp(bufs[i], bufs[i + stride], datatype->item_size * count) == 0;
1519     }
1520     serialize_threads(wr);
1521     int ok = oks[0];
1522     shared_free(wr, oks);
1523     shared_free(wr, bufs);
1524     return ok;
1525 }
1526 
pi_data_eq(void * buffer,size_t count,pi_datatype_ptr datatype,pi_comm_ptr wr)1527 int pi_data_eq(void *buffer, size_t count, pi_datatype_ptr datatype, pi_comm_ptr wr)
1528 {
1529     int ok = pi_thread_data_eq(buffer, count, datatype, wr);
1530 
1531     ASSERT_ALWAYS(count <= (size_t) INT_MAX);
1532     if (wr->trank == 0) {
1533         MPI_Allreduce(MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_BAND, wr->pals);
1534     }
1535     pi_thread_bcast(&ok, 1, BWC_PI_INT, 0, wr);
1536     return ok;
1537 }
1538 
1539 /* }}} */
1540 
1541 /* }}} */
1542 
1543 /*{{{ collective malloc and free (thread-level of course) */
shared_malloc(pi_comm_ptr wr,size_t size)1544 void * shared_malloc(pi_comm_ptr wr, size_t size)
1545 {
1546     void * ptr = NULL;
1547     if (wr->trank == 0) ptr = malloc(size);
1548     pi_thread_bcast(&ptr, sizeof(void*), BWC_PI_BYTE, 0, wr);
1549     return ptr;
1550 }
1551 
shared_malloc_set_zero(pi_comm_ptr wr,size_t size)1552 void * shared_malloc_set_zero(pi_comm_ptr wr, size_t size)
1553 {
1554     void * ptr = NULL;
1555     if (wr->trank == 0) {
1556         ptr = malloc(size);
1557         memset(ptr, 0, size);
1558     }
1559     pi_thread_bcast(&ptr, sizeof(void*), BWC_PI_BYTE, 0, wr);
1560     return ptr;
1561 }
1562 
shared_free(pi_comm_ptr wr,void * ptr)1563 void shared_free(pi_comm_ptr wr, void * ptr)
1564 {
1565     serialize_threads(wr);
1566     if (wr->trank == 0) free(ptr);
1567 }
1568 /*}}}*/
1569 
serialize__(pi_comm_ptr w,const char * s MAYBE_UNUSED,unsigned int l MAYBE_UNUSED)1570 int serialize__(pi_comm_ptr w, const char * s MAYBE_UNUSED, unsigned int l MAYBE_UNUSED)
1571 {
1572     int err;
1573 #ifdef  CONCURRENCY_DEBUG
1574     // note how w->th_count is normally a thread-private thing !
1575     w->th_count++;
1576     printf("(%s:%u) barrier #%d, %u/%u on %s [%p]\n", s, l, w->th_count,
1577             w->trank, w->ncores,
1578             w->th->desc, w->th->b);
1579 #endif
1580     my_pthread_barrier_wait(w->th->b);
1581     if (w->trank == 0) {
1582         err = MPI_Barrier(w->pals);
1583         ASSERT_ALWAYS(!err);
1584     }
1585     // struct timeval tv[1];
1586     // gettimeofday(tv, NULL);
1587     // printf("%.2f\n", tv->tv_sec + (double) tv->tv_usec / 1.0e6);
1588     // sleep(1);
1589     return w->jrank == 0 && w->trank == 0;
1590 }
1591 
serialize_threads__(pi_comm_ptr w,const char * s MAYBE_UNUSED,unsigned int l MAYBE_UNUSED)1592 int serialize_threads__(pi_comm_ptr w, const char * s MAYBE_UNUSED, unsigned int l MAYBE_UNUSED)
1593 {
1594 #ifdef  CONCURRENCY_DEBUG
1595     // note how w->th_count is normally a thread-private thing !
1596     w->th_count++;
1597     printf("(%s:%u) tbarrier #%d, %u/%u on %s [%p]\n", s, l, w->th_count,
1598             w->trank, w->ncores,
1599             w->th->desc, w->th->b);
1600 #endif
1601     my_pthread_barrier_wait(w->th->b);
1602     // struct timeval tv[1];
1603     // gettimeofday(tv, NULL);
1604     // printf("%.2f\n", tv->tv_sec + (double) tv->tv_usec / 1.0e6);
1605     // sleep(1);
1606     return w->trank == 0;
1607 }
1608 
1609 // XXX Many serializing calls in the two routines below have no
1610 // functional use, apart from the fact that they exert some pressure on
1611 // the mpi+pthreads way of doing things.
1612 
say_hello(pi_comm_ptr w,parallelizing_info_ptr pi MAYBE_UNUSED)1613 static void say_hello(pi_comm_ptr w, parallelizing_info_ptr pi MAYBE_UNUSED)
1614 {
1615     serialize(w);
1616     for(unsigned int j = 0 ; j < w->njobs ; j++) {
1617         serialize(w);
1618         if (w->jrank != j)
1619             continue;
1620         my_pthread_mutex_lock(w->th->m);
1621 #ifdef CONCURRENCY_DEBUG
1622         /* Make it less verbose -- if it ever hangs in there, then
1623          * we can re-enable it */
1624         printf("(%s) J%uT%u ; %s%s ; (%s:j%ut%u) (%s:j%ut%u)\n",
1625                 w->th->desc,
1626                 pi->m->jrank,
1627                 pi->m->trank,
1628                 pi->wr[0]->th->desc,
1629                 pi->wr[1]->th->desc,
1630                 pi->wr[0]->th->desc, pi->wr[0]->jrank, pi->wr[0]->trank,
1631                 pi->wr[1]->th->desc, pi->wr[1]->jrank, pi->wr[1]->trank
1632               );
1633 #endif
1634         my_pthread_mutex_unlock(w->th->m);
1635     }
1636 }
1637 
pi_hello(parallelizing_info_ptr pi)1638 void pi_hello(parallelizing_info_ptr pi)
1639 {
1640     if (serialize(pi->m)) {
1641 #ifdef  CONCURRENCY_DEBUG
1642         printf("Doing hello world loop\n");
1643 #endif
1644     }
1645     say_hello(pi->m, pi);
1646 
1647     // before changing the grain of the barriers, we have to serialize at
1648     // the thread level (i.e. wait for the mpi serialization calls to
1649     // complete). Otherwise, inter-job communication occuring within
1650     // say_hello might clash with the inter-job calls done within
1651     // serialize().
1652     // Rule is: never let mpi-level synchronisations on a communicator
1653     // wander outside an execution period for which all threads sharing
1654     // this communicator are doing the same.
1655     for(unsigned int i = 0 ; i < pi->wr[0]->totalsize ; i++) {
1656         serialize(pi->m);
1657         serialize_threads(pi->m);
1658         if (i == pi->wr[0]->jrank * pi->wr[0]->ncores + pi->wr[0]->trank) {
1659             say_hello(pi->wr[1], pi);
1660         }
1661     }
1662 
1663     for(unsigned int i = 0 ; i < pi->wr[1]->totalsize ; i++) {
1664         serialize(pi->m);
1665         serialize_threads(pi->m);
1666         if (i == pi->wr[1]->jrank * pi->wr[1]->ncores + pi->wr[1]->trank) {
1667             say_hello(pi->wr[0], pi);
1668         }
1669     }
1670 
1671     if (serialize(pi->m)) {
1672 #ifdef CONCURRENCY_DEBUG
1673         printf("OK: Finished hello world loop\n");
1674 #endif
1675     }
1676     serialize(pi->m);
1677 }
1678 /*{{{ new i/o layer */
1679 /* Open a file handle on the leader node. The two pi_comm_ptr's
1680  * represent the inner and outer levels; they're mostly unused for this
1681  * function call, but requested for consistency with the pi_file_read and
1682  * pi_file_write calls.
1683  *
1684  * Return a globally consistent boolean indicating whether the operation
1685  * was successful (1) or not (0). In the latter case, set errno
1686  * (globally) to the value returned by the fopen function at the leader
1687  * node.
1688  *
1689  * In multithreaded context, f must represent a different data area on
1690  * all threads.
1691  */
pi_file_open(pi_file_handle_ptr f,parallelizing_info_ptr pi,int inner,const char * name,const char * mode)1692 int pi_file_open(pi_file_handle_ptr f, parallelizing_info_ptr pi, int inner, const char * name, const char * mode)
1693 {
1694     memset(f, 0, sizeof(pi_file_handle));
1695     f->pi = pi;
1696     f->inner = inner;
1697     f->outer = !inner;
1698     f->f = NULL;
1699     f->name = strdup(name);
1700     f->mode = strdup(mode);
1701     int failed = 0;
1702     if (f->pi->m->jrank == 0 && f->pi->m->trank == 0) {
1703         errno = 0;
1704         /* POSIX fopen is required to set errno
1705          * http://pubs.opengroup.org/onlinepubs/9699919799/functions/fopen.html
1706          */
1707         f->f = fopen(name, mode);
1708         if (f->f == NULL)
1709             failed = errno ? errno : EIO;;
1710     }
1711     pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1712     if (failed) errno = failed;
1713     return !failed;
1714 }
1715 
1716 /* close the file handle. Return a globally consistent boolean indicating
1717  * whether the operation was successful (1) or not (0). In the latter
1718  * case, set errno (globally) to the value returned by the fclose
1719  * function at the leader node. */
pi_file_close(pi_file_handle_ptr f)1720 int pi_file_close(pi_file_handle_ptr f)
1721 {
1722     free(f->name);
1723     free(f->mode);
1724     int failed = 0;
1725     if (f->pi->m->jrank == 0 && f->pi->m->trank == 0) {
1726         errno = 0;
1727         if (fclose(f->f) != 0)
1728             failed = errno ? errno : EIO;;
1729     }
1730     pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1731     memset(f, 0, sizeof(pi_file_handle));
1732     if (failed) errno = failed;
1733     return !failed;
1734 }
1735 
area_is_zero(const void * src,ptrdiff_t offset0,ptrdiff_t offset1)1736 int area_is_zero(const void * src, ptrdiff_t offset0, ptrdiff_t offset1)
1737 {
1738     const char * b0 = pointer_arith_const(src, offset0);
1739     const char * b1 = pointer_arith_const(src, offset1);
1740     for( ; b0 < b1 ; b0++) {
1741         if (*b0) return 0;
1742     }
1743     return 1;
1744 }
1745 
pi_file_write_leader(pi_file_handle_ptr f,void * buf,size_t size)1746 ssize_t pi_file_write_leader(pi_file_handle_ptr f, void * buf, size_t size)
1747 {
1748     unsigned long ret;
1749     if (f->pi->m->jrank == 0 && f->pi->m->trank == 0)
1750         ret = fwrite(buf, 1, size, f->f);
1751     pi_bcast(&ret, 1, BWC_PI_UNSIGNED_LONG, 0, 0, f->pi->m);
1752     return (ssize_t) ret;
1753 }
1754 
pi_file_read_leader(pi_file_handle_ptr f,void * buf,size_t size)1755 ssize_t pi_file_read_leader(pi_file_handle_ptr f, void * buf, size_t size)
1756 {
1757     unsigned long ret;
1758     if (f->pi->m->jrank == 0 && f->pi->m->trank == 0)
1759         ret = fread(buf, 1, size, f->f);
1760     pi_bcast(&ret, 1, BWC_PI_UNSIGNED_LONG, 0, 0, f->pi->m);
1761     return (ssize_t) ret;
1762 }
1763 
1764 
1765 /* size is the chunk size on each core */
pi_file_write(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize)1766 ssize_t pi_file_write(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize)
1767 {
1768     return pi_file_write_chunk(f, buf, size, totalsize, 1, 0, 1);
1769 }
1770 
1771 /* Here, we write a fragment of a "virtual" file (of size totalsize) that
1772  * is made of several independent files. Think of entries in that virtual
1773  * file as being [chunksize] bytes long, and then we write specifically
1774  * bytes spos to epos of each of these chunks.
1775  *
1776  * Return a globally consistent count of written items. A full read is when
1777  * exactly (totalsize / chunksize) * (epos - spos) items are written.  When
1778  * a short read occurs, set errno (globally) to the value returned by the
1779  * fwrite function at the leader node.
1780  */
pi_file_write_chunk(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize,size_t chunksize,size_t spos,size_t epos)1781 ssize_t pi_file_write_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos)
1782 {
1783     // coverity[result_independent_of_operands]
1784     ASSERT_ALWAYS(size <= ULONG_MAX);
1785     ASSERT_ALWAYS(spos <= chunksize);
1786     ASSERT_ALWAYS(spos <= epos);
1787     ASSERT_ALWAYS(epos <= chunksize);
1788     ASSERT_ALWAYS(totalsize % chunksize == 0);
1789     ASSERT_ALWAYS(size % chunksize == 0);
1790     size_t loc_totalsize = (totalsize / chunksize) * (epos - spos);
1791     size_t loc_size = (size / chunksize) * (epos - spos);
1792     size_t chunk_per_rank = size / chunksize;
1793 
1794     /* Some sanity checking. This is not technically required: of
1795      * course we would be able to cope with uneven data sets.
1796      * However, having even sizes greatly simplifies the pictures, as
1797      * it alleviates the need to collect the sizes for all threads */
1798     unsigned long size_ul = size;
1799     ASSERT_ALWAYS(pi_data_eq(&size_ul, 1, BWC_PI_UNSIGNED_LONG, f->pi->m));
1800 
1801     unsigned int nci = f->pi->wr[f->inner]->ncores;
1802     unsigned int nco = f->pi->wr[f->outer]->ncores;
1803     unsigned int nji = f->pi->wr[f->inner]->njobs;
1804     unsigned int njo = f->pi->wr[f->outer]->njobs;
1805     unsigned int ci  = f->pi->wr[f->inner]->trank;
1806     unsigned int co  = f->pi->wr[f->outer]->trank;
1807     unsigned int ji  = f->pi->wr[f->inner]->jrank;
1808     unsigned int jo  = f->pi->wr[f->outer]->jrank;
1809     unsigned int jm  = f->pi->m->jrank;
1810 
1811     /* the inner threads will do writes and MPI sends in one go. For
1812      * outer threads, it is not possible to concatenate similarly,
1813      * because we have writes relative to the mpi-level interleaved.
1814      */
1815     void * sbuf = shared_malloc_set_zero(f->pi->m, loc_size * nci);
1816 
1817     long res = 0;
1818     int failed = 0;
1819 
1820     for(unsigned int xjo = 0 ; xjo < njo ; xjo++) {
1821         for(unsigned int xco = 0 ; xco < nco ; xco++) {
1822             for(unsigned int xji = 0 ; xji < nji ; xji++) {
1823                 unsigned int xj = mrank_from_tworanks(f->pi, f->inner, xji, xjo);
1824                 if (jm == xj) {
1825                     ASSERT_ALWAYS (ji == xji && jo == xjo);
1826                     /* fill the buffer with the contents relevant to the
1827                      * inner communicator (nci threads at work here) */
1828                     serialize_threads(f->pi->m);
1829                     if (xco == co) {
1830                         void * wptr = sbuf + ci * loc_size;
1831                         const void * rptr = buf + spos;
1832                         if (epos - spos == chunksize) {
1833                             memcpy(wptr, rptr, loc_size);
1834                         } else {
1835                             for(size_t i = 0 ; i < chunk_per_rank ; ++i) {
1836                                 memcpy(wptr, rptr, epos - spos);
1837                                 wptr += epos - spos;
1838                                 rptr += chunksize;
1839                             }
1840                         }
1841                     }
1842                     serialize_threads(f->pi->m);
1843                 }
1844                 if (f->pi->m->trank == 0) {
1845                     /* only one thread per node does something. */
1846                     if (!jm) {
1847                         /* leader node to receive data */
1848                         if (xj) {       /* except its own... */
1849                             MPI_Recv(sbuf, loc_size * nci, MPI_BYTE,
1850                                     xj, xj, f->pi->m->pals,
1851                                     MPI_STATUS_IGNORE);
1852                         }
1853                         if (!failed) {
1854                             size_t wanna_write = MIN(nci * loc_size, loc_totalsize - res);
1855                             if (wanna_write < nci * loc_size)
1856                                 ASSERT_ALWAYS(area_is_zero(sbuf, wanna_write, nci * loc_size));
1857                             /* POSIX fwrite is required to set errno
1858                              * http://pubs.opengroup.org/onlinepubs/9699919799/functions/fwrite.html
1859                              */
1860                             errno = 0;
1861                             ssize_t x = fwrite(sbuf, 1, wanna_write, f->f);
1862                             res += x;
1863                             if (x < (ssize_t) wanna_write)
1864                                 failed = errno ? errno : EIO;
1865                         }
1866                     } else if (jm == xj) {
1867                         /* my turn to send data */
1868                         MPI_Send(sbuf, loc_size * nci, MPI_BYTE,
1869                                 0, xj, f->pi->m->pals);
1870                     }
1871                 }
1872             }
1873         }
1874     }
1875     shared_free(f->pi->m, sbuf);
1876     pi_bcast(&res, 1, BWC_PI_LONG, 0, 0, f->pi->m);
1877     pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1878     if (failed) errno = failed;
1879     return res;
1880 }
1881 
pi_file_read(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize)1882 ssize_t pi_file_read(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize)
1883 {
1884     return pi_file_read_chunk(f, buf, size, totalsize, 1, 0, 1);
1885 }
1886 
1887 /*
1888  * Return a globally consistent count of read items. A full read is when
1889  * exactly (totalsize / chunksize) * (epos - spos) items are read.  When
1890  * a short read occurs, set errno (globally) to the value returned by the
1891  * fread function at the leader node.
1892  */
pi_file_read_chunk(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize,size_t chunksize,size_t spos,size_t epos)1893 ssize_t pi_file_read_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos)
1894 {
1895     // coverity[result_independent_of_operands]
1896     ASSERT_ALWAYS(size <= ULONG_MAX);
1897     ASSERT_ALWAYS(spos <= chunksize);
1898     ASSERT_ALWAYS(spos <= epos);
1899     ASSERT_ALWAYS(epos <= chunksize);
1900     ASSERT_ALWAYS(totalsize % chunksize == 0);
1901     ASSERT_ALWAYS(size % chunksize == 0);
1902     size_t loc_totalsize = (totalsize / chunksize) * (epos - spos);
1903     size_t loc_size = (size / chunksize) * (epos - spos);
1904     size_t chunk_per_rank = size / chunksize;
1905 
1906     /* Some sanity checking. This is not technically required: of
1907      * course we would be able to cope with uneven data sets.
1908      * However, having even sizes greatly simplifies the pictures, as
1909      * it alleviates the need to collect the sizes for all threads */
1910     unsigned long size_ul = size;
1911     ASSERT_ALWAYS(pi_data_eq(&size_ul, 1, BWC_PI_UNSIGNED_LONG, f->pi->m));
1912 
1913     unsigned int nci = f->pi->wr[f->inner]->ncores;
1914     unsigned int nco = f->pi->wr[f->outer]->ncores;
1915     unsigned int nji = f->pi->wr[f->inner]->njobs;
1916     unsigned int njo = f->pi->wr[f->outer]->njobs;
1917     unsigned int ci  = f->pi->wr[f->inner]->trank;
1918     unsigned int co  = f->pi->wr[f->outer]->trank;
1919     unsigned int ji  = f->pi->wr[f->inner]->jrank;
1920     unsigned int jo  = f->pi->wr[f->outer]->jrank;
1921     unsigned int jm  = f->pi->m->jrank;
1922 
1923     void * sbuf = shared_malloc_set_zero(f->pi->m, loc_size * nci);
1924 
1925     long res = 0;
1926     int failed = 0;
1927 
1928     for(unsigned int xjo = 0 ; xjo < njo ; xjo++) {
1929         for(unsigned int xco = 0 ; xco < nco ; xco++) {
1930             for(unsigned int xji = 0 ; xji < nji ; xji++) {
1931                 unsigned int xj = mrank_from_tworanks(f->pi, f->inner, xji, xjo);
1932                 /* for job (ji,jo), nci*nco threads enter here. Unless
1933                  * (ji == xji && jo == xjo), this loop becomes trivial.
1934                  */
1935                 if (f->pi->m->trank == 0) {
1936                     /* only one thread per node does something. */
1937                     if (!jm) {
1938                         /* leader node to read then send data */
1939                         memset(sbuf, 0, nci * loc_size);
1940                         if (!failed) {
1941                             size_t wanna_read = MIN(nci * loc_size, loc_totalsize - res);
1942                             /* POSIX fread is required to set errno
1943                              * http://pubs.opengroup.org/onlinepubs/9699919799/functions/fread.html
1944                              */
1945                             errno = 0;
1946                             ssize_t x = fread(sbuf, 1, wanna_read, f->f);
1947                             res += x;
1948                             if (x < (ssize_t) wanna_read)
1949                                 failed = errno ? errno : EIO;
1950                         }
1951                         if (xj) {       /* except to itself... */
1952                             MPI_Send(sbuf, loc_size * nci, MPI_BYTE,
1953                                     xj, xj, f->pi->m->pals);
1954                         }
1955                     } else if (jm == xj) {
1956                         /* my turn to receive data */
1957                         MPI_Recv(sbuf, loc_size * nci, MPI_BYTE,
1958                                 0, xj, f->pi->m->pals,
1959                                 MPI_STATUS_IGNORE);
1960                     }
1961                 }
1962                 if (jm == xj) {
1963                     ASSERT_ALWAYS (ji == xji && jo == xjo);
1964                     serialize_threads(f->pi->m);
1965                     /* fill the buffer with the contents relevant to the
1966                      * inner communicator (nci threads at work here) */
1967                     if (xco == co) {
1968                         void * wptr = buf + spos;
1969                         const void * rptr = sbuf + ci * loc_size;
1970                         if (epos - spos == chunksize) {
1971                             memcpy(wptr, rptr, loc_size);
1972                         } else {
1973                             for(size_t i = 0 ; i < chunk_per_rank ; ++i) {
1974                                 memcpy(wptr, rptr, epos - spos);
1975                                 wptr += chunksize;
1976                                 rptr += epos - spos;
1977                             }
1978                         }
1979                     }
1980                     serialize_threads(f->pi->m);
1981                 }
1982             }
1983         }
1984     }
1985     shared_free(f->pi->m, sbuf);
1986     pi_bcast(&res, 1, BWC_PI_LONG, 0, 0, f->pi->m);
1987     pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1988     if (failed) errno = failed;
1989     return res;
1990 }
1991 /*}}}*/
1992 
pi_interleaving_flip(parallelizing_info_ptr pi)1993 void pi_interleaving_flip(parallelizing_info_ptr pi)
1994 {
1995     if (!pi->interleaved)
1996         return;
1997     my_pthread_barrier_wait(pi->interleaved->b);
1998 }
1999 
pi_interleaving_enter(parallelizing_info_ptr pi)2000 void pi_interleaving_enter(parallelizing_info_ptr pi)
2001 {
2002     if (!pi->interleaved || pi->interleaved->idx == 0)
2003         return;
2004 
2005     my_pthread_barrier_wait(pi->interleaved->b);
2006 }
2007 
pi_interleaving_leave(parallelizing_info_ptr pi)2008 void pi_interleaving_leave(parallelizing_info_ptr pi)
2009 {
2010     if (!pi->interleaved || pi->interleaved->idx == 1)
2011         return;
2012 
2013     my_pthread_barrier_wait(pi->interleaved->b);
2014 }
2015 
2016 #if 0
2017 /* XXX the name is not very informative, notably about the important
2018  * point that pointer must be identical.
2019  */
2020 /* Considering a data area with a pointer identical on all calling
2021  * threads, tell whether the contents agree on all MPI jobs+threads
2022  * (returns 1 if this is the case)
2023  *
2024  * More exactly, this function only cares about the pointer on thread 0.
2025  */
2026 int mpi_data_eq(parallelizing_info_ptr pi, void *buffer, size_t sz)
2027 {
2028     /* TODO: think about allocating less */
2029     int cmp = 0;
2030     if (pi->m->trank == 0) {
2031         void * b_and = malloc(sz);
2032         void * b_or = malloc(sz);
2033         MPI_Allreduce(buffer, b_and, sz, MPI_BYTE, MPI_BAND, pi->m->pals);
2034         MPI_Allreduce(buffer, b_or, sz, MPI_BYTE, MPI_BOR, pi->m->pals);
2035         cmp = memcmp(b_and, b_or, sz);
2036         free(b_and);
2037         free(b_or);
2038     }
2039     thread_bcast(pi->m, &cmp, sizeof(int), 0);
2040     return cmp == 0;
2041 }
2042 #endif
2043 
2044 /* XXX must be called in single threaded context  */
parallelizing_info_init()2045 void parallelizing_info_init()
2046 {
2047     pi_init_attribute_things();
2048 }
2049 /* XXX must be called in single threaded context  */
parallelizing_info_finish()2050 void parallelizing_info_finish()
2051 {
2052     pi_clear_attribute_things();
2053 }
2054 
2055 
2056 
2057