1 #include "cado.h" // IWYU pragma: keep
2 #include <pthread.h> // for pthread_t
3 #include <stddef.h> // for ptrdiff_t
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7
8 #include <sys/types.h>
9 #include <errno.h>
10 #include <stdarg.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <ctype.h>
14 #include "select_mpi.h"
15 #include "parallelizing_info.h"
16 #include "macros.h"
17 #include "misc.h"
18 #include "verbose.h"
19 #include "portability.h"
20
21
22 #include <sys/time.h> // gettimeofday
23 #ifdef HAVE_UTSNAME_H
24 #include <sys/utsname.h>
25 #include <limits.h>
26 #endif
27
28 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
29 #include "cpubinding.h"
30 #include "params.h"
31 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
32
pi_comm_init_pthread_things(pi_comm_ptr w,const char * desc)33 static inline void pi_comm_init_pthread_things(pi_comm_ptr w, const char * desc)
34 {
35 struct pthread_things * res;
36
37 res = malloc(sizeof(struct pthread_things));
38
39 barrier_init(res->bh, w->ncores);
40 my_pthread_barrier_init(res->b, NULL, w->ncores);
41 my_pthread_mutex_init(res->m, NULL);
42 res->desc = strdup(desc);
43
44 w->th = res;
45 }
46
pi_comm_destroy_pthread_things(pi_comm_ptr w)47 static inline void pi_comm_destroy_pthread_things(pi_comm_ptr w)
48 {
49 barrier_destroy(w->th->bh);
50 my_pthread_barrier_destroy(w->th->b);
51
52 my_pthread_mutex_destroy(w->th->m);
53 /* Beware ! Freeing mustn't happen more than once ! */
54 free(w->th->desc);
55 free(w->th);
56 w->th = NULL;
57 }
58
print_several(unsigned int n1,unsigned int n2,char a,char b,unsigned int w)59 static void print_several(unsigned int n1, unsigned int n2, char a, char b, unsigned int w)
60 {
61 char * toto;
62 toto = malloc(w+2);
63 printf("%c",b);
64 memset(toto, a, w+1);
65 toto[w+1]='\0';
66 for(unsigned int i = 0 ; i < n1 ; i++) {
67 toto[w]=a;
68 for(unsigned int j = 0 ; j < n2-1 ; j++) {
69 printf("%s", toto);
70 }
71 toto[w]=b;
72 printf("%s", toto);
73 }
74 printf("\n");
75 free(toto);
76 }
77
78 struct pi_go_helper_s {
79 void *(*fcn)(parallelizing_info_ptr, param_list pl, void*);
80 parallelizing_info_ptr p;
81 param_list_ptr pl;
82 void * arg;
83 };
84
pi_go_helper_func(struct pi_go_helper_s * s)85 void * pi_go_helper_func(struct pi_go_helper_s * s)
86 {
87 pi_interleaving_enter(s->p);
88 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
89 cpubinding_do_pinning(s->p->cpubinding_info, s->p->wr[1]->trank, s->p->wr[0]->trank);
90 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
91 int debug = 0;
92 param_list_parse_int(s->pl, "debug-parallel-bwc", &debug);
93
94 if (debug) {
95 pi_log_init(s->p->m);
96 pi_log_init(s->p->wr[0]);
97 pi_log_init(s->p->wr[1]);
98 }
99
100 void * ret = (s->fcn)(s->p, s->pl, s->arg);
101
102 if (debug) {
103 pi_log_clear(s->p->m);
104 pi_log_clear(s->p->wr[0]);
105 pi_log_clear(s->p->wr[1]);
106 }
107
108 pi_interleaving_flip(s->p);
109 pi_interleaving_leave(s->p);
110 return ret;
111 }
112
113 typedef void * (*pthread_callee_t)(void*);
114
115 /*
116 void pi_errhandler(MPI_Comm * comm, int * err, ...)
117 {
118 int size;
119 int rank;
120 MPI_Comm_size(*comm, &size);
121 MPI_Comm_rank(*comm, &rank);
122 fprintf(stderr, "Fatal MPI error ;\n");
123 fprintf(stderr, " job %d in a comm. of size %d ;\n", rank, size);
124 if (err == NULL) {
125 fprintf(stderr, " no error code.\n");
126 } else {
127 char buf[MPI_MAX_ERROR_STRING];
128 int len = sizeof(buf);
129 MPI_Error_string(*err, buf, &len);
130 fprintf(stderr, " %s\n", buf);
131 }
132 abort();
133 }
134 */
135
grid_print(parallelizing_info_ptr pi,char * buf,size_t siz,int print)136 void grid_print(parallelizing_info_ptr pi, char * buf, size_t siz, int print)
137 {
138 pi_comm_ptr wr = pi->m;
139 unsigned long maxsize = siz;
140 pi_allreduce(NULL, &maxsize, 1, BWC_PI_UNSIGNED_LONG, BWC_PI_MAX, wr);
141
142 char * strings = malloc(wr->totalsize * maxsize);
143 memset(strings, 0, wr->totalsize * maxsize);
144
145 int me = wr->jrank * wr->ncores + wr->trank;
146
147 /* instead of doing memcpy, we align the stuff. */
148 char * fmt;
149 int rc = asprintf(&fmt, "%%-%lus", maxsize-1);
150 ASSERT_ALWAYS(rc >= 0);
151 snprintf(strings + me * maxsize, maxsize, fmt, buf);
152 free(fmt);
153 pi_allreduce(NULL, strings, wr->totalsize * maxsize, BWC_PI_BYTE, BWC_PI_BXOR, wr);
154
155 char * ptr = strings;
156
157 serialize(pi->m);
158
159 if (print) {
160 /* There's also the null byte counted in siz. So that makes one
161 * less.
162 */
163 unsigned int nj0 = pi->wr[0]->njobs;
164 unsigned int nj1 = pi->wr[1]->njobs;
165 unsigned int nt0 = pi->wr[0]->ncores;
166 unsigned int nt1 = pi->wr[1]->ncores;
167 print_several(nj0, nt0, '-', '+', maxsize-1);
168 for(unsigned int j1 = 0 ; j1 < nj1 ; j1++) {
169 for(unsigned int t1 = 0 ; t1 < nt1 ; t1++) {
170 printf("|");
171 for(unsigned int j0 = 0 ; j0 < nj0 ; j0++) {
172 for(unsigned int t0 = 0 ; t0 < nt0 ; t0++) {
173 int fence = t0 == nt0 - 1;
174 printf("%s%c", ptr, fence ? '|' : ' ');
175 ptr += maxsize;
176 }
177 }
178 printf("\n");
179 }
180 print_several(nj0, nt0, '-', '+', maxsize-1);
181 }
182 }
183 serialize(wr);
184 free(strings);
185 }
186
get_node_number_and_prefix(parallelizing_info_ptr pi)187 static void get_node_number_and_prefix(parallelizing_info_ptr pi)
188 {
189 int len = strlen(pi->nodename);
190 int minlen = len;
191 int maxlen = len;
192 MPI_Allreduce(MPI_IN_PLACE, &minlen, 1, MPI_INT, MPI_MIN, pi->m->pals);
193 MPI_Allreduce(MPI_IN_PLACE, &maxlen, 1, MPI_INT, MPI_MAX, pi->m->pals);
194 int j;
195 for(j = 0 ; j < minlen ; j++) {
196 int c = pi->nodename[j];
197 if (isdigit(c)) {
198 c = -1;
199 }
200 int cmin = c;
201 int cmax = c;
202 MPI_Allreduce(MPI_IN_PLACE, &cmin, 1, MPI_INT, MPI_MIN, pi->m->pals);
203 MPI_Allreduce(MPI_IN_PLACE, &cmax, 1, MPI_INT, MPI_MAX, pi->m->pals);
204 if (cmin != cmax || cmin < 0)
205 break;
206 }
207 memcpy(pi->nodeprefix, pi->nodename, j);
208 pi->nodeprefix[j]='\0';
209 if (j && pi->nodeprefix[j-1] == '-')
210 pi->nodeprefix[j-1] = '\0';
211 memset(pi->nodenumber_s, ' ', maxlen - j);
212 pi->nodenumber_s[maxlen-j]='\0';
213 memcpy(pi->nodenumber_s + maxlen-len, pi->nodename + j, len-j);
214 if (j == maxlen) {
215 // all nodes have the same name. Most probably because we're only
216 // on one node. Arrange for the display to look vaguely right.
217 memcpy(pi->nodeprefix, pi->nodename, PI_NAMELEN);
218 memcpy(pi->nodenumber_s, pi->nodename, PI_NAMELEN);
219 }
220 }
221
display_process_grid(parallelizing_info_ptr pi)222 static void display_process_grid(parallelizing_info_ptr pi)
223 {
224 char * all_node_ids = malloc(PI_NAMELEN * pi->m->njobs);
225 char * all_node_ids2 = malloc(PI_NAMELEN * pi->m->njobs);
226 typedef int cpair[2];
227 cpair my_coords = { pi->wr[0]->jrank, pi->wr[1]->jrank, };
228 cpair * all_cpairs = malloc(sizeof(cpair) * pi->m->njobs);
229
230 MPI_Allgather(pi->nodenumber_s, PI_NAMELEN, MPI_BYTE,
231 all_node_ids, PI_NAMELEN, MPI_BYTE, pi->m->pals);
232 MPI_Allgather(my_coords, 2, MPI_INT, all_cpairs, 2, MPI_INT, pi->m->pals);
233
234 for(unsigned int i = 0 ; i < pi->m->njobs ; i++) {
235 int x = all_cpairs[i][0];
236 int y = all_cpairs[i][1];
237 unsigned int j = y * pi->wr[0]->njobs + x;
238 if (j >= pi->m->njobs) {
239 fprintf(stderr, "uh ?\n");
240 pi_abort(EXIT_FAILURE, pi->m);
241 }
242 memcpy(all_node_ids2 + j * PI_NAMELEN,
243 all_node_ids + i * PI_NAMELEN,
244 PI_NAMELEN);
245 }
246
247 if (pi->m->jrank == 0) {
248 if (strlen(pi->nodeprefix)) {
249 printf("%d nodes on %s\n", pi->m->njobs, pi->nodeprefix);
250 } else {
251 printf("%d nodes, no common name prefix\n", pi->m->njobs);
252 }
253 char * pad = malloc(PI_NAMELEN);
254 memset(pad, ' ', PI_NAMELEN);
255 int node_id_len = strlen(pi->nodenumber_s);
256 pad[node_id_len]='\0';
257 if (node_id_len) {
258 if (pi->thr_orig[0]) {
259 pad[node_id_len/2]='\'';
260 } else {
261 pad[node_id_len/2]='.';
262 }
263 }
264 int mapping_error = 0;
265 for(unsigned int i = 0 ; i < pi->wr[1]->njobs ; i++) { // i == y
266 for(unsigned int it = 0 ; it < pi->wr[1]->ncores ; it++) {
267 for(unsigned int j = 0 ; j < pi->wr[0]->njobs ; j++) { // j == x
268 for(unsigned int jt = 0 ; jt < pi->wr[0]->ncores ; jt++) { // jt == x
269 char * what = pad;
270 if (!it && !jt) {
271 what = all_node_ids2 + PI_NAMELEN * (i * pi->wr[0]->njobs + j);
272 }
273 if (pi->thr_orig[0]) {
274 /* then we have it==jt==0, but the real leader is not
275 * this one. Check at least that we have the proper
276 * identity (same leader)
277 */
278 ASSERT_ALWAYS(!it && !jt);
279 unsigned int i0 = i - (i % pi->thr_orig[0]);
280 unsigned int j0 = j - (j % pi->thr_orig[1]);
281 const char * ref = all_node_ids2 + PI_NAMELEN * (i0 * pi->wr[0]->njobs + j0);
282 if (strcmp(ref, what) != 0) {
283 mapping_error=1;
284 }
285 if (i != i0 || j != j0)
286 what = pad;
287 }
288 printf(" %s", what);
289 }
290 }
291 printf("\n");
292 }
293 }
294 if (mapping_error) {
295 fprintf(stderr, "Error: we have not obtained the expected node mapping; the only_mpi=1 setting is VERY sensible to the process mapping chosen by the MPI implementation. Chances are you got it wrong. A working setup with openmpi-1.8.2 is a uniqueified host file, with --mca rmaps_base_mapping_policy slot (which is the default setting).\n");
296 exit(1);
297 }
298 free(pad);
299 }
300 free(all_node_ids2);
301 free(all_node_ids);
302 free(all_cpairs);
303 MPI_Barrier(pi->m->pals);
304 }
305
pi_init_mpilevel(parallelizing_info_ptr pi,param_list pl)306 static void pi_init_mpilevel(parallelizing_info_ptr pi, param_list pl)
307 {
308 int err;
309
310 memset(pi, 0, sizeof(parallelizing_info));
311
312 #ifdef HAVE_UTSNAME_H
313 {
314 struct utsname u[1];
315 uname(u);
316 memcpy(pi->nodename, u->nodename, MIN(PI_NAMELEN, sizeof(u->nodename)));
317 pi->nodename[PI_NAMELEN-1]='\0';
318 char * p = strchr(pi->nodename, '.');
319 if (p) *p='\0';
320 }
321 #else
322 strncpy(pi->nodename, "unknown", sizeof(pi->nodename));
323 #endif
324
325 #ifndef NDEBUG
326 /* Must make sure that we have proper ASSERTS after MPI calls then. */
327 MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
328 #endif
329
330 int mpi[2] = {1,1};
331 int thr[2] = {1,1};
332 int only_mpi = 0;
333
334 param_list_parse_intxint(pl, "mpi", mpi);
335 param_list_parse_intxint(pl, "thr", thr);
336 param_list_parse_int(pl, "only_mpi", &only_mpi);
337
338 unsigned int nhj = mpi[0];
339 unsigned int nvj = mpi[1];
340 unsigned int nhc = thr[0];
341 unsigned int nvc = thr[1];
342
343 pi->m->njobs = nhj * nvj;
344 pi->m->ncores = nhc * nvc;
345 pi->m->totalsize = pi->m->njobs * pi->m->ncores;
346 // MPI_Errhandler my_eh;
347 // MPI_Comm_create_errhandler(&pi_errhandler, &my_eh);
348 // MPI_Comm_set_errhandler(MPI_COMM_WORLD, my_eh);
349
350 if (only_mpi) {
351 int grank;
352 MPI_Comm_rank(MPI_COMM_WORLD, & grank);
353 if (grank == 0)
354 printf("Making %d independent single-threaded MPI jobs, instead of %d %d-thread jobs\n",
355 pi->m->totalsize, pi->m->njobs, pi->m->ncores);
356 int tt = pi->m->ncores;
357
358 int rank;
359 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
360 int nnum = rank / tt;
361 int nidx = rank % tt;
362 int ni = nnum / nvj;
363 int nj = nnum % nvj;
364 int ti = nidx / nvc;
365 int tj = nidx % nvc;
366 int i = ni * nhc + ti;
367 int j = nj * nvc + tj;
368 int nrank = i * (nvj * nvc) + j;
369
370 MPI_Comm_split(MPI_COMM_WORLD, 0, nrank, & pi->m->pals);
371 pi->m->ncores = 1;
372 pi->m->njobs = pi->m->totalsize;
373 memcpy(pi->thr_orig, thr, 2*sizeof(int));
374 mpi[0] *= thr[0]; thr[0]=1;
375 mpi[1] *= thr[1]; thr[1]=1;
376 nhj = mpi[0];
377 nvj = mpi[1];
378 nhc = thr[0];
379 nvc = thr[1];
380 } else {
381 pi->m->pals = MPI_COMM_WORLD;
382 }
383
384 MPI_Comm_rank(pi->m->pals, (int*) & pi->m->jrank);
385
386 int size;
387 MPI_Comm_size(pi->m->pals, & size);
388 if ((unsigned int) size != pi->m->njobs) {
389 if (pi->m->jrank == 0) {
390 fprintf(stderr, "Inconsistency -- exactly %u == %u * %u"
391 " MPI jobs are needed -- got %u\n",
392 pi->m->njobs, nhj, nvj, size);
393 }
394 exit(1);
395 }
396
397 pi->m->trank = 0;
398
399 /* Init to the simple case */
400 pi->wr[0]->njobs = nvj;
401 pi->wr[0]->ncores = nvc;
402 pi->wr[1]->njobs = nhj;
403 pi->wr[1]->ncores = nhc;
404 pi->wr[0]->totalsize = pi->wr[0]->njobs * pi->wr[0]->ncores;
405 pi->wr[1]->totalsize = pi->wr[1]->njobs * pi->wr[1]->ncores;
406
407 // Here we want the _common row number_ ; not to be confused with the
408 // rank !
409 unsigned int jcommon[2];
410 jcommon[0] = pi->m->jrank / nvj;
411 jcommon[1] = pi->m->jrank % nvj;
412
413 pi->wr[0]->trank = 0;
414 pi->wr[1]->trank = 0;
415
416 for(int d = 0 ; d < 2 ; d++) {
417 // A subgroup contains all process having the same jcommon value.
418 // Therefore, we have as many horizontal MPI barriers set up as
419 // one finds MPI job numbers across a column.
420 err = MPI_Comm_split(pi->m->pals, jcommon[d],
421 pi->m->jrank, &pi->wr[d]->pals);
422 ASSERT_ALWAYS(!err);
423 MPI_Comm_rank(pi->wr[d]->pals, (int*) &pi->wr[d]->jrank);
424 }
425
426 get_node_number_and_prefix(pi);
427 display_process_grid(pi);
428
429 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
430 /* prepare the cpu binding messages, and print the unique messages we
431 * receive */
432 if (!verbose_enabled(CADO_VERBOSE_PRINT_BWC_CPUBINDING)) {
433 pi->cpubinding_info = cpubinding_get_info(NULL, pl, thr);
434 } else {
435 char * cpubinding_messages;
436 pi->cpubinding_info = cpubinding_get_info(&cpubinding_messages, pl, thr);
437 int msgsize = 0;
438 if (cpubinding_messages)
439 msgsize = strlen(cpubinding_messages);
440 MPI_Allreduce(MPI_IN_PLACE, &msgsize, 1, MPI_INT, MPI_MAX, pi->m->pals);
441 msgsize++;
442 int chunksize = PI_NAMELEN + msgsize;
443 char * big_pool = malloc(pi->m->njobs * chunksize);
444 memset(big_pool, 0, pi->m->njobs * chunksize);
445 if (cpubinding_messages) {
446 int rc;
447 rc = strlcpy(big_pool + pi->m->jrank * chunksize, pi->nodename, PI_NAMELEN);
448 ASSERT_ALWAYS(rc == (int) strlen(pi->nodename));
449 rc = strlcpy(big_pool + pi->m->jrank * chunksize + PI_NAMELEN, cpubinding_messages, msgsize);
450 ASSERT_ALWAYS(rc == (int) strlen(cpubinding_messages));
451 free(cpubinding_messages);
452 }
453
454 MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
455 big_pool, chunksize, MPI_BYTE, pi->m->pals);
456
457 if (pi->m->jrank == 0) {
458 // const char * refnode = NULL;
459 const char * ref = NULL;
460 for(unsigned int i = 0 ; i < pi->m->njobs ; i++) {
461 const char * node = big_pool + i * chunksize;
462 const char * msg = node + PI_NAMELEN;
463 if (!ref || strcmp(msg, ref) != 0) {
464 printf("cpubinding messages on %s:\n%s", node, msg);
465 ref = msg;
466 /*
467 refnode = node;
468 } else {
469 printf("cpubinding messages on %s: same as %s\n", node, refnode);
470 */
471 }
472 }
473 }
474 free(big_pool);
475 }
476 #else
477 if (param_list_lookup_string(pl, "cpubinding")) {
478 if (pi->m->jrank == 0) {
479 #ifdef HAVE_HWLOC
480 printf("cpubinding: parameter ignored (no C++11)\n");
481 #elif defined(HAVE_CXX11)
482 printf("cpubinding: parameter ignored (no hwloc)\n");
483 #else
484 printf("cpubinding: parameter ignored (no hwloc, no C++11)\n");
485 #endif
486 }
487 }
488 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
489 }
490
491 /* How do we build a rank in pi->m from two rank in pi->wr[inner] and
492 * pi->wr->outer ?
493 */
mrank_from_tworanks(parallelizing_info_ptr pi,int d,int ji,int jo)494 static int mrank_from_tworanks(parallelizing_info_ptr pi, int d, int ji, int jo)
495 {
496 int jj[2];
497 jj[d] = ji;
498 jj[!d] = jo;
499 return jj[0] + jj[1] * pi->wr[0]->njobs;
500 /* d == 0:
501 * ji == rank when reading horizontally
502 * jo == rank when reading vertically
503 */
504 }
505
506
507 static parallelizing_info *
pi_grid_init(parallelizing_info_ptr pi)508 pi_grid_init(parallelizing_info_ptr pi)
509 {
510 // unsigned int nvj = pi->wr[0]->njobs;
511 // unsigned int nhj = pi->wr[1]->njobs;
512 unsigned int nvc = pi->wr[0]->ncores;
513 unsigned int nhc = pi->wr[1]->ncores;
514
515 /* used in several places for doing snprintf */
516 char buf[20];
517
518 // OK. So far we have something reasonable except for per-thread
519 // setup. We have to first duplicate our structure so as to
520 // specialize the pi things, and then set up agreed barriers.
521
522 // the global barrier is not too much work.
523 pi_comm_init_pthread_things(pi->m, "main");
524
525 // column and row barriers are more tricky.
526 parallelizing_info * grid;
527 grid = (parallelizing_info *) malloc(nhc * nvc * sizeof(parallelizing_info));
528 // setting to zero is important, since several fields are ensured as
529 // null.
530 memset(grid, 0, nhc * nvc * sizeof(parallelizing_info));
531
532 char commname[64];
533 #ifndef MPI_LIBRARY_MT_CAPABLE
534 MPI_Comm_set_name(pi->m->pals, "main");
535 snprintf(commname, sizeof(commname), "rows%u-%u",
536 pi->wr[1]->jrank * pi->wr[1]->ncores,
537 (pi->wr[1]->jrank + 1) * pi->wr[1]->ncores - 1);
538 MPI_Comm_set_name(pi->wr[0]->pals, commname);
539 snprintf(commname, sizeof(commname), "cols%u-%u",
540 pi->wr[0]->jrank * pi->wr[0]->ncores,
541 (pi->wr[0]->jrank + 1) * pi->wr[0]->ncores - 1);
542 MPI_Comm_set_name(pi->wr[1]->pals, commname);
543 #endif
544
545 for(unsigned int k = 0 ; k < nhc * nvc ; k++) {
546 parallelizing_info_ptr e = grid[k];
547 memcpy(e, pi, sizeof(parallelizing_info));
548 unsigned int k0 = k % nvc;
549 unsigned int k1 = k / nvc;
550 e->m->trank = k;
551 e->wr[0]->trank = k0;
552 e->wr[1]->trank = k1;
553 #ifdef MPI_LIBRARY_MT_CAPABLE
554 MPI_Comm_dup(pi->m->pals, &(e->m->pals));
555 MPI_Comm_dup(pi->wr[0]->pals, &(e->wr[0]->pals));
556 MPI_Comm_dup(pi->wr[1]->pals, &(e->wr[1]->pals));
557
558 /* give a name */
559 snprintf(commname, sizeof(commname), "main.%u", k);
560 MPI_Comm_set_name(e->m->pals, commname);
561 snprintf(commname, sizeof(commname), "row%u.%u",
562 e->wr[1]->jrank * e->wr[1]->ncores + e->wr[1]->trank,
563 e->wr[0]->trank);
564 MPI_Comm_set_name(e->wr[0]->pals, commname);
565 snprintf(commname, sizeof(commname), "col%u.%u",
566 e->wr[0]->jrank * e->wr[0]->ncores + e->wr[0]->trank,
567 e->wr[1]->trank);
568 MPI_Comm_set_name(e->wr[1]->pals, commname);
569 #endif /* MPI_LIBRARY_MT_CAPABLE */
570 }
571
572 // row barriers.
573 // we've got pi->wr[1]->ncores cores working on separate rows (that's
574 // the number of cores we encounter when walking down one column). So
575 // each row leader is at index c * pi->wr[0]->ncores, because
576 // pi->wr[0]->ncores is the number of cores working on separate
577 // columns.
578 //
579 for(unsigned int c = 0 ; c < pi->wr[1]->ncores ; c++) {
580 snprintf(buf, sizeof(buf), "r%u",
581 pi->wr[1]->jrank * pi->wr[1]->ncores + c);
582 unsigned int Nc = c * pi->wr[0]->ncores;
583 pi_comm_ptr leader = grid[Nc]->wr[0];
584 pi_comm_init_pthread_things(leader, buf);
585 // replicate.
586 for(unsigned int k = 0 ; k < pi->wr[0]->ncores ; k++) {
587 grid[k + Nc]->wr[0]->th = leader->th;
588 }
589 }
590
591 // column barriers.
592 for(unsigned int c = 0 ; c < pi->wr[0]->ncores ; c++) {
593 snprintf(buf, sizeof(buf), "c%u",
594 pi->wr[0]->jrank * pi->wr[0]->ncores + c);
595 pi_comm_ptr leader = grid[c]->wr[1];
596 pi_comm_init_pthread_things(leader, buf);
597 // replicate.
598 for(unsigned int k = 0 ; k < pi->wr[1]->ncores ; k++) {
599 unsigned int Nk = k * pi->wr[0]->ncores;
600 grid[c + Nk]->wr[1]->th = leader->th;
601 }
602 }
603
604 return grid;
605 }
606
607 #if 0
608 /* TODO: rewrite this using grid_print instead */
609 static void
610 pi_grid_print_sketch(parallelizing_info_ptr pi, parallelizing_info * grid)
611 {
612 char buf[20];
613 for(unsigned int ij = 0 ; ij < pi->wr[1]->njobs ; ij++) {
614 // note that at this point, we're not yet MT.
615 int err = MPI_Barrier(pi->m->pals);
616 ASSERT_ALWAYS(!err);
617
618 if (pi->wr[1]->jrank != ij) continue;
619
620 if (pi->m->jrank == 0)
621 print_several(pi->wr[0]->njobs,pi->wr[0]->ncores, '-', '+', 12);
622
623 for(unsigned int it = 0 ; it < pi->wr[1]->ncores ; it++) {
624 if (pi->wr[0]->jrank == 0) printf("|");
625 for(unsigned int jj = 0 ; jj < pi->wr[0]->njobs ; jj++) {
626 err = MPI_Barrier(pi->wr[0]->pals);
627 ASSERT_ALWAYS(!err);
628 for(unsigned int jt = 0 ; jt < pi->wr[0]->ncores ; jt++) {
629 parallelizing_info_srcptr tpi;
630 tpi = grid[it*pi->wr[0]->ncores+jt];
631 snprintf(buf, sizeof(buf), "J%uT%u%s%s",
632 tpi->m->jrank,
633 tpi->m->trank,
634 tpi->wr[0]->th->desc,
635 tpi->wr[1]->th->desc);
636 err = MPI_Bcast(buf, sizeof(buf),
637 MPI_BYTE, jj, pi->wr[0]->pals);
638 ASSERT_ALWAYS(!err);
639 int fence = jt == pi->wr[0]->ncores - 1;
640 if (pi->wr[0]->jrank == 0)
641 printf("%-12s%c", buf, fence ? '|' : ' ');
642 }
643 }
644 if (pi->wr[0]->jrank == 0) printf("\n");
645 }
646 if (pi->wr[1]->jrank == ij && pi->wr[0]->jrank == 0)
647 print_several(pi->wr[0]->njobs,pi->wr[0]->ncores, '-', '+', 12);
648 }
649 }
650 #endif
651
652 /* given a grid of processes which has beed set up with pi_grid_init,
653 * start the processes.
654 *
655 * The ngrids argument is here to accomodate the interleaved case.
656 */
pi_go_mt_now(parallelizing_info ** grids,unsigned int ngrids,unsigned int n,void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)657 static void pi_go_mt_now(
658 parallelizing_info ** grids,
659 unsigned int ngrids,
660 unsigned int n,
661 void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
662 param_list pl,
663 void * arg)
664 {
665 my_pthread_t * tgrid;
666 struct pi_go_helper_s * helper_grid;
667
668 tgrid = (my_pthread_t *) malloc(n * ngrids * sizeof(my_pthread_t));
669 helper_grid = malloc(n * ngrids * sizeof(struct pi_go_helper_s));
670
671 for(unsigned int k = 0 ; k < n * ngrids ; k++) {
672 helper_grid[k].fcn = fcn;
673 helper_grid[k].pl = pl;
674 helper_grid[k].arg = arg;
675 helper_grid[k].p = (parallelizing_info_ptr) grids[k/n] + (k%n);
676 my_pthread_create(tgrid+k, NULL,
677 (pthread_callee_t) pi_go_helper_func, helper_grid+k);
678 }
679
680 // nothing done here. We're the main job.
681 for(unsigned int k = 0 ; k < n * ngrids ; k++) {
682 my_pthread_join(tgrid[k], NULL);
683 }
684 free(helper_grid);
685 free(tgrid);
686 }
687
pi_grid_clear(parallelizing_info_ptr pi,parallelizing_info * grid)688 static void pi_grid_clear(parallelizing_info_ptr pi, parallelizing_info * grid)
689 {
690 // destroy row barriers.
691 for(unsigned int c = 0 ; c < pi->wr[1]->ncores ; c++) {
692 unsigned int Nc = c * pi->wr[0]->ncores;
693 pi_comm_destroy_pthread_things(grid[Nc]->wr[0]);
694 }
695 // destroy column barriers
696 for(unsigned int c = 0 ; c < pi->wr[0]->ncores ; c++) {
697 pi_comm_destroy_pthread_things(grid[c]->wr[1]);
698 }
699 #ifdef MPI_LIBRARY_MT_CAPABLE
700 for(unsigned int k = 0 ; k < pi->m->ncores ; k++) {
701 MPI_Comm_free(&grid[k]->m->pals);
702 MPI_Comm_free(&grid[k]->wr[0]->pals);
703 MPI_Comm_free(&grid[k]->wr[1]->pals);
704 }
705 #endif /* MPI_LIBRARY_MT_CAPABLE */
706
707 /* Don't do pi_log_clear, just like we haven't done pi_log_init. The
708 * sub-threads may define it, in which case it's their responsibility
709 * to clear the thing.
710 */
711
712 free(grid);
713 }
714
pi_clear_mpilevel(parallelizing_info_ptr pi)715 static void pi_clear_mpilevel(parallelizing_info_ptr pi)
716 {
717 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
718 int thr[2] = { pi->wr[0]->ncores, pi->wr[1]->ncores };
719 cpubinding_free_info(pi->cpubinding_info, thr);
720 #endif /* defined(HAVE_HWLOC) && defined(HAVE_CXX11) */
721
722 pi_comm_destroy_pthread_things(pi->m);
723
724 for(int d = 0 ; d < 2 ; d++) {
725 MPI_Comm_free(&pi->wr[d]->pals);
726 }
727 if (pi->m->pals != MPI_COMM_WORLD)
728 MPI_Comm_free(&pi->m->pals);
729
730 // MPI_Errhandler_free(&my_eh);
731 }
732
733 #if 0
734 static void shout_going_mt(pi_comm_ptr m)
735 {
736 int err = MPI_Barrier(m->pals);
737 ASSERT_ALWAYS(!err);
738 if (m->jrank == m->njobs - 1)
739 printf("going multithread now\n");
740 err = MPI_Barrier(m->pals);
741 ASSERT_ALWAYS(!err);
742 }
743 #endif
744
pi_go_inner_not_interleaved(void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)745 static void pi_go_inner_not_interleaved(
746 void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
747 param_list pl,
748 void * arg)
749 {
750 parallelizing_info pi;
751 parallelizing_info * grid;
752 pi_init_mpilevel(pi, pl);
753 grid = pi_grid_init(pi);
754 // shout_going_mt(pi->m);
755 // pi_grid_print_sketch(pi, grids[0]);
756 pi_go_mt_now(&grid, 1, pi->m->ncores, fcn, pl, arg);
757 pi_grid_clear(pi, grid);
758 pi_clear_mpilevel(pi);
759 }
760
pi_go_inner_interleaved(void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)761 static void pi_go_inner_interleaved(
762 void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
763 param_list pl,
764 void * arg)
765 {
766 parallelizing_info pi[2];
767 parallelizing_info * grids[2];
768
769 pi_init_mpilevel(pi[0], pl);
770 memcpy(pi[1], pi[0], sizeof(parallelizing_info));
771
772 pi[0]->interleaved = malloc(sizeof(pi_interleaving));
773 pi[0]->interleaved->idx = 0;
774
775 pi[1]->interleaved = malloc(sizeof(pi_interleaving));
776 pi[1]->interleaved->idx = 1;
777
778 /* Now the whole point is that it's the _same_ barrier ! */
779 my_pthread_barrier_t * b = malloc(sizeof(my_pthread_barrier_t));
780 my_pthread_barrier_init(b, NULL, 2 * pi[0]->m->ncores);
781 pi[0]->interleaved->b = b;
782 pi[1]->interleaved->b = b;
783
784 pi_dictionary d;
785 pi_dictionary_init(d);
786 pi[0]->dict = d;
787 pi[1]->dict = d;
788
789 grids[0] = pi_grid_init(pi[0]);
790 grids[1] = pi_grid_init(pi[1]);
791
792 // shout_going_mt(pi[0]->m);
793 // pi_grid_print_sketch(pi, grids[0]);
794
795 pi_go_mt_now(grids, 2, pi[0]->m->ncores, fcn, pl, arg);
796
797 pi_grid_clear(pi[0], grids[0]);
798 pi_grid_clear(pi[1], grids[1]);
799
800 my_pthread_barrier_destroy(b);
801 pi_dictionary_clear(d);
802
803 free(pi[0]->interleaved);
804 free(pi[1]->interleaved);
805
806 pi_clear_mpilevel(pi[0]);
807 /* pi[1] is a simple copy, it doesn't have to be cleared. */
808 }
809
pi_dictionary_init(pi_dictionary_ptr d)810 void pi_dictionary_init(pi_dictionary_ptr d)
811 {
812 my_pthread_rwlock_init(d->m, NULL);
813 d->e = NULL;
814 }
815
pi_dictionary_clear(pi_dictionary_ptr d)816 void pi_dictionary_clear(pi_dictionary_ptr d)
817 {
818 /* last grab of the lock -- altough it would be preferred if we
819 * weren't mt of course ! */
820 my_pthread_rwlock_wrlock(d->m);
821 pi_dictionary_entry_ptr ne = d->e;
822 for( ; ne ; ) {
823 pi_dictionary_entry_ptr nne = ne->next;
824 free(ne);
825 ne = nne;
826 }
827 d->e = NULL;
828 my_pthread_rwlock_unlock(d->m);
829 my_pthread_rwlock_destroy(d->m);
830 }
831
pi_store_generic(parallelizing_info_ptr pi,unsigned long key,unsigned long who,void * value)832 void pi_store_generic(parallelizing_info_ptr pi, unsigned long key, unsigned long who, void * value)
833 {
834 pi_dictionary_ptr d = pi->dict;
835 ASSERT_ALWAYS(d != NULL);
836 pi_dictionary_entry_ptr n = malloc(sizeof(pi_dictionary_entry));
837 n->key = key;
838 n->who = who;
839 n->value = value;
840 my_pthread_rwlock_wrlock(d->m);
841 n->next = d->e;
842 d->e = n;
843 my_pthread_rwlock_unlock(d->m);
844 }
845
pi_load_generic(parallelizing_info_ptr pi,unsigned long key,unsigned long who)846 void * pi_load_generic(parallelizing_info_ptr pi, unsigned long key, unsigned long who)
847 {
848 pi_dictionary_ptr d = pi->dict;
849 ASSERT_ALWAYS(d != NULL);
850 void * r = NULL;
851 my_pthread_rwlock_rdlock(d->m);
852 pi_dictionary_entry_ptr e = d->e;
853 for( ; e ; e = e->next) {
854 if (e->key == key && e->who == who) {
855 r = e->value;
856 break;
857 }
858 }
859 my_pthread_rwlock_unlock(d->m);
860 return r;
861 }
862
parallelizing_info_decl_usage(param_list pl)863 void parallelizing_info_decl_usage(param_list pl)/*{{{*/
864 {
865 param_list_decl_usage(pl, "mpi", "number of MPI nodes across which the execution will span, with mesh dimensions");
866 param_list_decl_usage(pl, "thr", "number of threads (on each node) for the program, with mesh dimensions");
867 param_list_decl_usage(pl, "interleaving", "whether we should start two interleaved sets of threads. Only supported by some programs, has no effect on others.");
868 param_list_decl_usage(pl, "only_mpi", "replace threads by distinct MPI jobs");
869 param_list_decl_usage(pl, "debug-parallel-bwc", "enable heavy debugging messages for desperate cases");
870
871 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
872 cpubinding_decl_usage(pl);
873 #else
874 param_list_decl_usage(pl, "cpubinding", "(ignored, no sufficient sotftware support)");
875 #endif
876 }
877 /*}}}*/
878
parallelizing_info_lookup_parameters(param_list_ptr pl)879 void parallelizing_info_lookup_parameters(param_list_ptr pl)/*{{{*/
880 {
881 /* These will all be looked later (within this file, though).
882 */
883 param_list_lookup_string(pl, "mpi");
884 param_list_lookup_string(pl, "thr");
885 param_list_lookup_string(pl, "interleaving");
886 param_list_lookup_string(pl, "only_mpi");
887 param_list_lookup_string(pl, "debug-parallel-bwc");
888
889 #if defined(HAVE_HWLOC) && defined(HAVE_CXX11)
890 cpubinding_lookup_parameters(pl);
891 #else
892 param_list_lookup_string(pl, "cpubinding");
893 #endif
894 }
895 /*}}}*/
896
897
pi_go(void * (* fcn)(parallelizing_info_ptr,param_list pl,void *),param_list pl,void * arg)898 void pi_go(
899 void *(*fcn)(parallelizing_info_ptr, param_list pl, void *),
900 param_list pl,
901 void * arg)
902 {
903 int interleaving = 0;
904 param_list_parse_int(pl, "interleaving", &interleaving);
905 if (interleaving) {
906 pi_go_inner_interleaved(fcn, pl, arg);
907 } else {
908 pi_go_inner_not_interleaved(fcn, pl, arg);
909 }
910 }
911
912
pi_log_init(pi_comm_ptr wr)913 void pi_log_init(pi_comm_ptr wr)
914 {
915 struct pi_log_book * lb = malloc(sizeof(struct pi_log_book));
916 memset(lb, 0, sizeof(struct pi_log_book));
917 wr->log_book = lb;
918
919 char commname[MPI_MAX_OBJECT_NAME];
920 int namelen = sizeof(commname);
921 MPI_Comm_get_name(wr->pals, commname, &namelen);
922
923 if (wr->jrank == 0 && wr->trank == 0) {
924 printf("Enabled logging for %s:%s\n", commname, wr->th->desc);
925 }
926 }
927
pi_log_clear(pi_comm_ptr wr)928 void pi_log_clear(pi_comm_ptr wr)
929 {
930 if (wr->log_book)
931 free(wr->log_book);
932 wr->log_book = (void*) 0xdeadbeef;
933 }
934
pi_log_op(pi_comm_ptr wr,const char * fmt,...)935 void pi_log_op(pi_comm_ptr wr, const char * fmt, ...)
936 {
937 va_list ap;
938 struct pi_log_book * lb = wr->log_book;
939 if (!lb)
940 return;
941
942 va_start(ap, fmt);
943
944 ASSERT_ALWAYS(lb->next < PI_LOG_BOOK_ENTRIES);
945
946 struct pi_log_entry * e = &(lb->t[lb->next]);
947
948 gettimeofday(e->tv, NULL);
949
950 vsnprintf(e->what, sizeof(e->what), fmt, ap);
951 if (wr->ncores > 1)
952 fprintf(stderr, "%s:%d:%d %s\n", wr->th->desc, wr->jrank, wr->trank, e->what);
953 va_end(ap);
954
955 lb->next++;
956 lb->next %= PI_LOG_BOOK_ENTRIES;
957 lb->hsize++;
958 }
959
pi_log_print_backend(pi_comm_ptr wr,const char * myname,char ** strings,int * n,int alloc)960 static void pi_log_print_backend(pi_comm_ptr wr, const char * myname, char ** strings, int * n, int alloc)
961 {
962 struct pi_log_book * lb = wr->log_book;
963 if (!lb) return;
964
965 ASSERT_ALWAYS(lb->next < PI_LOG_BOOK_ENTRIES);
966
967 int h = lb->hsize;
968 if (h >= PI_LOG_BOOK_ENTRIES) {
969 h = PI_LOG_BOOK_ENTRIES;
970 }
971 int i0 = lb->next - h;
972 if (i0 < 0)
973 i0 += PI_LOG_BOOK_ENTRIES;
974 char commname[MPI_MAX_OBJECT_NAME];
975 int namelen = sizeof(commname);
976 MPI_Comm_get_name(wr->pals, commname, &namelen);
977
978 int rc;
979
980 for(int i = i0 ; h-- ; ) {
981 struct pi_log_entry * e = &(lb->t[i]);
982
983 ASSERT_ALWAYS(*n < alloc);
984 rc = asprintf(&(strings[*n]), "%" PRIu64 ".%06u (%s) %s %s",
985 (uint64_t) e->tv->tv_sec,
986 (unsigned int) e->tv->tv_usec,
987 myname,
988 e->what,
989 commname);
990 ASSERT_ALWAYS(rc != -1);
991 ++*n;
992
993 i++;
994 if (i == PI_LOG_BOOK_ENTRIES)
995 i = 0;
996 }
997 }
998
pi_log_print(pi_comm_ptr wr)999 void pi_log_print(pi_comm_ptr wr)
1000 {
1001 /* FIXME stdout or stderr ? */
1002 int alloc = PI_LOG_BOOK_ENTRIES;
1003 char ** strings = malloc(alloc * sizeof(char*));
1004 int n = 0;
1005 pi_log_print_backend(wr, "", strings, &n, alloc);
1006 for(int i = 0 ; i < n ; i++) {
1007 puts(strings[i]);
1008 free(strings[i]);
1009 }
1010 fflush(stdout);
1011 free(strings);
1012 }
1013
1014 typedef int (*sortfunc_t)(const void *, const void *);
1015
p_strcmp(char const * const * a,char const * const * b)1016 int p_strcmp(char const * const * a, char const * const * b)
1017 {
1018 /* Input is formatted so that sorting makes sense. */
1019 return strcmp(*a, *b);
1020 }
1021
pi_log_print_all(parallelizing_info_ptr pi)1022 void pi_log_print_all(parallelizing_info_ptr pi)
1023 {
1024 int alloc = 3 * PI_LOG_BOOK_ENTRIES;
1025 char ** strings = malloc(alloc * sizeof(char*));
1026 int n = 0;
1027 char * myname;
1028 int rc;
1029 rc = asprintf(&myname, "%s%s", pi->wr[0]->th->desc, pi->wr[1]->th->desc);
1030 ASSERT_ALWAYS(rc != -1);
1031
1032 pi_log_print_backend(pi->m, myname, strings, &n, alloc);
1033 pi_log_print_backend(pi->wr[0], myname, strings, &n, alloc);
1034 pi_log_print_backend(pi->wr[1], myname, strings, &n, alloc);
1035 qsort(strings, n, sizeof(char*), (sortfunc_t) &p_strcmp);
1036
1037 for(int i = 0 ; i < n ; i++) {
1038 puts(strings[i]);
1039 free(strings[i]);
1040 }
1041 free(myname);
1042 free(strings);
1043 }
1044
1045 /* {{{ predefined types and operations */
1046 static struct pi_datatype_s pi_predefined_types[] = {
1047 { MPI_INT, NULL, sizeof(int) },
1048 { MPI_DOUBLE, NULL, sizeof(double) },
1049 { MPI_BYTE, NULL, 1 },
1050 { MPI_UNSIGNED, NULL, sizeof(unsigned) },
1051 { MPI_UNSIGNED_LONG, NULL, sizeof(unsigned long) },
1052 { MPI_UNSIGNED_LONG_LONG, NULL, sizeof(unsigned long long) },
1053 { MPI_LONG, NULL, sizeof(long) },
1054 };
1055
1056 /* TODO: mpi-3.0 introduced (at last) MPI_UINT64_T and friends. There are
1057 * several places where this can come in handy. So once we set our mind
1058 * on requiring mpi-3.0, we can do some simplifications here and there.
1059 */
1060 pi_datatype_ptr BWC_PI_INT = pi_predefined_types + 0;
1061 pi_datatype_ptr BWC_PI_DOUBLE = pi_predefined_types + 1;
1062 pi_datatype_ptr BWC_PI_BYTE = pi_predefined_types + 2;
1063 pi_datatype_ptr BWC_PI_UNSIGNED = pi_predefined_types + 3;
1064 pi_datatype_ptr BWC_PI_UNSIGNED_LONG = pi_predefined_types + 4;
1065 pi_datatype_ptr BWC_PI_UNSIGNED_LONG_LONG = pi_predefined_types + 5;
1066 pi_datatype_ptr BWC_PI_LONG = pi_predefined_types + 6;
1067
1068 char statically_assert_that_size_t_is_either_ulong_or_ulonglong[((sizeof(size_t) == sizeof(unsigned long)) || (sizeof(size_t) == sizeof(unsigned long long))) ? 1 : -1];
1069 pi_datatype_ptr BWC_PI_SIZE_T = pi_predefined_types + 4 + (sizeof(size_t) == sizeof(unsigned long long));
1070
1071
1072
1073 struct pi_op_s BWC_PI_MIN[1] = { { .stock = MPI_MIN, } };
1074 struct pi_op_s BWC_PI_MAX[1] = { { .stock = MPI_MAX, } };
1075
1076 static void pi_dispatch_op_add_stock(void *, void *, int *, MPI_Datatype *);
1077 static void pi_dispatch_op_add_custom(void *, void *, size_t, pi_datatype_ptr);
1078
1079 struct pi_op_s BWC_PI_SUM[1] = { {
1080 .stock = MPI_SUM,
1081 .f_stock = pi_dispatch_op_add_stock,
1082 .f_custom = pi_dispatch_op_add_custom,
1083 } };
1084 struct pi_op_s BWC_PI_BXOR[1] = { { .stock = MPI_BXOR, } };
1085 struct pi_op_s BWC_PI_BAND[1] = { { .stock = MPI_BAND, } };
1086 struct pi_op_s BWC_PI_BOR[1] = { { .stock = MPI_BOR, } };
1087
1088 struct reduction_function {
1089 MPI_Datatype datatype;
1090 MPI_Op op;
1091 thread_reducer_t f;
1092 };
1093
1094 /* {{{ predefined reduction functions */
1095
1096 /* always: a == my value, b == the other value */
reducer_byte_min(const unsigned char * b,unsigned char * a,size_t s)1097 static void reducer_byte_min(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_byte_max(const unsigned char * b,unsigned char * a,size_t s)1098 static void reducer_byte_max(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_byte_sum(const unsigned char * b,unsigned char * a,size_t s)1099 static void reducer_byte_sum(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_byte_bxor(const unsigned char * b,unsigned char * a,size_t s)1100 static void reducer_byte_bxor(const unsigned char * b, unsigned char * a, size_t s) { for( ; s-- ; a++, b++) *a ^= *b; }
reducer_int_min(const int * b,int * a,size_t s)1101 static void reducer_int_min(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_int_max(const int * b,int * a,size_t s)1102 static void reducer_int_max(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_int_sum(const int * b,int * a,size_t s)1103 static void reducer_int_sum(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_int_band(const int * b,int * a,size_t s)1104 static void reducer_int_band(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) *a &= *b; }
reducer_int_bor(const int * b,int * a,size_t s)1105 static void reducer_int_bor(const int * b, int * a, size_t s) { for( ; s-- ; a++, b++) *a |= *b; }
reducer_uint_min(const unsigned int * b,unsigned int * a,size_t s)1106 static void reducer_uint_min(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_uint_max(const unsigned int * b,unsigned int * a,size_t s)1107 static void reducer_uint_max(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_uint_sum(const unsigned int * b,unsigned int * a,size_t s)1108 static void reducer_uint_sum(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_uint_band(const unsigned int * b,unsigned int * a,size_t s)1109 static void reducer_uint_band(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) *a &= *b; }
reducer_uint_bor(const unsigned int * b,unsigned int * a,size_t s)1110 static void reducer_uint_bor(const unsigned int * b, unsigned int * a, size_t s) { for( ; s-- ; a++, b++) *a |= *b; }
reducer_double_min(const double * b,double * a,size_t s)1111 static void reducer_double_min(const double * b, double * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_double_max(const double * b,double * a,size_t s)1112 static void reducer_double_max(const double * b, double * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_double_sum(const double * b,double * a,size_t s)1113 static void reducer_double_sum(const double * b, double * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_long_min(const long * b,long * a,size_t s)1114 static void reducer_long_min(const long * b, long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_long_max(const long * b,long * a,size_t s)1115 static void reducer_long_max(const long * b, long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_long_sum(const long * b,long * a,size_t s)1116 static void reducer_long_sum(const long * b, long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_longlong_min(const long long * b,long long * a,size_t s)1117 static void reducer_longlong_min(const long long * b, long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_longlong_max(const long long * b,long long * a,size_t s)1118 static void reducer_longlong_max(const long long * b, long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_longlong_sum(const long long * b,long long * a,size_t s)1119 static void reducer_longlong_sum(const long long * b, long long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_ulong_min(const unsigned long * b,unsigned long * a,size_t s)1120 static void reducer_ulong_min(const unsigned long * b, unsigned long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_ulong_max(const unsigned long * b,unsigned long * a,size_t s)1121 static void reducer_ulong_max(const unsigned long * b, unsigned long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_ulong_sum(const unsigned long * b,unsigned long * a,size_t s)1122 static void reducer_ulong_sum(const unsigned long * b, unsigned long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
reducer_ulonglong_min(const unsigned long long * b,unsigned long long * a,size_t s)1123 static void reducer_ulonglong_min(const unsigned long long * b, unsigned long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b < *a) *a = *b; }
reducer_ulonglong_max(const unsigned long long * b,unsigned long long * a,size_t s)1124 static void reducer_ulonglong_max(const unsigned long long * b, unsigned long long * a, size_t s) { for( ; s-- ; a++, b++) if (*b > *a) *a = *b; }
reducer_ulonglong_sum(const unsigned long long * b,unsigned long long * a,size_t s)1125 static void reducer_ulonglong_sum(const unsigned long long * b, unsigned long long * a, size_t s) { for( ; s-- ; a++, b++) *a += *b; }
1126
1127 struct reduction_function predefined_functions[] = {
1128 { MPI_BYTE, MPI_MIN, (thread_reducer_t) reducer_byte_min, },
1129 { MPI_BYTE, MPI_MAX, (thread_reducer_t) reducer_byte_max, },
1130 { MPI_BYTE, MPI_SUM, (thread_reducer_t) reducer_byte_sum, },
1131 { MPI_BYTE, MPI_BXOR, (thread_reducer_t) reducer_byte_bxor, },
1132 { MPI_INT, MPI_MIN, (thread_reducer_t) reducer_int_min, },
1133 { MPI_INT, MPI_MAX, (thread_reducer_t) reducer_int_max, },
1134 { MPI_INT, MPI_SUM, (thread_reducer_t) reducer_int_sum, },
1135 { MPI_INT, MPI_BAND, (thread_reducer_t) reducer_int_band, },
1136 { MPI_INT, MPI_BOR, (thread_reducer_t) reducer_int_bor, },
1137 { MPI_UNSIGNED, MPI_MIN, (thread_reducer_t) reducer_uint_min, },
1138 { MPI_UNSIGNED, MPI_MAX, (thread_reducer_t) reducer_uint_max, },
1139 { MPI_UNSIGNED, MPI_SUM, (thread_reducer_t) reducer_uint_sum, },
1140 { MPI_UNSIGNED, MPI_BAND, (thread_reducer_t) reducer_uint_band, },
1141 { MPI_UNSIGNED, MPI_BOR, (thread_reducer_t) reducer_uint_bor, },
1142 { MPI_DOUBLE, MPI_MIN, (thread_reducer_t) reducer_double_min, },
1143 { MPI_DOUBLE, MPI_MAX, (thread_reducer_t) reducer_double_max, },
1144 { MPI_DOUBLE, MPI_SUM, (thread_reducer_t) reducer_double_sum, },
1145 { MPI_LONG, MPI_MIN, (thread_reducer_t) reducer_long_min, },
1146 { MPI_LONG, MPI_MAX, (thread_reducer_t) reducer_long_max, },
1147 { MPI_LONG, MPI_SUM, (thread_reducer_t) reducer_long_sum, },
1148 { MPI_UNSIGNED_LONG, MPI_MIN, (thread_reducer_t) reducer_ulong_min, },
1149 { MPI_UNSIGNED_LONG, MPI_MAX, (thread_reducer_t) reducer_ulong_max, },
1150 { MPI_UNSIGNED_LONG, MPI_SUM, (thread_reducer_t) reducer_ulong_sum, },
1151 { MPI_LONG_LONG, MPI_MIN, (thread_reducer_t) reducer_longlong_min, },
1152 { MPI_LONG_LONG, MPI_MAX, (thread_reducer_t) reducer_longlong_max, },
1153 { MPI_LONG_LONG, MPI_SUM, (thread_reducer_t) reducer_longlong_sum, },
1154 { MPI_UNSIGNED_LONG_LONG, MPI_MIN, (thread_reducer_t) reducer_ulonglong_min, },
1155 { MPI_UNSIGNED_LONG_LONG, MPI_MAX, (thread_reducer_t) reducer_ulonglong_max, },
1156 { MPI_UNSIGNED_LONG_LONG, MPI_SUM, (thread_reducer_t) reducer_ulonglong_sum, },
1157 { 0, 0, NULL, },
1158 };
1159
lookup_reduction_function(MPI_Datatype datatype,MPI_Op op)1160 static thread_reducer_t lookup_reduction_function(MPI_Datatype datatype, MPI_Op op)
1161 {
1162 struct reduction_function * x = predefined_functions;
1163 for( ; x->f && ! (x->datatype == datatype && x->op == op) ; x++) ;
1164 if (!x->f) {
1165 fprintf(stderr, "bad reduction function\n");
1166 abort();
1167 }
1168 return x->f;
1169 }
1170
1171 /* }}} */
1172
1173 /* }}} */
1174
1175 /* {{{ user-defined (mpfq) types and operations */
1176
1177 static int pi_mpi_attribute_key;
1178
1179 /* we need to present the mpi implementation with proper user functions
1180 * for operating on the mpfq types. Do so only for addition, since it's
1181 * the only one we use eventually.
1182 *
1183 * Of course we could conceivably do so for more operations.
1184 */
pi_dispatch_op_add_stock(void * invec,void * inoutvec,int * len,MPI_Datatype * datatype)1185 static void pi_dispatch_op_add_stock(void *invec, void *inoutvec, int *len, MPI_Datatype *datatype)
1186 {
1187 /* get the mpfq_vbase_ptr associated to the datatype */
1188 int got_it;
1189 mpfq_vbase_ptr abase;
1190 MPI_Type_get_attr(*datatype, pi_mpi_attribute_key, (void*) &abase, &got_it);
1191 ASSERT(got_it);
1192 abase->vec_add(abase, inoutvec, inoutvec, invec, *len);
1193 }
pi_dispatch_op_add_custom(void * invec,void * inoutvec,size_t len,pi_datatype_ptr datatype)1194 static void pi_dispatch_op_add_custom(void *invec, void *inoutvec, size_t len, pi_datatype_ptr datatype)
1195 {
1196 /* FIXME: mpfq's vec_add should really take size_t arguments */
1197 ASSERT_ALWAYS(len < (size_t) UINT_MAX);
1198 datatype->abase->vec_add(datatype->abase, inoutvec, inoutvec, invec, len);
1199 }
1200
1201 /* XXX must be called in single threaded context */
pi_init_attribute_things()1202 static void pi_init_attribute_things()
1203 {
1204 MPI_Type_create_keyval(MPI_TYPE_DUP_FN, MPI_TYPE_NULL_DELETE_FN, &pi_mpi_attribute_key, NULL);
1205 MPI_Op_create(BWC_PI_SUM->f_stock, 1, &BWC_PI_SUM->custom);
1206 }
1207
1208 /* XXX must be called in single threaded context */
pi_clear_attribute_things()1209 static void pi_clear_attribute_things()
1210 {
1211 MPI_Op_free(&BWC_PI_SUM->custom);
1212 MPI_Type_free_keyval(&pi_mpi_attribute_key);
1213 }
1214
pi_alloc_mpfq_datatype(parallelizing_info_ptr pi,mpfq_vbase_ptr abase)1215 pi_datatype_ptr pi_alloc_mpfq_datatype(parallelizing_info_ptr pi, mpfq_vbase_ptr abase)
1216 {
1217 pi_datatype_ptr ptr = shared_malloc_set_zero(pi->m, sizeof(struct pi_datatype_s));
1218 if (pi->m->trank == 0) {
1219 ptr->abase = abase;
1220 ptr->item_size = abase->vec_elt_stride(abase, 1);
1221 MPI_Type_contiguous(ptr->item_size, MPI_BYTE, &ptr->datatype);
1222 MPI_Type_commit(&ptr->datatype);
1223 MPI_Type_set_attr(ptr->datatype, pi_mpi_attribute_key, abase);
1224 }
1225 serialize_threads(pi->m);
1226 return ptr;
1227 }
1228
pi_free_mpfq_datatype(parallelizing_info_ptr pi,pi_datatype_ptr ptr)1229 void pi_free_mpfq_datatype(parallelizing_info_ptr pi, pi_datatype_ptr ptr)
1230 {
1231 serialize_threads(pi->m);
1232 if (pi->m->trank == 0) {
1233 MPI_Type_delete_attr(ptr->datatype, pi_mpi_attribute_key);
1234 MPI_Type_free(&ptr->datatype);
1235 }
1236 shared_free(pi->m, ptr);
1237 }
1238
1239 /* This may *not* use MPI_Reduce_local, because we might not have an MPI
1240 * implementation in the first place, just a set of placeholers. So it is
1241 * important that we find the operation to perform by ourselves
1242 */
pi_reduce_local(void * inbuf,void * inoutbuf,size_t count,pi_datatype_ptr datatype,pi_op_ptr op)1243 void pi_reduce_local(void *inbuf, void *inoutbuf, size_t count,
1244 pi_datatype_ptr datatype, pi_op_ptr op)
1245 {
1246 if (datatype->abase) {
1247 if (op->f_custom) {
1248 op->f_custom(inbuf, inoutbuf, count, datatype);
1249 } else {
1250 fprintf(stderr, "undefined operation on mpfq type\n");
1251 abort();
1252 }
1253 } else {
1254 thread_reducer_t f = lookup_reduction_function(datatype->datatype, op->stock);
1255 (*f)(inbuf, inoutbuf, count);
1256 }
1257 }
1258
1259 /* }}} */
1260
1261 /* {{{ collective operations with pi_comm_ptrs
1262 *
1263 * we provide a calling interface which is similar to mpi.
1264 *
1265 * we inherit the same crappy notion of datatypes, and use the same
1266 * MPI_Datatype data for it. When not building with MPI, our "fake MPI"
1267 * headers provides magic constants which are sufficient to our needs.
1268 *
1269 * We define the following functions:
1270 *
1271 * {pi,pi_thread}_{bcast,allreduce,data_eq}
1272 *
1273 * where data_eq is a collective operation returning true or false
1274 * depending on whether the pointed data area is consistent across all
1275 * callers.
1276 *
1277 * Being thread-level collective functions, these routines assume that
1278 * the provided pointers are different on the calling threads (otherwise
1279 * there's no point).
1280 *
1281 * Notice also that as long as the underlying mpi implementation cannot
1282 * be considered thread-safe in the MPI_THREAD_MULTIPLE meaning, it is
1283 * not possible to call any of the pi collective functions simultaneously
1284 * from two distinct parallel communicators. That is, either calls are with
1285 * pi->m, or with pi->wr[d] provided that pi->wr[!d]->trank == 0. The
1286 * SEVERAL_THREADS_PLAY_MPI_BEGIN construct may be useful, as follows:
1287 * SEVERAL_THREADS_PLAY_MPI_BEGIN(pi->wr[!d]) {
1288 * pi-collective operation on pi->wr[d]
1289 * }
1290 * SEVERAL_THREADS_PLAY_MPI_END();
1291 */
1292
1293 struct pi_collective_arg {
1294 pi_comm_ptr wr;
1295 /* this pointer is written from all threads. Its contents are
1296 * meaningful only for the root thread */
1297 void * ptr;
1298 unsigned int root;
1299 pi_datatype_ptr datatype;
1300 size_t count;
1301 /* last two only for reduction */
1302 pi_op_ptr op;
1303 void * dptr;
1304 };
1305
1306 /* {{{ broadcast
1307 *
1308 * thread-level strategy:
1309 * upon entering the barrier, the "root" threads fills in
1310 * a->wr->th->utility_ptr its own pointer. Upon leaving the barrier, all
1311 * threads copy data from there to their own buffer.
1312 *
1313 * mpi-level strategy:
1314 * broadcast on all other nodes from the root first, then on all threads
1315 * separately on each node.
1316 */
pi_thread_bcast_in(int s MAYBE_UNUSED,struct pi_collective_arg * a)1317 static void pi_thread_bcast_in(int s MAYBE_UNUSED, struct pi_collective_arg * a)
1318 {
1319 /* here we have a mutex locked */
1320 if (a->wr->trank == a->root) {
1321 a->wr->th->utility_ptr = a->ptr;
1322 }
1323 }
1324
pi_thread_bcast_out(int s MAYBE_UNUSED,struct pi_collective_arg * a)1325 static void pi_thread_bcast_out(int s MAYBE_UNUSED, struct pi_collective_arg * a)
1326 {
1327 /* here we know that exactly one thread has just filled the
1328 * a->wr->th->utility_ptr, so that a memcpy is possible, as long as
1329 * the owner of the data area has not touched it again */
1330 if (a->ptr != a->wr->th->utility_ptr) { /* placate valgrind */
1331 memcpy(a->ptr, a->wr->th->utility_ptr, a->count * a->datatype->item_size);
1332 }
1333 }
1334
1335 /* broadcast the data area pointed to by [ptr, ptr+size[ to all threads.
1336 * Pointers at calling threads must differ. */
pi_thread_bcast(void * ptr,size_t count,pi_datatype_ptr datatype,unsigned int root,pi_comm_ptr wr)1337 void pi_thread_bcast(void * ptr, size_t count, pi_datatype_ptr datatype, unsigned int root, pi_comm_ptr wr)
1338 {
1339 struct pi_collective_arg a[1];
1340 a->wr = wr;
1341 a->ptr = ptr;
1342 a->root = root;
1343 a->datatype = datatype;
1344 a->count = count;
1345 barrier_wait(wr->th->bh,
1346 (void(*)(int,void*)) &pi_thread_bcast_in,
1347 (void(*)(int,void*)) &pi_thread_bcast_out,
1348 a);
1349 serialize_threads(wr);
1350 }
1351
pi_bcast(void * ptr,size_t count,pi_datatype_ptr datatype,unsigned int jroot,unsigned int troot,pi_comm_ptr wr)1352 void pi_bcast(void * ptr, size_t count, pi_datatype_ptr datatype, unsigned int jroot, unsigned int troot, pi_comm_ptr wr)
1353 {
1354 int err;
1355
1356 ASSERT(jroot < wr->njobs);
1357 ASSERT(troot < wr->ncores);
1358 if (wr->trank == troot) {
1359 err = MPI_Bcast(ptr, count, datatype->datatype, jroot, wr->pals);
1360 ASSERT_ALWAYS(!err);
1361 }
1362 pi_thread_bcast(ptr, count, datatype, troot, wr);
1363 }
1364
pi_abort(int err,pi_comm_ptr wr)1365 void pi_abort(int err, pi_comm_ptr wr)
1366 {
1367 /* This only exists because an MPI_Abort that is initiated by all
1368 * threads is likely to clutter the output quite a lot */
1369 if (wr->trank == 0)
1370 MPI_Abort(wr->pals, err);
1371 }
1372 /* }}} */
1373
1374 /* {{{ allreduce
1375 *
1376 * thread-level strategy (dumb): use the provided sendbuf's, one after another,
1377 * as "last worked on" areas. The last being reduced in is then used to
1378 * communicate the data to all other threads.
1379 *
1380 * mpi-level strategy: thread-level allreduce, then in-place mpi-level allreduce
1381 * from the leader threads, the thread-level broadcast of the result.
1382 */
1383 /* this gets called with s = (count-1), (count-2), ..., 0 */
pi_thread_allreduce_in(int s,struct pi_collective_arg * a)1384 static void pi_thread_allreduce_in(int s, struct pi_collective_arg * a)
1385 {
1386 /* utility_ptr points to the "last worked on" area.
1387 * first call: set the "last worked on" area to our pointer.
1388 * further calls: reduce our data with the "lwo" data, and set the
1389 * "lwo" pointer to our modified data in our pointer.
1390 *
1391 * last call: reduction done, copy to dptr.
1392 */
1393 if (s < (int) a->wr->ncores - 1) {
1394 pi_reduce_local(a->wr->th->utility_ptr, a->ptr, a->count, a->datatype, a->op);
1395 }
1396 a->wr->th->utility_ptr = a->ptr;
1397 }
1398
pi_thread_allreduce_out(int s MAYBE_UNUSED,struct pi_collective_arg * a)1399 static void pi_thread_allreduce_out(int s MAYBE_UNUSED, struct pi_collective_arg * a)
1400 {
1401 /* almost the same as pi_thread_bcast_out, except that we work on
1402 * dptr...
1403 */
1404 if (a->dptr != a->wr->th->utility_ptr) { /* placate valgrind */
1405 memcpy(a->dptr, a->wr->th->utility_ptr, a->count * a->datatype->item_size);
1406 }
1407 }
1408
1409 /* ptr == dptr is supported */
pi_thread_allreduce(void * ptr,void * dptr,size_t count,pi_datatype_ptr datatype,pi_op_ptr op,pi_comm_ptr wr)1410 void pi_thread_allreduce(void *ptr, void *dptr, size_t count,
1411 pi_datatype_ptr datatype, pi_op_ptr op, pi_comm_ptr wr)
1412 {
1413 /* this code is buggy in the not-in-place case. let's fall back
1414 * to the in-place situation unconditionally.
1415 *
1416 * nature of the bug: if we don't do the fallback below, then clearly
1417 * a->ptr is being written to. We should clean this up, and use const
1418 * when possible.
1419 */
1420 if (ptr && ptr != dptr) {
1421 memcpy(dptr, ptr, count * datatype->item_size);
1422 ptr = NULL;
1423 }
1424 struct pi_collective_arg a[1];
1425 a->wr = wr;
1426 a->ptr = ptr ? ptr : dptr; /* handle in-place */
1427 a->root = 0; /* meaningless for allreduce */
1428 a->datatype = datatype;
1429 a->count = count;
1430 a->op = op;
1431 a->dptr = dptr;
1432 barrier_wait(wr->th->bh,
1433 (void(*)(int,void*)) &pi_thread_allreduce_in,
1434 (void(*)(int,void*)) &pi_thread_allreduce_out,
1435 a);
1436 serialize_threads(wr);
1437 }
1438
1439
1440 /* This intentionally has the same prototype as MPI_Allreduce */
1441 /* Only a few operations and datatypes are supported.
1442 *
1443 * NULL at sendbuf means in-place.
1444 */
pi_allreduce(void * sendbuf,void * recvbuf,size_t count,pi_datatype_ptr datatype,pi_op_ptr op,pi_comm_ptr wr)1445 void pi_allreduce(void *sendbuf, void *recvbuf, size_t count,
1446 pi_datatype_ptr datatype, pi_op_ptr op, pi_comm_ptr wr)
1447 {
1448 ASSERT_ALWAYS(count <= (size_t) INT_MAX);
1449 pi_thread_allreduce(sendbuf, recvbuf, count, datatype, op, wr);
1450 if (wr->trank == 0) {
1451 if (datatype->abase) {
1452 /* Then it's a type for which we supposedly have written an
1453 * overloaded operation. */
1454 MPI_Allreduce(MPI_IN_PLACE, recvbuf, count, datatype->datatype, op->custom, wr->pals);
1455 } else {
1456 MPI_Allreduce(MPI_IN_PLACE, recvbuf, count, datatype->datatype, op->stock, wr->pals);
1457 }
1458 }
1459 /* now it's just a matter of broadcasting to all threads */
1460 pi_thread_bcast(recvbuf, count, datatype, 0, wr);
1461 }
1462
pi_allgather(void * sendbuf,size_t sendcount,pi_datatype_ptr sendtype,void * recvbuf,size_t recvcount,pi_datatype_ptr recvtype,pi_comm_ptr wr)1463 extern void pi_allgather(void * sendbuf,
1464 size_t sendcount, pi_datatype_ptr sendtype,
1465 void *recvbuf,
1466 size_t recvcount, pi_datatype_ptr recvtype,
1467 pi_comm_ptr wr)
1468 {
1469 ASSERT_ALWAYS(sendbuf == NULL);
1470 ASSERT_ALWAYS(sendtype == NULL);
1471 ASSERT_ALWAYS(sendcount == 0);
1472 size_t per_thread = recvcount * recvtype->item_size;
1473 /* share the adress of the leader's buffer with everyone */
1474 void * recvbuf_leader = recvbuf;
1475 pi_thread_bcast(&recvbuf_leader, sizeof(void*), BWC_PI_BYTE, 0, wr);
1476 /* Now everyone writes its data there. */
1477 memcpy(recvbuf_leader + (wr->jrank * wr->ncores + wr->trank) * per_thread, recvbuf + (wr->jrank * wr->ncores + wr->trank) * per_thread, per_thread);
1478 serialize_threads(wr);
1479 if (wr->trank == 0) {
1480 MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
1481 recvbuf, wr->ncores * per_thread, MPI_BYTE,
1482 wr->pals);
1483 }
1484 serialize_threads(wr);
1485 /* and copy back */
1486 memcpy(recvbuf + (wr->jrank * wr->ncores + wr->trank) * per_thread, recvbuf_leader + (wr->jrank * wr->ncores + wr->trank) * per_thread, per_thread);
1487 }
1488
1489 /* }}} */
1490
1491 /* {{{ data_eq */
1492
pi_thread_data_eq(void * buffer,size_t count,pi_datatype_ptr datatype,pi_comm_ptr wr)1493 int pi_thread_data_eq(void *buffer, size_t count, pi_datatype_ptr datatype, pi_comm_ptr wr)
1494 {
1495 void ** bufs = shared_malloc(wr, wr->ncores * sizeof(void*));
1496 int * oks = shared_malloc(wr, wr->ncores * sizeof(int));
1497
1498 bufs[wr->trank] = buffer;
1499 oks[wr->trank] = 1;
1500
1501 /* given 5 threads, here is for example what is done at each step.
1502 *
1503 * stride=1
1504 * ok0 = (x0==x1)
1505 * ok2 = (x2==x3)
1506 * stride=2
1507 * ok0 = x0==x1==x2==x3
1508 * stride 4
1509 * ok0 = x0==x1==x2==x3==x4
1510 */
1511 for(unsigned int stride = 1 ; stride < wr->ncores ; stride <<= 1) {
1512 unsigned int i = wr->trank;
1513 serialize_threads(wr); /* because of this, we don't
1514 leave the loop with break */
1515 if (i + stride >= wr->ncores) continue;
1516 /* many threads are doing nothing here */
1517 if (i & stride) continue;
1518 oks[i] = oks[i] && oks[i + stride] && memcmp(bufs[i], bufs[i + stride], datatype->item_size * count) == 0;
1519 }
1520 serialize_threads(wr);
1521 int ok = oks[0];
1522 shared_free(wr, oks);
1523 shared_free(wr, bufs);
1524 return ok;
1525 }
1526
pi_data_eq(void * buffer,size_t count,pi_datatype_ptr datatype,pi_comm_ptr wr)1527 int pi_data_eq(void *buffer, size_t count, pi_datatype_ptr datatype, pi_comm_ptr wr)
1528 {
1529 int ok = pi_thread_data_eq(buffer, count, datatype, wr);
1530
1531 ASSERT_ALWAYS(count <= (size_t) INT_MAX);
1532 if (wr->trank == 0) {
1533 MPI_Allreduce(MPI_IN_PLACE, &ok, 1, MPI_INT, MPI_BAND, wr->pals);
1534 }
1535 pi_thread_bcast(&ok, 1, BWC_PI_INT, 0, wr);
1536 return ok;
1537 }
1538
1539 /* }}} */
1540
1541 /* }}} */
1542
1543 /*{{{ collective malloc and free (thread-level of course) */
shared_malloc(pi_comm_ptr wr,size_t size)1544 void * shared_malloc(pi_comm_ptr wr, size_t size)
1545 {
1546 void * ptr = NULL;
1547 if (wr->trank == 0) ptr = malloc(size);
1548 pi_thread_bcast(&ptr, sizeof(void*), BWC_PI_BYTE, 0, wr);
1549 return ptr;
1550 }
1551
shared_malloc_set_zero(pi_comm_ptr wr,size_t size)1552 void * shared_malloc_set_zero(pi_comm_ptr wr, size_t size)
1553 {
1554 void * ptr = NULL;
1555 if (wr->trank == 0) {
1556 ptr = malloc(size);
1557 memset(ptr, 0, size);
1558 }
1559 pi_thread_bcast(&ptr, sizeof(void*), BWC_PI_BYTE, 0, wr);
1560 return ptr;
1561 }
1562
shared_free(pi_comm_ptr wr,void * ptr)1563 void shared_free(pi_comm_ptr wr, void * ptr)
1564 {
1565 serialize_threads(wr);
1566 if (wr->trank == 0) free(ptr);
1567 }
1568 /*}}}*/
1569
serialize__(pi_comm_ptr w,const char * s MAYBE_UNUSED,unsigned int l MAYBE_UNUSED)1570 int serialize__(pi_comm_ptr w, const char * s MAYBE_UNUSED, unsigned int l MAYBE_UNUSED)
1571 {
1572 int err;
1573 #ifdef CONCURRENCY_DEBUG
1574 // note how w->th_count is normally a thread-private thing !
1575 w->th_count++;
1576 printf("(%s:%u) barrier #%d, %u/%u on %s [%p]\n", s, l, w->th_count,
1577 w->trank, w->ncores,
1578 w->th->desc, w->th->b);
1579 #endif
1580 my_pthread_barrier_wait(w->th->b);
1581 if (w->trank == 0) {
1582 err = MPI_Barrier(w->pals);
1583 ASSERT_ALWAYS(!err);
1584 }
1585 // struct timeval tv[1];
1586 // gettimeofday(tv, NULL);
1587 // printf("%.2f\n", tv->tv_sec + (double) tv->tv_usec / 1.0e6);
1588 // sleep(1);
1589 return w->jrank == 0 && w->trank == 0;
1590 }
1591
serialize_threads__(pi_comm_ptr w,const char * s MAYBE_UNUSED,unsigned int l MAYBE_UNUSED)1592 int serialize_threads__(pi_comm_ptr w, const char * s MAYBE_UNUSED, unsigned int l MAYBE_UNUSED)
1593 {
1594 #ifdef CONCURRENCY_DEBUG
1595 // note how w->th_count is normally a thread-private thing !
1596 w->th_count++;
1597 printf("(%s:%u) tbarrier #%d, %u/%u on %s [%p]\n", s, l, w->th_count,
1598 w->trank, w->ncores,
1599 w->th->desc, w->th->b);
1600 #endif
1601 my_pthread_barrier_wait(w->th->b);
1602 // struct timeval tv[1];
1603 // gettimeofday(tv, NULL);
1604 // printf("%.2f\n", tv->tv_sec + (double) tv->tv_usec / 1.0e6);
1605 // sleep(1);
1606 return w->trank == 0;
1607 }
1608
1609 // XXX Many serializing calls in the two routines below have no
1610 // functional use, apart from the fact that they exert some pressure on
1611 // the mpi+pthreads way of doing things.
1612
say_hello(pi_comm_ptr w,parallelizing_info_ptr pi MAYBE_UNUSED)1613 static void say_hello(pi_comm_ptr w, parallelizing_info_ptr pi MAYBE_UNUSED)
1614 {
1615 serialize(w);
1616 for(unsigned int j = 0 ; j < w->njobs ; j++) {
1617 serialize(w);
1618 if (w->jrank != j)
1619 continue;
1620 my_pthread_mutex_lock(w->th->m);
1621 #ifdef CONCURRENCY_DEBUG
1622 /* Make it less verbose -- if it ever hangs in there, then
1623 * we can re-enable it */
1624 printf("(%s) J%uT%u ; %s%s ; (%s:j%ut%u) (%s:j%ut%u)\n",
1625 w->th->desc,
1626 pi->m->jrank,
1627 pi->m->trank,
1628 pi->wr[0]->th->desc,
1629 pi->wr[1]->th->desc,
1630 pi->wr[0]->th->desc, pi->wr[0]->jrank, pi->wr[0]->trank,
1631 pi->wr[1]->th->desc, pi->wr[1]->jrank, pi->wr[1]->trank
1632 );
1633 #endif
1634 my_pthread_mutex_unlock(w->th->m);
1635 }
1636 }
1637
pi_hello(parallelizing_info_ptr pi)1638 void pi_hello(parallelizing_info_ptr pi)
1639 {
1640 if (serialize(pi->m)) {
1641 #ifdef CONCURRENCY_DEBUG
1642 printf("Doing hello world loop\n");
1643 #endif
1644 }
1645 say_hello(pi->m, pi);
1646
1647 // before changing the grain of the barriers, we have to serialize at
1648 // the thread level (i.e. wait for the mpi serialization calls to
1649 // complete). Otherwise, inter-job communication occuring within
1650 // say_hello might clash with the inter-job calls done within
1651 // serialize().
1652 // Rule is: never let mpi-level synchronisations on a communicator
1653 // wander outside an execution period for which all threads sharing
1654 // this communicator are doing the same.
1655 for(unsigned int i = 0 ; i < pi->wr[0]->totalsize ; i++) {
1656 serialize(pi->m);
1657 serialize_threads(pi->m);
1658 if (i == pi->wr[0]->jrank * pi->wr[0]->ncores + pi->wr[0]->trank) {
1659 say_hello(pi->wr[1], pi);
1660 }
1661 }
1662
1663 for(unsigned int i = 0 ; i < pi->wr[1]->totalsize ; i++) {
1664 serialize(pi->m);
1665 serialize_threads(pi->m);
1666 if (i == pi->wr[1]->jrank * pi->wr[1]->ncores + pi->wr[1]->trank) {
1667 say_hello(pi->wr[0], pi);
1668 }
1669 }
1670
1671 if (serialize(pi->m)) {
1672 #ifdef CONCURRENCY_DEBUG
1673 printf("OK: Finished hello world loop\n");
1674 #endif
1675 }
1676 serialize(pi->m);
1677 }
1678 /*{{{ new i/o layer */
1679 /* Open a file handle on the leader node. The two pi_comm_ptr's
1680 * represent the inner and outer levels; they're mostly unused for this
1681 * function call, but requested for consistency with the pi_file_read and
1682 * pi_file_write calls.
1683 *
1684 * Return a globally consistent boolean indicating whether the operation
1685 * was successful (1) or not (0). In the latter case, set errno
1686 * (globally) to the value returned by the fopen function at the leader
1687 * node.
1688 *
1689 * In multithreaded context, f must represent a different data area on
1690 * all threads.
1691 */
pi_file_open(pi_file_handle_ptr f,parallelizing_info_ptr pi,int inner,const char * name,const char * mode)1692 int pi_file_open(pi_file_handle_ptr f, parallelizing_info_ptr pi, int inner, const char * name, const char * mode)
1693 {
1694 memset(f, 0, sizeof(pi_file_handle));
1695 f->pi = pi;
1696 f->inner = inner;
1697 f->outer = !inner;
1698 f->f = NULL;
1699 f->name = strdup(name);
1700 f->mode = strdup(mode);
1701 int failed = 0;
1702 if (f->pi->m->jrank == 0 && f->pi->m->trank == 0) {
1703 errno = 0;
1704 /* POSIX fopen is required to set errno
1705 * http://pubs.opengroup.org/onlinepubs/9699919799/functions/fopen.html
1706 */
1707 f->f = fopen(name, mode);
1708 if (f->f == NULL)
1709 failed = errno ? errno : EIO;;
1710 }
1711 pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1712 if (failed) errno = failed;
1713 return !failed;
1714 }
1715
1716 /* close the file handle. Return a globally consistent boolean indicating
1717 * whether the operation was successful (1) or not (0). In the latter
1718 * case, set errno (globally) to the value returned by the fclose
1719 * function at the leader node. */
pi_file_close(pi_file_handle_ptr f)1720 int pi_file_close(pi_file_handle_ptr f)
1721 {
1722 free(f->name);
1723 free(f->mode);
1724 int failed = 0;
1725 if (f->pi->m->jrank == 0 && f->pi->m->trank == 0) {
1726 errno = 0;
1727 if (fclose(f->f) != 0)
1728 failed = errno ? errno : EIO;;
1729 }
1730 pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1731 memset(f, 0, sizeof(pi_file_handle));
1732 if (failed) errno = failed;
1733 return !failed;
1734 }
1735
area_is_zero(const void * src,ptrdiff_t offset0,ptrdiff_t offset1)1736 int area_is_zero(const void * src, ptrdiff_t offset0, ptrdiff_t offset1)
1737 {
1738 const char * b0 = pointer_arith_const(src, offset0);
1739 const char * b1 = pointer_arith_const(src, offset1);
1740 for( ; b0 < b1 ; b0++) {
1741 if (*b0) return 0;
1742 }
1743 return 1;
1744 }
1745
pi_file_write_leader(pi_file_handle_ptr f,void * buf,size_t size)1746 ssize_t pi_file_write_leader(pi_file_handle_ptr f, void * buf, size_t size)
1747 {
1748 unsigned long ret;
1749 if (f->pi->m->jrank == 0 && f->pi->m->trank == 0)
1750 ret = fwrite(buf, 1, size, f->f);
1751 pi_bcast(&ret, 1, BWC_PI_UNSIGNED_LONG, 0, 0, f->pi->m);
1752 return (ssize_t) ret;
1753 }
1754
pi_file_read_leader(pi_file_handle_ptr f,void * buf,size_t size)1755 ssize_t pi_file_read_leader(pi_file_handle_ptr f, void * buf, size_t size)
1756 {
1757 unsigned long ret;
1758 if (f->pi->m->jrank == 0 && f->pi->m->trank == 0)
1759 ret = fread(buf, 1, size, f->f);
1760 pi_bcast(&ret, 1, BWC_PI_UNSIGNED_LONG, 0, 0, f->pi->m);
1761 return (ssize_t) ret;
1762 }
1763
1764
1765 /* size is the chunk size on each core */
pi_file_write(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize)1766 ssize_t pi_file_write(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize)
1767 {
1768 return pi_file_write_chunk(f, buf, size, totalsize, 1, 0, 1);
1769 }
1770
1771 /* Here, we write a fragment of a "virtual" file (of size totalsize) that
1772 * is made of several independent files. Think of entries in that virtual
1773 * file as being [chunksize] bytes long, and then we write specifically
1774 * bytes spos to epos of each of these chunks.
1775 *
1776 * Return a globally consistent count of written items. A full read is when
1777 * exactly (totalsize / chunksize) * (epos - spos) items are written. When
1778 * a short read occurs, set errno (globally) to the value returned by the
1779 * fwrite function at the leader node.
1780 */
pi_file_write_chunk(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize,size_t chunksize,size_t spos,size_t epos)1781 ssize_t pi_file_write_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos)
1782 {
1783 // coverity[result_independent_of_operands]
1784 ASSERT_ALWAYS(size <= ULONG_MAX);
1785 ASSERT_ALWAYS(spos <= chunksize);
1786 ASSERT_ALWAYS(spos <= epos);
1787 ASSERT_ALWAYS(epos <= chunksize);
1788 ASSERT_ALWAYS(totalsize % chunksize == 0);
1789 ASSERT_ALWAYS(size % chunksize == 0);
1790 size_t loc_totalsize = (totalsize / chunksize) * (epos - spos);
1791 size_t loc_size = (size / chunksize) * (epos - spos);
1792 size_t chunk_per_rank = size / chunksize;
1793
1794 /* Some sanity checking. This is not technically required: of
1795 * course we would be able to cope with uneven data sets.
1796 * However, having even sizes greatly simplifies the pictures, as
1797 * it alleviates the need to collect the sizes for all threads */
1798 unsigned long size_ul = size;
1799 ASSERT_ALWAYS(pi_data_eq(&size_ul, 1, BWC_PI_UNSIGNED_LONG, f->pi->m));
1800
1801 unsigned int nci = f->pi->wr[f->inner]->ncores;
1802 unsigned int nco = f->pi->wr[f->outer]->ncores;
1803 unsigned int nji = f->pi->wr[f->inner]->njobs;
1804 unsigned int njo = f->pi->wr[f->outer]->njobs;
1805 unsigned int ci = f->pi->wr[f->inner]->trank;
1806 unsigned int co = f->pi->wr[f->outer]->trank;
1807 unsigned int ji = f->pi->wr[f->inner]->jrank;
1808 unsigned int jo = f->pi->wr[f->outer]->jrank;
1809 unsigned int jm = f->pi->m->jrank;
1810
1811 /* the inner threads will do writes and MPI sends in one go. For
1812 * outer threads, it is not possible to concatenate similarly,
1813 * because we have writes relative to the mpi-level interleaved.
1814 */
1815 void * sbuf = shared_malloc_set_zero(f->pi->m, loc_size * nci);
1816
1817 long res = 0;
1818 int failed = 0;
1819
1820 for(unsigned int xjo = 0 ; xjo < njo ; xjo++) {
1821 for(unsigned int xco = 0 ; xco < nco ; xco++) {
1822 for(unsigned int xji = 0 ; xji < nji ; xji++) {
1823 unsigned int xj = mrank_from_tworanks(f->pi, f->inner, xji, xjo);
1824 if (jm == xj) {
1825 ASSERT_ALWAYS (ji == xji && jo == xjo);
1826 /* fill the buffer with the contents relevant to the
1827 * inner communicator (nci threads at work here) */
1828 serialize_threads(f->pi->m);
1829 if (xco == co) {
1830 void * wptr = sbuf + ci * loc_size;
1831 const void * rptr = buf + spos;
1832 if (epos - spos == chunksize) {
1833 memcpy(wptr, rptr, loc_size);
1834 } else {
1835 for(size_t i = 0 ; i < chunk_per_rank ; ++i) {
1836 memcpy(wptr, rptr, epos - spos);
1837 wptr += epos - spos;
1838 rptr += chunksize;
1839 }
1840 }
1841 }
1842 serialize_threads(f->pi->m);
1843 }
1844 if (f->pi->m->trank == 0) {
1845 /* only one thread per node does something. */
1846 if (!jm) {
1847 /* leader node to receive data */
1848 if (xj) { /* except its own... */
1849 MPI_Recv(sbuf, loc_size * nci, MPI_BYTE,
1850 xj, xj, f->pi->m->pals,
1851 MPI_STATUS_IGNORE);
1852 }
1853 if (!failed) {
1854 size_t wanna_write = MIN(nci * loc_size, loc_totalsize - res);
1855 if (wanna_write < nci * loc_size)
1856 ASSERT_ALWAYS(area_is_zero(sbuf, wanna_write, nci * loc_size));
1857 /* POSIX fwrite is required to set errno
1858 * http://pubs.opengroup.org/onlinepubs/9699919799/functions/fwrite.html
1859 */
1860 errno = 0;
1861 ssize_t x = fwrite(sbuf, 1, wanna_write, f->f);
1862 res += x;
1863 if (x < (ssize_t) wanna_write)
1864 failed = errno ? errno : EIO;
1865 }
1866 } else if (jm == xj) {
1867 /* my turn to send data */
1868 MPI_Send(sbuf, loc_size * nci, MPI_BYTE,
1869 0, xj, f->pi->m->pals);
1870 }
1871 }
1872 }
1873 }
1874 }
1875 shared_free(f->pi->m, sbuf);
1876 pi_bcast(&res, 1, BWC_PI_LONG, 0, 0, f->pi->m);
1877 pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1878 if (failed) errno = failed;
1879 return res;
1880 }
1881
pi_file_read(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize)1882 ssize_t pi_file_read(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize)
1883 {
1884 return pi_file_read_chunk(f, buf, size, totalsize, 1, 0, 1);
1885 }
1886
1887 /*
1888 * Return a globally consistent count of read items. A full read is when
1889 * exactly (totalsize / chunksize) * (epos - spos) items are read. When
1890 * a short read occurs, set errno (globally) to the value returned by the
1891 * fread function at the leader node.
1892 */
pi_file_read_chunk(pi_file_handle_ptr f,void * buf,size_t size,size_t totalsize,size_t chunksize,size_t spos,size_t epos)1893 ssize_t pi_file_read_chunk(pi_file_handle_ptr f, void * buf, size_t size, size_t totalsize, size_t chunksize, size_t spos, size_t epos)
1894 {
1895 // coverity[result_independent_of_operands]
1896 ASSERT_ALWAYS(size <= ULONG_MAX);
1897 ASSERT_ALWAYS(spos <= chunksize);
1898 ASSERT_ALWAYS(spos <= epos);
1899 ASSERT_ALWAYS(epos <= chunksize);
1900 ASSERT_ALWAYS(totalsize % chunksize == 0);
1901 ASSERT_ALWAYS(size % chunksize == 0);
1902 size_t loc_totalsize = (totalsize / chunksize) * (epos - spos);
1903 size_t loc_size = (size / chunksize) * (epos - spos);
1904 size_t chunk_per_rank = size / chunksize;
1905
1906 /* Some sanity checking. This is not technically required: of
1907 * course we would be able to cope with uneven data sets.
1908 * However, having even sizes greatly simplifies the pictures, as
1909 * it alleviates the need to collect the sizes for all threads */
1910 unsigned long size_ul = size;
1911 ASSERT_ALWAYS(pi_data_eq(&size_ul, 1, BWC_PI_UNSIGNED_LONG, f->pi->m));
1912
1913 unsigned int nci = f->pi->wr[f->inner]->ncores;
1914 unsigned int nco = f->pi->wr[f->outer]->ncores;
1915 unsigned int nji = f->pi->wr[f->inner]->njobs;
1916 unsigned int njo = f->pi->wr[f->outer]->njobs;
1917 unsigned int ci = f->pi->wr[f->inner]->trank;
1918 unsigned int co = f->pi->wr[f->outer]->trank;
1919 unsigned int ji = f->pi->wr[f->inner]->jrank;
1920 unsigned int jo = f->pi->wr[f->outer]->jrank;
1921 unsigned int jm = f->pi->m->jrank;
1922
1923 void * sbuf = shared_malloc_set_zero(f->pi->m, loc_size * nci);
1924
1925 long res = 0;
1926 int failed = 0;
1927
1928 for(unsigned int xjo = 0 ; xjo < njo ; xjo++) {
1929 for(unsigned int xco = 0 ; xco < nco ; xco++) {
1930 for(unsigned int xji = 0 ; xji < nji ; xji++) {
1931 unsigned int xj = mrank_from_tworanks(f->pi, f->inner, xji, xjo);
1932 /* for job (ji,jo), nci*nco threads enter here. Unless
1933 * (ji == xji && jo == xjo), this loop becomes trivial.
1934 */
1935 if (f->pi->m->trank == 0) {
1936 /* only one thread per node does something. */
1937 if (!jm) {
1938 /* leader node to read then send data */
1939 memset(sbuf, 0, nci * loc_size);
1940 if (!failed) {
1941 size_t wanna_read = MIN(nci * loc_size, loc_totalsize - res);
1942 /* POSIX fread is required to set errno
1943 * http://pubs.opengroup.org/onlinepubs/9699919799/functions/fread.html
1944 */
1945 errno = 0;
1946 ssize_t x = fread(sbuf, 1, wanna_read, f->f);
1947 res += x;
1948 if (x < (ssize_t) wanna_read)
1949 failed = errno ? errno : EIO;
1950 }
1951 if (xj) { /* except to itself... */
1952 MPI_Send(sbuf, loc_size * nci, MPI_BYTE,
1953 xj, xj, f->pi->m->pals);
1954 }
1955 } else if (jm == xj) {
1956 /* my turn to receive data */
1957 MPI_Recv(sbuf, loc_size * nci, MPI_BYTE,
1958 0, xj, f->pi->m->pals,
1959 MPI_STATUS_IGNORE);
1960 }
1961 }
1962 if (jm == xj) {
1963 ASSERT_ALWAYS (ji == xji && jo == xjo);
1964 serialize_threads(f->pi->m);
1965 /* fill the buffer with the contents relevant to the
1966 * inner communicator (nci threads at work here) */
1967 if (xco == co) {
1968 void * wptr = buf + spos;
1969 const void * rptr = sbuf + ci * loc_size;
1970 if (epos - spos == chunksize) {
1971 memcpy(wptr, rptr, loc_size);
1972 } else {
1973 for(size_t i = 0 ; i < chunk_per_rank ; ++i) {
1974 memcpy(wptr, rptr, epos - spos);
1975 wptr += chunksize;
1976 rptr += epos - spos;
1977 }
1978 }
1979 }
1980 serialize_threads(f->pi->m);
1981 }
1982 }
1983 }
1984 }
1985 shared_free(f->pi->m, sbuf);
1986 pi_bcast(&res, 1, BWC_PI_LONG, 0, 0, f->pi->m);
1987 pi_bcast(&failed, 1, BWC_PI_INT, 0, 0, f->pi->m);
1988 if (failed) errno = failed;
1989 return res;
1990 }
1991 /*}}}*/
1992
pi_interleaving_flip(parallelizing_info_ptr pi)1993 void pi_interleaving_flip(parallelizing_info_ptr pi)
1994 {
1995 if (!pi->interleaved)
1996 return;
1997 my_pthread_barrier_wait(pi->interleaved->b);
1998 }
1999
pi_interleaving_enter(parallelizing_info_ptr pi)2000 void pi_interleaving_enter(parallelizing_info_ptr pi)
2001 {
2002 if (!pi->interleaved || pi->interleaved->idx == 0)
2003 return;
2004
2005 my_pthread_barrier_wait(pi->interleaved->b);
2006 }
2007
pi_interleaving_leave(parallelizing_info_ptr pi)2008 void pi_interleaving_leave(parallelizing_info_ptr pi)
2009 {
2010 if (!pi->interleaved || pi->interleaved->idx == 1)
2011 return;
2012
2013 my_pthread_barrier_wait(pi->interleaved->b);
2014 }
2015
2016 #if 0
2017 /* XXX the name is not very informative, notably about the important
2018 * point that pointer must be identical.
2019 */
2020 /* Considering a data area with a pointer identical on all calling
2021 * threads, tell whether the contents agree on all MPI jobs+threads
2022 * (returns 1 if this is the case)
2023 *
2024 * More exactly, this function only cares about the pointer on thread 0.
2025 */
2026 int mpi_data_eq(parallelizing_info_ptr pi, void *buffer, size_t sz)
2027 {
2028 /* TODO: think about allocating less */
2029 int cmp = 0;
2030 if (pi->m->trank == 0) {
2031 void * b_and = malloc(sz);
2032 void * b_or = malloc(sz);
2033 MPI_Allreduce(buffer, b_and, sz, MPI_BYTE, MPI_BAND, pi->m->pals);
2034 MPI_Allreduce(buffer, b_or, sz, MPI_BYTE, MPI_BOR, pi->m->pals);
2035 cmp = memcmp(b_and, b_or, sz);
2036 free(b_and);
2037 free(b_or);
2038 }
2039 thread_bcast(pi->m, &cmp, sizeof(int), 0);
2040 return cmp == 0;
2041 }
2042 #endif
2043
2044 /* XXX must be called in single threaded context */
parallelizing_info_init()2045 void parallelizing_info_init()
2046 {
2047 pi_init_attribute_things();
2048 }
2049 /* XXX must be called in single threaded context */
parallelizing_info_finish()2050 void parallelizing_info_finish()
2051 {
2052 pi_clear_attribute_things();
2053 }
2054
2055
2056
2057