1 #include "cado.h" // IWYU pragma: keep
2 #include <stdint.h>
3 #include <inttypes.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <unistd.h>               // for access, unlink, ssize_t, R_OK
9 #include <sys/stat.h>             // for stat, mkdir
10 #include <pthread.h>              // for pthread_mutex_lock, pthread_mutex_u...
11 #include <gmp.h>
12 #include "async.h"                // for timing_next_timer, timing_data (ptr...
13 #include "balancing_workhorse.h"
14 #include "cheating_vec_init.h"
15 #include "intersections.h"
16 #include "bwc_config.h" // IWYU pragma: keep
17 #include "macros.h"     // ASSERT_ALWAYS // IWYU pragma: keep
18 #include "matmul.h"
19 #include "matmul_top.h"
20 #include "mf_bal.h"
21 #include "misc.h"
22 #include "params.h"
23 #include "portability.h" // asprintf // IWYU pragma: keep
24 #include "random_matrix.h"
25 #include "raw_matrix_u32.h"       // for matrix_u32
26 #include "select_mpi.h"
27 #include "timing.h"     // wct_seconds
28 #include "verbose.h"    // CADO_VERBOSE_PRINT_BWC_CACHE_BUILD
29 
30 /* Our innermost communication routines are essentially all-gather and
31  * reduce-scatter, following the MPI terminology. We provide several
32  * choices for doing this. The compile-time definitions here allow to
33  * change which is used. The "best" should in theory always be the one
34  * provided by the MPI implementation. Unfortunately, it is not always
35  * that clear.
36  *
37  * Note that because we have several threads wanting to do all these
38  * operations one after the other, we also have some interest in using
39  * non-blockng collectives, or even also some sort of
40  * communication-computation overlap.
41  */
42 
43 /* choices for reduce-scatter */
44 #define RS_CHOICE_STOCK_RS           1
45 #define RS_CHOICE_STOCK_RSBLOCK      (MPI_VERSION_ATLEAST(2,2) ? 2 : -2)
46 #define RS_CHOICE_STOCK_IRS          (MPI_VERSION_ATLEAST(3,0) ? 3 : -3)
47 #define RS_CHOICE_STOCK_IRSBLOCK     (MPI_VERSION_ATLEAST(3,0) ? 4 : -4)
48 #define RS_CHOICE_MINE               5
49 #define RS_CHOICE_MINE_DROP_IN       (MPI_VERSION_ATLEAST(3,0) ? 6 : -6)
50 #define RS_CHOICE_MINE_PARALLEL      7
51 #define RS_CHOICE_MINE_OVERLAPPING   8  /* TODO */
52 /* choices for all-gather */
53 #define AG_CHOICE_STOCK_AG           1
54 #define AG_CHOICE_STOCK_IAG          (MPI_VERSION_ATLEAST(3,0) ? 2 : -2)
55 
56 /* The actual performance is affected by the communicator size, the chunk
57  * size, and the operation. When we're doing bxor, we have the following
58  * measurements for RS.
59  *
60  * n=2, chunk=174 MB, openmpi-1.8.3, IB FDR.
61  * RS_CHOICE_MINE_PARALLEL .25
62  * RS_CHOICE_MINE .45
63  * RS_CHOICE_MINE_DROP_IN  .48
64  * RS_CHOICE_STOCK_RS      .77
65  * RS_CHOICE_STOCK_RSBLOCK 1.63
66  * RS_CHOICE_STOCK_IRS        => not measured
67  * RS_CHOICE_STOCK_IRSBLOCK        => not measured
68  * RS_CHOICE_MINE_OVERLAPPING      => to be implemented
69  */
70 
71 /* _this_ part can be configured */
72 // #define RS_CHOICE RS_CHOICE_STOCK_IRSBLOCK
73 // #define AG_CHOICE AG_CHOICE_STOCK_IAG
74 #define RS_CHOICE RS_CHOICE_MINE_PARALLEL
75 #define AG_CHOICE (MPI_VERSION_ATLEAST(3,0) ? AG_CHOICE_STOCK_IAG : AG_CHOICE_STOCK_AG)
76 
77 /* some sanity checking */
78 #if RS_CHOICE < 0
79 #error "Choice for reduce-scatter strategy is invalid or not supported"
80 #endif
81 #if AG_CHOICE < 0
82 #error "Choice for reduce-scatter strategy is invalid or not supported"
83 #endif
84 
85 /* we no longer use this.
86 #ifdef  HAVE_MPI3_API
87 #define MPI_LIBRARY_NONBLOCKING_COLLECTIVES
88 #endif
89 */
90 
91 ///////////////////////////////////////////////////////////////////
92 /* Start with stuff that does not depend on abase at all -- this
93  * provides a half-baked interface */
94 
95 /* At some point we had this. Not sure it's still useful. */
96 #define ABASE_UNIVERSAL_READAHEAD_ITEMS 8
97 
98 static permutation_data_ptr permutation_data_alloc();
99 static void permutation_data_free(permutation_data_ptr a);
100 static void permutation_data_push(permutation_data_ptr a, unsigned int u[2]);
101 #ifdef  MVAPICH2_NUMVERSION
102 static void permutation_data_ensure(permutation_data_ptr a, size_t n);
103 #endif
104 
105 
matmul_top_decl_usage(param_list_ptr pl)106 void matmul_top_decl_usage(param_list_ptr pl)
107 {
108     param_list_decl_usage(pl, "matrix",
109             "the matrix file (binary)");
110     param_list_decl_usage(pl, "balancing",
111             "the matrix balancing file, as computed by mf_bal");
112     param_list_decl_usage(pl, "random_matrix",
113             "characteristics of a random matrix to be used for staged runs.");
114     param_list_decl_usage(pl, "static_random_matrix",
115             "(unset or set to something arbitrary): indicate that the matrix is fake, and that there is no need to bother with the generation of vectors");
116 
117     param_list_decl_usage(pl, "rebuild_cache",
118             "force rebuilding matrix caches");
119     param_list_decl_usage(pl, "export_cachelist",
120             "print the per-node needed cache files and exit");
121     param_list_decl_usage(pl, "save_submatrices",
122             "after dispatching, save a copy of the local uncompressed matrix before creating the cache file");
123     param_list_decl_usage(pl, "sequential_cache_build",
124             "build the cache files sequentially on each node");
125     param_list_decl_usage(pl, "sequential_cache_read",
126             "read the cache files sequentially on each node");
127     param_list_decl_usage(pl, "balancing_options",
128             "options to pass to the balancing subprogram (see mf_bal_adjust_from_option_string)");
129     balancing_decl_usage(pl);
130     matmul_decl_usage(pl);
131 }
132 
matmul_top_lookup_parameters(param_list_ptr pl)133 void matmul_top_lookup_parameters(param_list_ptr pl)
134 {
135     param_list_lookup_string(pl, "matrix");
136     param_list_lookup_string(pl, "balancing");
137     param_list_lookup_string(pl, "random_matrix");
138     param_list_lookup_string(pl, "static_random_matrix");
139     param_list_lookup_string(pl, "rebuild_cache");
140     param_list_lookup_string(pl, "export_cachelist");
141     param_list_lookup_string(pl, "save_submatrices");
142     param_list_lookup_string(pl, "sequential_cache_build");
143     param_list_lookup_string(pl, "sequential_cache_read");
144     param_list_lookup_string(pl, "balancing_options");
145     balancing_lookup_parameters(pl);
146     matmul_lookup_parameters(pl);
147 }
148 
is_char2(mpfq_vbase_ptr abase)149 static int is_char2(mpfq_vbase_ptr abase)
150 {
151     mpz_t p;
152     mpz_init(p);
153     abase->field_characteristic(abase, p);
154     int char2 = mpz_cmp_ui(p, 2) == 0;
155     mpz_clear(p);
156     return char2;
157 }
158 
159 
160 /* Some info about distributed vectors.
161  *
162  * See linalg/bwc/GUIDED_TOUR_OF_SOURCES for documentation about what's
163  * in the mmt_vec type.
164  *
165  * Here are some pre- and post- conditions about consistency for the
166  * common routines:
167  *
168  * matmul_top_mul_cpu:
169  *      input: fully consistent
170  *      output: inconsistent
171  * reduce (all variants)
172  *      input: inconsistent
173  *      output: partially consistent
174  * allreduce
175  *      input: inconsistent
176  *      output: fully consistent
177  * broadcast
178  *      input: partially consistent
179  *      output: fully consistent
180  * matmul_top_mul_comm:
181  *      input: inconsistent
182  *      output: fully consistent
183  *
184  * The following are non-critical. So for easiness, we require and
185  * provide full consistency.
186  *
187  * apply_identity:
188  *      input: fully consistent
189  *      output: fully consistent
190  *
191  */
192 
193 
194 /* {{{ vector init/clear */
195 
196 /* this is for a vector which will be of interest to a group of threads
197  * and jobs in direction d */
mmt_vec_init(matmul_top_data_ptr mmt,mpfq_vbase_ptr abase,pi_datatype_ptr pitype,mmt_vec_ptr v,int d,int flags,unsigned int n)198 void mmt_vec_init(matmul_top_data_ptr mmt, mpfq_vbase_ptr abase, pi_datatype_ptr pitype, mmt_vec_ptr v, int d, int flags, unsigned int n)
199 {
200     ASSERT_ALWAYS(v != NULL);
201     if (abase == NULL) abase = mmt->abase;
202     if (pitype == NULL) pitype = mmt->pitype;
203     memset(v, 0, sizeof(mmt_vec));
204     v->pi = mmt->pi;
205     v->d = d;
206     v->abase = abase;
207     v->pitype = pitype;
208     v->n = n;
209 
210     ASSERT_ALWAYS(n % mmt->pi->m->totalsize == 0);
211 
212     pi_comm_ptr wr = mmt->pi->wr[d];
213     pi_comm_ptr xwr = mmt->pi->wr[!d];
214 
215     /* now what is the size which we are going to allocate locally */
216     n /= xwr->totalsize;
217     v->i0 = n * (xwr->jrank * xwr->ncores + xwr->trank);
218     v->i1 = v->i0 + n;
219 
220     /* Look for readahead settings for all submatrices */
221     n += ABASE_UNIVERSAL_READAHEAD_ITEMS;
222     for(int i = 0 ; i < mmt->nmatrices ; i++) {
223         matmul_aux(mmt->matrices[i]->mm, MATMUL_AUX_GET_READAHEAD, &n);
224     }
225 
226     if (flags & THREAD_SHARED_VECTOR) {
227         if (wr->trank == 0) {
228             cheating_vec_init(abase, &v->v, n);
229             abase->vec_set_zero(abase, v->v, n);
230         }
231         pi_thread_bcast(&v->v, sizeof(void*), BWC_PI_BYTE, 0, wr);
232         v->siblings = NULL;
233     } else {
234         cheating_vec_init(abase, &v->v, n);
235         abase->vec_set_zero(abase, v->v, n);
236         v->siblings = shared_malloc(wr, wr->ncores * sizeof(void *));
237         v->siblings[wr->trank] = v;
238     }
239     /* Vectors begin initialized to zero, so we have full consistency */
240     v->consistency = 2;
241     serialize_threads(v->pi->m);
242 
243     // pi_log_op(v->pi->m, "Hello, world");
244     /* fill wrpals and mpals */
245     v->wrpals[0] = shared_malloc(v->pi->wr[0], v->pi->wr[0]->ncores * sizeof(void *));
246     v->wrpals[0][v->pi->wr[0]->trank] = v;
247     serialize_threads(v->pi->m);
248     v->wrpals[1] = shared_malloc(v->pi->wr[1], v->pi->wr[1]->ncores * sizeof(void *));
249     v->wrpals[1][v->pi->wr[1]->trank] = v;
250     serialize_threads(v->pi->m);
251     v->mpals = shared_malloc(v->pi->m, v->pi->m->ncores * sizeof(void *));
252     v->mpals[v->pi->m->trank] = v;
253     serialize_threads(v->pi->m);
254 
255 }
256 
mmt_vec_clear(matmul_top_data_ptr mmt,mmt_vec_ptr v)257 void mmt_vec_clear(matmul_top_data_ptr mmt, mmt_vec_ptr v)
258 {
259     ASSERT_ALWAYS(v != NULL);
260     pi_comm_ptr wr = mmt->pi->wr[v->d];
261     serialize_threads(wr);
262     if (v->rsbuf[0]) free(v->rsbuf[0]);
263     if (v->rsbuf[1]) free(v->rsbuf[1]);
264     unsigned int n = v->i1 - v->i0;
265     /* see above */
266     n += ABASE_UNIVERSAL_READAHEAD_ITEMS;
267     for(int i = 0 ; i < mmt->nmatrices ; i++) {
268         matmul_aux(mmt->matrices[i]->mm, MATMUL_AUX_GET_READAHEAD, &n);
269     }
270     if (v->siblings) {
271         cheating_vec_clear(v->abase, &v->v, n);
272         shared_free(wr, v->siblings);
273     } else {
274         if (wr->trank == 0)
275             cheating_vec_clear(v->abase, &v->v, n);
276     }
277     shared_free(v->pi->wr[0], v->wrpals[0]);
278     shared_free(v->pi->wr[1], v->wrpals[1]);
279     shared_free(v->pi->m, v->mpals);
280     memset(v, 0, sizeof(mmt_vec));
281 }
282 /* }}} */
283 
284 /* my "own" offset is the added offset within my locally stored data area
285  * which represents the data range I am the owner of. This data range
286  * correspond to the index range v->i0 + offset to v->i0 + offset + size
287  */
mmt_my_own_offset_in_items(mmt_vec_ptr v)288 size_t mmt_my_own_offset_in_items(mmt_vec_ptr v)
289 {
290     pi_comm_ptr wr = v->pi->wr[v->d];
291     size_t eblock = (v->i1 - v->i0) /  wr->totalsize;
292     int pos = wr->jrank * wr->ncores + wr->trank;
293     return pos * eblock;
294 }
295 
mmt_my_own_offset_in_bytes(mmt_vec_ptr v)296 size_t mmt_my_own_offset_in_bytes(mmt_vec_ptr v)
297 {
298     return v->abase->vec_elt_stride(v->abase, mmt_my_own_offset_in_items(v));
299 }
300 
mmt_my_own_subvec(mmt_vec_ptr v)301 void * mmt_my_own_subvec(mmt_vec_ptr v)
302 {
303     return v->abase->vec_subvec(v->abase, v->v, mmt_my_own_offset_in_items(v));
304 }
305 
mmt_my_own_size_in_items(mmt_vec_ptr v)306 size_t mmt_my_own_size_in_items(mmt_vec_ptr v)
307 {
308     pi_comm_ptr wr = v->pi->wr[v->d];
309     size_t eblock = (v->i1 - v->i0) /  wr->totalsize;
310     return eblock;
311 }
312 
mmt_my_own_size_in_bytes(mmt_vec_ptr v)313 size_t mmt_my_own_size_in_bytes(mmt_vec_ptr v)
314 {
315     return v->abase->vec_elt_stride(v->abase, mmt_my_own_size_in_items(v));
316 }
317 
318 /* This copies **ONLY** the data we are supposed to own from v to w.
319  * mmt_own_vec_set2 is slightly special, since in this case we might in
320  * this way be picking data from vector areas owned by other blocks (or
321  * writing there). Therefore we convey the info about which vector piece
322  * we care about with the first argument z.
323  *
324  */
mmt_own_vec_set2(mmt_vec_ptr z,mmt_vec_ptr w,mmt_vec_ptr v)325 void mmt_own_vec_set2(mmt_vec_ptr z, mmt_vec_ptr w, mmt_vec_ptr v)
326 {
327     ASSERT_ALWAYS(z != NULL);
328     ASSERT_ALWAYS(v != NULL);
329     ASSERT_ALWAYS(w != NULL);
330     if (v == w) return;
331     ASSERT_ALWAYS(z->d == v->d);
332     ASSERT_ALWAYS(z->d == w->d);
333     size_t off = mmt_my_own_offset_in_items(z);
334     size_t sz = mmt_my_own_size_in_items(z);
335     ASSERT_ALWAYS(sz == mmt_my_own_size_in_items(v));
336     ASSERT_ALWAYS(sz == mmt_my_own_size_in_items(w));
337     v->abase->vec_set(v->abase,
338             w->abase->vec_subvec(w->abase, w->v, off),
339             v->abase->vec_subvec(v->abase, v->v, off),
340             sz);
341 }
mmt_own_vec_set(mmt_vec_ptr w,mmt_vec_ptr v)342 void mmt_own_vec_set(mmt_vec_ptr w, mmt_vec_ptr v)
343 {
344     ASSERT_ALWAYS(v->abase == w->abase);
345     mmt_own_vec_set2(v, w, v);
346     w->consistency = 1;
347 }
mmt_vec_swap(mmt_vec_ptr w,mmt_vec_ptr v)348 void mmt_vec_swap(mmt_vec_ptr w, mmt_vec_ptr v)
349 {
350     mmt_vec foo;
351     memcpy(foo,v,sizeof(mmt_vec));
352     memcpy(v,w,sizeof(mmt_vec));
353     memcpy(w,foo,sizeof(mmt_vec));
354 }
355 
mmt_full_vec_set(mmt_vec_ptr w,mmt_vec_ptr v)356 void mmt_full_vec_set(mmt_vec_ptr w, mmt_vec_ptr v)
357 {
358     ASSERT_ALWAYS(v != NULL);
359     ASSERT_ALWAYS(w != NULL);
360     /* DO **NOT** early-quit when v==w, because we might be calling this
361      * with v and w being siblings, maybe equal for one of the threads.
362      */
363     // same remark as above
364     // ASSERT_ALWAYS(v->abase == w->abase);
365     ASSERT_ALWAYS(v->d == w->d);
366     ASSERT_ALWAYS(mmt_my_own_size_in_items(v) == mmt_my_own_size_in_items(w));
367     if (w->siblings) {
368         if (w->v != v->v) {
369             v->abase->vec_set(v->abase, w->v, v->v, v->i1 - v->i0);
370         }
371     } else {
372         ASSERT_ALWAYS(v->siblings == NULL);
373         if (w->v != v->v) {
374             if (w->pi->wr[w->d]->trank == 0) {
375                 v->abase->vec_set(v->abase, w->v, v->v, v->i1 - v->i0);
376             }
377         }
378         serialize_threads(w->pi->wr[w->d]);
379     }
380     if (v != w)
381         w->consistency = v->consistency;
382 }
383 
mmt_full_vec_set_zero(mmt_vec_ptr v)384 void mmt_full_vec_set_zero(mmt_vec_ptr v)
385 {
386     ASSERT_ALWAYS(v != NULL);
387     if (v->siblings) {
388         v->abase->vec_set_zero(v->abase, v->v, v->i1 - v->i0);
389     } else {
390         serialize_threads(v->pi->wr[v->d]);
391         if (v->pi->wr[v->d]->trank == 0)
392             v->abase->vec_set_zero(v->abase, v->v, v->i1 - v->i0);
393     }
394     v->consistency = 2;
395     serialize_threads(v->pi->wr[v->d]);
396 }
397 
mmt_vec_set_basis_vector_at(mmt_vec_ptr v,int k,unsigned int j)398 void mmt_vec_set_basis_vector_at(mmt_vec_ptr v, int k, unsigned int j)
399 {
400     mmt_full_vec_set_zero(v);
401     mmt_vec_add_basis_vector_at(v,k,j);
402 }
403 
mmt_vec_add_basis_vector_at(mmt_vec_ptr v,int k,unsigned int j)404 void mmt_vec_add_basis_vector_at(mmt_vec_ptr v, int k, unsigned int j)
405 {
406     if (v->i0 <= j && j < v->i1) {
407         if (v->siblings) {
408             v->abase->simd_set_ui_at(v->abase, v->abase->vec_coeff_ptr(v->abase, v->v, j - v->i0), k, 1);
409         } else {
410             serialize_threads(v->pi->wr[v->d]);
411             if (v->pi->wr[v->d]->trank == 0)
412                 v->abase->simd_set_ui_at(v->abase, v->abase->vec_coeff_ptr(v->abase, v->v, j - v->i0), k, 1);
413             serialize_threads(v->pi->wr[v->d]);
414         }
415     }
416 }
417 
mmt_vec_add_basis_vector(mmt_vec_ptr v,unsigned int j)418 void mmt_vec_add_basis_vector(mmt_vec_ptr v, unsigned int j)
419 {
420     mmt_vec_add_basis_vector_at(v, 0, j);
421 }
422 
mmt_vec_set_basis_vector(mmt_vec_ptr v,unsigned int j)423 void mmt_vec_set_basis_vector(mmt_vec_ptr v, unsigned int j)
424 {
425     mmt_vec_set_basis_vector_at(v, 0, j);
426 }
427 
mmt_vec_downgrade_consistency(mmt_vec_ptr v)428 void mmt_vec_downgrade_consistency(mmt_vec_ptr v)
429 {
430     ASSERT_ALWAYS(v->consistency == 2);
431     size_t erase[2][2];
432     size_t off = mmt_my_own_offset_in_items(v);
433     size_t sz = mmt_my_own_size_in_items(v);
434         serialize_threads(v->pi->wr[v->d]);
435     if (v->siblings) {
436         erase[0][0] = 0;
437         erase[0][1] = off;
438         erase[1][0] = off + sz;
439         erase[1][1] = v->i1 - v->i0;
440     } else {
441         /* There are no siblings, which means that this vector is shared
442          * across all threads in this direction. Let only one thread do
443          * the job.
444          */
445         if (v->pi->wr[v->d]->trank == 0) {
446             erase[0][0] = 0;
447             /* because we are rank 0, this is the minimal offset for this set
448              * of threads */
449             erase[0][1] = off;
450             erase[1][0] = off + sz * v->pi->wr[v->d]->ncores;
451             erase[1][1] = v->i1 - v->i0;
452         } else {
453             erase[0][0] = 0;
454             erase[0][1] = 0;
455             erase[1][0] = 0;
456             erase[1][1] = 0;
457         }
458     }
459     for(int i = 0 ; i < 2 ; i++) {
460         if (erase[i][1] != erase[i][0]) {
461             v->abase->vec_set_zero(v->abase,
462                     v->abase->vec_subvec(v->abase, v->v, erase[i][0]),
463                     erase[i][1] - erase[i][0]);
464         }
465     }
466     v->consistency = 1;
467     serialize_threads(v->pi->wr[v->d]);
468 }
469 
470 
471 #if 0
472 // looks utterly bogus, so...
473 /* On a shared vector which is assumed to be commonly known by all
474  * nodes/threads, select only one portion to remain, and zero out the
475  * rest (for later use by e.g.  matmul_top_mul_comm or other integrated
476  * function).
477  */
478 static void mmt_own_vec_clear_complement(matmul_top_data_ptr mmt, int d)
479 {
480     mmt_comm_ptr mdst = mmt->wr[d];
481     pi_comm_ptr pidst = mmt->pi->wr[d];
482     if (mdst->v->flags & THREAD_SHARED_VECTOR)
483         serialize_threads(pidst);
484     if (pidst->trank == 0 || !(mdst->v->flags & THREAD_SHARED_VECTOR)) {
485         if (pidst->jrank == 0 && pidst->trank == 0) {
486             /* ok, we keep the data */
487         } else {
488             mdst->v->abase->vec_set_zero(mdst->v->abase, mdst->v->v, mdst->i1 - mdst->i0);
489         }
490     }
491 }
492 #endif
mmt_vec_clear_padding(mmt_vec_ptr v,size_t unpadded,size_t padded)493 void mmt_vec_clear_padding(mmt_vec_ptr v, size_t unpadded, size_t padded)
494 {
495     /* This can be applied no matter what the consistency argument says
496      * */
497     serialize(v->pi->m);
498     if (unpadded >= padded) return;
499 
500     size_t s0 = unpadded >= v->i0 ? (unpadded - v->i0) : 0;
501     size_t s1 = padded >= v->i0 ? (padded - v->i0) : 0;
502     s0 = MIN(s0, v->i1 - v->i0);
503     s1 = MIN(s1, v->i1 - v->i0);
504 
505     if (s1 - s0)
506         v->abase->vec_set_zero(v->abase,
507                 v->abase->vec_subvec(v->abase, v->v, s0), s1-s0);
508 
509     serialize(v->pi->m);
510 }
511 
mmt_vec_sibling(mmt_vec_ptr v,unsigned int i)512 mmt_vec_ptr mmt_vec_sibling(mmt_vec_ptr v, unsigned int i)
513 {
514     if (v->siblings) {
515         return v->siblings[i];
516     } else {
517         return v;
518     }
519 }
520 
521 /* {{{ mmt_vec_broadcast (generic interface) */
522 /* mmt_vec_broadcast reads data in mmt->wr[d]->v, and broadcasts it across the
523  * communicator mmt->pi->wr[d] ; eventually everybody on the communicator
524  * mmt->pi->wr[d] has the data.
525  *
526  * Note that the combination of mmt_vec_reduce + mmt_vec_broadcast is not the
527  * identity (because of the shuffled_product).
528  */
529 
530 /* XXX
531  * ``across'' (horizontally) and ``down'' (vertically) are just here for
532  * exposition. The binding of this operation to job/thread arrangement
533  * is flexible, through argument d.
534  * XXX
535  */
536 void
mmt_vec_broadcast(mmt_vec_ptr v)537 mmt_vec_broadcast(mmt_vec_ptr v)
538 {
539     ASSERT_ALWAYS(v != NULL);
540     ASSERT_ALWAYS(v->consistency == 1);
541 
542     // broadcasting down columns is for common agreement on a right
543     // source vector, once a left output has been computed.
544     //
545     // broadcasting down a column is when d == 1.
546     int err;
547 
548     /* communicator wr is in the direction we are broadcasting */
549     pi_comm_ptr wr = v->pi->wr[v->d];
550 
551     /* communicator xwr is in the other direction */
552     pi_comm_ptr xwr = v->pi->wr[!v->d];
553     mmt_vec_ptr * xwrpals = v->wrpals[!v->d];
554 
555     pi_log_op(v->pi->m, "[%s:%d] enter first loop", __func__, __LINE__);
556     /* Make sure that no thread on the column is wandering in other
557      * places -- when we're leaving reduce, this is important. */
558     serialize_threads(wr);
559 
560     /* This loop suffers from two-dimensional serializing, so the
561      * simplistic macros SEVERAL_THREADS_PLAY_MPI_BEGIN and END do not
562      * help here.
563      */
564 
565     size_t eblock = mmt_my_own_size_in_items(v);
566 
567 #ifndef MPI_LIBRARY_MT_CAPABLE
568     if (v->siblings) {
569         /* not shared: begin by collecting everything on thread 0 */
570         mmt_own_vec_set2(v, v->siblings[0], v);
571     }
572     serialize_threads(v->pi->m);
573     if (wr->trank == 0 && xwr->trank == 0) {
574 #if AG_CHOICE == AG_CHOICE_STOCK_IAG
575         MPI_Request * req = malloc(xwr->ncores * sizeof(MPI_Request));
576 #endif  /* AG_CHOICE == AG_CHOICE_STOCK_IAG */
577 
578         for(unsigned int t = 0 ; t < xwr->ncores ; t++) {
579             // although the openmpi man page looks funny, I'm assuming that
580             // MPI_Allgather wants MPI_IN_PLACE as a sendbuf argument.
581             pi_log_op(wr, "[%s:%d] MPI_Allgather (round %u)", __func__, __LINE__, t);
582 #if AG_CHOICE == AG_CHOICE_STOCK_IAG
583             err = MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
584                     xwrpals[t]->v, v->abase->vec_elt_stride(v->abase, 1) * eblock * wr->ncores, MPI_BYTE, wr->pals, &req[t]);
585 #elif AG_CHOICE == AG_CHOICE_STOCK_AG
586             err = MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, xwrpals[t]->v, v->abase->vec_elt_stride(v->abase, 1) * eblock * wr->ncores, MPI_BYTE, wr->pals);
587 #else   /* AG_CHOICE */
588 #error "Bad AG_CHOICE setting"
589 #endif  /* AG_CHOICE */
590             pi_log_op(wr, "[%s:%d] MPI_Allgather (round %u) done", __func__, __LINE__, t);
591             ASSERT_ALWAYS(!err);
592         }
593 
594 #if AG_CHOICE == AG_CHOICE_STOCK_IAG
595         for(unsigned int t = 0 ; t < xwr->ncores ; t++) {
596             MPI_Wait(&req[t], MPI_STATUS_IGNORE);
597         }
598         free(req);
599 #endif  /* AG_CHOICE == AG_CHOICE_STOCK_IAG */
600     }
601     v->consistency = 2;
602     serialize_threads(v->pi->m);
603     if (v->siblings) {
604         mmt_full_vec_set(v, v->siblings[0]);
605         serialize_threads(wr);
606     }
607 #else   /* MPI_LIBRARY_MT_CAPABLE */
608     /* Code deleted 20110119, as I've never been able to have enough
609      * trust in an MPI implementation to check this */
610     ASSERT_ALWAYS(0);
611 #endif  /* MPI_LIBRARY_MT_CAPABLE */
612 
613     pi_log_op(v->pi->m, "[%s:%d] trailer", __func__, __LINE__);
614 }
615 /* }}} */
616 
617 /* {{{ generic interfaces for load/save */
618 /* {{{ load */
mmt_vec_load(mmt_vec_ptr v,const char * filename_pattern,unsigned int itemsondisk,unsigned int block_position)619 int mmt_vec_load(mmt_vec_ptr v, const char * filename_pattern, unsigned int itemsondisk, unsigned int block_position)
620 {
621     ASSERT_ALWAYS(v != NULL);
622     serialize(v->pi->m);
623     int tcan_print = v->pi->m->trank == 0 && v->pi->m->jrank == 0;
624 
625     ASSERT_ALWAYS(strstr(filename_pattern, "%u-%u") != NULL);
626 
627     mpz_t p;
628     mpz_init(p);
629     v->abase->field_characteristic(v->abase, p);
630     int char2 = mpz_cmp_ui(p, 2) == 0;
631     mpz_clear(p);
632     int splitwidth = char2 ? 64 : 1;
633     unsigned int Adisk_width = splitwidth;
634     unsigned int Adisk_multiplex = v->abase->simd_groupsize(v->abase) / Adisk_width;
635 
636     size_t sizeondisk = v->abase->vec_elt_stride(v->abase, itemsondisk);
637     void * mychunk = mmt_my_own_subvec(v);
638     size_t mysize = mmt_my_own_size_in_bytes(v);
639     size_t bigstride = v->abase->vec_elt_stride(v->abase, 1);
640     size_t smallstride = bigstride / Adisk_multiplex;
641 
642     int global_ok = 1;
643 
644     for(unsigned int b = 0 ; global_ok && b < Adisk_multiplex ; b++) {
645         unsigned int b0 = block_position + b * Adisk_width;
646         char * filename;
647         int rc;
648         rc = asprintf(&filename, filename_pattern, b0, b0 + splitwidth);
649         ASSERT_ALWAYS(rc >= 0);
650         double tt = -wct_seconds();
651         if (tcan_print) {
652             printf("Loading %s ...", filename);
653             fflush(stdout);
654         }
655         pi_file_handle f;
656         int ok = pi_file_open(f, v->pi, v->d, filename, "rb");
657         /* "ok" is globally consistent after pi_file_open */
658         if (!ok) {
659             if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
660                 fprintf(stderr, "ERROR: failed to load %s: %s\n", filename, strerror(errno));
661             }
662         } else {
663             ASSERT_ALWAYS(v != NULL);
664             serialize(v->pi->m);
665             ssize_t s = pi_file_read_chunk(f, mychunk, mysize, sizeondisk,
666                     bigstride, b * smallstride, (b+1) * smallstride);
667             int ok = s >= 0 && (size_t) s == sizeondisk / Adisk_multiplex;
668             /* "ok" is globally consistent after pi_file_read_chunk */
669             if (!ok) {
670                 if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
671                     fprintf(stderr, "ERROR: failed to load %s: short read, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
672                 }
673             }
674             v->consistency = ok;
675             /* not clear it's useful, but well. */
676             if (ok) mmt_vec_broadcast(v);
677             serialize_threads(v->pi->m);
678             pi_file_close(f);
679         }
680         free(filename);
681         tt += wct_seconds();
682         if (ok && tcan_print) {
683             char buf[20], buf2[20];
684             printf(" done [%s in %.2fs, %s/s]\n",
685                     size_disp(sizeondisk / Adisk_multiplex, buf),
686                     tt,
687                     size_disp(sizeondisk / Adisk_multiplex / tt, buf2));
688         }
689         global_ok = global_ok && ok;
690     }
691 
692     serialize_threads(v->pi->m);
693     return global_ok;
694 }
695 /* }}} */
696 /* {{{ save */
mmt_vec_save(mmt_vec_ptr v,const char * filename_pattern,unsigned int itemsondisk,unsigned int block_position)697 int mmt_vec_save(mmt_vec_ptr v, const char * filename_pattern, unsigned int itemsondisk, unsigned int block_position)
698 {
699     serialize_threads(v->pi->m);
700     int tcan_print = v->pi->m->trank == 0 && v->pi->m->jrank == 0;
701 
702     ASSERT_ALWAYS(strstr(filename_pattern, "%u-%u") != NULL);
703 
704     mpz_t p;
705     mpz_init(p);
706     v->abase->field_characteristic(v->abase, p);
707     int char2 = mpz_cmp_ui(p, 2) == 0;
708     mpz_clear(p);
709     int splitwidth = char2 ? 64 : 1;
710     unsigned int Adisk_width = splitwidth;
711     unsigned int Adisk_multiplex = v->abase->simd_groupsize(v->abase) / Adisk_width;
712 
713     size_t sizeondisk = v->abase->vec_elt_stride(v->abase, itemsondisk);
714     void * mychunk = mmt_my_own_subvec(v);
715     size_t mysize = mmt_my_own_size_in_bytes(v);
716     size_t bigstride = v->abase->vec_elt_stride(v->abase, 1);
717     size_t smallstride = bigstride / Adisk_multiplex;
718 
719     int global_ok = 1;
720 
721     for(unsigned int b = 0 ; b < Adisk_multiplex ; b++) {
722         unsigned int b0 = block_position + b * Adisk_width;
723         char * filename;
724         int rc = asprintf(&filename, filename_pattern, b0, b0 + splitwidth);
725         ASSERT_ALWAYS(rc >= 0);
726         char * tmpfilename;
727         rc = asprintf(&tmpfilename, "%s.tmp", filename);
728         ASSERT_ALWAYS(rc >= 0);
729         double tt = -wct_seconds();
730         if (tcan_print) {
731             printf("Saving %s ...", filename);
732             fflush(stdout);
733         }
734         pi_file_handle f;
735         int ok = pi_file_open(f, v->pi, v->d, tmpfilename, "wb");
736         /* "ok" is globally consistent after pi_file_open */
737         if (!ok) {
738             if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
739                 fprintf(stderr, "WARNING: failed to save %s: %s\n", filename, strerror(errno));
740                 unlink(tmpfilename);    // just in case
741             }
742         } else {
743             ASSERT_ALWAYS(v != NULL);
744             ASSERT_ALWAYS(v->consistency == 2);
745             serialize_threads(v->pi->m);
746             ssize_t s = pi_file_write_chunk(f, mychunk, mysize, sizeondisk,
747                     bigstride, b * smallstride, (b+1) * smallstride);
748             serialize_threads(v->pi->m);
749             ok = s >= 0 && (size_t) s == sizeondisk / Adisk_multiplex;
750             /* "ok" is globally consistent after pi_file_write_chunk */
751             if (!ok && v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
752                 fprintf(stderr, "ERROR: failed to save %s: short write, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
753             }
754             ok = pi_file_close(f);
755             if (!ok && v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
756                 fprintf(stderr, "ERROR: failed to save %s: failed fclose, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
757             }
758             if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
759                 ok = rename(tmpfilename, filename) == 0;
760                 if (!ok) {
761                     fprintf(stderr, "ERROR: failed to save %s: failed rename, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
762                 }
763             }
764             pi_bcast(&ok, 1, BWC_PI_INT, 0, 0, v->pi->m);
765         }
766         free(filename);
767         free(tmpfilename);
768         tt += wct_seconds();
769         if (tcan_print) {
770             char buf[20], buf2[20];
771             printf(" done [%s in %.2fs, %s/s]\n",
772                     size_disp(sizeondisk / Adisk_multiplex, buf),
773                     tt,
774                     size_disp(sizeondisk / Adisk_multiplex / tt, buf2));
775         }
776         global_ok = global_ok && ok;
777     }
778 
779     serialize_threads(v->pi->m);
780     return global_ok;
781 }
782 /* }}} */
783 /* }}} */
784 
mmt_vec_reduce_mod_p(mmt_vec_ptr v)785 void mmt_vec_reduce_mod_p(mmt_vec_ptr v)
786 {
787     /* if it so happens that there's no reduce function defined, it may
788      * correspond to a case where we have nothing to do, like over GF(2)
789      * -- where unreduced elements are just the same, period */
790     if (!v->abase->reduce)
791         return;
792     void * ptr = mmt_my_own_subvec(v);
793     void * tmp;
794     v->abase->vec_ur_init(v->abase, &tmp, 1);
795     for(size_t i = 0 ; i < mmt_my_own_size_in_items(v) ; i++) {
796         v->abase->elt_ur_set_elt(v->abase,
797                 tmp,
798                 v->abase->vec_coeff_ptr_const(v->abase, ptr, i));
799         v->abase->reduce(v->abase,
800                 v->abase->vec_coeff_ptr(v->abase, ptr, i),
801                 tmp);
802     }
803     v->abase->vec_ur_clear(v->abase, &tmp, 1);
804 }
805 
806 
807 
808 //////////////////////////////////////////////////////////////////////////
809 
matmul_top_mul(matmul_top_data_ptr mmt,mmt_vec * v,struct timing_data * tt)810 void matmul_top_mul(matmul_top_data_ptr mmt, mmt_vec * v, struct timing_data * tt)/*{{{*/
811 {
812     /* Do all matrices in turn.
813      *
814      * We represent M as * M0*M1*..*M_{n-1}.
815      *
816      * The input vector is v[0], and the result is put in v[0] again.
817      *
818      * The direction in which we apply the product is given by v[0]->d.
819      * For v[0]->d == 0, we do v[0]*M. For v[0]->d==1, we do M*v[0]
820      *
821      * We use temporaries as follows.
822      * For v[0]->d == 0:
823      *  v[1] <- v[0] * M0 ; v[1]->d == 1
824      *  v[2] <- v[1] * M1 ; v[2]->d == 0
825      *  if n is odd:
826      *  v[n] <- v[n-1] * M_{n-1} ; v[n]->d == 1
827      *  if n is even:
828      *  v[0] <- v[n-1] * M_{n-1} ; v[0]->d == 0
829      *
830      * For v[0]->d == 1:
831      *  v[1] <- M0 * v[0] ; v[1]->d == 0
832      *  v[2] <- M1 * v[1] ; v[2]->d == 1
833      *  if n is odd:
834      *  v[n] <- M_{n-1} * v[n-1] ; v[n]->d == 0
835      *  if n is even:
836      *  v[0] <- M_{n-1} * v[n-1] ; v[0]->d == 1
837      *
838      * This has the consequence that v must hold exactly n+(n&1) vectors.
839      *
840      * Appropriate communication operations are run after each step.
841      *
842      * If tt is not NULL, it should be a timing_data structure holding
843      * exactly 4*n timers (or only 4, conceivably). Timers are switched
844      * exactly that many times.
845      *
846      * If the mmt->pi->interleaving setting is on, we interleave
847      * computations and communications. We do 2n flips. Communications
848      * are forbidden both before and after calls to this function (in the
849      * directly adjacent code fragments before the closest flip() call,
850      * that is).
851      */
852 
853     int d = v[0]->d;
854     int nmats_odd = mmt->nmatrices & 1;
855     int midx = (d ? (mmt->nmatrices - 1) : 0);
856     for(int l = 0 ; l < mmt->nmatrices ; l++) {
857         mmt_vec_ptr src = v[l];
858         int last = l == (mmt->nmatrices - 1);
859         int lnext = last && !nmats_odd ? 0 : (l+1);
860         mmt_vec_ptr dst = v[lnext];
861 
862         ASSERT_ALWAYS(src->consistency == 2);
863         matmul_top_mul_cpu(mmt, midx, d, dst, src);
864         ASSERT_ALWAYS(dst->consistency == 0);
865 
866         timing_next_timer(tt);
867         /* now measuring jitter */
868         pi_interleaving_flip(mmt->pi);
869         serialize(mmt->pi->m);
870         timing_next_timer(tt);
871 
872         /* Now we can resume MPI communications. */
873         if (last && nmats_odd) {
874             ASSERT_ALWAYS(lnext == mmt->nmatrices);
875             matmul_top_mul_comm(v[0], dst);
876         } else {
877             mmt_vec_allreduce(dst);
878         }
879         timing_next_timer(tt);
880         /* now measuring jitter */
881         pi_interleaving_flip(mmt->pi);
882         serialize(mmt->pi->m);
883 
884         timing_next_timer(tt);
885         midx += d ? -1 : 1;
886     }
887     ASSERT_ALWAYS(v[0]->consistency == 2);
888 }
889 /*}}}*/
890 
891 #if 0/*{{{*/
892 /* no longer used -- was only used by prep.
893  * It's not buggy, but making this work in a context where we have
894  * multiple threads is tricky.
895  */
896 void matmul_top_fill_random_source_generic(matmul_top_data_ptr mmt, size_t stride, mmt_vec_ptr v, int d)
897 {
898     if (v == NULL) v = mmt->wr[d]->v;
899 
900     // In conjugation mode, it is possible to fill exactly the data chunk
901     // that will eventually be relevant. However, it's easy enough to
902     // fill our output vector with garbage, and do mmt_vec_broadcast
903     // afterwards...
904     if ((v->flags & THREAD_SHARED_VECTOR) == 0 || mmt->pi->wr[d]->trank == 0)
905         mpfq_generic_random(stride, v->v, mmt->wr[d]->i1 - mmt->wr[d]->i0);
906 
907     // reconcile all cells which correspond to the same vertical block.
908     mmt_vec_broadcast(mmt, v, d);
909 }
910 #endif/*}}}*/
911 
912 /* {{{ mmt_vec_reduce */
913 /* {{{ various reduce_scatter implementations */
914 /* {{{ alternative_reduce_scatter */
915 #if 1 || RS_CHOICE == RS_CHOICE_MINE
alternative_reduce_scatter(mmt_vec_ptr v)916 void alternative_reduce_scatter(mmt_vec_ptr v)
917 {
918     pi_comm_ptr wr = v->pi->wr[v->d];
919     int njobs = wr->njobs;
920     int rank = wr->jrank;
921     MPI_Datatype t = v->pitype->datatype;
922 
923     size_t eitems = mmt_my_own_size_in_items(v) * wr->ncores;
924     size_t needed = v->abase->vec_elt_stride(v->abase, eitems);
925 
926     if (v->rsbuf_size < needed) {
927         ASSERT_ALWAYS(v->rsbuf_size == 0);
928         v->rsbuf[0] = realloc(v->rsbuf[0], needed);
929         v->rsbuf[1] = realloc(v->rsbuf[1], needed);
930         v->rsbuf_size = needed;
931     }
932 
933     void *b[2];
934     b[0] = v->rsbuf[0];
935     b[1] = v->rsbuf[1];
936     v->abase->vec_set_zero(v->abase, b[0], eitems);
937 
938     int l = (rank + 1) % njobs;
939     int srank = (rank + 1) % njobs;
940     int drank = (rank + njobs - 1) % njobs;
941 
942     for (int i = 0; i < njobs; i++) {
943         int j0, j1;
944         j0 = l * eitems;
945         j1 = j0 +  eitems;
946         v->abase->vec_add(v->abase, b[0], b[0],
947                 v->abase->vec_subvec(v->abase, mmt_vec_sibling(v, 0)->v, j0),
948                 j1-j0);
949         if (i == njobs - 1)
950             break;
951         MPI_Sendrecv(b[0], eitems, t, drank, (i<<16) + rank,
952                      b[1], eitems, t, srank, (i<<16) + srank,
953                      wr->pals, MPI_STATUS_IGNORE);
954         void * tb = b[0];   b[0] = b[1];  b[1] = tb;
955         l = (l + 1) % njobs;
956     }
957     /* This is going at the beginning of the memory area, which is weird.
958      * But it's the convention used by the MPI call...
959      */
960     v->abase->vec_set(v->abase, mmt_vec_sibling(v, 0)->v, b[0], eitems);
961 }
962 #endif  /* RS_CHOICE == RS_CHOICE_MINE */
963 /* }}} */
964 /* {{{ alternative_reduce_scatter_parallel */
965 #if 1 || RS_CHOICE == RS_CHOICE_MINE_PARALLEL
966 /* Example data for a factoring matrix (rsa100) of size 135820*135692,
967  * split over 2x3 mpi jobs, and 7x5 threads.
968  *
969  * 2 == mmt->pi->wr[1]->njobs (number of jobs encountered on a vertical axis).
970  * 7 == mmt->pi->wr[1]->ncores (number of jobs per core on a vertical axis).
971  * 3 == mmt->pi->wr[0]->njobs (number of jobs encountered on an horiz. axis).
972  * 5 == mmt->pi->wr[0]->ncores (number of jobs per core on an horiz. axis).
973  *
974  * matrix is padded to a multiple of 210 = 2*3*5*7, which is * N=135870=210*647
975  *
976  * we'll call 647 the "small chunk" size.
977  *
978  * for all jobs/threads, the following relations hold:
979  *      mmt->wr[0]->i1 - mmt->wr[0]->i0 == N/14 == 9705 == 15 * 647
980  *      mmt->wr[1]->i1 - mmt->wr[1]->i0 == N/15 == 9058 == 14 * 647
981  *
982  * a mmt_vec_reduce operation, in the context of factoring, is with d==1
983  * below. Hence, in fact, we're doing a reduction down a column.
984  *
985  * the value eitems fed to this function is mmt->pi->wr[d]->ncores (here,
986  * 7) times the small chunk size. Here 7*647 == 4529.
987  */
988 /* all threads in mmt->wr[!d], one after another a priori, are going to
989  * do alternative_reduce_scatter on their vector v[i]
990  */
alternative_reduce_scatter_parallel(pi_comm_ptr xr,mmt_vec_ptr * vs)991 void alternative_reduce_scatter_parallel(pi_comm_ptr xr, mmt_vec_ptr * vs)
992 {
993     /* we write all data counts below with comments indicating the typical
994      * size in our toy example above */
995 
996     /* he have xr->ncores vectors. The vs[] array accesses data from the
997      * other peers. Note that the pi structures belong separately to each
998      * peer, and so do the embedded communicators. Therefore the proper
999      * way to see "our" xr is none other that the given parameter. And for
1000      * "our" wr, then we'll have to look for our own vector within vs.
1001      */
1002     mmt_vec_ptr v = vs[xr->trank];
1003     mpfq_vbase_ptr ab = v->abase;
1004     pi_comm_ptr wr = v->pi->wr[v->d];  /* 2 jobs, 7 cores */
1005     /* what we're going to do will happen completely in parallel over
1006      * xr->njobs==3 communicators. We're simulating here the split into 5
1007      * communicators, even though those collide into a unique
1008      * communicator as far as MPI is concerned.
1009      */
1010     int njobs = wr->njobs;      /* 2 */
1011     /* note that we no longer care at this point about what happened at
1012      * the thread level in our dimension. This is already done, period.
1013      */
1014     int rank = wr->jrank;
1015     MPI_Datatype t = v->pitype->datatype;
1016 
1017     /* If the rsbuf[] buffers have not yet been allocated, it is time to
1018      * do so now. We also take the opportunity to possibly re-allocate
1019      * them if because of a larger abase, the corresponding storage has
1020      * to be expanded.
1021      */
1022     size_t eitems = mmt_my_own_size_in_items(v) * wr->ncores;
1023     size_t needed = v->abase->vec_elt_stride(v->abase, eitems);
1024 
1025     /* notice that we are allocating a temp buffer only for one vector.
1026      * Of course, since this is a multithreaded routine, each thread in
1027      * xr is doing so at the same time */
1028     if (v->rsbuf_size < needed) {
1029         v->rsbuf[0] = realloc(v->rsbuf[0], needed);
1030         v->rsbuf[1] = realloc(v->rsbuf[1], needed);
1031         v->rsbuf_size = needed;
1032     }
1033 
1034     ab->vec_set_zero(ab, v->rsbuf[0], eitems);
1035     serialize_threads(xr);
1036 
1037     int srank = (rank + 1) % njobs;
1038     int drank = (rank + njobs - 1) % njobs;
1039 
1040     /* We describe the algorithm for one of the xr->ncores==5 threads.
1041      * local vector areas [9058] split into [wr->njobs==2] sub-areas of
1042      * size [eitems (==4529)].
1043      *
1044      * on each job, each thread has 9058 == 2*4529 items
1045      *          more generally: njobs * eitems items.
1046      *
1047      * each has a place where it wants to receive data, let's call that
1048      * "his" zone, even though everyone has data at all places.
1049      *
1050      */
1051     /* scheme for 4 jobs: (input, output. All chunks have size eitmes. AA
1052      * is the sum a3+a2+a1+a0)
1053      *
1054      * j0: a0 b0 c0 d0      j0: AA .. .. ..
1055      * j1: a1 b1 c1 d1      j1: .. BB .. ..
1056      * j2: a2 b2 c2 d2      j2: .. .. CC ..
1057      * j3: a3 b3 c3 d3      j3: .. .. .. DD
1058      *
1059      * loop 0: j0: sends       b0 to j3, receives       c1 from j1
1060      * loop 1: j0: sends    c1+c0 to j3, receives    d2+d1 from j1
1061      * loop 2: j0: sends d2+d1+d0 to j3, receives a3+a2+a1 from j1
1062      * loop 3: we only have to adjust.
1063      */
1064 
1065     for (int i = 0, s = 0; i < njobs; i++, s=!s) {
1066         int l = (rank + 1 + i) % njobs;
1067         int j0 = l * eitems;
1068         int j1 = j0 +  eitems;
1069         ab->vec_add(ab, v->rsbuf[s], v->rsbuf[s],
1070                 ab->vec_subvec(ab, mmt_vec_sibling(v, 0)->v, j0),
1071                 j1-j0);
1072         serialize_threads(xr);
1073 
1074         if (i == njobs - 1)
1075             break;
1076 
1077         if (xr->trank == 0) {
1078             MPI_Request * r = NULL;
1079             r = malloc(2 * xr->ncores * sizeof(MPI_Request));
1080             for(unsigned int w = 0 ; w < xr->ncores ; w++) {
1081                 MPI_Request * rs = r + 2*w;
1082                 MPI_Request * rr = r + 2*w + 1;
1083                 MPI_Isend(vs[w]->rsbuf[s],  eitems, t, drank, 0xb00+w, wr->pals, rs);
1084                 MPI_Irecv(vs[w]->rsbuf[!s], eitems, t, srank, 0xb00+w, wr->pals, rr);
1085                 /*
1086                 MPI_Sendrecv(vs[w]->rsbuf[s], eitems, t, drank, 0xbeef,
1087                         vs[w]->rsbuf[!s], eitems, t, srank, 0xbeef,
1088                         wr->pals, MPI_STATUS_IGNORE);
1089                         */
1090                 // MPI_Waitall(2, r + 2*w, MPI_STATUSES_IGNORE);
1091             }
1092             MPI_Waitall(2 * xr->ncores, r, MPI_STATUSES_IGNORE);
1093             free(r);
1094         }
1095         serialize_threads(xr);
1096     }
1097 
1098     ab->vec_set(ab, mmt_vec_sibling(v, 0)->v, v->rsbuf[(njobs-1)&1], eitems);
1099 
1100     pi_log_op(wr, "[%s:%d] MPI_Reduce_scatter done", __func__, __LINE__);
1101 }
1102 #endif  /* RS_CHOICE == RS_CHOICE_MINE_PARALLEL */
1103 /* }}} */
1104 /* {{{ my_MPI_Reduce_scatter_block */
1105 #if 1 || RS_CHOICE == RS_CHOICE_MINE_DROP_IN
my_MPI_Reduce_scatter_block(void * sendbuf,void * recvbuf,int recvcount,MPI_Datatype datatype,MPI_Op op,MPI_Comm wr)1106 int my_MPI_Reduce_scatter_block(void *sendbuf, void *recvbuf, int recvcount,
1107                 MPI_Datatype datatype, MPI_Op op, MPI_Comm wr)
1108 {
1109     ASSERT_ALWAYS(sendbuf == MPI_IN_PLACE);
1110     int njobs;
1111     int rank;
1112     MPI_Comm_size(wr, &njobs);
1113     MPI_Comm_rank(wr, &rank);
1114 
1115     int tsize;
1116     MPI_Type_size(datatype, &tsize);
1117 
1118     void * v = recvbuf;
1119 
1120     /* This is a deliberate leak. Note that we expect to be serailized
1121      * here, so there is no concurrency issue with the static data. */
1122     static size_t rsbuf_size = 0;
1123     static unsigned long * rsbuf[2];
1124 
1125     size_t needed = recvcount * tsize;
1126 
1127     if (rsbuf_size < needed) {
1128         rsbuf[0] = realloc(rsbuf[0], needed);
1129         rsbuf[1] = realloc(rsbuf[1], needed);
1130         rsbuf_size = needed;
1131     }
1132 
1133     memset(rsbuf[0], 0, recvcount * tsize);
1134 
1135     int srank = (rank + 1) % njobs;
1136     int drank = (rank + njobs - 1) % njobs;
1137 
1138     for (int i = 0, w = 0; i < njobs; i++, w^=1) {
1139         int j0 = ((rank + i + 1) % njobs) * recvcount;
1140         const void * share = pointer_arith_const(v, j0 * tsize);
1141 #if MPI_VERSION_ATLEAST(2,2)
1142         MPI_Reduce_local(share, rsbuf[w], recvcount, datatype, op);
1143 #else
1144         {
1145             ASSERT_ALWAYS(datatype == MPI_UNSIGNED_LONG);
1146             ASSERT_ALWAYS(op == MPI_BXOR);
1147             const unsigned long * a = share;
1148             unsigned long * b = rsbuf[w];
1149             for(int k = 0 ; k < recvcount ; k++) {
1150                 b[k] ^= a[k];
1151             }
1152         }
1153 #endif
1154         if (i == njobs - 1) {
1155             memcpy(v, rsbuf[w], recvcount * tsize);
1156             break;
1157         }
1158         MPI_Sendrecv(rsbuf[w],  recvcount, datatype, drank, (i<<16) + rank,
1159                      rsbuf[!w], recvcount, datatype, srank, (i<<16) + srank,
1160                      wr, MPI_STATUS_IGNORE);
1161     }
1162     return 0;
1163 }
1164 #endif
1165 /* }}} */
1166 /* }}} */
1167 
1168 /* mmt_vec_reduce_inner reads data in v (vector for side d), sums it up
1169  * across the communicator mmt->pi->wr[d], and collects the results in
1170  * vector v again, except that it's put in thread0's buffer (counting
1171  * threads in direction d, of course), *AT THE BEGINNING* of the data
1172  * area (which is surprising).
1173  *
1174  * mmt_vec_reduce completes the work by saving the resulting data in vector w
1175  * (vector in dimension !d).
1176  *
1177  * Note that the combination of mmt_vec_reduce + mmt_vec_broadcast is not the
1178  * identity (because of the shuffled_product).
1179  */
1180 void
mmt_vec_reduce_inner(mmt_vec_ptr v)1181 mmt_vec_reduce_inner(mmt_vec_ptr v)
1182 {
1183     ASSERT_ALWAYS(v != NULL);
1184     ASSERT_ALWAYS(v->consistency != 2);
1185 
1186     /* reducing across a row is when d == 0 */
1187     pi_comm_ptr wr = v->pi->wr[v->d];
1188     pi_comm_ptr xr = v->pi->wr[!v->d];
1189 
1190     pi_log_op(v->pi->m, "[%s:%d] enter first loop", __func__, __LINE__);
1191 
1192     // I don't think that the case of shared vectors has been tested
1193     // correctly for reduction. Well, to start with, I doubt it really
1194     // makes a lot of sense anyhow.
1195     // ASSERT_ALWAYS((v->flags & THREAD_SHARED_VECTOR) == 0);
1196 
1197     if (wr->ncores > 1 && v->siblings) {
1198         /* row threads have to sum up their data. Of course it's
1199          * irrelevant when there is only one such thread...
1200          *
1201          * Concerning locking, we have to make sure that everybody on the
1202          * row has finished its computation task, but besides that,
1203          * there's no locking until we start mpi stuff.
1204          */
1205         pi_log_op(wr, "[%s:%d] serialize_threads", __func__, __LINE__);
1206         serialize_threads(wr);
1207         pi_log_op(wr, "[%s:%d] serialize_threads done", __func__, __LINE__);
1208 
1209         /* Our [i0,i1[ range is split into wr->ncores parts. This range
1210          * represent coordinates which are common to all threads on
1211          * wr. Corresponding data has to be summed. Amongst the
1212          * wr->ncores threads, thread k takes the responsibility of
1213          * summing data in the k-th block (that is, indices
1214          * i0+k*(i1-i0)/wr->ncores and further). As a convention,
1215          * the thread which eventually owns the final data is thread 0.
1216          *
1217          * Note that one should consider that data in threads other than
1218          * the destination thread may be clobbered by the operation
1219          * (although in the present implementation it is not).
1220          */
1221         size_t thread_chunk = wr->njobs * mmt_my_own_size_in_items(v);
1222         void * dptr = v->abase->vec_subvec(v->abase,
1223                 mmt_vec_sibling(v, 0)->v,
1224                 wr->trank * thread_chunk);
1225         for(unsigned int w = 1 ; w < wr->ncores ; w++) {
1226             const void * sptr = v->abase->vec_subvec(v->abase,
1227                     mmt_vec_sibling(v, w)->v,
1228                     wr->trank * thread_chunk);
1229             v->abase->vec_add(v->abase, dptr, dptr, sptr, thread_chunk);
1230         }
1231         pi_log_op(wr, "[%s:%d] thread reduction done", __func__, __LINE__);
1232     }
1233 
1234     /* Good. Now on each node, thread 0 has the reduced data for the
1235      * range [i0..i1[ (well, actually, this corresponds to indices
1236      * distributed in a funny manner in the original matrix, but as far
1237      * as we care here, it's really the range v->i0..v->i1
1238      *
1239      * These data areas must now be reduced or reduce-scattered across
1240      * nodes. Note that in the case where the MPI library does not
1241      * support MT operation, we are performing the reduction in place,
1242      * and copy to the buffer in the other direction is done in a
1243      * secondary step -- which will be among the reasons which imply a
1244      * thread serialization before anything interesting can be done after
1245      * this function.
1246      */
1247 
1248     /* XXX We have to serialize threads here. At least row-wise, so that
1249      * the computations above are finished. Easily seen, that's just a
1250      * couple of lines above.
1251      *
1252      * Unfortunately, that's not the whole story. We must also serialize
1253      * column-wise. Because we're writing on the right vector buffers,
1254      * which are used as input for the matrix multiplication code --
1255      * which may be still running at this point because there has been no
1256      * column serialization in the current procedure yet.
1257      */
1258 
1259     pi_log_op(v->pi->m, "[%s:%d] secondary loop", __func__, __LINE__);
1260 
1261     pi_log_op(v->pi->m, "[%s:%d] serialize_threads", __func__, __LINE__);
1262     serialize_threads(wr);
1263     pi_log_op(v->pi->m, "[%s:%d] serialize_threads done", __func__, __LINE__);
1264 
1265 #ifndef MPI_LIBRARY_MT_CAPABLE
1266     if (wr->trank == 0) {
1267         /* openmpi-1.8.2 does not seem to have a working non-blocking
1268          * reduce_scatter, at least not a very efficient one. All
1269          * I've been able to do is to run MPI_Ireduce_scatter with
1270          * MPI_UNSIGNED_LONG and MPI_BXOR. With custom types it seems
1271          * to crash. And anyway, it's very inefficient.
1272          */
1273 
1274         ASSERT((v->i1 - v->i0) % wr->totalsize == 0);
1275 #if RS_CHOICE == RS_CHOICE_STOCK_RS
1276         void * dptr = mmt_vec_sibling(v, 0)->v;
1277         SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1278             // all recvcounts are equal
1279             int * rc = malloc(wr->njobs * sizeof(int));
1280             for(unsigned int k = 0 ; k < wr->njobs ; k++)
1281                 rc[k] = (v->i1 - v->i0) / wr->njobs;
1282             int err = MPI_Reduce_scatter(dptr, dptr, rc,
1283                     v->pitype->datatype,
1284                     BWC_PI_SUM->custom,
1285                     wr->pals);
1286             free(rc);
1287             ASSERT_ALWAYS(!err);
1288         }
1289         SEVERAL_THREADS_PLAY_MPI_END();
1290 #elif RS_CHOICE == RS_CHOICE_STOCK_RSBLOCK
1291         void * dptr = mmt_vec_sibling(v, 0)->v;
1292         SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1293             int err = MPI_Reduce_scatter_block(dptr, dptr,
1294                     (v->i1 - v->i0) / wr->njobs,
1295                     v->pitype->datatype,
1296                     BWC_PI_SUM->custom,
1297                     wr->pals);
1298             ASSERT_ALWAYS(!err);
1299         }
1300         SEVERAL_THREADS_PLAY_MPI_END();
1301 #elif RS_CHOICE == RS_CHOICE_MINE_DROP_IN
1302         void * dptr = mmt_vec_sibling(v, 0)->v;
1303         SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1304             int err = my_MPI_Reduce_scatter_block(MPI_IN_PLACE, dptr,
1305                     (v->i1 - v->i0) / wr->njobs,
1306                     v->pitype->datatype,
1307                     BWC_PI_SUM->custom,
1308                     wr->pals);
1309             ASSERT_ALWAYS(!err);
1310         }
1311         SEVERAL_THREADS_PLAY_MPI_END();
1312 #elif RS_CHOICE == RS_CHOICE_STOCK_IRSBLOCK
1313         void * dptr = mmt_vec_sibling(v, 0)->v;
1314         MPI_Request * req = shared_malloc(xr, xr->ncores * sizeof(MPI_Request));
1315             SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1316                 int err = MPI_Ireduce_scatter_block(dptr, dptr,
1317                         (v->i1 - v->i0) / wr->njobs,
1318                         v->pitype->datatype,
1319                         BWC_PI_SUM->custom,
1320                         wr->pals, &req[t__]);
1321             ASSERT_ALWAYS(!err);
1322             pi_log_op(wr, "[%s:%d] MPI_Reduce_scatter done", __func__, __LINE__);
1323         }
1324         SEVERAL_THREADS_PLAY_MPI_END();
1325         serialize_threads(xr);
1326         if (xr->trank == 0) {
1327             for(unsigned int t = 0 ; t < xr->ncores ; t++) {
1328                 MPI_Wait(&req[t], MPI_STATUS_IGNORE);
1329             }
1330         }
1331         shared_free(xr, req);
1332 #elif RS_CHOICE == RS_CHOICE_STOCK_IRS
1333         void * dptr = mmt_vec_sibling(v, 0)->v;
1334         MPI_Request * req = shared_malloc(xr, xr->ncores * sizeof(MPI_Request));
1335             int * rc = malloc(wr->njobs * sizeof(int));
1336             for(unsigned int k = 0 ; k < wr->njobs ; k++)
1337                 rc[k] = (v->i1 - v->i0) / wr->njobs;
1338             SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1339                 int err = MPI_Ireduce_scatter(dptr, dptr,
1340                         rc,
1341                         v->pitype->datatype,
1342                         BWC_PI_SUM->custom,
1343                         wr->pals, &req[t__]);
1344                 free(rc);
1345             ASSERT_ALWAYS(!err);
1346             pi_log_op(wr, "[%s:%d] MPI_Reduce_scatter done", __func__, __LINE__);
1347         }
1348         SEVERAL_THREADS_PLAY_MPI_END();
1349         serialize_threads(xr);
1350         if (xr->trank == 0) {
1351             for(unsigned int t = 0 ; t < xr->ncores ; t++) {
1352                 MPI_Wait(&req[t], MPI_STATUS_IGNORE);
1353             }
1354         }
1355         shared_free(xr, req);
1356 #elif RS_CHOICE == RS_CHOICE_MINE
1357         /* This strategy exposes code which is really similar to
1358          * RS_CHOICE_MINE_DROP_IN, with the only exception that we
1359          * have a slightly different interface. There's no reason for
1360          * both to stay in the long run.
1361          */
1362         SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1363             alternative_reduce_scatter(v);
1364         }
1365         SEVERAL_THREADS_PLAY_MPI_END();
1366 #elif RS_CHOICE == RS_CHOICE_MINE_PARALLEL
1367         mmt_vec_ptr * vs = shared_malloc(xr, xr->ncores * sizeof(mmt_vec_ptr));
1368         vs[xr->trank] = v;
1369         serialize_threads(xr);
1370         alternative_reduce_scatter_parallel(xr, vs);
1371         shared_free(xr, vs);
1372 #elif RS_CHOICE == RS_CHOICE_MINE_OVERLAPPING
1373 #error "not implemented, but planned"
1374 #endif
1375     }
1376     serialize_threads(wr);
1377 }
1378 void
mmt_vec_reduce(mmt_vec_ptr w,mmt_vec_ptr v)1379 mmt_vec_reduce(mmt_vec_ptr w, mmt_vec_ptr v)
1380 {
1381     ASSERT_ALWAYS(v != NULL);
1382     ASSERT_ALWAYS(w != NULL);
1383     ASSERT_ALWAYS(v->abase == w->abase);
1384     ASSERT_ALWAYS(v->d != w->d);
1385     mmt_vec_reduce_inner(v);
1386     pi_comm_ptr wr = v->pi->wr[v->d];
1387     // row threads pick what they're interested in in thread0's reduced
1388     // buffer. Writes are non-overlapping in the mcol buffer here.
1389     // Different row threads always have different mcol buffers, and
1390     // sibling col threads write to different locations in their
1391     // (generally shared) mcol buffer, depending on which row they
1392     // intersect.
1393 
1394     // Notice that results are being written inside v->all_v[0],
1395     // just packed at the beginning.
1396 
1397     // row job rj, row thread rt has a col buffer containing
1398     // picol->totalsize blocks of size eblock.  col job cj, col
1399     // thread ct has in its row buffer pirow->totalsize blocks, but
1400     // only virtually. Because of the reduce_scatter operation, only
1401     // pirow->ncores blocks are here.
1402     //
1403     // Thus the k-th block in the row buffer is rather understood as
1404     // the one of indek rj * pirow->ncores + k in the data the row
1405     // threads have collectively computed.
1406     //
1407     // This block is of interest to the row thread of index k of
1408     // course, thus we restrict to rt==k
1409     //
1410     // Now among the picol->totalsize blocks of the col buffer, this
1411     // will go to position cj * picol->ncores + ct
1412 
1413     size_t eblock = mmt_my_own_size_in_items(v);
1414     ASSERT_ALWAYS(mmt_my_own_size_in_items(w) == eblock);
1415 
1416     v->abase->vec_set(v->abase,
1417             mmt_my_own_subvec(w),
1418             /* Note: reduce-scatter packs everything at the beginning in
1419              * the leader block, which is why we don't have our usual offset
1420              * here. */
1421             v->abase->vec_subvec(v->abase,
1422                 mmt_vec_sibling(v, 0)->v, wr->trank * eblock),
1423             eblock);
1424 #else   /* MPI_LIBRARY_MT_CAPABLE */
1425     /* Code deleted 20110119, as I've never been able to have enough
1426      * trust in an MPI implementation to check this */
1427     ASSERT_ALWAYS(0);
1428 #endif
1429 
1430     // as usual, we do not serialize on exit. Up to the next routine to
1431     // do so if needed.
1432     //
1433     // what _is_ guaranteed is that in each column, the _leader_ thread
1434     // has all the necessary data to begin the column broadcast.
1435     //
1436     // In most cases, threads must be prevented from starting computation
1437     // before the leader thread has finished importing data. This means
1438     // that a column thread serialization is probably needed in most
1439     // circumstances after this step.
1440     w->consistency = 1;
1441 }
1442 
1443 /* This small variant is used only for the transposition routines. We do
1444  * not want, in that case, to rely on the data moving back and forth
1445  * between the left and right vectors, because in full generality, we
1446  * have no guarantee that they are the same size.
1447  *
1448  * Therefore we're merely building upon mmt_vec_reduce_inner, only with
1449  * the weirdo "pack-at-the-beginning" behaviour removed.
1450  */
1451 void
mmt_vec_reduce_sameside(mmt_vec_ptr v)1452 mmt_vec_reduce_sameside(mmt_vec_ptr v)
1453 {
1454     pi_comm_ptr wr = v->pi->wr[v->d];
1455     mmt_vec_reduce_inner(v);
1456     size_t eblock = mmt_my_own_size_in_items(v);
1457     v->abase->vec_set(v->abase,
1458             mmt_my_own_subvec(v),
1459             v->abase->vec_subvec(v->abase,
1460                                 mmt_vec_sibling(v, 0)->v, wr->trank * eblock),
1461             eblock);
1462     /* This ensures that we've effectively *moved* the data, not copied
1463      * it */
1464     if (wr->trank) {
1465         v->abase->vec_set_zero(v->abase,
1466                 v->abase->vec_subvec(v->abase,
1467                     mmt_vec_sibling(v, 0)->v, wr->trank * eblock),
1468                 eblock);
1469     }
1470     serialize_threads(wr);
1471     v->consistency = 1;
1472 }
1473 /* }}} */
1474 
1475 /* {{{ mmt_vec_allreduce */
1476 /* This is only for convenience now. Eventually this will be relevant for
1477  * block Lanczos.  Note that allreduce is conceptually much simpler.
1478  * There is no funny permutation to be considered.
1479  */
1480 void
mmt_vec_allreduce(mmt_vec_ptr v)1481 mmt_vec_allreduce(mmt_vec_ptr v)
1482 {
1483     ASSERT_ALWAYS(v != NULL);
1484     ASSERT_ALWAYS(v->consistency != 2);
1485     /* reducing across a row is when d == 0 */
1486     pi_comm_ptr wr = v->pi->wr[v->d];
1487 
1488     pi_log_op(v->pi->m, "[%s:%d] enter first loop", __func__, __LINE__);
1489 
1490     serialize_threads(v->pi->m);
1491     /* sum up row threads, so that only one thread on each row is used
1492      * for communication */
1493     size_t thread_chunk = wr->njobs * mmt_my_own_size_in_items(v);
1494     if (v->siblings) {
1495         void * dv = v->abase->vec_subvec(v->abase, v->v,
1496                 wr->trank * thread_chunk);
1497         for(unsigned int k = 1 ; k < wr->ncores ; k++) {
1498             void * sv = v->abase->vec_subvec(v->abase,
1499                     mmt_vec_sibling(v, (wr->trank+k) % wr->ncores)->v,
1500                     wr->trank * thread_chunk);
1501             v->abase->vec_add(v->abase, dv, dv, sv, thread_chunk);
1502         }
1503     }
1504     /* Compared to the SEVERAL_THREADS_PLAY_MPI_BEGIN() approach, this
1505      * one has thread 0 do the work for all other threads, while other
1506      * threads are waiting.
1507      */
1508     SEVERAL_THREADS_PLAY_MPI_BEGIN2(v->pi->m, peer) {
1509         void * dv = v->abase->vec_subvec(v->abase, v->mpals[peer]->v,
1510                 v->mpals[peer]->pi->wr[v->d]->trank * thread_chunk);
1511         MPI_Allreduce(MPI_IN_PLACE,
1512                 dv,
1513                 thread_chunk,
1514                 v->pitype->datatype,
1515                 BWC_PI_SUM->custom,
1516                 wr->pals);
1517     }
1518     SEVERAL_THREADS_PLAY_MPI_END2(v->pi->m);
1519     if (v->siblings) {
1520         void * sv = v->abase->vec_subvec(v->abase, v->v,
1521                 wr->trank * thread_chunk);
1522         for(unsigned int k = 1 ; k < wr->ncores ; k++) {
1523             void * dv = v->abase->vec_subvec(v->abase,
1524                     mmt_vec_sibling(v, (wr->trank+k) % wr->ncores)->v,
1525                     wr->trank * thread_chunk);
1526             v->abase->vec_set(v->abase, dv, sv, thread_chunk);
1527         }
1528     }
1529     v->consistency = 2;
1530 }
1531 /* }}} */
1532 
1533 /**********************************************************************/
1534 /* bench code */
1535 
matmul_top_comm_bench_helper(int * pk,double * pt,void (* f)(mmt_vec_ptr),mmt_vec_ptr v)1536 void matmul_top_comm_bench_helper(int * pk, double * pt,
1537                                   void (*f) (mmt_vec_ptr),
1538 				  mmt_vec_ptr v)
1539 {
1540     int k;
1541     double t0, t1;
1542     int cont;
1543     t0 = wct_seconds();
1544     for (k = 0;; k++) {
1545 	t1 = wct_seconds();
1546 	cont = t1 < t0 + 0.25;
1547 	cont = cont && (t1 < t0 + 1 || k < 100);
1548         pi_allreduce(NULL, &cont, 1, BWC_PI_INT, BWC_PI_MIN, v->pi->m);
1549 	if (!cont)
1550 	    break;
1551         /* It's difficult to be faithful to the requirements on
1552          * consistency here. But apparently 1 pleases both operations
1553          * tested. */
1554         v->consistency = 1;
1555 	(*f) (v);
1556     }
1557     int target = 10 * k / (t1 - t0);
1558     ASSERT_ALWAYS(target >= 0);
1559     if (target > 100)
1560 	target = 100;
1561     if (target == 0)
1562         target = 1;
1563     pi_bcast(&target, 1, BWC_PI_INT, 0, 0, v->pi->m);
1564     t0 = wct_seconds();
1565     for (k = 0; k < target; k++) {
1566         pi_log_op(v->pi->m, "[%s] iter%d/%d", __func__, k, target);
1567         v->consistency = 1;     /* see above */
1568         (*f) (v);
1569     }
1570     serialize(v->pi->m);
1571     t1 = wct_seconds();
1572     *pk = k;
1573     *pt = t1 - t0;
1574 }
1575 
1576 
matmul_top_comm_bench(matmul_top_data_ptr mmt,int d)1577 void matmul_top_comm_bench(matmul_top_data_ptr mmt, int d)
1578 {
1579     /* like matmul_top_mul_comm, we'll call mmt_vec_reduce with !d, and
1580      * mmt_vec_broadcast with d */
1581     int k;
1582     double dt;
1583 
1584     void (*funcs[2])(mmt_vec_ptr) = {
1585         mmt_vec_broadcast,
1586         mmt_vec_reduce_sameside,
1587     };
1588     const char * text[2] = { "bd", "ra" };
1589 
1590     mpfq_vbase_ptr abase = mmt->abase;
1591 
1592     mmt_vec test_vectors[2];
1593     int is_shared[2] = {0,0};
1594     mmt_vec_init(mmt, NULL, NULL, test_vectors[0], 0, is_shared[0], mmt->n[0]);
1595     mmt_vec_init(mmt, NULL, NULL, test_vectors[1], 1, is_shared[1], mmt->n[1]);
1596 
1597     size_t datasize[2];
1598 
1599     {
1600         pi_comm_ptr pirow = mmt->pi->wr[!d];
1601         pi_comm_ptr picol = mmt->pi->wr[d];
1602         /* within each row, all jobs are concerned with the same range
1603          * vrow->i0 to vrow->i1. This is split into m=pirow->njobs
1604          * chunks, and reduce_scatter has m-1 communication rounds where
1605          * all of the m nodes output and receive one such chunk. All this
1606          * happens for all column threads, so we multiply by
1607          * picol->ncores, too.
1608          *
1609          * Note that vrow->i1 - vrow->i0 is #rows / picol->totalsize
1610          *
1611          * Note also that picol->ncores * #rows / picol->totalsize =
1612          * #rows / picol->njobs, so that the final thing we compute is
1613          * really:
1614          *      #rows / mmt->pi->m->njobs * (pirow->njobs - 1)
1615          */
1616         size_t data_out_ra = abase->vec_elt_stride(abase,
1617                 picol->ncores * (mmt->n[!d] / picol->totalsize) /
1618                 pirow->njobs * (pirow->njobs - 1));
1619 
1620         /* one way to do all-gather is to mimick this, except that each
1621          * node will output the same chunk at each round. Beyond that,
1622          * the calculation is similar, and we'll use it as a guide. Note
1623          * of course that if hardware-level multicast is used, our
1624          * throughput estimation is way off.
1625          *
1626          * as above, this is really:
1627          *      #cols / mmt->pi->m->njobs * (picol->njobs - 1)
1628          *
1629          */
1630         size_t data_out_ag = abase->vec_elt_stride(abase,
1631                 pirow->ncores * (mmt->n[d] / pirow->totalsize) /
1632                 picol->njobs * (picol->njobs - 1));
1633 
1634         datasize[0] = data_out_ag;
1635         datasize[1] = data_out_ra;
1636     }
1637 
1638     for(int s = 0 ; s < 2 ; s++) {
1639         /* we have our axis, and the other axis */
1640         pi_comm_ptr wr = mmt->pi->wr[d ^ s];          /* our axis */
1641         pi_comm_ptr xr = mmt->pi->wr[d ^ s ^ 1];      /* other axis */
1642         /* our operation has operated on the axis wr ; hence, we must
1643          * display data relative to the different indices within the
1644          * communicator xr.
1645          */
1646         matmul_top_comm_bench_helper(&k, &dt, funcs[s], test_vectors[d^s]);
1647         if (verbose_enabled(CADO_VERBOSE_PRINT_BWC_TIMING_GRIDS)) {
1648             for(unsigned int z = 0 ; z < xr->njobs ; z++) {
1649                 if (xr->jrank == z && wr->jrank == 0) {
1650                     for(unsigned int w = 0 ; w < xr->ncores ; w++) {
1651                         if (xr->trank == w && wr->trank == 0) {
1652                             char buf[16];
1653                             printf("%s %2d/%d, %s: %2d in %.1fs ; one: %.2fs (xput: %s/s)\n",
1654                                     wr->th->desc,
1655                                     xr->jrank * xr->ncores + xr->trank,
1656                                     xr->totalsize,
1657                                     text[s],
1658                                     k, dt, dt/k,
1659                                     size_disp(datasize[d^s] * k/dt, buf));
1660                         }
1661                     }
1662                 }
1663                 serialize(mmt->pi->m);
1664             }
1665         }
1666         serialize(mmt->pi->m);
1667     }
1668     mmt_vec_clear(mmt, test_vectors[0]);
1669     mmt_vec_clear(mmt, test_vectors[1]);
1670 }
1671 
1672 
1673 /**********************************************************************/
1674 /* Utility stuff for applying standard permutations to vectors. */
1675 
1676 /* We have four permutations defined in the context of bwc.
1677  *
1678  * Sr -- row permutation
1679  * Sc -- column permutation
1680  * P  -- shuffled product permutation
1681  * T  -- decorrelating permutation
1682  *
1683  * As a convention, in the code as well as in the documentation below, we
1684  * freely handle the following equivalent representations of a
1685  * permutation:
1686  *  - a function f,
1687  *  - a list of images [f(i), * 0<=i<n],
1688  *  - a list of pairs [<i,f(i)>, * 0<=i<n],
1689  *  - a matrix F of size n*n, with only entries at row i and column f(i) being 1
1690  *
1691  * We let Mpad be the "padded" matrix, where #rows and #columns are both
1692  * multiples of nh*nv
1693  *
1694  * T is a constant permutation on the columns of the matrix M. It is
1695  * mostly working as a preconditioner, meant to eliminate some nasty
1696  * correlation effects between row and column weights. In every respect,
1697  * the matrix we work with consistently is the matrix Mpad*T. T is computed
1698  * by balancing_pre_shuffle and balancing_pre_unshuffle.
1699  *
1700  * row i of T has its non-zero coefficient in column
1701  * balancing_pre_shuffle(i)
1702  *
1703  * Sr and Sc are defined in the balancing file (as list of images). They
1704  * are such that the matrix Mtwisted = Sr*Mpad*T*Sc^-1 is well balanced
1705  * across the different jobs and threads. For square matrices, depending
1706  * on how the balancing was computed, one may be implicitly defined from
1707  * the other (but not equal, for the reason below).
1708  *
1709  * P is defined only for square matrices. It reflects the fact that
1710  * although the matrix is "geographically" stored as Mtwisted =
1711  * Sr*Mpad*T*Sc^-1 in the jobs and threads, the matmul_top_mul code
1712  * multiplies by a matrix which is:
1713  *      - for matrix times vector: Sc*Mpad*T*Sc^-1
1714  *              (that is, v<-v*Transpose(Sc*Mpad*T*Sc^-1) )
1715  *      - for vector times matrix: P*Sc*Mpad*T*Sc^-1*P^-1
1716  *
1717  * Implicit definition of Sr for square matrices
1718  * =============================================
1719  *
1720  * Sc, read as the colperm[] array in the balancing file, is such that column
1721  * i of the twisted matrix is in fact column Sc(i) in the original matrix,
1722  * which means that we build Mtwisted as [...]*M*Sc^-1
1723  *
1724  * for square matrices, with FLAG_REPLICATE, the Sr permutation is chosen
1725  * implicitly.
1726  *
1727  * here's how we compute the (forward) row permutation in the code (here, xr
1728  * is the colperm array).
1729  *              ix = (i * nv + j) * elem;
1730  *              iy = (j * nh + i) * elem;
1731  *              for(unsigned int k = 0 ; k < elem ; k++) {
1732  *                  m->fw_rowperm[xr[iy+k]] = ix+k;
1733  *
1734  * We denote by Sr the permutation, implicitly computed here, such that row i
1735  * of the twisted matrix is row Sr(i) in the original matrix -- so that
1736  * Mtwisted is in fact Sr*M*Sc^-1. Here, fw_rowperm is the inverse: row i in
1737  * the original matrix goes to row fw_rowperm[i] in the twisted matrix. So
1738  * that fw_rowperm == Sr^-1.
1739  *
1740  * Our code does fw_rowperm(Sc(P(x)))=x, for P the permutation which sends
1741  * sub-block nv*i+j to sub-block nh*j+i. Writing as operations on row vectors
1742  * (as magma does), this gives:
1743  *      P * Sc * (Sr^-1) = id
1744  *      Sr = P * Sc
1745  *
1746  * So in this case the twisted matrix is in fact Sr*Mpad*Sc^-1, and Sr = P*Sc;
1747  *
1748  * Implicit definition of Sc for square matrices
1749  * =============================================
1750  *
1751  * There is no code doing this at the moment, but one could imagine
1752  * defining this as well. In such a situation, we write down what Sc
1753  * would be.
1754  *
1755  * Matrix is geographically stored as Sr*Mpad*T*Sc^-1 in the jobs and
1756  * threads, and the matmul_top_mul code multiplies by a matrix which is:
1757  *      - for matrix times vector: P^-1*Sr*Mpad*T*Sc^-1
1758  *      - for vector times matrix: Sr*Mpad*T*Sc^-1*P^-1
1759  * Therefore we want Sc^-1*P^-1 == Sr^-1, whence Sc == P^-1 * Sr
1760  *
1761  * Action of P with matmul_top_mul_comm
1762  * ====================================
1763  *
1764  * After v*Mtwisted, reduce_sameside of the result (in direction 1) followed
1765  * by broadcast (in direction 0) produces a twisted vector. Since v*Mtwisted
1766  * is in direction 1, then blocks of the vector are to be read in column-major
1767  * order. If we reduce then broadcast, then sub-block nh*j+i will go to
1768  * sub-block nv*i+j. This means that v*Mtwisted will be transformed into
1769  * v*Mtwisted*P^-1
1770  *
1771  * After v*Transpose(Mtwisted), reduce_sameside on the result which is in
1772  * direction 0 transforms it to v*transpose(Mtwisted)*P (here P is
1773  * transpose of P^-1: we transpose indices blocks in the other
1774  * direction), which is thus v*transpose(P^-1*Mtwisted).
1775  *
1776  * Conclusion: with Mtwisted = Sr*Mpad*Sc^-1, and Sr = P*Sc, when we do
1777  * matmul_top_mul_cpu followed by matmul_top_mul_comm, we multiply by:
1778  *      - for matrix times vector: Sc*Mpad*Sc^-1
1779  *              (that is, v<-v*Transpose(Sc*Mpad*T*Sc^-1) )
1780  *      - for vector times matrix: P*Sc*Mpad*Sc^-1*P^-1
1781  *
1782  * Applying P to vectors
1783  * =====================
1784  *
1785  * We can emulate the multiplications by P and P^-1 with appropriate
1786  * combinations of apply_identity and matmul_top_mul_comm. So we give a few
1787  * details.
1788  *
1789  * Let v be a distributed vector. With mmt_apply_identity, the output is
1790  * distributed in the other direction, but not consistent. If we do
1791  * mmt_vec_reduce_sameside (or mmt_vec_allreduce, which does more), the
1792  * resulting vector (looking at locally-owned pieces only) is the same as
1793  * v, in the other direction.
1794  *
1795  * And then, matmul_top_mul_comm on this resulting vector does the P
1796  * action as above. Therefore:
1797  *
1798  * For v->d == 0, applying in sequence the functions mmt_apply_identity
1799  * matmul_top_mul_comm, then we get v*P^-1
1800  *
1801  * For v->d == 1, the same sequence produces v*Transpose(P^-1)==v*P
1802  *
1803  * Doing the converse is feasible. For v->d==0, if we do instead
1804  * matmul_top_mul_comm then mmt_apply_identity, we get v*P
1805  *
1806  * For v->d==1, if we do matmul_top_mul_comm then mmt_apply_identity, we
1807  * get v*P^-1
1808  *
1809  */
1810 
mmt_apply_identity(mmt_vec_ptr w,mmt_vec_ptr v)1811 void mmt_apply_identity(mmt_vec_ptr w, mmt_vec_ptr v)
1812 {
1813     /* input: fully consistent */
1814     /* output: inconsistent !
1815      * Need mmt_vec_allreduce or mmt_vec_reduce_sameside, or
1816      * matmul_top_mul_comm, depending on what we want to do. */
1817     ASSERT_ALWAYS(v->consistency == 2);
1818     ASSERT_ALWAYS(w->abase == v->abase);
1819     ASSERT_ALWAYS(v->d != w->d);
1820     ASSERT_ALWAYS(v->n == w->n);
1821 
1822     mpfq_vbase_ptr A = v->abase;
1823 
1824     serialize_threads(w->pi->m);
1825     mmt_full_vec_set_zero(w);
1826     serialize_threads(w->pi->m);
1827 
1828     unsigned int v_off, w_off;
1829     unsigned int how_many = intersect_two_intervals(&v_off, &w_off,
1830             v->i0, v->i1, w->i0, w->i1);
1831 
1832     A->vec_set(A,
1833             A->vec_coeff_ptr(A,  w->v, w_off),
1834             A->vec_coeff_ptr(A,  v->v, v_off),
1835             how_many);
1836     w->consistency = 1;
1837 }
1838 
mmt_vec_apply_or_unapply_P_inner(matmul_top_data_ptr mmt,mmt_vec_ptr y,int apply)1839 void mmt_vec_apply_or_unapply_P_inner(matmul_top_data_ptr mmt, mmt_vec_ptr y, int apply)
1840 {
1841     ASSERT_ALWAYS(y->consistency == 2);
1842     mmt_vec yt;
1843     mmt_vec_init(mmt, y->abase, y->pitype, yt, !y->d, 0, y->n);
1844     if ((apply ^ y->d) == 0) {
1845         // y->d == 0: get v*P^-1
1846         // y->d == 1: get v*P
1847         mmt_apply_identity(yt, y);
1848         matmul_top_mul_comm(y, yt);
1849     } else {
1850         // y->d == 0: get v*P
1851         // y->d == 1: get v*P^-1
1852         mmt_vec_downgrade_consistency(y);
1853         matmul_top_mul_comm(yt, y);
1854         mmt_apply_identity(y, yt);
1855         mmt_vec_allreduce(y);
1856     }
1857     mmt_vec_clear(mmt, yt);
1858     ASSERT_ALWAYS(y->consistency == 2);
1859 }
1860 
mmt_vec_unapply_P(matmul_top_data_ptr mmt,mmt_vec_ptr y)1861 void mmt_vec_unapply_P(matmul_top_data_ptr mmt, mmt_vec_ptr y)
1862 {
1863     mmt_vec_apply_or_unapply_P_inner(mmt, y, 0);
1864 }
1865 
mmt_vec_apply_P(matmul_top_data_ptr mmt,mmt_vec_ptr y)1866 void mmt_vec_apply_P(matmul_top_data_ptr mmt, mmt_vec_ptr y)
1867 {
1868     mmt_vec_apply_or_unapply_P_inner(mmt, y, 1);
1869 }
1870 
1871 /* apply == 1 for apply, apply == 0 for unapply *
1872  * apply == 1 d == 0 Sr defined:   v <- v * Sr
1873  * apply == 1 d == 0 Sr implicit:  v <- v * Sc
1874  * apply == 1 d == 1 Sc defined:   v <- v * Sc
1875  * apply == 1 d == 1 Sc implicit:  v <- v * Sr
1876  * apply == 0 d == 0 Sr defined:   v <- v * Sr^-1
1877  * apply == 0 d == 0 Sr implicit:  v <- v * Sc^-1
1878  * apply == 0 d == 1 Sc defined:   v <- v * Sc^-1
1879  * apply == 0 d == 1 Sc implicit:  v <- v * Sr^-1
1880  *
1881  * See that when, say, Sr is implicitly defined (to P*Sc), this function
1882  * only applies Sc, not P !
1883  */
mmt_vec_apply_or_unapply_S_inner(matmul_top_data_ptr mmt,int midx,mmt_vec_ptr y,int apply)1884 void mmt_vec_apply_or_unapply_S_inner(matmul_top_data_ptr mmt, int midx, mmt_vec_ptr y, int apply)
1885 {
1886     ASSERT_ALWAYS(y->consistency == 2);
1887     /* input: fully consistent */
1888     /* output: fully consistent */
1889     int d = y->d;
1890     mpfq_vbase_ptr A = y->abase;
1891 
1892     serialize_threads(y->pi->m);
1893 
1894     /* We'll have two vectors of size n[d], one named y in direction d,
1895      * and one named yt in direction !d.
1896      *
1897      *
1898      * In the permutation mmt->perm[d], the pairs (i,j) are such that,
1899      * given two vectors of size n[d], one named y in direction d,
1900      * and one named yt in direction !d, we have:
1901      *  i in [y->i0..y->i1[
1902      *  j in [yt->i0..yt->i1[
1903      */
1904     matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
1905     permutation_data_ptr s = Mloc->perm[d];
1906 
1907     /* For square matrices, we'll use the other permutation transparently
1908      * with this piece of code. Note though that when we do so, applying
1909      * the permutation actually goes in the opposite direction. */
1910     int xd = d;
1911     if (!s && (Mloc->bal->h->flags & FLAG_REPLICATE)) {
1912         ASSERT_ALWAYS(Mloc->n[0] == Mloc->n[1]);
1913         s = Mloc->perm[!d];
1914         xd = !d;
1915     }
1916     if (!s) {
1917         /* could well be that we have nothing to do */
1918         return;
1919     }
1920 
1921     if ((apply^d^xd) == 0) {
1922         /*
1923          * apply == 0 d == 0 Sr defined:   v <- v * Sr^-1
1924          * apply == 0 d == 1 Sc defined:   v <- v * Sc^-1
1925          * apply == 1 d == 0 Sr implicit:  v <- v * Sc
1926          * apply == 1 d == 1 Sc implicit:  v <- v * Sr
1927          */
1928         mmt_vec yt;
1929         mmt_vec_init(mmt, A, y->pitype, yt, !d, 0, y->n);
1930 
1931         mmt_apply_identity(yt, y);
1932         mmt_vec_allreduce(yt);
1933         mmt_full_vec_set_zero(y);
1934         serialize_threads(y->pi->m);
1935         for(unsigned int k = 0 ; k < s->n ; k++) {
1936             if (s->x[k][d^xd] < y->i0 || s->x[k][d^xd] >= y->i1)
1937                 continue;
1938             if (s->x[k][d^xd^1] < yt->i0 || s->x[k][d^xd^1] >= yt->i1)
1939                 continue;
1940             A->vec_set(A,
1941                     A->vec_coeff_ptr(A,  y->v, s->x[k][d^xd]  - y->i0),
1942                     A->vec_coeff_ptr(A, yt->v, s->x[k][d^xd^1] - yt->i0),
1943                     1);
1944         }
1945         y->consistency = 1;
1946         serialize_threads(y->pi->m);
1947         mmt_vec_allreduce(y);
1948         mmt_vec_clear(mmt, yt);
1949     } else {
1950         /*
1951          * apply == 1 d == 0 Sr defined:   v <- v * Sr
1952          * apply == 1 d == 1 Sc defined:   v <- v * Sc
1953          * apply == 0 d == 0 Sr implicit:  v <- v * Sc^-1
1954          * apply == 0 d == 1 Sc implicit:  v <- v * Sr^-1
1955          */
1956         mmt_vec yt;
1957         mmt_vec_init(mmt, A, y->pitype, yt, !d, 0, y->n);
1958         for(unsigned int k = 0 ; k < s->n ; k++) {
1959             if (s->x[k][d^xd] < y->i0 || s->x[k][d^xd] >= y->i1)
1960                 continue;
1961             if (s->x[k][d^xd^1] < yt->i0 || s->x[k][d^xd^1] >= yt->i1)
1962                 continue;
1963             A->vec_set(A,
1964                     A->vec_coeff_ptr(A, yt->v, s->x[k][d^xd^1] - yt->i0),
1965                     A->vec_coeff_ptr(A,  y->v, s->x[k][d^xd] -  y->i0),
1966                     1);
1967         }
1968         yt->consistency = 1;
1969         mmt_vec_allreduce(yt);
1970         mmt_apply_identity(y, yt);
1971         mmt_vec_allreduce(y);
1972         mmt_vec_clear(mmt, yt);
1973     }
1974     serialize_threads(y->pi->m);
1975     ASSERT_ALWAYS(y->consistency == 2);
1976 }
1977 
1978 /* multiply v by Sr^-1 if v->d == 0, by Sc^-1 if v->d == 1 */
mmt_vec_unapply_S(matmul_top_data_ptr mmt,int midx,mmt_vec_ptr y)1979 void mmt_vec_unapply_S(matmul_top_data_ptr mmt, int midx, mmt_vec_ptr y)
1980 {
1981     matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
1982     mmt_vec_apply_or_unapply_S_inner(mmt, midx, y, 0);
1983     if ((Mloc->bal->h->flags & FLAG_REPLICATE) && !Mloc->perm[y->d]) {
1984         if (y->d == 0) {
1985             /* implicit Sr^-1 is Sc^-1*P^-1 */
1986             mmt_vec_unapply_P(mmt, y);
1987         } else {
1988             /* implicit Sc^-1 is Sr^-1*P */
1989             mmt_vec_apply_P(mmt, y);
1990         }
1991     }
1992 }
1993 
1994 /* multiply v by Sr if v->d == 0, by Sc if v->d == 1 */
mmt_vec_apply_S(matmul_top_data_ptr mmt,int midx,mmt_vec_ptr y)1995 void mmt_vec_apply_S(matmul_top_data_ptr mmt, int midx, mmt_vec_ptr y)
1996 {
1997     matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
1998     if ((Mloc->bal->h->flags & FLAG_REPLICATE) && !Mloc->perm[y->d]) {
1999 
2000         if (y->d == 0) {
2001             /* implicit Sr is P * Sc */
2002             mmt_vec_apply_P(mmt, y);
2003         } else {
2004             /* implicit Sc is P^-1 * Sr */
2005             mmt_vec_unapply_P(mmt, y);
2006         }
2007     }
2008     mmt_vec_apply_or_unapply_S_inner(mmt, midx, y, 1);
2009 }
2010 
2011 /* for square matrix products, the inner loops do
2012  *   v <- v * (Sr=P*Sc)*Mpad*Sc^-1*P^-1              (for vector times matrix)
2013  *   v <- v * Transpose(P^-1*(Sr=P*Sc)*Mpad*Sc^-1)   (for matrix times vector)
2014  * while for non-square (no FLAG_REPLICATE), it's simply by
2015  *      Sr*Mpad*Sc^-1
2016  *
2017  * therefore the rules for twisting and untwisting are as follows.
2018  *
2019  * twisting v->d == 0
2020  *      we assume we want here to change v to become a good *input* for a
2021  *      vector times matrix operation. Therefore we do:
2022  *              v <- v * Sr^-1
2023  * twisting v->d == 1
2024  *      v <- v * Sc^-1
2025  *
2026  */
2027 
mmt_vec_twist(matmul_top_data_ptr mmt,mmt_vec_ptr y)2028 void mmt_vec_twist(matmul_top_data_ptr mmt, mmt_vec_ptr y)
2029 {
2030     mmt_vec_unapply_S(mmt, y->d == 0 ? 0 : (mmt->nmatrices-1), y);
2031 }
2032 
mmt_vec_untwist(matmul_top_data_ptr mmt,mmt_vec_ptr y)2033 void mmt_vec_untwist(matmul_top_data_ptr mmt, mmt_vec_ptr y)
2034 {
2035     mmt_vec_apply_S(mmt, y->d == 0 ? 0 : (mmt->nmatrices-1), y);
2036 }
2037 
2038 /* {{{ mmt_vec_{un,}appy_T -- this applies the fixed column
2039  * permutation which we use unconditionally in bwc to avoid correlation
2040  * of row and column weights.
2041  */
2042     // pshuf indicates two integers a,b such that the COLUMN i of the input
2043     // matrix is in fact mapped to column a*i+b mod n in the matrix we work
2044     // with. pshuf_inv indicates the inverse permutation. a and b do
mmt_vec_apply_or_unapply_T_inner(matmul_top_data_ptr mmt,mmt_vec_ptr y,int apply)2045 void mmt_vec_apply_or_unapply_T_inner(matmul_top_data_ptr mmt, mmt_vec_ptr y, int apply)
2046 {
2047     /* apply: coefficient i of the vector goes to coefficient
2048      * balancing_pre_shuffle[i]
2049      *
2050      * For row vectors this means: apply == (v <- v * T)
2051      * For column vectors this means: apply == (v <- T^-1 * v)
2052      *
2053      */
2054     if (y->d == 0) return;
2055     matmul_top_matrix_ptr Mloc = mmt->matrices[mmt->nmatrices - 1];
2056     ASSERT_ALWAYS(y->consistency == 2);
2057     serialize_threads(y->pi->m);
2058     mmt_vec yt;
2059     mmt_vec_init(mmt, y->abase, y->pitype, yt, !y->d, 0, y->n);
2060     for(unsigned int i = y->i0 ; i < y->i1 ; i++) {
2061         unsigned int j;
2062         if (apply) {
2063             j = balancing_pre_shuffle(Mloc->bal, i);
2064         } else {
2065             j = balancing_pre_unshuffle(Mloc->bal, i);
2066         }
2067         if (j >= yt->i0 && j < yt->i1) {
2068             y->abase->vec_set(y->abase,
2069                     y->abase->vec_coeff_ptr(y->abase, yt->v, j - yt->i0),
2070                     y->abase->vec_coeff_ptr_const(y->abase, y->v, i - y->i0),
2071                     1);
2072         }
2073     }
2074     yt->consistency = 0;
2075     mmt_vec_allreduce(yt);
2076     mmt_apply_identity(y, yt);
2077     mmt_vec_allreduce(y);
2078 }
mmt_vec_unapply_T(matmul_top_data_ptr mmt,mmt_vec_ptr v)2079 void mmt_vec_unapply_T(matmul_top_data_ptr mmt, mmt_vec_ptr v)
2080 {
2081     mmt_vec_apply_or_unapply_T_inner(mmt, v, 0);
2082 }
2083 
mmt_vec_apply_T(matmul_top_data_ptr mmt,mmt_vec_ptr v)2084 void mmt_vec_apply_T(matmul_top_data_ptr mmt, mmt_vec_ptr v)
2085 {
2086     mmt_vec_apply_or_unapply_T_inner(mmt, v, 1);
2087 }
2088 /* }}} */
2089 
2090 /* {{{ Application of permutations to indices */
2091 
2092 /* Given indices which relate to a vector in direction d, modify them in
2093  * place so that they correspond to coefficients of the matching twisted vector.
2094  *
2095  * So we want: v_i == twist(v)_{twist(i)}
2096  *
2097  * for d == 0, twist(v) = v * Sr^-1, so that index i in v goes to
2098  * position S(i) in in the twisted vector. For implicit Sr, we have to
2099  * take into account the fact that Sr=P*Sc, hence S(i) is in fact
2100  * Sc[P[i]].
2101  *
2102  * The algorithm is simple: only one thread knows about each single
2103  * (i,Sr(i)) pair. So this one changes the relevant xs[] values, leaving
2104  * the other to zero. And then we do a global allreduce() on the xs[]
2105  * values.
2106  *
2107  * (here, "relevant" may mean something which includes the P
2108  * permutation).
2109  */
2110 
2111 /* returns the two intervals such that for all pairs (i,j) in
2112  * mmt->perm->x, we have ii[0] <= i < ii[1], and jj[0] <= j < jj[1]
2113  */
get_local_permutations_ranges(matmul_top_data_ptr mmt,int d,unsigned int ii[2],unsigned int jj[2])2114 static void get_local_permutations_ranges(matmul_top_data_ptr mmt, int d, unsigned int ii[2], unsigned int jj[2])
2115 {
2116     int pos[2];
2117 
2118     for(int dir = 0 ; dir < 2 ; dir++)  {
2119         pi_comm_ptr piwr = mmt->pi->wr[dir];
2120         pos[dir] = piwr->jrank * piwr->ncores + piwr->trank;
2121     }
2122 
2123     size_t e = mmt->n[d] / mmt->pi->m->totalsize;
2124     ii[0] = e *  pos[!d]    * mmt->pi->wr[d]->totalsize;
2125     ii[1] = e * (pos[!d]+1) * mmt->pi->wr[d]->totalsize;
2126     jj[0] = e *  pos[d]     * mmt->pi->wr[!d]->totalsize;
2127     jj[1] = e * (pos[d]+1)  * mmt->pi->wr[!d]->totalsize;
2128 }
2129 
indices_twist(matmul_top_data_ptr mmt,uint32_t * xs,unsigned int n,int d)2130 void indices_twist(matmul_top_data_ptr mmt, uint32_t * xs, unsigned int n, int d)
2131 {
2132     int midx = d == 0 ? 0 : (mmt->nmatrices - 1);
2133     matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
2134     /* d == 1: twist(v) = v*Sc^-1
2135      * d == 0: twist(v) = v*Sr^-1
2136      */
2137     unsigned int ii[2], jj[2];
2138 
2139     if (Mloc->perm[d]) {
2140         /* explicit S */
2141         /* coordinate S[i] in the original vector becomes i in the
2142          * twisted vector.
2143          */
2144         get_local_permutations_ranges(mmt, d, ii, jj);
2145         unsigned int * r = malloc((jj[1] - jj[0]) * sizeof(unsigned int));
2146         memset(r, 0, (jj[1] - jj[0]) * sizeof(unsigned int));
2147         for(size_t i = 0 ; i < Mloc->perm[d]->n ; i++) {
2148             ASSERT_ALWAYS(Mloc->perm[d]->x[i][0] >= ii[0]);
2149             ASSERT_ALWAYS(Mloc->perm[d]->x[i][0] <  ii[1]);
2150             ASSERT_ALWAYS(Mloc->perm[d]->x[i][1] >= jj[0]);
2151             ASSERT_ALWAYS(Mloc->perm[d]->x[i][1] <  jj[1]);
2152             // r[Mloc->perm[d]->x[i][0] - ii[0]] = Mloc->perm[d]->x[i][1];
2153             r[Mloc->perm[d]->x[i][1] - jj[0]] = Mloc->perm[d]->x[i][0];
2154         }
2155         for(unsigned int k = 0 ; k < n ; k++) {
2156             unsigned int j = xs[k];
2157             if (j >= jj[0] && j < jj[1])
2158                 xs[k] = r[j - jj[0]];
2159             else
2160                 xs[k] = 0;
2161         }
2162         free(r);
2163         pi_allreduce(NULL, xs, n * sizeof(uint32_t), BWC_PI_BYTE, BWC_PI_BXOR, mmt->pi->m);
2164     } else if (Mloc->perm[!d] && (Mloc->bal->h->flags & FLAG_REPLICATE)) {
2165         ASSERT_ALWAYS(Mloc->n[0] == Mloc->n[1]);
2166         /* implicit S -- first we get the bits about the S in the other
2167          * direction, because the pieces we have are for the other
2168          * ranges, which is a bit disturbing...
2169          */
2170         get_local_permutations_ranges(mmt, !d, ii, jj);
2171         unsigned int * r = malloc((jj[1] - jj[0]) * sizeof(unsigned int));
2172         memset(r, 0, (jj[1] - jj[0]) * sizeof(unsigned int));
2173         for(size_t i = 0 ; i < Mloc->perm[!d]->n ; i++) {
2174             ASSERT_ALWAYS(Mloc->perm[!d]->x[i][0] >= ii[0]);
2175             ASSERT_ALWAYS(Mloc->perm[!d]->x[i][0] <  ii[1]);
2176             ASSERT_ALWAYS(Mloc->perm[!d]->x[i][1] >= jj[0]);
2177             ASSERT_ALWAYS(Mloc->perm[!d]->x[i][1] <  jj[1]);
2178             r[Mloc->perm[!d]->x[i][1] - jj[0]] = Mloc->perm[!d]->x[i][0];
2179         }
2180         /* nh and nv are the same for all submatrices, really */
2181         unsigned int nn[2] = { Mloc->bal->h->nh, Mloc->bal->h->nv };
2182         /* for d == 0, we have implicit Sr = P * Sc.
2183          * for d == 1, we have implicit Sc = P^-1 * Sc
2184          */
2185         size_t z = Mloc->bal->trows / (nn[0]*nn[1]);
2186         for(unsigned int k = 0 ; k < n ; k++) {
2187             unsigned int j = xs[k];
2188             /* for d == 0, index i goes to P^-1[Sc^-1[i]] */
2189             if (j >= jj[0] && j < jj[1]) {
2190                 unsigned int i = r[j - jj[0]];
2191                 unsigned int qz = i / z;
2192                 unsigned int rz = i % z;
2193                 /* P    sends sub-block nv*i+j to sub-block nh*j+i */
2194                 /* P^-1 sends sub-block nh*j+i to sub-block nv*i+j */
2195                 unsigned int qh = qz / nn[d];
2196                 unsigned int rh = qz % nn[d];
2197                 ASSERT_ALWAYS(qz == qh * nn[d] + rh);
2198                 ASSERT_ALWAYS(i == (qh * nn[d] + rh)*z + rz);
2199                 xs[k] = (rh * nn[!d] + qh)*z + rz;
2200             } else {
2201                 xs[k] = 0;
2202             }
2203         }
2204         free(r);
2205         pi_allreduce(NULL, xs, n * sizeof(uint32_t), BWC_PI_BYTE, BWC_PI_BXOR, mmt->pi->m);
2206     }
2207 }
2208 /* }}} */
2209 
2210 /**********************************************************************/
2211 #if 0
2212 /* no longer used -- was only used by prep.
2213  * It's not buggy, but making this work in a context where we have
2214  * multiple threads is tricky.
2215  */
2216 void matmul_top_fill_random_source(matmul_top_data_ptr mmt, int d)
2217 {
2218     matmul_top_fill_random_source_generic(mmt, mmt->vr->stride, NULL, d);
2219 }
2220 #endif
2221 
2222 /* For d == 0: do w = v * M
2223  * For d == 1: do w = M * v
2224  *
2225  * We do not necessarily have v->d == d, although this will admittedly be
2226  * the case most often:
2227  * - in block Wiedemann, we have a vector split in some direction (say
2228  *   d==1 when we wanna solve Mw=0), we compute w=Mv, and then there's
2229  *   the matmul_top_mul_comm step which moves stuff to v again.
2230  * - in block Lanczos, say we start from v->d == 1 again. We do w=M*v,
2231  *   so that w->d==0. But then we want to compute M^T * w, which is w^T *
2232  *   M. So again, w->d == 0 is appropriate with the second product being
2233  *   in the direction foo*M.
2234  * - the only case where this does not necessarily happen so is when we
2235  *   have several matrices.
2236  */
matmul_top_mul_cpu(matmul_top_data_ptr mmt,int midx,int d,mmt_vec_ptr w,mmt_vec_ptr v)2237 void matmul_top_mul_cpu(matmul_top_data_ptr mmt, int midx, int d, mmt_vec_ptr w, mmt_vec_ptr v)
2238 {
2239     matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
2240     ASSERT_ALWAYS(v->consistency == 2);
2241     ASSERT_ALWAYS(w->abase == v->abase);
2242     unsigned int di_in  = v->i1 - v->i0;
2243     unsigned int di_out = w->i1 - w->i0;
2244     ASSERT_ALWAYS(Mloc->mm->dim[!d] == di_out);
2245     ASSERT_ALWAYS(Mloc->mm->dim[d] == di_in);
2246 
2247     ASSERT_ALWAYS(w->siblings); /* w must not be shared */
2248 
2249     pi_log_op(mmt->pi->m, "[%s:%d] enter matmul_mul", __func__, __LINE__);
2250 
2251     /* Note that matmul_init copies the calling abase argument to the
2252      * lower-level mm structure. It can quite probably be qualified as a
2253      * flaw.
2254      */
2255     matmul_mul(Mloc->mm, w->v, v->v, d);
2256     w->consistency = 0;
2257 }
2258 
2259 /* This takes partial results in w, and puts the
2260  * collected and re-broadcasted results in the areas mmt->wd[d]->v
2261  *
2262  * Note that for the shuffled product, this is not equivalent to a trivial
2263  * operation.
2264  */
matmul_top_mul_comm(mmt_vec_ptr v,mmt_vec_ptr w)2265 void matmul_top_mul_comm(mmt_vec_ptr v, mmt_vec_ptr w)
2266 {
2267     /* this takes inconsistent input.
2268      * XXX if we have fully consistent input, then a reduce() is much
2269      * undesired !
2270      */
2271     ASSERT_ALWAYS(w->consistency != 2);
2272     pi_log_op(v->pi->m, "[%s:%d] enter mmt_vec_reduce", __func__, __LINE__);
2273     mmt_vec_reduce(v, w);
2274     ASSERT_ALWAYS(v->consistency == 1);
2275     pi_log_op(v->pi->m, "[%s:%d] enter mmt_vec_broadcast", __func__, __LINE__);
2276     mmt_vec_broadcast(v);
2277     ASSERT_ALWAYS(v->consistency == 2);
2278 
2279     /* If we have shared input data for the column threads, then we'd
2280      * better make sure it has arrived completely, because while all
2281      * threads will need the data, only one is actually importing it.
2282      */
2283     if (!v->siblings) {
2284         pi_log_op(v->pi->wr[v->d], "[%s:%d] serialize threads", __func__, __LINE__);
2285         serialize_threads(v->pi->wr[v->d]);
2286     }
2287 }
2288 
2289 
2290 /* Vector I/O is done by only one job, one thread. It incurs a
2291  * significant amount of memory allocation, but this is done relatively
2292  * rarely.
2293  *
2294  * Vectors, as handled within the core routines, are permuted. He have
2295  * coordinates (v_{\sigma(0)}...v_{\sigma(n-1)}), where \sigma is the
2296  * permutation given by the *_perm files. For block Wiedemann, we assume
2297  * that we have conjugated permutations on the two sides. This means that
2298  * no data manipulation is required within the critical loops (this would
2299  * imply a fuzzy communication pattern, boiling down to essentially using
2300  * allreduce instead of reduce).
2301  */
2302 
2303 /* As always, comments are written with one given point of view in mind.
2304  * The vector wich gets saved is always the source vector. Here, our
2305  * arbitrary description choice is that we iterate M^i times vector,
2306  * hence the source vector is on the right: d == 1.
2307  */
2308 
2309 // comments, variable names and so on are names which for simplicity
2310 // reflect the situation d == 1 (save right vector). The stable state we
2311 // start with is after a matrix-times-vector product (or just before
2312 // one): The left vector has been computed, and the data for indices
2313 // [i0..i1[ is on the nodes seeing those indices vertically as well. Data
2314 // is therefore found in the right vector area, as after the reduce step.
2315 //
2316 // Doing a mmt_vec_broadcast columns will ensure that each row contains
2317 // the complete data set for our vector.
2318 
mmt_vec_set_random_through_file(mmt_vec_ptr v,const char * filename_pattern,unsigned int itemsondisk,gmp_randstate_t rstate,unsigned int block_position)2319 void mmt_vec_set_random_through_file(mmt_vec_ptr v, const char * filename_pattern, unsigned int itemsondisk, gmp_randstate_t rstate, unsigned int block_position)
2320 {
2321     /* FIXME: this generates the complete vector on rank 0, saves it, and
2322      * loads it again. But I'm a bit puzzled by the choice of saving a
2323      * number of items which is n0[d]. Seems to me that this is in fact
2324      * incorrect, we want n0[!d] here.
2325      */
2326     mpfq_vbase_ptr A = v->abase;
2327     parallelizing_info_ptr pi = v->pi;
2328     int tcan_print = v->pi->m->trank == 0 && v->pi->m->jrank == 0;
2329 
2330     mpz_t p;
2331     mpz_init(p);
2332     v->abase->field_characteristic(v->abase, p);
2333     int char2 = mpz_cmp_ui(p, 2) == 0;
2334     mpz_clear(p);
2335     int splitwidth = char2 ? 64 : 1;
2336     unsigned int Adisk_width = splitwidth;
2337     unsigned int Adisk_multiplex = v->abase->simd_groupsize(v->abase) / Adisk_width;
2338 
2339     ASSERT_ALWAYS(itemsondisk % Adisk_multiplex == 0);
2340     unsigned int loc_itemsondisk = itemsondisk / Adisk_multiplex;
2341 
2342     if (pi->m->trank == 0 && pi->m->jrank == 0) {
2343         for(unsigned int b = 0 ; b < Adisk_multiplex ; b++) {
2344             unsigned int b0 = block_position + b * Adisk_width;
2345             char * filename;
2346             int rc;
2347             rc = asprintf(&filename, filename_pattern, b0, b0 + splitwidth);
2348             ASSERT_ALWAYS(rc >= 0);
2349 
2350             /* we want to create v->n / Adisk_multiplex entries --
2351              * but we can't do that with access to just A. So we
2352              * generate slightly more, and rely on itemsondisk to do
2353              * the job of properly cutting the overflowing data.
2354              */
2355 
2356             size_t nitems = iceildiv(v->n, Adisk_multiplex);
2357             void * y;
2358             cheating_vec_init(A, &y, nitems);
2359             A->vec_set_zero(A, y, nitems);
2360             A->vec_random(A, y, nitems, rstate);
2361             double tt = -wct_seconds();
2362             if (tcan_print) {
2363                 printf("Creating random vector %s...", filename);
2364                 fflush(stdout);
2365             }
2366             FILE * f = fopen(filename, "wb");
2367             ASSERT_ALWAYS(f);
2368             rc = fwrite(y, A->vec_elt_stride(A,1), loc_itemsondisk, f);
2369             ASSERT_ALWAYS(rc == (int) loc_itemsondisk);
2370             fclose(f);
2371             tt += wct_seconds();
2372             if (tcan_print) {
2373                 size_t sizeondisk = A->vec_elt_stride(A, loc_itemsondisk);
2374                 char buf[20], buf2[20];
2375                 printf(" done [%s in %.2fs, %s/s]\n",
2376                         size_disp(sizeondisk / Adisk_multiplex, buf),
2377                         tt,
2378                         size_disp(sizeondisk / Adisk_multiplex / tt, buf2));
2379             }
2380             cheating_vec_clear(A, &y, v->n);
2381 
2382             free(filename);
2383         }
2384     }
2385     int ok = mmt_vec_load(v, filename_pattern, itemsondisk, block_position);
2386     ASSERT_ALWAYS(ok);
2387 }
2388 
mmt_vec_hamming_weight(mmt_vec_ptr y)2389 unsigned long mmt_vec_hamming_weight(mmt_vec_ptr y) {
2390     ASSERT_ALWAYS(y->consistency == 2);
2391     unsigned long w = y->abase->vec_hamming_weight(y->abase, y->v, y->i1 - y->i0);
2392     /* all threads / cores in wiring wr[y->d] share the same data and
2393      * thus deduce the same count */
2394     pi_allreduce(NULL, &w, 1, BWC_PI_UNSIGNED_LONG, BWC_PI_SUM, y->pi->wr[!y->d]);
2395     return w;
2396 }
2397 
2398 /* this is inconsistent in the sense that it's balancing-dependent */
mmt_vec_set_random_inconsistent(mmt_vec_ptr v,gmp_randstate_t rstate)2399 void mmt_vec_set_random_inconsistent(mmt_vec_ptr v, gmp_randstate_t rstate)
2400 {
2401     ASSERT_ALWAYS(v != NULL);
2402     mmt_full_vec_set_zero(v);
2403     v->abase->vec_random(v->abase, mmt_my_own_subvec(v), mmt_my_own_size_in_items(v), rstate);
2404     v->consistency=1;
2405     mmt_vec_allreduce(v);
2406 }
2407 
2408 /* _above and _below functions here do not use mmt, but we activate this
2409  * same interface nevertheless, for consistency with mmt_vec_truncate */
mmt_vec_truncate_above_index(matmul_top_data_ptr mmt MAYBE_UNUSED,mmt_vec_ptr v,unsigned int idx)2410 void mmt_vec_truncate_above_index(matmul_top_data_ptr mmt MAYBE_UNUSED, mmt_vec_ptr v, unsigned int idx)
2411 {
2412     ASSERT_ALWAYS(v != NULL);
2413     if (idx <= v->i0) idx = v->i0;
2414     if (v->i0 <= idx && idx < v->i1) {
2415         if (v->siblings) {
2416             v->abase->vec_set_zero(v->abase,
2417                     v->abase->vec_subvec(v->abase, v->v, idx - v->i0),
2418                     v->i1 - idx);
2419         } else {
2420             serialize_threads(v->pi->wr[v->d]);
2421             if (v->pi->wr[v->d]->trank == 0)
2422                 v->abase->vec_set_zero(v->abase,
2423                         v->abase->vec_subvec(v->abase, v->v, idx - v->i0),
2424                         v->i1 - idx);
2425             serialize_threads(v->pi->wr[v->d]);
2426         }
2427     }
2428 }
2429 
mmt_vec_truncate_below_index(matmul_top_data_ptr mmt MAYBE_UNUSED,mmt_vec_ptr v,unsigned int idx)2430 void mmt_vec_truncate_below_index(matmul_top_data_ptr mmt MAYBE_UNUSED, mmt_vec_ptr v, unsigned int idx)
2431 {
2432     ASSERT_ALWAYS(v != NULL);
2433     if (idx >= v->i1) idx = v->i1;
2434     if (v->i0 <= idx && idx < v->i1) {
2435         if (v->siblings) {
2436             v->abase->vec_set_zero(v->abase, v->v, idx - v->i0);
2437         } else {
2438             serialize_threads(v->pi->wr[v->d]);
2439             if (v->pi->wr[v->d]->trank == 0)
2440                 v->abase->vec_set_zero(v->abase, v->v, idx - v->i0);
2441             serialize_threads(v->pi->wr[v->d]);
2442         }
2443     }
2444 }
2445 
mmt_vec_truncate(matmul_top_data_ptr mmt,mmt_vec_ptr v)2446 void mmt_vec_truncate(matmul_top_data_ptr mmt, mmt_vec_ptr v)
2447 {
2448     mmt_vec_truncate_above_index(mmt, v, mmt->n0[v->d]);
2449 }
2450 
mmt_vec_set_x_indices(mmt_vec_ptr y,uint32_t * gxvecs,int m,unsigned int nx)2451 void mmt_vec_set_x_indices(mmt_vec_ptr y, uint32_t * gxvecs, int m, unsigned int nx)
2452 {
2453     int shared = !y->siblings;
2454     mpfq_vbase_ptr A = y->abase;
2455     mmt_full_vec_set_zero(y);
2456     if (!shared || y->pi->wr[y->d]->trank == 0) {
2457         for(int j = 0 ; j < m ; j++) {
2458             for(unsigned int k = 0 ; k < nx ; k++) {
2459                 uint32_t i = gxvecs[j*nx+k];
2460                 // set bit j of entry i to 1.
2461                 if (i < y->i0 || i >= y->i1)
2462                     continue;
2463                 {
2464                     void * x = A->vec_coeff_ptr(A, y->v, i - y->i0);
2465                     A->simd_add_ui_at(A, x, x, j, 1);
2466                 }
2467             }
2468         }
2469     }
2470     y->consistency=2;
2471     if (shared)
2472         serialize_threads(y->pi->wr[y->d]);
2473 }
2474 
2475 /* Set to the zero vector, except for the first n entries that are taken
2476  * from the vector v
2477  */
mmt_vec_set_expanded_copy_of_local_data(mmt_vec_ptr y,const void * v,unsigned int n)2478 void mmt_vec_set_expanded_copy_of_local_data(mmt_vec_ptr y, const void * v, unsigned int n)
2479 {
2480     int shared = !y->siblings;
2481     mpfq_vbase_ptr A = y->abase;
2482     mmt_full_vec_set_zero(y);
2483     if (!shared || y->pi->wr[y->d]->trank == 0) {
2484         for(unsigned int i = y->i0 ; i < y->i1 && i < n ; i++) {
2485             A->set(A,
2486                     A->vec_coeff_ptr(A, y->v, i - y->i0),
2487                     A->vec_coeff_ptr_const(A, v, i));
2488         }
2489     }
2490     y->consistency=2;
2491     if (shared)
2492         serialize_threads(y->pi->wr[y->d]);
2493 }
2494 
2495 
2496 
2497 /**********************************************************************/
2498 static void matmul_top_read_submatrix(matmul_top_data_ptr mmt, int midx, param_list_ptr pl, int optimized_direction);
2499 
2500 /* returns an allocated string holding the name of the midx-th submatrix */
matrix_list_get_item(param_list_ptr pl,const char * key,int midx)2501 static char * matrix_list_get_item(param_list_ptr pl, const char * key, int midx)
2502 {
2503     char * res = NULL;
2504     char ** mnames;
2505     int nmatrices;
2506     int rc = param_list_parse_string_list_alloc(pl, key, &mnames, &nmatrices, ",");
2507     if (rc == 0)
2508         return NULL;
2509     ASSERT_ALWAYS(midx < nmatrices);
2510     for(int i = 0 ; i < nmatrices ; i++) {
2511         if (i == midx) {
2512             res = mnames[i];
2513         } else {
2514             free(mnames[i]);
2515         }
2516     }
2517     free(mnames);
2518     return res;
2519 }
2520 
matrix_get_derived_cache_subdir(const char * matrixname,parallelizing_info_ptr pi)2521 static char* matrix_get_derived_cache_subdir(const char * matrixname, parallelizing_info_ptr pi)
2522 {
2523     /* input is NULL in the case of random matrices */
2524     if (!matrixname) return NULL;
2525     unsigned int nh = pi->wr[1]->totalsize;
2526     unsigned int nv = pi->wr[0]->totalsize;
2527     char * copy = strdup(matrixname);
2528     char * t;
2529     if (strlen(copy) > 4 && strcmp((t = copy + strlen(copy) - 4), ".bin") == 0) {
2530         *t = '\0';
2531     }
2532     if ((t = strrchr(copy, '/')) == NULL) { /* basename */
2533         t = copy;
2534     } else {
2535         t++;
2536     }
2537     int rc = asprintf(&t, "%s.%ux%u", t, nh, nv);
2538     ASSERT_ALWAYS(rc >=0);
2539     free(copy);
2540     return t;
2541 }
2542 
matrix_create_derived_cache_subdir(const char * matrixname,parallelizing_info_ptr pi)2543 static void matrix_create_derived_cache_subdir(const char * matrixname, parallelizing_info_ptr pi)
2544 {
2545     char * d = matrix_get_derived_cache_subdir(matrixname, pi);
2546     struct stat sbuf[1];
2547     int rc = stat(d, sbuf);
2548     if (rc < 0 && errno == ENOENT) {
2549         rc = mkdir(d, 0777);
2550         if (rc < 0 && errno != EEXIST) {
2551             fprintf(stderr, "mkdir(%s): %s\n", d, strerror(errno));
2552         }
2553     }
2554     free(d);
2555 }
2556 
2557 /* return an allocated string with the name of a balancing file for this
2558  * matrix and this mpi/thr split.
2559  */
matrix_get_derived_balancing_filename(const char * matrixname,parallelizing_info_ptr pi)2560 static char* matrix_get_derived_balancing_filename(const char * matrixname, parallelizing_info_ptr pi)
2561 {
2562     /* input is NULL in the case of random matrices */
2563     if (!matrixname) return NULL;
2564     char * dn = matrix_get_derived_cache_subdir(matrixname, pi);
2565     char * t;
2566     int rc = asprintf(&t, "%s/%s.bin", dn, dn);
2567     free(dn);
2568     ASSERT_ALWAYS(rc >=0);
2569     return t;
2570 }
2571 
matrix_get_derived_cache_filename_stem(const char * matrixname,parallelizing_info_ptr pi,uint32_t checksum)2572 static char* matrix_get_derived_cache_filename_stem(const char * matrixname, parallelizing_info_ptr pi, uint32_t checksum)
2573 {
2574     /* input is NULL in the case of random matrices */
2575     if (!matrixname) return NULL;
2576     char * copy = strdup(matrixname);
2577     char * t;
2578     if (strlen(copy) > 4 && strcmp((t = copy + strlen(copy) - 4), ".bin") == 0) {
2579         *t = '\0';
2580     }
2581     if ((t = strrchr(copy, '/')) == NULL) { /* basename */
2582         t = copy;
2583     } else {
2584         t++;
2585     }
2586     int pos[2];
2587     for(int d = 0 ; d < 2 ; d++)  {
2588         pi_comm_ptr wr = pi->wr[d];
2589         pos[d] = wr->jrank * wr->ncores + wr->trank;
2590     }
2591     char * dn = matrix_get_derived_cache_subdir(matrixname, pi);
2592     int rc = asprintf(&t, "%s/%s.%08" PRIx32 ".h%d.v%d", dn, dn, checksum, pos[1], pos[0]);
2593     free(dn);
2594     ASSERT_ALWAYS(rc >=0);
2595     free(copy);
2596     return t;
2597 }
2598 
matrix_get_derived_submatrix_filename(const char * matrixname,parallelizing_info_ptr pi)2599 static char* matrix_get_derived_submatrix_filename(const char * matrixname, parallelizing_info_ptr pi)
2600 {
2601     /* input is NULL in the case of random matrices */
2602     if (!matrixname) return NULL;
2603     char * copy = strdup(matrixname);
2604     char * t;
2605     if (strlen(copy) > 4 && strcmp((t = copy + strlen(copy) - 4), ".bin") == 0) {
2606         *t = '\0';
2607     }
2608     if ((t = strrchr(copy, '/')) == NULL) { /* basename */
2609         t = copy;
2610     } else {
2611         t++;
2612     }
2613     int pos[2];
2614     for(int d = 0 ; d < 2 ; d++)  {
2615         pi_comm_ptr wr = pi->wr[d];
2616         pos[d] = wr->jrank * wr->ncores + wr->trank;
2617     }
2618     char * dn = matrix_get_derived_cache_subdir(matrixname, pi);
2619     int rc = asprintf(&t, "%s/%s.h%d.v%d.bin", dn, dn, pos[1], pos[0]);
2620     free(dn);
2621     ASSERT_ALWAYS(rc >=0);
2622     free(copy);
2623     return t;
2624 }
2625 
matmul_top_init_fill_balancing_header(matmul_top_data_ptr mmt,int i,param_list_ptr pl)2626 static void matmul_top_init_fill_balancing_header(matmul_top_data_ptr mmt, int i, param_list_ptr pl)
2627 {
2628     parallelizing_info_ptr pi = mmt->pi;
2629     matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2630 
2631     if (pi->m->jrank == 0 && pi->m->trank == 0) {
2632         if (!Mloc->mname) {
2633             random_matrix_fill_fake_balancing_header(Mloc->bal, pi, param_list_lookup_string(pl, "random_matrix"));
2634         } else {
2635             if (access(Mloc->bname, R_OK) != 0) {
2636                 if (errno == ENOENT) {
2637                     printf("Creating balancing file %s\n", Mloc->bname);
2638                     struct mf_bal_args mba = {
2639                         .mfile = Mloc->mname,
2640                         .bfile = Mloc->bname,
2641                         .nh = pi->wr[1]->totalsize,
2642                         .nv = pi->wr[0]->totalsize,
2643                         .do_perm = { MF_BAL_PERM_AUTO, MF_BAL_PERM_AUTO },
2644                     };
2645                     mf_bal_adjust_from_option_string(&mba, param_list_lookup_string(pl, "balancing_options"));
2646                     /* withcoeffs being a switch for param_list, it is
2647                      * clobbered by the configure_switch mechanism */
2648                     mba.withcoeffs = !is_char2(mmt->abase);
2649                     matrix_create_derived_cache_subdir(Mloc->mname, mmt->pi);
2650 
2651                     mf_bal(&mba);
2652                 } else {
2653                     fprintf(stderr, "Cannot access balancing file %s: %s\n", Mloc->bname, strerror(errno));
2654                     exit(EXIT_FAILURE);
2655                 }
2656             }
2657             balancing_read_header(Mloc->bal, Mloc->bname);
2658         }
2659     }
2660     pi_bcast(Mloc->bal, sizeof(balancing), BWC_PI_BYTE, 0, 0, mmt->pi->m);
2661 
2662     /* check that balancing dimensions are compatible with our run */
2663     int ok = 1;
2664     ok = ok && mmt->pi->wr[0]->totalsize == Mloc->bal->h->nv;
2665     ok = ok && mmt->pi->wr[1]->totalsize == Mloc->bal->h->nh;
2666     if (ok) return;
2667 
2668     if (pi->m->jrank == 0 && pi->m->trank == 0) {
2669         fprintf(stderr, "Matrix %d, %s: balancing file %s"
2670                 " has dimensions %ux%u,"
2671                 " this conflicts with the current run,"
2672                 " which expects dimensions (%ux%u)x(%ux%u).\n",
2673                 i, Mloc->mname, Mloc->bname,
2674                 Mloc->bal->h->nh, Mloc->bal->h->nv,
2675                 mmt->pi->wr[1]->njobs,
2676                 mmt->pi->wr[1]->ncores,
2677                 mmt->pi->wr[0]->njobs,
2678                 mmt->pi->wr[0]->ncores);
2679     }
2680     serialize(mmt->pi->m);
2681     exit(1);
2682 }
2683 
2684 
matmul_top_init_prepare_local_permutations(matmul_top_data_ptr mmt,int i)2685 static void matmul_top_init_prepare_local_permutations(matmul_top_data_ptr mmt, int i)
2686 {
2687     matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2688     /* Here, we get a copy of the rowperm and colperm.
2689      *
2690      * For each (job,thread), two pairs of intervals are defined.
2691      *
2692      * for row indices: [i0[0], i1[0][ = [mrow->i0, mrow->i1[ and [Xi0[0], Xi1[0][
2693      * for col indices: [i0[1], i1[1][ = [mcol->i0, mcol->i1[ and [Xi0[1], Xi1[1][
2694      *
2695      * The parts we get are:
2696      *      [i, rowperm[i]] for    i in [mrow->i0, mrow->i1[
2697      *                         and rowperm[i] in [X0, X1[
2698      *      [j, colperm[j]] for    j in [mcol->i0, mcol->i1[
2699      *                         and colperm[i] in [Y0, Y1[
2700      * where X0 is what would be mcol->i0 if the matrix as many columns as
2701      * rows, and ditto for X1,Y0,Y1.
2702      */
2703 
2704     unsigned int rowperm_items=0;
2705     unsigned int colperm_items=0;
2706 
2707     /* Define a complete structure for the balancing which is shared
2708      * among threads, so that we'll be able to access it from all threads
2709      * simultaneously. We will put things in bal_tmp->rowperm and
2710      * bal_tmp->colperm, but beyond that, the header part will be wrong
2711      * at non-root nodes.
2712      */
2713     balancing_ptr bal_tmp = shared_malloc_set_zero(mmt->pi->m, sizeof(balancing));
2714 
2715     if (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0) {
2716         if (Mloc->bname)
2717             balancing_read(bal_tmp, Mloc->bname);
2718         /* It's fine if we have nothing. This just means that we'll have
2719          * no balancing to deal with (this occurs only for matrices
2720          * which are generated at random on the fly). */
2721         rowperm_items = bal_tmp->rowperm != NULL ? bal_tmp->trows : 0;
2722         colperm_items = bal_tmp->colperm != NULL ? bal_tmp->tcols : 0;
2723     }
2724     pi_bcast(&rowperm_items, 1, BWC_PI_UNSIGNED, 0, 0, mmt->pi->m);
2725     pi_bcast(&colperm_items, 1, BWC_PI_UNSIGNED, 0, 0, mmt->pi->m);
2726 
2727     if (mmt->pi->m->trank == 0) {
2728         if (rowperm_items) {
2729             ASSERT_ALWAYS(rowperm_items == Mloc->bal->trows);
2730             if (mmt->pi->m->jrank != 0)
2731                 bal_tmp->rowperm = malloc(Mloc->bal->trows * sizeof(uint32_t));
2732             MPI_Bcast(bal_tmp->rowperm, Mloc->bal->trows * sizeof(uint32_t), MPI_BYTE, 0, mmt->pi->m->pals);
2733         }
2734         if (colperm_items) {
2735             ASSERT_ALWAYS(colperm_items == Mloc->bal->tcols);
2736             if (mmt->pi->m->jrank != 0)
2737                 bal_tmp->colperm = malloc(Mloc->bal->tcols * sizeof(uint32_t));
2738             MPI_Bcast(bal_tmp->colperm, Mloc->bal->tcols * sizeof(uint32_t), MPI_BYTE, 0, mmt->pi->m->pals);
2739         }
2740     }
2741     serialize_threads(mmt->pi->m);      /* important ! */
2742 
2743     uint32_t * balperm[2] = { bal_tmp->rowperm, bal_tmp->colperm };
2744     for(int d = 0 ; d < 2 ; d++)  {
2745         unsigned int ii[2];
2746         unsigned int jj[2];
2747         get_local_permutations_ranges(mmt, d, ii, jj);
2748 
2749         if (!balperm[d]) continue;
2750 
2751         Mloc->perm[d] = permutation_data_alloc();
2752 #ifdef  MVAPICH2_NUMVERSION
2753         /* apparently mvapich2 frowns on realloc() */
2754         permutation_data_ensure(Mloc->perm[d],ii[1] - ii[0]);
2755 #endif
2756 
2757         /* now create the really local permutation */
2758         for(unsigned int i = ii[0] ; i < ii[1] ; i++) {
2759             unsigned int j = balperm[d][i];
2760             if (j >= jj[0] && j < jj[1]) {
2761                 unsigned int ij[2] = { i, j };
2762                 permutation_data_push(Mloc->perm[d], ij);
2763             }
2764         }
2765 #if 0
2766         const char * text[2] = { "left", "right", };
2767         printf("[%s] J%uT%u does %zu/%u permutation pairs for %s vectors\n",
2768                 mmt->pi->nodenumber_s,
2769                 mmt->pi->m->jrank, mmt->pi->m->trank,
2770                 Mloc->perm[d]->n, d ? Mloc->bal->tcols : Mloc->bal->trows,
2771                 text[d]);
2772 #endif
2773     }
2774 
2775     serialize_threads(mmt->pi->m);      /* important ! */
2776 
2777     if (mmt->pi->m->trank == 0) {
2778         if (bal_tmp->colperm) free(bal_tmp->colperm);
2779         if (bal_tmp->rowperm) free(bal_tmp->rowperm);
2780     }
2781 
2782     shared_free(mmt->pi->m, bal_tmp);
2783 }
2784 
matmul_top_init(matmul_top_data_ptr mmt,mpfq_vbase_ptr abase,parallelizing_info_ptr pi,param_list_ptr pl,int optimized_direction)2785 void matmul_top_init(matmul_top_data_ptr mmt,
2786         mpfq_vbase_ptr abase,
2787         /* matmul_ptr mm, */
2788         parallelizing_info_ptr pi,
2789         param_list_ptr pl,
2790         int optimized_direction)
2791 {
2792     memset(mmt, 0, sizeof(*mmt));
2793 
2794     mmt->abase = abase;
2795     mmt->pitype = pi_alloc_mpfq_datatype(pi, abase);
2796     mmt->pi = pi;
2797     mmt->matrices = NULL;
2798 
2799     int nbals = param_list_get_list_count(pl, "balancing");
2800     mmt->nmatrices = param_list_get_list_count(pl, "matrix");
2801     const char * random_description = param_list_lookup_string(pl, "random_matrix");
2802     const char * static_random_matrix = param_list_lookup_string(pl, "static_random_matrix");
2803 
2804 
2805     if (random_description || static_random_matrix) {
2806         if (nbals || mmt->nmatrices) {
2807             fprintf(stderr, "random_matrix is incompatible with balancing= and matrix=\n");
2808             exit(EXIT_FAILURE);
2809         }
2810     } else if (nbals && !mmt->nmatrices) {
2811         fprintf(stderr, "missing parameter matrix=\n");
2812         exit(EXIT_FAILURE);
2813     } else if (!nbals && mmt->nmatrices) {
2814         /* nbals == 0 is a hint towards taking the default balancing file
2815          * names, that's it */
2816     } else if (nbals != mmt->nmatrices) {
2817         fprintf(stderr, "balancing= and matrix= have inconsistent number of items\n");
2818         exit(EXIT_FAILURE);
2819     }
2820 
2821     if (random_description)
2822         mmt->nmatrices = 1;
2823     if (static_random_matrix)
2824         mmt->nmatrices = 1;
2825 
2826     mmt->matrices = malloc(mmt->nmatrices * sizeof(matmul_top_matrix));
2827     memset(mmt->matrices, 0, mmt->nmatrices * sizeof(matmul_top_matrix));
2828 
2829     serialize_threads(mmt->pi->m);
2830 
2831     /* The initialization goes through several passes */
2832     for(int i = 0 ; i < mmt->nmatrices ; i++) {
2833         matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2834         Mloc->mname = matrix_list_get_item(pl, "matrix", i);
2835         Mloc->bname = matrix_list_get_item(pl, "balancing", i);
2836         if (static_random_matrix) {
2837             ASSERT_ALWAYS(i == 0);
2838             Mloc->mname = strdup(static_random_matrix);
2839         }
2840         if (!Mloc->bname) {
2841             /* returns NULL is mname is NULL */
2842             Mloc->bname = matrix_get_derived_balancing_filename(Mloc->mname, mmt->pi);
2843         }
2844         ASSERT_ALWAYS((Mloc->bname != NULL) == !random_description);
2845 
2846         matmul_top_init_fill_balancing_header(mmt, i, pl);
2847 
2848         Mloc->n[0] = Mloc->bal->trows;
2849         Mloc->n[1] = Mloc->bal->tcols;
2850         Mloc->n0[0] = Mloc->bal->h->nrows;
2851         Mloc->n0[1] = Mloc->bal->h->ncols;
2852         Mloc->locfile = matrix_get_derived_cache_filename_stem(Mloc->mname, mmt->pi, Mloc->bal->h->checksum);
2853 
2854     }
2855 
2856     mmt->n[0] = mmt->matrices[0]->n[0];
2857     mmt->n0[0] = mmt->matrices[0]->n0[0];
2858     mmt->n[1] = mmt->matrices[mmt->nmatrices-1]->n[1];
2859     mmt->n0[1] = mmt->matrices[mmt->nmatrices-1]->n0[1];
2860 
2861     /* in the second loop below, get_local_permutations_ranges uses
2862      * mmt->n[], so we do it in a second pass.
2863      * Now, given that double matrices only barely work at the moment,
2864      * I'm not absolutely sure that it's really needed.
2865      */
2866     for(int i = 0 ; i < mmt->nmatrices ; i++) {
2867         matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2868 
2869         matmul_top_init_prepare_local_permutations(mmt, i);
2870 
2871         if (!mmt->pi->interleaved) {
2872             matmul_top_read_submatrix(mmt, i, pl, optimized_direction );
2873         } else {
2874             /* Interleaved threads will share their matrix data. The first
2875              * thread to arrive will do the initialization, and the second
2876              * one will just grab the pointer. The trick is to be able to
2877              * pick the pointer in the right location ! We add a generic
2878              * pointer dictionary feature in the parallelizing_info
2879              * interface for this purpose.
2880              */
2881 
2882 #define MMT_MM_MAGIC_KEY        0xaa000000UL
2883 
2884             if (mmt->pi->interleaved->idx == 0) {
2885                 matmul_top_read_submatrix(mmt, i, pl, optimized_direction);
2886                 pi_store_generic(mmt->pi, MMT_MM_MAGIC_KEY + i, mmt->pi->m->trank, Mloc->mm);
2887             } else {
2888                 Mloc->mm = pi_load_generic(mmt->pi, MMT_MM_MAGIC_KEY + i, mmt->pi->m->trank);
2889             }
2890         }
2891     }
2892 }
2893 
matmul_top_rank_upper_bound(matmul_top_data_ptr mmt)2894 unsigned int matmul_top_rank_upper_bound(matmul_top_data_ptr mmt)
2895 {
2896     unsigned int r = MAX(mmt->n0[0], mmt->n0[1]);
2897     for(int i = 0 ; i < mmt->nmatrices ; i++) {
2898         matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2899         r = MAX(r, Mloc->bal->h->nrows - Mloc->bal->h->nzrows);
2900         r = MAX(r, Mloc->bal->h->ncols - Mloc->bal->h->nzcols);
2901     }
2902     return r;
2903 }
2904 
2905 
export_cache_list_if_requested(matmul_top_matrix_ptr Mloc,parallelizing_info_ptr pi,param_list_ptr pl)2906 static int export_cache_list_if_requested(matmul_top_matrix_ptr Mloc, parallelizing_info_ptr pi, param_list_ptr pl)
2907 {
2908     const char * cachelist = param_list_lookup_string(pl, "export_cachelist");
2909     if (!cachelist) return 0;
2910 
2911     char * myline = NULL;
2912     int rc;
2913     rc = asprintf(&myline, "%s %s", pi->nodename, Mloc->mm->cachefile_name);
2914     ASSERT_ALWAYS(rc >= 0);
2915     ASSERT_ALWAYS(myline != NULL);
2916     char ** tlines = NULL;
2917     tlines = shared_malloc_set_zero(pi->m, pi->m->ncores * sizeof(const char *));
2918     tlines[pi->m->trank] = myline;
2919     serialize_threads(pi->m);
2920 
2921     /* Also, just out of curiosity, try to see what we have currently */
2922     struct stat st[1];
2923     int * has_cache = shared_malloc_set_zero(pi->m, pi->m->totalsize * sizeof(int));
2924     rc = stat(Mloc->mm->cachefile_name, st);
2925     unsigned int mynode = pi->m->ncores * pi->m->jrank;
2926     has_cache[mynode + pi->m->trank] = rc == 0;
2927     serialize_threads(pi->m);
2928 
2929     int len = 0;
2930     if (pi->m->trank == 0) {
2931     MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
2932                 has_cache, pi->m->ncores, MPI_INT, pi->m->pals);
2933 
2934         for(unsigned int j = 0 ; j < pi->m->ncores ; j++) {
2935             int s = strlen(tlines[j]);
2936             if (s >= len) len = s;
2937         }
2938         MPI_Allreduce(MPI_IN_PLACE, &len, 1, MPI_INT, MPI_MAX, pi->m->pals);
2939         char * info = malloc(pi->m->totalsize * (len + 1));
2940         char * mybuf = malloc(pi->m->ncores * (len+1));
2941         memset(mybuf, 0, pi->m->ncores * (len+1));
2942         for(unsigned int j = 0 ; j < pi->m->ncores ; j++) {
2943             memcpy(mybuf + j * (len+1), tlines[j], strlen(tlines[j])+1);
2944         }
2945         MPI_Allgather(mybuf, pi->m->ncores * (len+1), MPI_BYTE,
2946                 info, pi->m->ncores * (len+1), MPI_BYTE,
2947                 pi->m->pals);
2948         if (pi->m->jrank == 0) {
2949             FILE * f = fopen(cachelist, "wb");
2950             DIE_ERRNO_DIAG(f == NULL, "fopen(%s)", cachelist);
2951             for(unsigned int j = 0 ; j < pi->m->njobs ; j++) {
2952                 unsigned int j0 = j * pi->m->ncores;
2953                 fprintf(f, "get-cache ");
2954                 for(unsigned int k = 0 ; k < pi->m->ncores ; k++) {
2955                     char * t = info + (j0 + k) * (len + 1);
2956                     char * q = strchr(t, ' ');
2957                     ASSERT_ALWAYS(q);
2958                     if (!k) {
2959                         *q='\0';
2960                         fprintf(f, "%s", t);
2961                         *q=' ';
2962                     }
2963                     fprintf(f, "%s", q);
2964                 }
2965                 fprintf(f, "\n");
2966             }
2967             for(unsigned int j = 0 ; j < pi->m->njobs ; j++) {
2968                 unsigned int j0 = j * pi->m->ncores;
2969                 fprintf(f, "has-cache ");
2970                 for(unsigned int k = 0 ; k < pi->m->ncores ; k++) {
2971                     char * t = info + (j0 + k) * (len + 1);
2972                     char * q = strchr(t, ' ');
2973                     ASSERT_ALWAYS(q);
2974                     if (!k) {
2975                         *q='\0';
2976                         fprintf(f, "%s", t);
2977                         *q=' ';
2978                     }
2979                     if (!has_cache[pi->m->ncores * j + k]) continue;
2980                     fprintf(f, "%s", q);
2981                 }
2982                 fprintf(f, "\n");
2983             }
2984             fclose(f);
2985         }
2986         free(info);
2987         free(mybuf);
2988     }
2989     serialize_threads(pi->m);
2990     shared_free(pi->m, tlines);
2991     shared_free(pi->m, has_cache);
2992     serialize_threads(pi->m);
2993     free(myline);
2994     serialize(pi->m);
2995 
2996     return 1;
2997 }
2998 
local_fraction(unsigned int padded,unsigned int normal,pi_comm_ptr wr)2999 static unsigned int local_fraction(unsigned int padded, unsigned int normal, pi_comm_ptr wr)
3000 {
3001     ASSERT_ALWAYS(padded % wr->totalsize == 0);
3002     unsigned int i = wr->jrank * wr->ncores + wr->trank;
3003     unsigned int quo = padded / wr->totalsize;
3004     return MIN(normal - i * quo, quo);
3005 }
3006 
3007 
matmul_top_read_submatrix(matmul_top_data_ptr mmt,int midx,param_list_ptr pl,int optimized_direction)3008 static void matmul_top_read_submatrix(matmul_top_data_ptr mmt, int midx, param_list_ptr pl, int optimized_direction)
3009 {
3010     int rebuild = 0;
3011     param_list_parse_int(pl, "rebuild_cache", &rebuild);
3012     int can_print = (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0);
3013 
3014     matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
3015 
3016     Mloc->mm = matmul_init(mmt->abase,
3017             Mloc->n[0] / mmt->pi->wr[1]->totalsize,
3018             Mloc->n[1] / mmt->pi->wr[0]->totalsize,
3019             Mloc->locfile, NULL  /* means: choose mm_impl from pl */,
3020             pl, optimized_direction);
3021 
3022     // *IF* we need to do a collective read of the matrix, we need to
3023     // provide the pointer *now*.
3024     unsigned int sqread = 0;
3025     param_list_parse_uint(pl, "sequential_cache_read", &sqread);
3026 
3027     int cache_loaded = 0;
3028 
3029     if (export_cache_list_if_requested(Mloc, mmt->pi, pl)) {
3030         /* If we are being called from dispatch, once all submatrices
3031          * have had their list of required files printed, the program
3032          * will exit. */
3033         return;
3034     }
3035 
3036     if (!rebuild) {
3037         if (can_print) {
3038             printf("Now trying to load matrix cache files\n");
3039         }
3040         if (sqread) {
3041             for(unsigned int j = 0 ; j < mmt->pi->m->ncores ; j += sqread) {
3042                 serialize_threads(mmt->pi->m);
3043                 double t_read = -wct_seconds();
3044                 if (j / sqread == mmt->pi->m->trank / sqread)
3045                     cache_loaded = matmul_reload_cache(Mloc->mm);
3046                 serialize(mmt->pi->m);
3047                 t_read += wct_seconds();
3048                 if (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == j && cache_loaded) {
3049                     printf("[%s] J%uT%u-%u: read cache %s (and others) in %.2fs (round %u/%u)\n",
3050                     mmt->pi->nodenumber_s,
3051                     mmt->pi->m->jrank,
3052                     mmt->pi->m->trank,
3053                     MIN(mmt->pi->m->ncores, mmt->pi->m->trank + sqread) - 1,
3054                     Mloc->mm->cachefile_name,
3055                     t_read,
3056                     j / sqread, iceildiv(mmt->pi->m->ncores, sqread)
3057                     );
3058                 }
3059             }
3060         } else {
3061             double t_read = -wct_seconds();
3062             cache_loaded = matmul_reload_cache(Mloc->mm);
3063             serialize(mmt->pi->m);
3064             t_read += wct_seconds();
3065             if (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0 && cache_loaded) {
3066                 printf("[%s] J%u: read cache %s (and others) in %.2fs\n",
3067                         mmt->pi->nodenumber_s,
3068                         mmt->pi->m->jrank,
3069                         Mloc->mm->cachefile_name,
3070                         t_read
3071                       );
3072             }
3073         }
3074         if (!mmt->pi->m->trank) {
3075             printf("J%u %s done reading (result=%d)\n", mmt->pi->m->jrank, mmt->pi->nodename, cache_loaded);
3076         }
3077     }
3078 
3079     if (!pi_data_eq(&cache_loaded, 1, BWC_PI_INT, mmt->pi->m)) {
3080         if (can_print) {
3081             fprintf(stderr, "Fatal error: cache files not present at expected locations\n");
3082         }
3083         SEVERAL_THREADS_PLAY_MPI_BEGIN(mmt->pi->m) {
3084             fprintf(stderr, "[%s] J%uT%u: cache %s: %s\n",
3085                     mmt->pi->nodenumber_s,
3086                     mmt->pi->m->jrank,
3087                     mmt->pi->m->trank,
3088                     Mloc->mm->cachefile_name,
3089                     cache_loaded ? "ok" : "not ok");
3090         }
3091         SEVERAL_THREADS_PLAY_MPI_END();
3092         serialize(mmt->pi->m);
3093         abort();
3094     }
3095 
3096     unsigned int sqb = 0;
3097     param_list_parse_uint(pl, "sequential_cache_build", &sqb);
3098 
3099     matrix_u32 m;
3100     memset(m, 0, sizeof(matrix_u32));
3101     /* see remark in raw_matrix_u32.h about data ownership for type
3102      * matrix_u32 */
3103 
3104     if (!cache_loaded) {
3105         // the mm layer is informed of the higher priority computations
3106         // that will take place. Depending on the implementation, this
3107         // may cause the direct or transposed ordering to be preferred.
3108         // Thus we have to read this back from the mm structure.
3109         m->bfile = Mloc->bname;
3110         m->mfile = Mloc->mname;
3111         m->transpose = Mloc->mm->store_transposed;
3112         m->withcoeffs = !is_char2(mmt->abase);
3113         if (!(Mloc->mname)) {
3114             if (can_print) {
3115                 printf("Begin creation of fake matrix data in parallel\n");
3116             }
3117             /* Mloc->mm->dim[0,1] contains the dimensions of the padded
3118              * matrix. This is absolutely fine in the normal case. But in
3119              * the case of staged matrices, it's a bit different. We must
3120              * make sure that we generate matrices which have zeroes in
3121              * the padding area.
3122              */
3123             random_matrix_get_u32(mmt->pi, pl, m,
3124                     local_fraction(Mloc->n[0], Mloc->n0[0], mmt->pi->wr[1]),
3125                     local_fraction(Mloc->n[1], Mloc->n0[1], mmt->pi->wr[0]));
3126         } else {
3127             if (can_print) {
3128                 printf("Matrix dispatching starts\n");
3129             }
3130 
3131             /* It might be that the leader and the other nodes do _not_
3132              * share a common filesystem, in which case we must do this
3133              * also here.
3134              */
3135             if (mmt->pi->m->trank == 0)
3136                 matrix_create_derived_cache_subdir(Mloc->mname, mmt->pi);
3137             serialize_threads(mmt->pi->m);
3138 
3139             balancing_get_matrix_u32(mmt->pi, pl, m);
3140 
3141             int ssm = 0;
3142             param_list_parse_int(pl, "save_submatrices", &ssm);
3143             if (ssm) {
3144                 char * submat = matrix_get_derived_submatrix_filename(Mloc->mname, mmt->pi);
3145                 fprintf(stderr, "DEBUG: creating %s\n", submat);
3146                 FILE * f = fopen(submat, "wb");
3147                 fwrite(m->p, sizeof(uint32_t), m->size, f);
3148                 fclose(f);
3149                 free(submat);
3150             }
3151         }
3152     }
3153 
3154     if (can_print && Mloc->bname) {
3155         balancing bal;
3156         balancing_init(bal);
3157         balancing_read_header(bal, Mloc->bname);
3158         balancing_set_row_col_count(bal);
3159         printf("Matrix: total %" PRIu32 " rows %" PRIu32 " cols "
3160                 "%" PRIu64 " coeffs\n",
3161                 bal->h->nrows, bal->h->ncols, bal->h->ncoeffs);
3162         balancing_clear(bal);
3163     }
3164 
3165     if (!sqb) {
3166         if (!cache_loaded) {
3167             // everybody does it in parallel
3168             if (verbose_enabled(CADO_VERBOSE_PRINT_BWC_CACHE_MAJOR_INFO))
3169                 printf("[%s] J%uT%u building cache for %s\n",
3170                         mmt->pi->nodenumber_s,
3171                         mmt->pi->m->jrank,
3172                         mmt->pi->m->trank,
3173                         Mloc->locfile);
3174             matmul_build_cache(Mloc->mm, m);
3175             matmul_save_cache(Mloc->mm);
3176         }
3177     } else {
3178         if (can_print)
3179             printf("Building local caches %d at a time\n", sqb);
3180         for(unsigned int j = 0 ; j < mmt->pi->m->ncores + sqb ; j += sqb) {
3181             serialize_threads(mmt->pi->m);
3182             if (cache_loaded) continue;
3183             if (j / sqb == mmt->pi->m->trank / sqb) {
3184                 if (verbose_enabled(CADO_VERBOSE_PRINT_BWC_CACHE_MAJOR_INFO))
3185                     printf("[%s] J%uT%u building cache for %s\n",
3186                             mmt->pi->nodenumber_s,
3187                             mmt->pi->m->jrank,
3188                             mmt->pi->m->trank,
3189                             Mloc->locfile);
3190                 matmul_build_cache(Mloc->mm, m);
3191             } else if (j / sqb == mmt->pi->m->trank / sqb + 1) {
3192                 matmul_save_cache(Mloc->mm);
3193             }
3194         }
3195     }
3196 
3197     /* see remark in raw_matrix_u32.h about data ownership for type
3198      * matrix_u32 */
3199 
3200     if (Mloc->mm->cachefile_name && verbose_enabled(CADO_VERBOSE_PRINT_BWC_CACHE_MAJOR_INFO)) {
3201         my_pthread_mutex_lock(mmt->pi->m->th->m);
3202         printf("[%s] J%uT%u uses cache file %s\n",
3203                 mmt->pi->nodenumber_s,
3204                 mmt->pi->m->jrank, mmt->pi->m->trank,
3205                 /* cache for mmt->locfile, */
3206                 Mloc->mm->cachefile_name);
3207         my_pthread_mutex_unlock(mmt->pi->m->th->m);
3208     }
3209 }
3210 
matmul_top_report(matmul_top_data_ptr mmt,double scale,int full)3211 void matmul_top_report(matmul_top_data_ptr mmt, double scale, int full)
3212 {
3213     for(int midx = 0 ; midx < mmt->nmatrices ; midx++) {
3214         matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
3215         matmul_report(Mloc->mm, scale);
3216         size_t max_report_size = 0;
3217         pi_allreduce(&Mloc->mm->report_string_size, &max_report_size, 1, BWC_PI_SIZE_T, BWC_PI_MAX, mmt->pi->m);
3218         char * all_reports = malloc(mmt->pi->m->totalsize * max_report_size);
3219         memset(all_reports, 0, mmt->pi->m->totalsize * max_report_size);
3220         memcpy(all_reports + max_report_size * (mmt->pi->m->jrank * mmt->pi->m->ncores + mmt->pi->m->trank), Mloc->mm->report_string, Mloc->mm->report_string_size);
3221         pi_allgather(NULL, 0, 0,
3222                 all_reports, max_report_size, BWC_PI_BYTE, mmt->pi->m);
3223 
3224         if (max_report_size > 1 && mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0) {
3225             for(unsigned int j = 0 ; j < mmt->pi->m->njobs ; j++) {
3226                 for(unsigned int t = 0 ; t < mmt->pi->m->ncores ; t++) {
3227                     char * locreport = all_reports + max_report_size * (j * mmt->pi->m->ncores + t);
3228                     if (full || (j == 0 && t == 0))
3229                         printf("##### J%uT%u timing report:\n%s", j, t, locreport);
3230                 }
3231             }
3232         }
3233         serialize(mmt->pi->m);
3234         free(all_reports);
3235     }
3236 }
3237 
matmul_top_clear(matmul_top_data_ptr mmt)3238 void matmul_top_clear(matmul_top_data_ptr mmt)
3239 {
3240     pi_free_mpfq_datatype(mmt->pi, mmt->pitype);
3241     serialize_threads(mmt->pi->m);
3242     serialize(mmt->pi->m);
3243     serialize_threads(mmt->pi->m);
3244 
3245     for(int midx = 0 ; midx < mmt->nmatrices ; midx++) {
3246         matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
3247         for(int d = 0 ; d < 2 ; d++)  {
3248             permutation_data_free(Mloc->perm[d]);
3249             // both are expected to hold storage for:
3250             // (mmt->n[d] / mmt->pi->m->totalsize * mmt->pi->wr[d]->ncores))
3251             // elements, corresponding to the largest abase encountered.
3252         }
3253         serialize(mmt->pi->m);
3254         if (!mmt->pi->interleaved) {
3255             matmul_clear(Mloc->mm);
3256         } else if (mmt->pi->interleaved->idx == 1) {
3257             matmul_clear(Mloc->mm);
3258             /* group 0 is the first to leave, thus it doesn't to freeing.
3259             */
3260         }
3261         free(Mloc->locfile);
3262         free(Mloc->mname);
3263         free(Mloc->bname);
3264     }
3265     serialize(mmt->pi->m);
3266     free(mmt->matrices);
3267 }
3268 
permutation_data_alloc()3269 static permutation_data_ptr permutation_data_alloc()
3270 {
3271     permutation_data_ptr a = malloc(sizeof(permutation_data));
3272     a->n = 0;
3273     a->alloc = 0;
3274     a->x = NULL;
3275     return a;
3276 }
3277 
permutation_data_free(permutation_data_ptr a)3278 static void permutation_data_free(permutation_data_ptr a)
3279 {
3280     if (!a) return;
3281     a->n = 0;
3282     a->alloc = 0;
3283     if (a->x) free(a->x);
3284     free(a);
3285 }
3286 
3287 static pthread_mutex_t pp = PTHREAD_MUTEX_INITIALIZER;
3288 
permutation_data_push(permutation_data_ptr a,unsigned int u[2])3289 static void permutation_data_push(permutation_data_ptr a, unsigned int u[2])
3290 {
3291     if (a->n >= a->alloc) {
3292         pthread_mutex_lock(&pp);
3293         a->alloc = a->n + 16 + a->alloc / 4;
3294         a->x = realloc(a->x, a->alloc * sizeof(unsigned int[2]));
3295         pthread_mutex_unlock(&pp);
3296         ASSERT_ALWAYS(a->x != NULL);
3297     }
3298     a->x[a->n][0] = u[0];
3299     a->x[a->n][1] = u[1];
3300     a->n++;
3301 }
3302 
3303 #ifdef  MVAPICH2_NUMVERSION
permutation_data_ensure(permutation_data_ptr a,size_t n)3304 static void permutation_data_ensure(permutation_data_ptr a, size_t n)
3305 {
3306     if (n >= a->alloc) {
3307         pthread_mutex_lock(&pp);
3308         a->alloc = n;
3309         a->x = realloc(a->x, n * sizeof(unsigned int[2]));
3310         pthread_mutex_unlock(&pp);
3311         ASSERT_ALWAYS(a->x != NULL);
3312     }
3313 }
3314 #endif
3315