1 #include "cado.h" // IWYU pragma: keep
2 #include <stdint.h>
3 #include <inttypes.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <unistd.h> // for access, unlink, ssize_t, R_OK
9 #include <sys/stat.h> // for stat, mkdir
10 #include <pthread.h> // for pthread_mutex_lock, pthread_mutex_u...
11 #include <gmp.h>
12 #include "async.h" // for timing_next_timer, timing_data (ptr...
13 #include "balancing_workhorse.h"
14 #include "cheating_vec_init.h"
15 #include "intersections.h"
16 #include "bwc_config.h" // IWYU pragma: keep
17 #include "macros.h" // ASSERT_ALWAYS // IWYU pragma: keep
18 #include "matmul.h"
19 #include "matmul_top.h"
20 #include "mf_bal.h"
21 #include "misc.h"
22 #include "params.h"
23 #include "portability.h" // asprintf // IWYU pragma: keep
24 #include "random_matrix.h"
25 #include "raw_matrix_u32.h" // for matrix_u32
26 #include "select_mpi.h"
27 #include "timing.h" // wct_seconds
28 #include "verbose.h" // CADO_VERBOSE_PRINT_BWC_CACHE_BUILD
29
30 /* Our innermost communication routines are essentially all-gather and
31 * reduce-scatter, following the MPI terminology. We provide several
32 * choices for doing this. The compile-time definitions here allow to
33 * change which is used. The "best" should in theory always be the one
34 * provided by the MPI implementation. Unfortunately, it is not always
35 * that clear.
36 *
37 * Note that because we have several threads wanting to do all these
38 * operations one after the other, we also have some interest in using
39 * non-blockng collectives, or even also some sort of
40 * communication-computation overlap.
41 */
42
43 /* choices for reduce-scatter */
44 #define RS_CHOICE_STOCK_RS 1
45 #define RS_CHOICE_STOCK_RSBLOCK (MPI_VERSION_ATLEAST(2,2) ? 2 : -2)
46 #define RS_CHOICE_STOCK_IRS (MPI_VERSION_ATLEAST(3,0) ? 3 : -3)
47 #define RS_CHOICE_STOCK_IRSBLOCK (MPI_VERSION_ATLEAST(3,0) ? 4 : -4)
48 #define RS_CHOICE_MINE 5
49 #define RS_CHOICE_MINE_DROP_IN (MPI_VERSION_ATLEAST(3,0) ? 6 : -6)
50 #define RS_CHOICE_MINE_PARALLEL 7
51 #define RS_CHOICE_MINE_OVERLAPPING 8 /* TODO */
52 /* choices for all-gather */
53 #define AG_CHOICE_STOCK_AG 1
54 #define AG_CHOICE_STOCK_IAG (MPI_VERSION_ATLEAST(3,0) ? 2 : -2)
55
56 /* The actual performance is affected by the communicator size, the chunk
57 * size, and the operation. When we're doing bxor, we have the following
58 * measurements for RS.
59 *
60 * n=2, chunk=174 MB, openmpi-1.8.3, IB FDR.
61 * RS_CHOICE_MINE_PARALLEL .25
62 * RS_CHOICE_MINE .45
63 * RS_CHOICE_MINE_DROP_IN .48
64 * RS_CHOICE_STOCK_RS .77
65 * RS_CHOICE_STOCK_RSBLOCK 1.63
66 * RS_CHOICE_STOCK_IRS => not measured
67 * RS_CHOICE_STOCK_IRSBLOCK => not measured
68 * RS_CHOICE_MINE_OVERLAPPING => to be implemented
69 */
70
71 /* _this_ part can be configured */
72 // #define RS_CHOICE RS_CHOICE_STOCK_IRSBLOCK
73 // #define AG_CHOICE AG_CHOICE_STOCK_IAG
74 #define RS_CHOICE RS_CHOICE_MINE_PARALLEL
75 #define AG_CHOICE (MPI_VERSION_ATLEAST(3,0) ? AG_CHOICE_STOCK_IAG : AG_CHOICE_STOCK_AG)
76
77 /* some sanity checking */
78 #if RS_CHOICE < 0
79 #error "Choice for reduce-scatter strategy is invalid or not supported"
80 #endif
81 #if AG_CHOICE < 0
82 #error "Choice for reduce-scatter strategy is invalid or not supported"
83 #endif
84
85 /* we no longer use this.
86 #ifdef HAVE_MPI3_API
87 #define MPI_LIBRARY_NONBLOCKING_COLLECTIVES
88 #endif
89 */
90
91 ///////////////////////////////////////////////////////////////////
92 /* Start with stuff that does not depend on abase at all -- this
93 * provides a half-baked interface */
94
95 /* At some point we had this. Not sure it's still useful. */
96 #define ABASE_UNIVERSAL_READAHEAD_ITEMS 8
97
98 static permutation_data_ptr permutation_data_alloc();
99 static void permutation_data_free(permutation_data_ptr a);
100 static void permutation_data_push(permutation_data_ptr a, unsigned int u[2]);
101 #ifdef MVAPICH2_NUMVERSION
102 static void permutation_data_ensure(permutation_data_ptr a, size_t n);
103 #endif
104
105
matmul_top_decl_usage(param_list_ptr pl)106 void matmul_top_decl_usage(param_list_ptr pl)
107 {
108 param_list_decl_usage(pl, "matrix",
109 "the matrix file (binary)");
110 param_list_decl_usage(pl, "balancing",
111 "the matrix balancing file, as computed by mf_bal");
112 param_list_decl_usage(pl, "random_matrix",
113 "characteristics of a random matrix to be used for staged runs.");
114 param_list_decl_usage(pl, "static_random_matrix",
115 "(unset or set to something arbitrary): indicate that the matrix is fake, and that there is no need to bother with the generation of vectors");
116
117 param_list_decl_usage(pl, "rebuild_cache",
118 "force rebuilding matrix caches");
119 param_list_decl_usage(pl, "export_cachelist",
120 "print the per-node needed cache files and exit");
121 param_list_decl_usage(pl, "save_submatrices",
122 "after dispatching, save a copy of the local uncompressed matrix before creating the cache file");
123 param_list_decl_usage(pl, "sequential_cache_build",
124 "build the cache files sequentially on each node");
125 param_list_decl_usage(pl, "sequential_cache_read",
126 "read the cache files sequentially on each node");
127 param_list_decl_usage(pl, "balancing_options",
128 "options to pass to the balancing subprogram (see mf_bal_adjust_from_option_string)");
129 balancing_decl_usage(pl);
130 matmul_decl_usage(pl);
131 }
132
matmul_top_lookup_parameters(param_list_ptr pl)133 void matmul_top_lookup_parameters(param_list_ptr pl)
134 {
135 param_list_lookup_string(pl, "matrix");
136 param_list_lookup_string(pl, "balancing");
137 param_list_lookup_string(pl, "random_matrix");
138 param_list_lookup_string(pl, "static_random_matrix");
139 param_list_lookup_string(pl, "rebuild_cache");
140 param_list_lookup_string(pl, "export_cachelist");
141 param_list_lookup_string(pl, "save_submatrices");
142 param_list_lookup_string(pl, "sequential_cache_build");
143 param_list_lookup_string(pl, "sequential_cache_read");
144 param_list_lookup_string(pl, "balancing_options");
145 balancing_lookup_parameters(pl);
146 matmul_lookup_parameters(pl);
147 }
148
is_char2(mpfq_vbase_ptr abase)149 static int is_char2(mpfq_vbase_ptr abase)
150 {
151 mpz_t p;
152 mpz_init(p);
153 abase->field_characteristic(abase, p);
154 int char2 = mpz_cmp_ui(p, 2) == 0;
155 mpz_clear(p);
156 return char2;
157 }
158
159
160 /* Some info about distributed vectors.
161 *
162 * See linalg/bwc/GUIDED_TOUR_OF_SOURCES for documentation about what's
163 * in the mmt_vec type.
164 *
165 * Here are some pre- and post- conditions about consistency for the
166 * common routines:
167 *
168 * matmul_top_mul_cpu:
169 * input: fully consistent
170 * output: inconsistent
171 * reduce (all variants)
172 * input: inconsistent
173 * output: partially consistent
174 * allreduce
175 * input: inconsistent
176 * output: fully consistent
177 * broadcast
178 * input: partially consistent
179 * output: fully consistent
180 * matmul_top_mul_comm:
181 * input: inconsistent
182 * output: fully consistent
183 *
184 * The following are non-critical. So for easiness, we require and
185 * provide full consistency.
186 *
187 * apply_identity:
188 * input: fully consistent
189 * output: fully consistent
190 *
191 */
192
193
194 /* {{{ vector init/clear */
195
196 /* this is for a vector which will be of interest to a group of threads
197 * and jobs in direction d */
mmt_vec_init(matmul_top_data_ptr mmt,mpfq_vbase_ptr abase,pi_datatype_ptr pitype,mmt_vec_ptr v,int d,int flags,unsigned int n)198 void mmt_vec_init(matmul_top_data_ptr mmt, mpfq_vbase_ptr abase, pi_datatype_ptr pitype, mmt_vec_ptr v, int d, int flags, unsigned int n)
199 {
200 ASSERT_ALWAYS(v != NULL);
201 if (abase == NULL) abase = mmt->abase;
202 if (pitype == NULL) pitype = mmt->pitype;
203 memset(v, 0, sizeof(mmt_vec));
204 v->pi = mmt->pi;
205 v->d = d;
206 v->abase = abase;
207 v->pitype = pitype;
208 v->n = n;
209
210 ASSERT_ALWAYS(n % mmt->pi->m->totalsize == 0);
211
212 pi_comm_ptr wr = mmt->pi->wr[d];
213 pi_comm_ptr xwr = mmt->pi->wr[!d];
214
215 /* now what is the size which we are going to allocate locally */
216 n /= xwr->totalsize;
217 v->i0 = n * (xwr->jrank * xwr->ncores + xwr->trank);
218 v->i1 = v->i0 + n;
219
220 /* Look for readahead settings for all submatrices */
221 n += ABASE_UNIVERSAL_READAHEAD_ITEMS;
222 for(int i = 0 ; i < mmt->nmatrices ; i++) {
223 matmul_aux(mmt->matrices[i]->mm, MATMUL_AUX_GET_READAHEAD, &n);
224 }
225
226 if (flags & THREAD_SHARED_VECTOR) {
227 if (wr->trank == 0) {
228 cheating_vec_init(abase, &v->v, n);
229 abase->vec_set_zero(abase, v->v, n);
230 }
231 pi_thread_bcast(&v->v, sizeof(void*), BWC_PI_BYTE, 0, wr);
232 v->siblings = NULL;
233 } else {
234 cheating_vec_init(abase, &v->v, n);
235 abase->vec_set_zero(abase, v->v, n);
236 v->siblings = shared_malloc(wr, wr->ncores * sizeof(void *));
237 v->siblings[wr->trank] = v;
238 }
239 /* Vectors begin initialized to zero, so we have full consistency */
240 v->consistency = 2;
241 serialize_threads(v->pi->m);
242
243 // pi_log_op(v->pi->m, "Hello, world");
244 /* fill wrpals and mpals */
245 v->wrpals[0] = shared_malloc(v->pi->wr[0], v->pi->wr[0]->ncores * sizeof(void *));
246 v->wrpals[0][v->pi->wr[0]->trank] = v;
247 serialize_threads(v->pi->m);
248 v->wrpals[1] = shared_malloc(v->pi->wr[1], v->pi->wr[1]->ncores * sizeof(void *));
249 v->wrpals[1][v->pi->wr[1]->trank] = v;
250 serialize_threads(v->pi->m);
251 v->mpals = shared_malloc(v->pi->m, v->pi->m->ncores * sizeof(void *));
252 v->mpals[v->pi->m->trank] = v;
253 serialize_threads(v->pi->m);
254
255 }
256
mmt_vec_clear(matmul_top_data_ptr mmt,mmt_vec_ptr v)257 void mmt_vec_clear(matmul_top_data_ptr mmt, mmt_vec_ptr v)
258 {
259 ASSERT_ALWAYS(v != NULL);
260 pi_comm_ptr wr = mmt->pi->wr[v->d];
261 serialize_threads(wr);
262 if (v->rsbuf[0]) free(v->rsbuf[0]);
263 if (v->rsbuf[1]) free(v->rsbuf[1]);
264 unsigned int n = v->i1 - v->i0;
265 /* see above */
266 n += ABASE_UNIVERSAL_READAHEAD_ITEMS;
267 for(int i = 0 ; i < mmt->nmatrices ; i++) {
268 matmul_aux(mmt->matrices[i]->mm, MATMUL_AUX_GET_READAHEAD, &n);
269 }
270 if (v->siblings) {
271 cheating_vec_clear(v->abase, &v->v, n);
272 shared_free(wr, v->siblings);
273 } else {
274 if (wr->trank == 0)
275 cheating_vec_clear(v->abase, &v->v, n);
276 }
277 shared_free(v->pi->wr[0], v->wrpals[0]);
278 shared_free(v->pi->wr[1], v->wrpals[1]);
279 shared_free(v->pi->m, v->mpals);
280 memset(v, 0, sizeof(mmt_vec));
281 }
282 /* }}} */
283
284 /* my "own" offset is the added offset within my locally stored data area
285 * which represents the data range I am the owner of. This data range
286 * correspond to the index range v->i0 + offset to v->i0 + offset + size
287 */
mmt_my_own_offset_in_items(mmt_vec_ptr v)288 size_t mmt_my_own_offset_in_items(mmt_vec_ptr v)
289 {
290 pi_comm_ptr wr = v->pi->wr[v->d];
291 size_t eblock = (v->i1 - v->i0) / wr->totalsize;
292 int pos = wr->jrank * wr->ncores + wr->trank;
293 return pos * eblock;
294 }
295
mmt_my_own_offset_in_bytes(mmt_vec_ptr v)296 size_t mmt_my_own_offset_in_bytes(mmt_vec_ptr v)
297 {
298 return v->abase->vec_elt_stride(v->abase, mmt_my_own_offset_in_items(v));
299 }
300
mmt_my_own_subvec(mmt_vec_ptr v)301 void * mmt_my_own_subvec(mmt_vec_ptr v)
302 {
303 return v->abase->vec_subvec(v->abase, v->v, mmt_my_own_offset_in_items(v));
304 }
305
mmt_my_own_size_in_items(mmt_vec_ptr v)306 size_t mmt_my_own_size_in_items(mmt_vec_ptr v)
307 {
308 pi_comm_ptr wr = v->pi->wr[v->d];
309 size_t eblock = (v->i1 - v->i0) / wr->totalsize;
310 return eblock;
311 }
312
mmt_my_own_size_in_bytes(mmt_vec_ptr v)313 size_t mmt_my_own_size_in_bytes(mmt_vec_ptr v)
314 {
315 return v->abase->vec_elt_stride(v->abase, mmt_my_own_size_in_items(v));
316 }
317
318 /* This copies **ONLY** the data we are supposed to own from v to w.
319 * mmt_own_vec_set2 is slightly special, since in this case we might in
320 * this way be picking data from vector areas owned by other blocks (or
321 * writing there). Therefore we convey the info about which vector piece
322 * we care about with the first argument z.
323 *
324 */
mmt_own_vec_set2(mmt_vec_ptr z,mmt_vec_ptr w,mmt_vec_ptr v)325 void mmt_own_vec_set2(mmt_vec_ptr z, mmt_vec_ptr w, mmt_vec_ptr v)
326 {
327 ASSERT_ALWAYS(z != NULL);
328 ASSERT_ALWAYS(v != NULL);
329 ASSERT_ALWAYS(w != NULL);
330 if (v == w) return;
331 ASSERT_ALWAYS(z->d == v->d);
332 ASSERT_ALWAYS(z->d == w->d);
333 size_t off = mmt_my_own_offset_in_items(z);
334 size_t sz = mmt_my_own_size_in_items(z);
335 ASSERT_ALWAYS(sz == mmt_my_own_size_in_items(v));
336 ASSERT_ALWAYS(sz == mmt_my_own_size_in_items(w));
337 v->abase->vec_set(v->abase,
338 w->abase->vec_subvec(w->abase, w->v, off),
339 v->abase->vec_subvec(v->abase, v->v, off),
340 sz);
341 }
mmt_own_vec_set(mmt_vec_ptr w,mmt_vec_ptr v)342 void mmt_own_vec_set(mmt_vec_ptr w, mmt_vec_ptr v)
343 {
344 ASSERT_ALWAYS(v->abase == w->abase);
345 mmt_own_vec_set2(v, w, v);
346 w->consistency = 1;
347 }
mmt_vec_swap(mmt_vec_ptr w,mmt_vec_ptr v)348 void mmt_vec_swap(mmt_vec_ptr w, mmt_vec_ptr v)
349 {
350 mmt_vec foo;
351 memcpy(foo,v,sizeof(mmt_vec));
352 memcpy(v,w,sizeof(mmt_vec));
353 memcpy(w,foo,sizeof(mmt_vec));
354 }
355
mmt_full_vec_set(mmt_vec_ptr w,mmt_vec_ptr v)356 void mmt_full_vec_set(mmt_vec_ptr w, mmt_vec_ptr v)
357 {
358 ASSERT_ALWAYS(v != NULL);
359 ASSERT_ALWAYS(w != NULL);
360 /* DO **NOT** early-quit when v==w, because we might be calling this
361 * with v and w being siblings, maybe equal for one of the threads.
362 */
363 // same remark as above
364 // ASSERT_ALWAYS(v->abase == w->abase);
365 ASSERT_ALWAYS(v->d == w->d);
366 ASSERT_ALWAYS(mmt_my_own_size_in_items(v) == mmt_my_own_size_in_items(w));
367 if (w->siblings) {
368 if (w->v != v->v) {
369 v->abase->vec_set(v->abase, w->v, v->v, v->i1 - v->i0);
370 }
371 } else {
372 ASSERT_ALWAYS(v->siblings == NULL);
373 if (w->v != v->v) {
374 if (w->pi->wr[w->d]->trank == 0) {
375 v->abase->vec_set(v->abase, w->v, v->v, v->i1 - v->i0);
376 }
377 }
378 serialize_threads(w->pi->wr[w->d]);
379 }
380 if (v != w)
381 w->consistency = v->consistency;
382 }
383
mmt_full_vec_set_zero(mmt_vec_ptr v)384 void mmt_full_vec_set_zero(mmt_vec_ptr v)
385 {
386 ASSERT_ALWAYS(v != NULL);
387 if (v->siblings) {
388 v->abase->vec_set_zero(v->abase, v->v, v->i1 - v->i0);
389 } else {
390 serialize_threads(v->pi->wr[v->d]);
391 if (v->pi->wr[v->d]->trank == 0)
392 v->abase->vec_set_zero(v->abase, v->v, v->i1 - v->i0);
393 }
394 v->consistency = 2;
395 serialize_threads(v->pi->wr[v->d]);
396 }
397
mmt_vec_set_basis_vector_at(mmt_vec_ptr v,int k,unsigned int j)398 void mmt_vec_set_basis_vector_at(mmt_vec_ptr v, int k, unsigned int j)
399 {
400 mmt_full_vec_set_zero(v);
401 mmt_vec_add_basis_vector_at(v,k,j);
402 }
403
mmt_vec_add_basis_vector_at(mmt_vec_ptr v,int k,unsigned int j)404 void mmt_vec_add_basis_vector_at(mmt_vec_ptr v, int k, unsigned int j)
405 {
406 if (v->i0 <= j && j < v->i1) {
407 if (v->siblings) {
408 v->abase->simd_set_ui_at(v->abase, v->abase->vec_coeff_ptr(v->abase, v->v, j - v->i0), k, 1);
409 } else {
410 serialize_threads(v->pi->wr[v->d]);
411 if (v->pi->wr[v->d]->trank == 0)
412 v->abase->simd_set_ui_at(v->abase, v->abase->vec_coeff_ptr(v->abase, v->v, j - v->i0), k, 1);
413 serialize_threads(v->pi->wr[v->d]);
414 }
415 }
416 }
417
mmt_vec_add_basis_vector(mmt_vec_ptr v,unsigned int j)418 void mmt_vec_add_basis_vector(mmt_vec_ptr v, unsigned int j)
419 {
420 mmt_vec_add_basis_vector_at(v, 0, j);
421 }
422
mmt_vec_set_basis_vector(mmt_vec_ptr v,unsigned int j)423 void mmt_vec_set_basis_vector(mmt_vec_ptr v, unsigned int j)
424 {
425 mmt_vec_set_basis_vector_at(v, 0, j);
426 }
427
mmt_vec_downgrade_consistency(mmt_vec_ptr v)428 void mmt_vec_downgrade_consistency(mmt_vec_ptr v)
429 {
430 ASSERT_ALWAYS(v->consistency == 2);
431 size_t erase[2][2];
432 size_t off = mmt_my_own_offset_in_items(v);
433 size_t sz = mmt_my_own_size_in_items(v);
434 serialize_threads(v->pi->wr[v->d]);
435 if (v->siblings) {
436 erase[0][0] = 0;
437 erase[0][1] = off;
438 erase[1][0] = off + sz;
439 erase[1][1] = v->i1 - v->i0;
440 } else {
441 /* There are no siblings, which means that this vector is shared
442 * across all threads in this direction. Let only one thread do
443 * the job.
444 */
445 if (v->pi->wr[v->d]->trank == 0) {
446 erase[0][0] = 0;
447 /* because we are rank 0, this is the minimal offset for this set
448 * of threads */
449 erase[0][1] = off;
450 erase[1][0] = off + sz * v->pi->wr[v->d]->ncores;
451 erase[1][1] = v->i1 - v->i0;
452 } else {
453 erase[0][0] = 0;
454 erase[0][1] = 0;
455 erase[1][0] = 0;
456 erase[1][1] = 0;
457 }
458 }
459 for(int i = 0 ; i < 2 ; i++) {
460 if (erase[i][1] != erase[i][0]) {
461 v->abase->vec_set_zero(v->abase,
462 v->abase->vec_subvec(v->abase, v->v, erase[i][0]),
463 erase[i][1] - erase[i][0]);
464 }
465 }
466 v->consistency = 1;
467 serialize_threads(v->pi->wr[v->d]);
468 }
469
470
471 #if 0
472 // looks utterly bogus, so...
473 /* On a shared vector which is assumed to be commonly known by all
474 * nodes/threads, select only one portion to remain, and zero out the
475 * rest (for later use by e.g. matmul_top_mul_comm or other integrated
476 * function).
477 */
478 static void mmt_own_vec_clear_complement(matmul_top_data_ptr mmt, int d)
479 {
480 mmt_comm_ptr mdst = mmt->wr[d];
481 pi_comm_ptr pidst = mmt->pi->wr[d];
482 if (mdst->v->flags & THREAD_SHARED_VECTOR)
483 serialize_threads(pidst);
484 if (pidst->trank == 0 || !(mdst->v->flags & THREAD_SHARED_VECTOR)) {
485 if (pidst->jrank == 0 && pidst->trank == 0) {
486 /* ok, we keep the data */
487 } else {
488 mdst->v->abase->vec_set_zero(mdst->v->abase, mdst->v->v, mdst->i1 - mdst->i0);
489 }
490 }
491 }
492 #endif
mmt_vec_clear_padding(mmt_vec_ptr v,size_t unpadded,size_t padded)493 void mmt_vec_clear_padding(mmt_vec_ptr v, size_t unpadded, size_t padded)
494 {
495 /* This can be applied no matter what the consistency argument says
496 * */
497 serialize(v->pi->m);
498 if (unpadded >= padded) return;
499
500 size_t s0 = unpadded >= v->i0 ? (unpadded - v->i0) : 0;
501 size_t s1 = padded >= v->i0 ? (padded - v->i0) : 0;
502 s0 = MIN(s0, v->i1 - v->i0);
503 s1 = MIN(s1, v->i1 - v->i0);
504
505 if (s1 - s0)
506 v->abase->vec_set_zero(v->abase,
507 v->abase->vec_subvec(v->abase, v->v, s0), s1-s0);
508
509 serialize(v->pi->m);
510 }
511
mmt_vec_sibling(mmt_vec_ptr v,unsigned int i)512 mmt_vec_ptr mmt_vec_sibling(mmt_vec_ptr v, unsigned int i)
513 {
514 if (v->siblings) {
515 return v->siblings[i];
516 } else {
517 return v;
518 }
519 }
520
521 /* {{{ mmt_vec_broadcast (generic interface) */
522 /* mmt_vec_broadcast reads data in mmt->wr[d]->v, and broadcasts it across the
523 * communicator mmt->pi->wr[d] ; eventually everybody on the communicator
524 * mmt->pi->wr[d] has the data.
525 *
526 * Note that the combination of mmt_vec_reduce + mmt_vec_broadcast is not the
527 * identity (because of the shuffled_product).
528 */
529
530 /* XXX
531 * ``across'' (horizontally) and ``down'' (vertically) are just here for
532 * exposition. The binding of this operation to job/thread arrangement
533 * is flexible, through argument d.
534 * XXX
535 */
536 void
mmt_vec_broadcast(mmt_vec_ptr v)537 mmt_vec_broadcast(mmt_vec_ptr v)
538 {
539 ASSERT_ALWAYS(v != NULL);
540 ASSERT_ALWAYS(v->consistency == 1);
541
542 // broadcasting down columns is for common agreement on a right
543 // source vector, once a left output has been computed.
544 //
545 // broadcasting down a column is when d == 1.
546 int err;
547
548 /* communicator wr is in the direction we are broadcasting */
549 pi_comm_ptr wr = v->pi->wr[v->d];
550
551 /* communicator xwr is in the other direction */
552 pi_comm_ptr xwr = v->pi->wr[!v->d];
553 mmt_vec_ptr * xwrpals = v->wrpals[!v->d];
554
555 pi_log_op(v->pi->m, "[%s:%d] enter first loop", __func__, __LINE__);
556 /* Make sure that no thread on the column is wandering in other
557 * places -- when we're leaving reduce, this is important. */
558 serialize_threads(wr);
559
560 /* This loop suffers from two-dimensional serializing, so the
561 * simplistic macros SEVERAL_THREADS_PLAY_MPI_BEGIN and END do not
562 * help here.
563 */
564
565 size_t eblock = mmt_my_own_size_in_items(v);
566
567 #ifndef MPI_LIBRARY_MT_CAPABLE
568 if (v->siblings) {
569 /* not shared: begin by collecting everything on thread 0 */
570 mmt_own_vec_set2(v, v->siblings[0], v);
571 }
572 serialize_threads(v->pi->m);
573 if (wr->trank == 0 && xwr->trank == 0) {
574 #if AG_CHOICE == AG_CHOICE_STOCK_IAG
575 MPI_Request * req = malloc(xwr->ncores * sizeof(MPI_Request));
576 #endif /* AG_CHOICE == AG_CHOICE_STOCK_IAG */
577
578 for(unsigned int t = 0 ; t < xwr->ncores ; t++) {
579 // although the openmpi man page looks funny, I'm assuming that
580 // MPI_Allgather wants MPI_IN_PLACE as a sendbuf argument.
581 pi_log_op(wr, "[%s:%d] MPI_Allgather (round %u)", __func__, __LINE__, t);
582 #if AG_CHOICE == AG_CHOICE_STOCK_IAG
583 err = MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
584 xwrpals[t]->v, v->abase->vec_elt_stride(v->abase, 1) * eblock * wr->ncores, MPI_BYTE, wr->pals, &req[t]);
585 #elif AG_CHOICE == AG_CHOICE_STOCK_AG
586 err = MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, xwrpals[t]->v, v->abase->vec_elt_stride(v->abase, 1) * eblock * wr->ncores, MPI_BYTE, wr->pals);
587 #else /* AG_CHOICE */
588 #error "Bad AG_CHOICE setting"
589 #endif /* AG_CHOICE */
590 pi_log_op(wr, "[%s:%d] MPI_Allgather (round %u) done", __func__, __LINE__, t);
591 ASSERT_ALWAYS(!err);
592 }
593
594 #if AG_CHOICE == AG_CHOICE_STOCK_IAG
595 for(unsigned int t = 0 ; t < xwr->ncores ; t++) {
596 MPI_Wait(&req[t], MPI_STATUS_IGNORE);
597 }
598 free(req);
599 #endif /* AG_CHOICE == AG_CHOICE_STOCK_IAG */
600 }
601 v->consistency = 2;
602 serialize_threads(v->pi->m);
603 if (v->siblings) {
604 mmt_full_vec_set(v, v->siblings[0]);
605 serialize_threads(wr);
606 }
607 #else /* MPI_LIBRARY_MT_CAPABLE */
608 /* Code deleted 20110119, as I've never been able to have enough
609 * trust in an MPI implementation to check this */
610 ASSERT_ALWAYS(0);
611 #endif /* MPI_LIBRARY_MT_CAPABLE */
612
613 pi_log_op(v->pi->m, "[%s:%d] trailer", __func__, __LINE__);
614 }
615 /* }}} */
616
617 /* {{{ generic interfaces for load/save */
618 /* {{{ load */
mmt_vec_load(mmt_vec_ptr v,const char * filename_pattern,unsigned int itemsondisk,unsigned int block_position)619 int mmt_vec_load(mmt_vec_ptr v, const char * filename_pattern, unsigned int itemsondisk, unsigned int block_position)
620 {
621 ASSERT_ALWAYS(v != NULL);
622 serialize(v->pi->m);
623 int tcan_print = v->pi->m->trank == 0 && v->pi->m->jrank == 0;
624
625 ASSERT_ALWAYS(strstr(filename_pattern, "%u-%u") != NULL);
626
627 mpz_t p;
628 mpz_init(p);
629 v->abase->field_characteristic(v->abase, p);
630 int char2 = mpz_cmp_ui(p, 2) == 0;
631 mpz_clear(p);
632 int splitwidth = char2 ? 64 : 1;
633 unsigned int Adisk_width = splitwidth;
634 unsigned int Adisk_multiplex = v->abase->simd_groupsize(v->abase) / Adisk_width;
635
636 size_t sizeondisk = v->abase->vec_elt_stride(v->abase, itemsondisk);
637 void * mychunk = mmt_my_own_subvec(v);
638 size_t mysize = mmt_my_own_size_in_bytes(v);
639 size_t bigstride = v->abase->vec_elt_stride(v->abase, 1);
640 size_t smallstride = bigstride / Adisk_multiplex;
641
642 int global_ok = 1;
643
644 for(unsigned int b = 0 ; global_ok && b < Adisk_multiplex ; b++) {
645 unsigned int b0 = block_position + b * Adisk_width;
646 char * filename;
647 int rc;
648 rc = asprintf(&filename, filename_pattern, b0, b0 + splitwidth);
649 ASSERT_ALWAYS(rc >= 0);
650 double tt = -wct_seconds();
651 if (tcan_print) {
652 printf("Loading %s ...", filename);
653 fflush(stdout);
654 }
655 pi_file_handle f;
656 int ok = pi_file_open(f, v->pi, v->d, filename, "rb");
657 /* "ok" is globally consistent after pi_file_open */
658 if (!ok) {
659 if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
660 fprintf(stderr, "ERROR: failed to load %s: %s\n", filename, strerror(errno));
661 }
662 } else {
663 ASSERT_ALWAYS(v != NULL);
664 serialize(v->pi->m);
665 ssize_t s = pi_file_read_chunk(f, mychunk, mysize, sizeondisk,
666 bigstride, b * smallstride, (b+1) * smallstride);
667 int ok = s >= 0 && (size_t) s == sizeondisk / Adisk_multiplex;
668 /* "ok" is globally consistent after pi_file_read_chunk */
669 if (!ok) {
670 if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
671 fprintf(stderr, "ERROR: failed to load %s: short read, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
672 }
673 }
674 v->consistency = ok;
675 /* not clear it's useful, but well. */
676 if (ok) mmt_vec_broadcast(v);
677 serialize_threads(v->pi->m);
678 pi_file_close(f);
679 }
680 free(filename);
681 tt += wct_seconds();
682 if (ok && tcan_print) {
683 char buf[20], buf2[20];
684 printf(" done [%s in %.2fs, %s/s]\n",
685 size_disp(sizeondisk / Adisk_multiplex, buf),
686 tt,
687 size_disp(sizeondisk / Adisk_multiplex / tt, buf2));
688 }
689 global_ok = global_ok && ok;
690 }
691
692 serialize_threads(v->pi->m);
693 return global_ok;
694 }
695 /* }}} */
696 /* {{{ save */
mmt_vec_save(mmt_vec_ptr v,const char * filename_pattern,unsigned int itemsondisk,unsigned int block_position)697 int mmt_vec_save(mmt_vec_ptr v, const char * filename_pattern, unsigned int itemsondisk, unsigned int block_position)
698 {
699 serialize_threads(v->pi->m);
700 int tcan_print = v->pi->m->trank == 0 && v->pi->m->jrank == 0;
701
702 ASSERT_ALWAYS(strstr(filename_pattern, "%u-%u") != NULL);
703
704 mpz_t p;
705 mpz_init(p);
706 v->abase->field_characteristic(v->abase, p);
707 int char2 = mpz_cmp_ui(p, 2) == 0;
708 mpz_clear(p);
709 int splitwidth = char2 ? 64 : 1;
710 unsigned int Adisk_width = splitwidth;
711 unsigned int Adisk_multiplex = v->abase->simd_groupsize(v->abase) / Adisk_width;
712
713 size_t sizeondisk = v->abase->vec_elt_stride(v->abase, itemsondisk);
714 void * mychunk = mmt_my_own_subvec(v);
715 size_t mysize = mmt_my_own_size_in_bytes(v);
716 size_t bigstride = v->abase->vec_elt_stride(v->abase, 1);
717 size_t smallstride = bigstride / Adisk_multiplex;
718
719 int global_ok = 1;
720
721 for(unsigned int b = 0 ; b < Adisk_multiplex ; b++) {
722 unsigned int b0 = block_position + b * Adisk_width;
723 char * filename;
724 int rc = asprintf(&filename, filename_pattern, b0, b0 + splitwidth);
725 ASSERT_ALWAYS(rc >= 0);
726 char * tmpfilename;
727 rc = asprintf(&tmpfilename, "%s.tmp", filename);
728 ASSERT_ALWAYS(rc >= 0);
729 double tt = -wct_seconds();
730 if (tcan_print) {
731 printf("Saving %s ...", filename);
732 fflush(stdout);
733 }
734 pi_file_handle f;
735 int ok = pi_file_open(f, v->pi, v->d, tmpfilename, "wb");
736 /* "ok" is globally consistent after pi_file_open */
737 if (!ok) {
738 if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
739 fprintf(stderr, "WARNING: failed to save %s: %s\n", filename, strerror(errno));
740 unlink(tmpfilename); // just in case
741 }
742 } else {
743 ASSERT_ALWAYS(v != NULL);
744 ASSERT_ALWAYS(v->consistency == 2);
745 serialize_threads(v->pi->m);
746 ssize_t s = pi_file_write_chunk(f, mychunk, mysize, sizeondisk,
747 bigstride, b * smallstride, (b+1) * smallstride);
748 serialize_threads(v->pi->m);
749 ok = s >= 0 && (size_t) s == sizeondisk / Adisk_multiplex;
750 /* "ok" is globally consistent after pi_file_write_chunk */
751 if (!ok && v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
752 fprintf(stderr, "ERROR: failed to save %s: short write, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
753 }
754 ok = pi_file_close(f);
755 if (!ok && v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
756 fprintf(stderr, "ERROR: failed to save %s: failed fclose, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
757 }
758 if (v->pi->m->trank == 0 && v->pi->m->jrank == 0) {
759 ok = rename(tmpfilename, filename) == 0;
760 if (!ok) {
761 fprintf(stderr, "ERROR: failed to save %s: failed rename, %s\n", filename, errno ? strerror(errno) : "no error reported via errno");
762 }
763 }
764 pi_bcast(&ok, 1, BWC_PI_INT, 0, 0, v->pi->m);
765 }
766 free(filename);
767 free(tmpfilename);
768 tt += wct_seconds();
769 if (tcan_print) {
770 char buf[20], buf2[20];
771 printf(" done [%s in %.2fs, %s/s]\n",
772 size_disp(sizeondisk / Adisk_multiplex, buf),
773 tt,
774 size_disp(sizeondisk / Adisk_multiplex / tt, buf2));
775 }
776 global_ok = global_ok && ok;
777 }
778
779 serialize_threads(v->pi->m);
780 return global_ok;
781 }
782 /* }}} */
783 /* }}} */
784
mmt_vec_reduce_mod_p(mmt_vec_ptr v)785 void mmt_vec_reduce_mod_p(mmt_vec_ptr v)
786 {
787 /* if it so happens that there's no reduce function defined, it may
788 * correspond to a case where we have nothing to do, like over GF(2)
789 * -- where unreduced elements are just the same, period */
790 if (!v->abase->reduce)
791 return;
792 void * ptr = mmt_my_own_subvec(v);
793 void * tmp;
794 v->abase->vec_ur_init(v->abase, &tmp, 1);
795 for(size_t i = 0 ; i < mmt_my_own_size_in_items(v) ; i++) {
796 v->abase->elt_ur_set_elt(v->abase,
797 tmp,
798 v->abase->vec_coeff_ptr_const(v->abase, ptr, i));
799 v->abase->reduce(v->abase,
800 v->abase->vec_coeff_ptr(v->abase, ptr, i),
801 tmp);
802 }
803 v->abase->vec_ur_clear(v->abase, &tmp, 1);
804 }
805
806
807
808 //////////////////////////////////////////////////////////////////////////
809
matmul_top_mul(matmul_top_data_ptr mmt,mmt_vec * v,struct timing_data * tt)810 void matmul_top_mul(matmul_top_data_ptr mmt, mmt_vec * v, struct timing_data * tt)/*{{{*/
811 {
812 /* Do all matrices in turn.
813 *
814 * We represent M as * M0*M1*..*M_{n-1}.
815 *
816 * The input vector is v[0], and the result is put in v[0] again.
817 *
818 * The direction in which we apply the product is given by v[0]->d.
819 * For v[0]->d == 0, we do v[0]*M. For v[0]->d==1, we do M*v[0]
820 *
821 * We use temporaries as follows.
822 * For v[0]->d == 0:
823 * v[1] <- v[0] * M0 ; v[1]->d == 1
824 * v[2] <- v[1] * M1 ; v[2]->d == 0
825 * if n is odd:
826 * v[n] <- v[n-1] * M_{n-1} ; v[n]->d == 1
827 * if n is even:
828 * v[0] <- v[n-1] * M_{n-1} ; v[0]->d == 0
829 *
830 * For v[0]->d == 1:
831 * v[1] <- M0 * v[0] ; v[1]->d == 0
832 * v[2] <- M1 * v[1] ; v[2]->d == 1
833 * if n is odd:
834 * v[n] <- M_{n-1} * v[n-1] ; v[n]->d == 0
835 * if n is even:
836 * v[0] <- M_{n-1} * v[n-1] ; v[0]->d == 1
837 *
838 * This has the consequence that v must hold exactly n+(n&1) vectors.
839 *
840 * Appropriate communication operations are run after each step.
841 *
842 * If tt is not NULL, it should be a timing_data structure holding
843 * exactly 4*n timers (or only 4, conceivably). Timers are switched
844 * exactly that many times.
845 *
846 * If the mmt->pi->interleaving setting is on, we interleave
847 * computations and communications. We do 2n flips. Communications
848 * are forbidden both before and after calls to this function (in the
849 * directly adjacent code fragments before the closest flip() call,
850 * that is).
851 */
852
853 int d = v[0]->d;
854 int nmats_odd = mmt->nmatrices & 1;
855 int midx = (d ? (mmt->nmatrices - 1) : 0);
856 for(int l = 0 ; l < mmt->nmatrices ; l++) {
857 mmt_vec_ptr src = v[l];
858 int last = l == (mmt->nmatrices - 1);
859 int lnext = last && !nmats_odd ? 0 : (l+1);
860 mmt_vec_ptr dst = v[lnext];
861
862 ASSERT_ALWAYS(src->consistency == 2);
863 matmul_top_mul_cpu(mmt, midx, d, dst, src);
864 ASSERT_ALWAYS(dst->consistency == 0);
865
866 timing_next_timer(tt);
867 /* now measuring jitter */
868 pi_interleaving_flip(mmt->pi);
869 serialize(mmt->pi->m);
870 timing_next_timer(tt);
871
872 /* Now we can resume MPI communications. */
873 if (last && nmats_odd) {
874 ASSERT_ALWAYS(lnext == mmt->nmatrices);
875 matmul_top_mul_comm(v[0], dst);
876 } else {
877 mmt_vec_allreduce(dst);
878 }
879 timing_next_timer(tt);
880 /* now measuring jitter */
881 pi_interleaving_flip(mmt->pi);
882 serialize(mmt->pi->m);
883
884 timing_next_timer(tt);
885 midx += d ? -1 : 1;
886 }
887 ASSERT_ALWAYS(v[0]->consistency == 2);
888 }
889 /*}}}*/
890
891 #if 0/*{{{*/
892 /* no longer used -- was only used by prep.
893 * It's not buggy, but making this work in a context where we have
894 * multiple threads is tricky.
895 */
896 void matmul_top_fill_random_source_generic(matmul_top_data_ptr mmt, size_t stride, mmt_vec_ptr v, int d)
897 {
898 if (v == NULL) v = mmt->wr[d]->v;
899
900 // In conjugation mode, it is possible to fill exactly the data chunk
901 // that will eventually be relevant. However, it's easy enough to
902 // fill our output vector with garbage, and do mmt_vec_broadcast
903 // afterwards...
904 if ((v->flags & THREAD_SHARED_VECTOR) == 0 || mmt->pi->wr[d]->trank == 0)
905 mpfq_generic_random(stride, v->v, mmt->wr[d]->i1 - mmt->wr[d]->i0);
906
907 // reconcile all cells which correspond to the same vertical block.
908 mmt_vec_broadcast(mmt, v, d);
909 }
910 #endif/*}}}*/
911
912 /* {{{ mmt_vec_reduce */
913 /* {{{ various reduce_scatter implementations */
914 /* {{{ alternative_reduce_scatter */
915 #if 1 || RS_CHOICE == RS_CHOICE_MINE
alternative_reduce_scatter(mmt_vec_ptr v)916 void alternative_reduce_scatter(mmt_vec_ptr v)
917 {
918 pi_comm_ptr wr = v->pi->wr[v->d];
919 int njobs = wr->njobs;
920 int rank = wr->jrank;
921 MPI_Datatype t = v->pitype->datatype;
922
923 size_t eitems = mmt_my_own_size_in_items(v) * wr->ncores;
924 size_t needed = v->abase->vec_elt_stride(v->abase, eitems);
925
926 if (v->rsbuf_size < needed) {
927 ASSERT_ALWAYS(v->rsbuf_size == 0);
928 v->rsbuf[0] = realloc(v->rsbuf[0], needed);
929 v->rsbuf[1] = realloc(v->rsbuf[1], needed);
930 v->rsbuf_size = needed;
931 }
932
933 void *b[2];
934 b[0] = v->rsbuf[0];
935 b[1] = v->rsbuf[1];
936 v->abase->vec_set_zero(v->abase, b[0], eitems);
937
938 int l = (rank + 1) % njobs;
939 int srank = (rank + 1) % njobs;
940 int drank = (rank + njobs - 1) % njobs;
941
942 for (int i = 0; i < njobs; i++) {
943 int j0, j1;
944 j0 = l * eitems;
945 j1 = j0 + eitems;
946 v->abase->vec_add(v->abase, b[0], b[0],
947 v->abase->vec_subvec(v->abase, mmt_vec_sibling(v, 0)->v, j0),
948 j1-j0);
949 if (i == njobs - 1)
950 break;
951 MPI_Sendrecv(b[0], eitems, t, drank, (i<<16) + rank,
952 b[1], eitems, t, srank, (i<<16) + srank,
953 wr->pals, MPI_STATUS_IGNORE);
954 void * tb = b[0]; b[0] = b[1]; b[1] = tb;
955 l = (l + 1) % njobs;
956 }
957 /* This is going at the beginning of the memory area, which is weird.
958 * But it's the convention used by the MPI call...
959 */
960 v->abase->vec_set(v->abase, mmt_vec_sibling(v, 0)->v, b[0], eitems);
961 }
962 #endif /* RS_CHOICE == RS_CHOICE_MINE */
963 /* }}} */
964 /* {{{ alternative_reduce_scatter_parallel */
965 #if 1 || RS_CHOICE == RS_CHOICE_MINE_PARALLEL
966 /* Example data for a factoring matrix (rsa100) of size 135820*135692,
967 * split over 2x3 mpi jobs, and 7x5 threads.
968 *
969 * 2 == mmt->pi->wr[1]->njobs (number of jobs encountered on a vertical axis).
970 * 7 == mmt->pi->wr[1]->ncores (number of jobs per core on a vertical axis).
971 * 3 == mmt->pi->wr[0]->njobs (number of jobs encountered on an horiz. axis).
972 * 5 == mmt->pi->wr[0]->ncores (number of jobs per core on an horiz. axis).
973 *
974 * matrix is padded to a multiple of 210 = 2*3*5*7, which is * N=135870=210*647
975 *
976 * we'll call 647 the "small chunk" size.
977 *
978 * for all jobs/threads, the following relations hold:
979 * mmt->wr[0]->i1 - mmt->wr[0]->i0 == N/14 == 9705 == 15 * 647
980 * mmt->wr[1]->i1 - mmt->wr[1]->i0 == N/15 == 9058 == 14 * 647
981 *
982 * a mmt_vec_reduce operation, in the context of factoring, is with d==1
983 * below. Hence, in fact, we're doing a reduction down a column.
984 *
985 * the value eitems fed to this function is mmt->pi->wr[d]->ncores (here,
986 * 7) times the small chunk size. Here 7*647 == 4529.
987 */
988 /* all threads in mmt->wr[!d], one after another a priori, are going to
989 * do alternative_reduce_scatter on their vector v[i]
990 */
alternative_reduce_scatter_parallel(pi_comm_ptr xr,mmt_vec_ptr * vs)991 void alternative_reduce_scatter_parallel(pi_comm_ptr xr, mmt_vec_ptr * vs)
992 {
993 /* we write all data counts below with comments indicating the typical
994 * size in our toy example above */
995
996 /* he have xr->ncores vectors. The vs[] array accesses data from the
997 * other peers. Note that the pi structures belong separately to each
998 * peer, and so do the embedded communicators. Therefore the proper
999 * way to see "our" xr is none other that the given parameter. And for
1000 * "our" wr, then we'll have to look for our own vector within vs.
1001 */
1002 mmt_vec_ptr v = vs[xr->trank];
1003 mpfq_vbase_ptr ab = v->abase;
1004 pi_comm_ptr wr = v->pi->wr[v->d]; /* 2 jobs, 7 cores */
1005 /* what we're going to do will happen completely in parallel over
1006 * xr->njobs==3 communicators. We're simulating here the split into 5
1007 * communicators, even though those collide into a unique
1008 * communicator as far as MPI is concerned.
1009 */
1010 int njobs = wr->njobs; /* 2 */
1011 /* note that we no longer care at this point about what happened at
1012 * the thread level in our dimension. This is already done, period.
1013 */
1014 int rank = wr->jrank;
1015 MPI_Datatype t = v->pitype->datatype;
1016
1017 /* If the rsbuf[] buffers have not yet been allocated, it is time to
1018 * do so now. We also take the opportunity to possibly re-allocate
1019 * them if because of a larger abase, the corresponding storage has
1020 * to be expanded.
1021 */
1022 size_t eitems = mmt_my_own_size_in_items(v) * wr->ncores;
1023 size_t needed = v->abase->vec_elt_stride(v->abase, eitems);
1024
1025 /* notice that we are allocating a temp buffer only for one vector.
1026 * Of course, since this is a multithreaded routine, each thread in
1027 * xr is doing so at the same time */
1028 if (v->rsbuf_size < needed) {
1029 v->rsbuf[0] = realloc(v->rsbuf[0], needed);
1030 v->rsbuf[1] = realloc(v->rsbuf[1], needed);
1031 v->rsbuf_size = needed;
1032 }
1033
1034 ab->vec_set_zero(ab, v->rsbuf[0], eitems);
1035 serialize_threads(xr);
1036
1037 int srank = (rank + 1) % njobs;
1038 int drank = (rank + njobs - 1) % njobs;
1039
1040 /* We describe the algorithm for one of the xr->ncores==5 threads.
1041 * local vector areas [9058] split into [wr->njobs==2] sub-areas of
1042 * size [eitems (==4529)].
1043 *
1044 * on each job, each thread has 9058 == 2*4529 items
1045 * more generally: njobs * eitems items.
1046 *
1047 * each has a place where it wants to receive data, let's call that
1048 * "his" zone, even though everyone has data at all places.
1049 *
1050 */
1051 /* scheme for 4 jobs: (input, output. All chunks have size eitmes. AA
1052 * is the sum a3+a2+a1+a0)
1053 *
1054 * j0: a0 b0 c0 d0 j0: AA .. .. ..
1055 * j1: a1 b1 c1 d1 j1: .. BB .. ..
1056 * j2: a2 b2 c2 d2 j2: .. .. CC ..
1057 * j3: a3 b3 c3 d3 j3: .. .. .. DD
1058 *
1059 * loop 0: j0: sends b0 to j3, receives c1 from j1
1060 * loop 1: j0: sends c1+c0 to j3, receives d2+d1 from j1
1061 * loop 2: j0: sends d2+d1+d0 to j3, receives a3+a2+a1 from j1
1062 * loop 3: we only have to adjust.
1063 */
1064
1065 for (int i = 0, s = 0; i < njobs; i++, s=!s) {
1066 int l = (rank + 1 + i) % njobs;
1067 int j0 = l * eitems;
1068 int j1 = j0 + eitems;
1069 ab->vec_add(ab, v->rsbuf[s], v->rsbuf[s],
1070 ab->vec_subvec(ab, mmt_vec_sibling(v, 0)->v, j0),
1071 j1-j0);
1072 serialize_threads(xr);
1073
1074 if (i == njobs - 1)
1075 break;
1076
1077 if (xr->trank == 0) {
1078 MPI_Request * r = NULL;
1079 r = malloc(2 * xr->ncores * sizeof(MPI_Request));
1080 for(unsigned int w = 0 ; w < xr->ncores ; w++) {
1081 MPI_Request * rs = r + 2*w;
1082 MPI_Request * rr = r + 2*w + 1;
1083 MPI_Isend(vs[w]->rsbuf[s], eitems, t, drank, 0xb00+w, wr->pals, rs);
1084 MPI_Irecv(vs[w]->rsbuf[!s], eitems, t, srank, 0xb00+w, wr->pals, rr);
1085 /*
1086 MPI_Sendrecv(vs[w]->rsbuf[s], eitems, t, drank, 0xbeef,
1087 vs[w]->rsbuf[!s], eitems, t, srank, 0xbeef,
1088 wr->pals, MPI_STATUS_IGNORE);
1089 */
1090 // MPI_Waitall(2, r + 2*w, MPI_STATUSES_IGNORE);
1091 }
1092 MPI_Waitall(2 * xr->ncores, r, MPI_STATUSES_IGNORE);
1093 free(r);
1094 }
1095 serialize_threads(xr);
1096 }
1097
1098 ab->vec_set(ab, mmt_vec_sibling(v, 0)->v, v->rsbuf[(njobs-1)&1], eitems);
1099
1100 pi_log_op(wr, "[%s:%d] MPI_Reduce_scatter done", __func__, __LINE__);
1101 }
1102 #endif /* RS_CHOICE == RS_CHOICE_MINE_PARALLEL */
1103 /* }}} */
1104 /* {{{ my_MPI_Reduce_scatter_block */
1105 #if 1 || RS_CHOICE == RS_CHOICE_MINE_DROP_IN
my_MPI_Reduce_scatter_block(void * sendbuf,void * recvbuf,int recvcount,MPI_Datatype datatype,MPI_Op op,MPI_Comm wr)1106 int my_MPI_Reduce_scatter_block(void *sendbuf, void *recvbuf, int recvcount,
1107 MPI_Datatype datatype, MPI_Op op, MPI_Comm wr)
1108 {
1109 ASSERT_ALWAYS(sendbuf == MPI_IN_PLACE);
1110 int njobs;
1111 int rank;
1112 MPI_Comm_size(wr, &njobs);
1113 MPI_Comm_rank(wr, &rank);
1114
1115 int tsize;
1116 MPI_Type_size(datatype, &tsize);
1117
1118 void * v = recvbuf;
1119
1120 /* This is a deliberate leak. Note that we expect to be serailized
1121 * here, so there is no concurrency issue with the static data. */
1122 static size_t rsbuf_size = 0;
1123 static unsigned long * rsbuf[2];
1124
1125 size_t needed = recvcount * tsize;
1126
1127 if (rsbuf_size < needed) {
1128 rsbuf[0] = realloc(rsbuf[0], needed);
1129 rsbuf[1] = realloc(rsbuf[1], needed);
1130 rsbuf_size = needed;
1131 }
1132
1133 memset(rsbuf[0], 0, recvcount * tsize);
1134
1135 int srank = (rank + 1) % njobs;
1136 int drank = (rank + njobs - 1) % njobs;
1137
1138 for (int i = 0, w = 0; i < njobs; i++, w^=1) {
1139 int j0 = ((rank + i + 1) % njobs) * recvcount;
1140 const void * share = pointer_arith_const(v, j0 * tsize);
1141 #if MPI_VERSION_ATLEAST(2,2)
1142 MPI_Reduce_local(share, rsbuf[w], recvcount, datatype, op);
1143 #else
1144 {
1145 ASSERT_ALWAYS(datatype == MPI_UNSIGNED_LONG);
1146 ASSERT_ALWAYS(op == MPI_BXOR);
1147 const unsigned long * a = share;
1148 unsigned long * b = rsbuf[w];
1149 for(int k = 0 ; k < recvcount ; k++) {
1150 b[k] ^= a[k];
1151 }
1152 }
1153 #endif
1154 if (i == njobs - 1) {
1155 memcpy(v, rsbuf[w], recvcount * tsize);
1156 break;
1157 }
1158 MPI_Sendrecv(rsbuf[w], recvcount, datatype, drank, (i<<16) + rank,
1159 rsbuf[!w], recvcount, datatype, srank, (i<<16) + srank,
1160 wr, MPI_STATUS_IGNORE);
1161 }
1162 return 0;
1163 }
1164 #endif
1165 /* }}} */
1166 /* }}} */
1167
1168 /* mmt_vec_reduce_inner reads data in v (vector for side d), sums it up
1169 * across the communicator mmt->pi->wr[d], and collects the results in
1170 * vector v again, except that it's put in thread0's buffer (counting
1171 * threads in direction d, of course), *AT THE BEGINNING* of the data
1172 * area (which is surprising).
1173 *
1174 * mmt_vec_reduce completes the work by saving the resulting data in vector w
1175 * (vector in dimension !d).
1176 *
1177 * Note that the combination of mmt_vec_reduce + mmt_vec_broadcast is not the
1178 * identity (because of the shuffled_product).
1179 */
1180 void
mmt_vec_reduce_inner(mmt_vec_ptr v)1181 mmt_vec_reduce_inner(mmt_vec_ptr v)
1182 {
1183 ASSERT_ALWAYS(v != NULL);
1184 ASSERT_ALWAYS(v->consistency != 2);
1185
1186 /* reducing across a row is when d == 0 */
1187 pi_comm_ptr wr = v->pi->wr[v->d];
1188 pi_comm_ptr xr = v->pi->wr[!v->d];
1189
1190 pi_log_op(v->pi->m, "[%s:%d] enter first loop", __func__, __LINE__);
1191
1192 // I don't think that the case of shared vectors has been tested
1193 // correctly for reduction. Well, to start with, I doubt it really
1194 // makes a lot of sense anyhow.
1195 // ASSERT_ALWAYS((v->flags & THREAD_SHARED_VECTOR) == 0);
1196
1197 if (wr->ncores > 1 && v->siblings) {
1198 /* row threads have to sum up their data. Of course it's
1199 * irrelevant when there is only one such thread...
1200 *
1201 * Concerning locking, we have to make sure that everybody on the
1202 * row has finished its computation task, but besides that,
1203 * there's no locking until we start mpi stuff.
1204 */
1205 pi_log_op(wr, "[%s:%d] serialize_threads", __func__, __LINE__);
1206 serialize_threads(wr);
1207 pi_log_op(wr, "[%s:%d] serialize_threads done", __func__, __LINE__);
1208
1209 /* Our [i0,i1[ range is split into wr->ncores parts. This range
1210 * represent coordinates which are common to all threads on
1211 * wr. Corresponding data has to be summed. Amongst the
1212 * wr->ncores threads, thread k takes the responsibility of
1213 * summing data in the k-th block (that is, indices
1214 * i0+k*(i1-i0)/wr->ncores and further). As a convention,
1215 * the thread which eventually owns the final data is thread 0.
1216 *
1217 * Note that one should consider that data in threads other than
1218 * the destination thread may be clobbered by the operation
1219 * (although in the present implementation it is not).
1220 */
1221 size_t thread_chunk = wr->njobs * mmt_my_own_size_in_items(v);
1222 void * dptr = v->abase->vec_subvec(v->abase,
1223 mmt_vec_sibling(v, 0)->v,
1224 wr->trank * thread_chunk);
1225 for(unsigned int w = 1 ; w < wr->ncores ; w++) {
1226 const void * sptr = v->abase->vec_subvec(v->abase,
1227 mmt_vec_sibling(v, w)->v,
1228 wr->trank * thread_chunk);
1229 v->abase->vec_add(v->abase, dptr, dptr, sptr, thread_chunk);
1230 }
1231 pi_log_op(wr, "[%s:%d] thread reduction done", __func__, __LINE__);
1232 }
1233
1234 /* Good. Now on each node, thread 0 has the reduced data for the
1235 * range [i0..i1[ (well, actually, this corresponds to indices
1236 * distributed in a funny manner in the original matrix, but as far
1237 * as we care here, it's really the range v->i0..v->i1
1238 *
1239 * These data areas must now be reduced or reduce-scattered across
1240 * nodes. Note that in the case where the MPI library does not
1241 * support MT operation, we are performing the reduction in place,
1242 * and copy to the buffer in the other direction is done in a
1243 * secondary step -- which will be among the reasons which imply a
1244 * thread serialization before anything interesting can be done after
1245 * this function.
1246 */
1247
1248 /* XXX We have to serialize threads here. At least row-wise, so that
1249 * the computations above are finished. Easily seen, that's just a
1250 * couple of lines above.
1251 *
1252 * Unfortunately, that's not the whole story. We must also serialize
1253 * column-wise. Because we're writing on the right vector buffers,
1254 * which are used as input for the matrix multiplication code --
1255 * which may be still running at this point because there has been no
1256 * column serialization in the current procedure yet.
1257 */
1258
1259 pi_log_op(v->pi->m, "[%s:%d] secondary loop", __func__, __LINE__);
1260
1261 pi_log_op(v->pi->m, "[%s:%d] serialize_threads", __func__, __LINE__);
1262 serialize_threads(wr);
1263 pi_log_op(v->pi->m, "[%s:%d] serialize_threads done", __func__, __LINE__);
1264
1265 #ifndef MPI_LIBRARY_MT_CAPABLE
1266 if (wr->trank == 0) {
1267 /* openmpi-1.8.2 does not seem to have a working non-blocking
1268 * reduce_scatter, at least not a very efficient one. All
1269 * I've been able to do is to run MPI_Ireduce_scatter with
1270 * MPI_UNSIGNED_LONG and MPI_BXOR. With custom types it seems
1271 * to crash. And anyway, it's very inefficient.
1272 */
1273
1274 ASSERT((v->i1 - v->i0) % wr->totalsize == 0);
1275 #if RS_CHOICE == RS_CHOICE_STOCK_RS
1276 void * dptr = mmt_vec_sibling(v, 0)->v;
1277 SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1278 // all recvcounts are equal
1279 int * rc = malloc(wr->njobs * sizeof(int));
1280 for(unsigned int k = 0 ; k < wr->njobs ; k++)
1281 rc[k] = (v->i1 - v->i0) / wr->njobs;
1282 int err = MPI_Reduce_scatter(dptr, dptr, rc,
1283 v->pitype->datatype,
1284 BWC_PI_SUM->custom,
1285 wr->pals);
1286 free(rc);
1287 ASSERT_ALWAYS(!err);
1288 }
1289 SEVERAL_THREADS_PLAY_MPI_END();
1290 #elif RS_CHOICE == RS_CHOICE_STOCK_RSBLOCK
1291 void * dptr = mmt_vec_sibling(v, 0)->v;
1292 SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1293 int err = MPI_Reduce_scatter_block(dptr, dptr,
1294 (v->i1 - v->i0) / wr->njobs,
1295 v->pitype->datatype,
1296 BWC_PI_SUM->custom,
1297 wr->pals);
1298 ASSERT_ALWAYS(!err);
1299 }
1300 SEVERAL_THREADS_PLAY_MPI_END();
1301 #elif RS_CHOICE == RS_CHOICE_MINE_DROP_IN
1302 void * dptr = mmt_vec_sibling(v, 0)->v;
1303 SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1304 int err = my_MPI_Reduce_scatter_block(MPI_IN_PLACE, dptr,
1305 (v->i1 - v->i0) / wr->njobs,
1306 v->pitype->datatype,
1307 BWC_PI_SUM->custom,
1308 wr->pals);
1309 ASSERT_ALWAYS(!err);
1310 }
1311 SEVERAL_THREADS_PLAY_MPI_END();
1312 #elif RS_CHOICE == RS_CHOICE_STOCK_IRSBLOCK
1313 void * dptr = mmt_vec_sibling(v, 0)->v;
1314 MPI_Request * req = shared_malloc(xr, xr->ncores * sizeof(MPI_Request));
1315 SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1316 int err = MPI_Ireduce_scatter_block(dptr, dptr,
1317 (v->i1 - v->i0) / wr->njobs,
1318 v->pitype->datatype,
1319 BWC_PI_SUM->custom,
1320 wr->pals, &req[t__]);
1321 ASSERT_ALWAYS(!err);
1322 pi_log_op(wr, "[%s:%d] MPI_Reduce_scatter done", __func__, __LINE__);
1323 }
1324 SEVERAL_THREADS_PLAY_MPI_END();
1325 serialize_threads(xr);
1326 if (xr->trank == 0) {
1327 for(unsigned int t = 0 ; t < xr->ncores ; t++) {
1328 MPI_Wait(&req[t], MPI_STATUS_IGNORE);
1329 }
1330 }
1331 shared_free(xr, req);
1332 #elif RS_CHOICE == RS_CHOICE_STOCK_IRS
1333 void * dptr = mmt_vec_sibling(v, 0)->v;
1334 MPI_Request * req = shared_malloc(xr, xr->ncores * sizeof(MPI_Request));
1335 int * rc = malloc(wr->njobs * sizeof(int));
1336 for(unsigned int k = 0 ; k < wr->njobs ; k++)
1337 rc[k] = (v->i1 - v->i0) / wr->njobs;
1338 SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1339 int err = MPI_Ireduce_scatter(dptr, dptr,
1340 rc,
1341 v->pitype->datatype,
1342 BWC_PI_SUM->custom,
1343 wr->pals, &req[t__]);
1344 free(rc);
1345 ASSERT_ALWAYS(!err);
1346 pi_log_op(wr, "[%s:%d] MPI_Reduce_scatter done", __func__, __LINE__);
1347 }
1348 SEVERAL_THREADS_PLAY_MPI_END();
1349 serialize_threads(xr);
1350 if (xr->trank == 0) {
1351 for(unsigned int t = 0 ; t < xr->ncores ; t++) {
1352 MPI_Wait(&req[t], MPI_STATUS_IGNORE);
1353 }
1354 }
1355 shared_free(xr, req);
1356 #elif RS_CHOICE == RS_CHOICE_MINE
1357 /* This strategy exposes code which is really similar to
1358 * RS_CHOICE_MINE_DROP_IN, with the only exception that we
1359 * have a slightly different interface. There's no reason for
1360 * both to stay in the long run.
1361 */
1362 SEVERAL_THREADS_PLAY_MPI_BEGIN(xr) {
1363 alternative_reduce_scatter(v);
1364 }
1365 SEVERAL_THREADS_PLAY_MPI_END();
1366 #elif RS_CHOICE == RS_CHOICE_MINE_PARALLEL
1367 mmt_vec_ptr * vs = shared_malloc(xr, xr->ncores * sizeof(mmt_vec_ptr));
1368 vs[xr->trank] = v;
1369 serialize_threads(xr);
1370 alternative_reduce_scatter_parallel(xr, vs);
1371 shared_free(xr, vs);
1372 #elif RS_CHOICE == RS_CHOICE_MINE_OVERLAPPING
1373 #error "not implemented, but planned"
1374 #endif
1375 }
1376 serialize_threads(wr);
1377 }
1378 void
mmt_vec_reduce(mmt_vec_ptr w,mmt_vec_ptr v)1379 mmt_vec_reduce(mmt_vec_ptr w, mmt_vec_ptr v)
1380 {
1381 ASSERT_ALWAYS(v != NULL);
1382 ASSERT_ALWAYS(w != NULL);
1383 ASSERT_ALWAYS(v->abase == w->abase);
1384 ASSERT_ALWAYS(v->d != w->d);
1385 mmt_vec_reduce_inner(v);
1386 pi_comm_ptr wr = v->pi->wr[v->d];
1387 // row threads pick what they're interested in in thread0's reduced
1388 // buffer. Writes are non-overlapping in the mcol buffer here.
1389 // Different row threads always have different mcol buffers, and
1390 // sibling col threads write to different locations in their
1391 // (generally shared) mcol buffer, depending on which row they
1392 // intersect.
1393
1394 // Notice that results are being written inside v->all_v[0],
1395 // just packed at the beginning.
1396
1397 // row job rj, row thread rt has a col buffer containing
1398 // picol->totalsize blocks of size eblock. col job cj, col
1399 // thread ct has in its row buffer pirow->totalsize blocks, but
1400 // only virtually. Because of the reduce_scatter operation, only
1401 // pirow->ncores blocks are here.
1402 //
1403 // Thus the k-th block in the row buffer is rather understood as
1404 // the one of indek rj * pirow->ncores + k in the data the row
1405 // threads have collectively computed.
1406 //
1407 // This block is of interest to the row thread of index k of
1408 // course, thus we restrict to rt==k
1409 //
1410 // Now among the picol->totalsize blocks of the col buffer, this
1411 // will go to position cj * picol->ncores + ct
1412
1413 size_t eblock = mmt_my_own_size_in_items(v);
1414 ASSERT_ALWAYS(mmt_my_own_size_in_items(w) == eblock);
1415
1416 v->abase->vec_set(v->abase,
1417 mmt_my_own_subvec(w),
1418 /* Note: reduce-scatter packs everything at the beginning in
1419 * the leader block, which is why we don't have our usual offset
1420 * here. */
1421 v->abase->vec_subvec(v->abase,
1422 mmt_vec_sibling(v, 0)->v, wr->trank * eblock),
1423 eblock);
1424 #else /* MPI_LIBRARY_MT_CAPABLE */
1425 /* Code deleted 20110119, as I've never been able to have enough
1426 * trust in an MPI implementation to check this */
1427 ASSERT_ALWAYS(0);
1428 #endif
1429
1430 // as usual, we do not serialize on exit. Up to the next routine to
1431 // do so if needed.
1432 //
1433 // what _is_ guaranteed is that in each column, the _leader_ thread
1434 // has all the necessary data to begin the column broadcast.
1435 //
1436 // In most cases, threads must be prevented from starting computation
1437 // before the leader thread has finished importing data. This means
1438 // that a column thread serialization is probably needed in most
1439 // circumstances after this step.
1440 w->consistency = 1;
1441 }
1442
1443 /* This small variant is used only for the transposition routines. We do
1444 * not want, in that case, to rely on the data moving back and forth
1445 * between the left and right vectors, because in full generality, we
1446 * have no guarantee that they are the same size.
1447 *
1448 * Therefore we're merely building upon mmt_vec_reduce_inner, only with
1449 * the weirdo "pack-at-the-beginning" behaviour removed.
1450 */
1451 void
mmt_vec_reduce_sameside(mmt_vec_ptr v)1452 mmt_vec_reduce_sameside(mmt_vec_ptr v)
1453 {
1454 pi_comm_ptr wr = v->pi->wr[v->d];
1455 mmt_vec_reduce_inner(v);
1456 size_t eblock = mmt_my_own_size_in_items(v);
1457 v->abase->vec_set(v->abase,
1458 mmt_my_own_subvec(v),
1459 v->abase->vec_subvec(v->abase,
1460 mmt_vec_sibling(v, 0)->v, wr->trank * eblock),
1461 eblock);
1462 /* This ensures that we've effectively *moved* the data, not copied
1463 * it */
1464 if (wr->trank) {
1465 v->abase->vec_set_zero(v->abase,
1466 v->abase->vec_subvec(v->abase,
1467 mmt_vec_sibling(v, 0)->v, wr->trank * eblock),
1468 eblock);
1469 }
1470 serialize_threads(wr);
1471 v->consistency = 1;
1472 }
1473 /* }}} */
1474
1475 /* {{{ mmt_vec_allreduce */
1476 /* This is only for convenience now. Eventually this will be relevant for
1477 * block Lanczos. Note that allreduce is conceptually much simpler.
1478 * There is no funny permutation to be considered.
1479 */
1480 void
mmt_vec_allreduce(mmt_vec_ptr v)1481 mmt_vec_allreduce(mmt_vec_ptr v)
1482 {
1483 ASSERT_ALWAYS(v != NULL);
1484 ASSERT_ALWAYS(v->consistency != 2);
1485 /* reducing across a row is when d == 0 */
1486 pi_comm_ptr wr = v->pi->wr[v->d];
1487
1488 pi_log_op(v->pi->m, "[%s:%d] enter first loop", __func__, __LINE__);
1489
1490 serialize_threads(v->pi->m);
1491 /* sum up row threads, so that only one thread on each row is used
1492 * for communication */
1493 size_t thread_chunk = wr->njobs * mmt_my_own_size_in_items(v);
1494 if (v->siblings) {
1495 void * dv = v->abase->vec_subvec(v->abase, v->v,
1496 wr->trank * thread_chunk);
1497 for(unsigned int k = 1 ; k < wr->ncores ; k++) {
1498 void * sv = v->abase->vec_subvec(v->abase,
1499 mmt_vec_sibling(v, (wr->trank+k) % wr->ncores)->v,
1500 wr->trank * thread_chunk);
1501 v->abase->vec_add(v->abase, dv, dv, sv, thread_chunk);
1502 }
1503 }
1504 /* Compared to the SEVERAL_THREADS_PLAY_MPI_BEGIN() approach, this
1505 * one has thread 0 do the work for all other threads, while other
1506 * threads are waiting.
1507 */
1508 SEVERAL_THREADS_PLAY_MPI_BEGIN2(v->pi->m, peer) {
1509 void * dv = v->abase->vec_subvec(v->abase, v->mpals[peer]->v,
1510 v->mpals[peer]->pi->wr[v->d]->trank * thread_chunk);
1511 MPI_Allreduce(MPI_IN_PLACE,
1512 dv,
1513 thread_chunk,
1514 v->pitype->datatype,
1515 BWC_PI_SUM->custom,
1516 wr->pals);
1517 }
1518 SEVERAL_THREADS_PLAY_MPI_END2(v->pi->m);
1519 if (v->siblings) {
1520 void * sv = v->abase->vec_subvec(v->abase, v->v,
1521 wr->trank * thread_chunk);
1522 for(unsigned int k = 1 ; k < wr->ncores ; k++) {
1523 void * dv = v->abase->vec_subvec(v->abase,
1524 mmt_vec_sibling(v, (wr->trank+k) % wr->ncores)->v,
1525 wr->trank * thread_chunk);
1526 v->abase->vec_set(v->abase, dv, sv, thread_chunk);
1527 }
1528 }
1529 v->consistency = 2;
1530 }
1531 /* }}} */
1532
1533 /**********************************************************************/
1534 /* bench code */
1535
matmul_top_comm_bench_helper(int * pk,double * pt,void (* f)(mmt_vec_ptr),mmt_vec_ptr v)1536 void matmul_top_comm_bench_helper(int * pk, double * pt,
1537 void (*f) (mmt_vec_ptr),
1538 mmt_vec_ptr v)
1539 {
1540 int k;
1541 double t0, t1;
1542 int cont;
1543 t0 = wct_seconds();
1544 for (k = 0;; k++) {
1545 t1 = wct_seconds();
1546 cont = t1 < t0 + 0.25;
1547 cont = cont && (t1 < t0 + 1 || k < 100);
1548 pi_allreduce(NULL, &cont, 1, BWC_PI_INT, BWC_PI_MIN, v->pi->m);
1549 if (!cont)
1550 break;
1551 /* It's difficult to be faithful to the requirements on
1552 * consistency here. But apparently 1 pleases both operations
1553 * tested. */
1554 v->consistency = 1;
1555 (*f) (v);
1556 }
1557 int target = 10 * k / (t1 - t0);
1558 ASSERT_ALWAYS(target >= 0);
1559 if (target > 100)
1560 target = 100;
1561 if (target == 0)
1562 target = 1;
1563 pi_bcast(&target, 1, BWC_PI_INT, 0, 0, v->pi->m);
1564 t0 = wct_seconds();
1565 for (k = 0; k < target; k++) {
1566 pi_log_op(v->pi->m, "[%s] iter%d/%d", __func__, k, target);
1567 v->consistency = 1; /* see above */
1568 (*f) (v);
1569 }
1570 serialize(v->pi->m);
1571 t1 = wct_seconds();
1572 *pk = k;
1573 *pt = t1 - t0;
1574 }
1575
1576
matmul_top_comm_bench(matmul_top_data_ptr mmt,int d)1577 void matmul_top_comm_bench(matmul_top_data_ptr mmt, int d)
1578 {
1579 /* like matmul_top_mul_comm, we'll call mmt_vec_reduce with !d, and
1580 * mmt_vec_broadcast with d */
1581 int k;
1582 double dt;
1583
1584 void (*funcs[2])(mmt_vec_ptr) = {
1585 mmt_vec_broadcast,
1586 mmt_vec_reduce_sameside,
1587 };
1588 const char * text[2] = { "bd", "ra" };
1589
1590 mpfq_vbase_ptr abase = mmt->abase;
1591
1592 mmt_vec test_vectors[2];
1593 int is_shared[2] = {0,0};
1594 mmt_vec_init(mmt, NULL, NULL, test_vectors[0], 0, is_shared[0], mmt->n[0]);
1595 mmt_vec_init(mmt, NULL, NULL, test_vectors[1], 1, is_shared[1], mmt->n[1]);
1596
1597 size_t datasize[2];
1598
1599 {
1600 pi_comm_ptr pirow = mmt->pi->wr[!d];
1601 pi_comm_ptr picol = mmt->pi->wr[d];
1602 /* within each row, all jobs are concerned with the same range
1603 * vrow->i0 to vrow->i1. This is split into m=pirow->njobs
1604 * chunks, and reduce_scatter has m-1 communication rounds where
1605 * all of the m nodes output and receive one such chunk. All this
1606 * happens for all column threads, so we multiply by
1607 * picol->ncores, too.
1608 *
1609 * Note that vrow->i1 - vrow->i0 is #rows / picol->totalsize
1610 *
1611 * Note also that picol->ncores * #rows / picol->totalsize =
1612 * #rows / picol->njobs, so that the final thing we compute is
1613 * really:
1614 * #rows / mmt->pi->m->njobs * (pirow->njobs - 1)
1615 */
1616 size_t data_out_ra = abase->vec_elt_stride(abase,
1617 picol->ncores * (mmt->n[!d] / picol->totalsize) /
1618 pirow->njobs * (pirow->njobs - 1));
1619
1620 /* one way to do all-gather is to mimick this, except that each
1621 * node will output the same chunk at each round. Beyond that,
1622 * the calculation is similar, and we'll use it as a guide. Note
1623 * of course that if hardware-level multicast is used, our
1624 * throughput estimation is way off.
1625 *
1626 * as above, this is really:
1627 * #cols / mmt->pi->m->njobs * (picol->njobs - 1)
1628 *
1629 */
1630 size_t data_out_ag = abase->vec_elt_stride(abase,
1631 pirow->ncores * (mmt->n[d] / pirow->totalsize) /
1632 picol->njobs * (picol->njobs - 1));
1633
1634 datasize[0] = data_out_ag;
1635 datasize[1] = data_out_ra;
1636 }
1637
1638 for(int s = 0 ; s < 2 ; s++) {
1639 /* we have our axis, and the other axis */
1640 pi_comm_ptr wr = mmt->pi->wr[d ^ s]; /* our axis */
1641 pi_comm_ptr xr = mmt->pi->wr[d ^ s ^ 1]; /* other axis */
1642 /* our operation has operated on the axis wr ; hence, we must
1643 * display data relative to the different indices within the
1644 * communicator xr.
1645 */
1646 matmul_top_comm_bench_helper(&k, &dt, funcs[s], test_vectors[d^s]);
1647 if (verbose_enabled(CADO_VERBOSE_PRINT_BWC_TIMING_GRIDS)) {
1648 for(unsigned int z = 0 ; z < xr->njobs ; z++) {
1649 if (xr->jrank == z && wr->jrank == 0) {
1650 for(unsigned int w = 0 ; w < xr->ncores ; w++) {
1651 if (xr->trank == w && wr->trank == 0) {
1652 char buf[16];
1653 printf("%s %2d/%d, %s: %2d in %.1fs ; one: %.2fs (xput: %s/s)\n",
1654 wr->th->desc,
1655 xr->jrank * xr->ncores + xr->trank,
1656 xr->totalsize,
1657 text[s],
1658 k, dt, dt/k,
1659 size_disp(datasize[d^s] * k/dt, buf));
1660 }
1661 }
1662 }
1663 serialize(mmt->pi->m);
1664 }
1665 }
1666 serialize(mmt->pi->m);
1667 }
1668 mmt_vec_clear(mmt, test_vectors[0]);
1669 mmt_vec_clear(mmt, test_vectors[1]);
1670 }
1671
1672
1673 /**********************************************************************/
1674 /* Utility stuff for applying standard permutations to vectors. */
1675
1676 /* We have four permutations defined in the context of bwc.
1677 *
1678 * Sr -- row permutation
1679 * Sc -- column permutation
1680 * P -- shuffled product permutation
1681 * T -- decorrelating permutation
1682 *
1683 * As a convention, in the code as well as in the documentation below, we
1684 * freely handle the following equivalent representations of a
1685 * permutation:
1686 * - a function f,
1687 * - a list of images [f(i), * 0<=i<n],
1688 * - a list of pairs [<i,f(i)>, * 0<=i<n],
1689 * - a matrix F of size n*n, with only entries at row i and column f(i) being 1
1690 *
1691 * We let Mpad be the "padded" matrix, where #rows and #columns are both
1692 * multiples of nh*nv
1693 *
1694 * T is a constant permutation on the columns of the matrix M. It is
1695 * mostly working as a preconditioner, meant to eliminate some nasty
1696 * correlation effects between row and column weights. In every respect,
1697 * the matrix we work with consistently is the matrix Mpad*T. T is computed
1698 * by balancing_pre_shuffle and balancing_pre_unshuffle.
1699 *
1700 * row i of T has its non-zero coefficient in column
1701 * balancing_pre_shuffle(i)
1702 *
1703 * Sr and Sc are defined in the balancing file (as list of images). They
1704 * are such that the matrix Mtwisted = Sr*Mpad*T*Sc^-1 is well balanced
1705 * across the different jobs and threads. For square matrices, depending
1706 * on how the balancing was computed, one may be implicitly defined from
1707 * the other (but not equal, for the reason below).
1708 *
1709 * P is defined only for square matrices. It reflects the fact that
1710 * although the matrix is "geographically" stored as Mtwisted =
1711 * Sr*Mpad*T*Sc^-1 in the jobs and threads, the matmul_top_mul code
1712 * multiplies by a matrix which is:
1713 * - for matrix times vector: Sc*Mpad*T*Sc^-1
1714 * (that is, v<-v*Transpose(Sc*Mpad*T*Sc^-1) )
1715 * - for vector times matrix: P*Sc*Mpad*T*Sc^-1*P^-1
1716 *
1717 * Implicit definition of Sr for square matrices
1718 * =============================================
1719 *
1720 * Sc, read as the colperm[] array in the balancing file, is such that column
1721 * i of the twisted matrix is in fact column Sc(i) in the original matrix,
1722 * which means that we build Mtwisted as [...]*M*Sc^-1
1723 *
1724 * for square matrices, with FLAG_REPLICATE, the Sr permutation is chosen
1725 * implicitly.
1726 *
1727 * here's how we compute the (forward) row permutation in the code (here, xr
1728 * is the colperm array).
1729 * ix = (i * nv + j) * elem;
1730 * iy = (j * nh + i) * elem;
1731 * for(unsigned int k = 0 ; k < elem ; k++) {
1732 * m->fw_rowperm[xr[iy+k]] = ix+k;
1733 *
1734 * We denote by Sr the permutation, implicitly computed here, such that row i
1735 * of the twisted matrix is row Sr(i) in the original matrix -- so that
1736 * Mtwisted is in fact Sr*M*Sc^-1. Here, fw_rowperm is the inverse: row i in
1737 * the original matrix goes to row fw_rowperm[i] in the twisted matrix. So
1738 * that fw_rowperm == Sr^-1.
1739 *
1740 * Our code does fw_rowperm(Sc(P(x)))=x, for P the permutation which sends
1741 * sub-block nv*i+j to sub-block nh*j+i. Writing as operations on row vectors
1742 * (as magma does), this gives:
1743 * P * Sc * (Sr^-1) = id
1744 * Sr = P * Sc
1745 *
1746 * So in this case the twisted matrix is in fact Sr*Mpad*Sc^-1, and Sr = P*Sc;
1747 *
1748 * Implicit definition of Sc for square matrices
1749 * =============================================
1750 *
1751 * There is no code doing this at the moment, but one could imagine
1752 * defining this as well. In such a situation, we write down what Sc
1753 * would be.
1754 *
1755 * Matrix is geographically stored as Sr*Mpad*T*Sc^-1 in the jobs and
1756 * threads, and the matmul_top_mul code multiplies by a matrix which is:
1757 * - for matrix times vector: P^-1*Sr*Mpad*T*Sc^-1
1758 * - for vector times matrix: Sr*Mpad*T*Sc^-1*P^-1
1759 * Therefore we want Sc^-1*P^-1 == Sr^-1, whence Sc == P^-1 * Sr
1760 *
1761 * Action of P with matmul_top_mul_comm
1762 * ====================================
1763 *
1764 * After v*Mtwisted, reduce_sameside of the result (in direction 1) followed
1765 * by broadcast (in direction 0) produces a twisted vector. Since v*Mtwisted
1766 * is in direction 1, then blocks of the vector are to be read in column-major
1767 * order. If we reduce then broadcast, then sub-block nh*j+i will go to
1768 * sub-block nv*i+j. This means that v*Mtwisted will be transformed into
1769 * v*Mtwisted*P^-1
1770 *
1771 * After v*Transpose(Mtwisted), reduce_sameside on the result which is in
1772 * direction 0 transforms it to v*transpose(Mtwisted)*P (here P is
1773 * transpose of P^-1: we transpose indices blocks in the other
1774 * direction), which is thus v*transpose(P^-1*Mtwisted).
1775 *
1776 * Conclusion: with Mtwisted = Sr*Mpad*Sc^-1, and Sr = P*Sc, when we do
1777 * matmul_top_mul_cpu followed by matmul_top_mul_comm, we multiply by:
1778 * - for matrix times vector: Sc*Mpad*Sc^-1
1779 * (that is, v<-v*Transpose(Sc*Mpad*T*Sc^-1) )
1780 * - for vector times matrix: P*Sc*Mpad*Sc^-1*P^-1
1781 *
1782 * Applying P to vectors
1783 * =====================
1784 *
1785 * We can emulate the multiplications by P and P^-1 with appropriate
1786 * combinations of apply_identity and matmul_top_mul_comm. So we give a few
1787 * details.
1788 *
1789 * Let v be a distributed vector. With mmt_apply_identity, the output is
1790 * distributed in the other direction, but not consistent. If we do
1791 * mmt_vec_reduce_sameside (or mmt_vec_allreduce, which does more), the
1792 * resulting vector (looking at locally-owned pieces only) is the same as
1793 * v, in the other direction.
1794 *
1795 * And then, matmul_top_mul_comm on this resulting vector does the P
1796 * action as above. Therefore:
1797 *
1798 * For v->d == 0, applying in sequence the functions mmt_apply_identity
1799 * matmul_top_mul_comm, then we get v*P^-1
1800 *
1801 * For v->d == 1, the same sequence produces v*Transpose(P^-1)==v*P
1802 *
1803 * Doing the converse is feasible. For v->d==0, if we do instead
1804 * matmul_top_mul_comm then mmt_apply_identity, we get v*P
1805 *
1806 * For v->d==1, if we do matmul_top_mul_comm then mmt_apply_identity, we
1807 * get v*P^-1
1808 *
1809 */
1810
mmt_apply_identity(mmt_vec_ptr w,mmt_vec_ptr v)1811 void mmt_apply_identity(mmt_vec_ptr w, mmt_vec_ptr v)
1812 {
1813 /* input: fully consistent */
1814 /* output: inconsistent !
1815 * Need mmt_vec_allreduce or mmt_vec_reduce_sameside, or
1816 * matmul_top_mul_comm, depending on what we want to do. */
1817 ASSERT_ALWAYS(v->consistency == 2);
1818 ASSERT_ALWAYS(w->abase == v->abase);
1819 ASSERT_ALWAYS(v->d != w->d);
1820 ASSERT_ALWAYS(v->n == w->n);
1821
1822 mpfq_vbase_ptr A = v->abase;
1823
1824 serialize_threads(w->pi->m);
1825 mmt_full_vec_set_zero(w);
1826 serialize_threads(w->pi->m);
1827
1828 unsigned int v_off, w_off;
1829 unsigned int how_many = intersect_two_intervals(&v_off, &w_off,
1830 v->i0, v->i1, w->i0, w->i1);
1831
1832 A->vec_set(A,
1833 A->vec_coeff_ptr(A, w->v, w_off),
1834 A->vec_coeff_ptr(A, v->v, v_off),
1835 how_many);
1836 w->consistency = 1;
1837 }
1838
mmt_vec_apply_or_unapply_P_inner(matmul_top_data_ptr mmt,mmt_vec_ptr y,int apply)1839 void mmt_vec_apply_or_unapply_P_inner(matmul_top_data_ptr mmt, mmt_vec_ptr y, int apply)
1840 {
1841 ASSERT_ALWAYS(y->consistency == 2);
1842 mmt_vec yt;
1843 mmt_vec_init(mmt, y->abase, y->pitype, yt, !y->d, 0, y->n);
1844 if ((apply ^ y->d) == 0) {
1845 // y->d == 0: get v*P^-1
1846 // y->d == 1: get v*P
1847 mmt_apply_identity(yt, y);
1848 matmul_top_mul_comm(y, yt);
1849 } else {
1850 // y->d == 0: get v*P
1851 // y->d == 1: get v*P^-1
1852 mmt_vec_downgrade_consistency(y);
1853 matmul_top_mul_comm(yt, y);
1854 mmt_apply_identity(y, yt);
1855 mmt_vec_allreduce(y);
1856 }
1857 mmt_vec_clear(mmt, yt);
1858 ASSERT_ALWAYS(y->consistency == 2);
1859 }
1860
mmt_vec_unapply_P(matmul_top_data_ptr mmt,mmt_vec_ptr y)1861 void mmt_vec_unapply_P(matmul_top_data_ptr mmt, mmt_vec_ptr y)
1862 {
1863 mmt_vec_apply_or_unapply_P_inner(mmt, y, 0);
1864 }
1865
mmt_vec_apply_P(matmul_top_data_ptr mmt,mmt_vec_ptr y)1866 void mmt_vec_apply_P(matmul_top_data_ptr mmt, mmt_vec_ptr y)
1867 {
1868 mmt_vec_apply_or_unapply_P_inner(mmt, y, 1);
1869 }
1870
1871 /* apply == 1 for apply, apply == 0 for unapply *
1872 * apply == 1 d == 0 Sr defined: v <- v * Sr
1873 * apply == 1 d == 0 Sr implicit: v <- v * Sc
1874 * apply == 1 d == 1 Sc defined: v <- v * Sc
1875 * apply == 1 d == 1 Sc implicit: v <- v * Sr
1876 * apply == 0 d == 0 Sr defined: v <- v * Sr^-1
1877 * apply == 0 d == 0 Sr implicit: v <- v * Sc^-1
1878 * apply == 0 d == 1 Sc defined: v <- v * Sc^-1
1879 * apply == 0 d == 1 Sc implicit: v <- v * Sr^-1
1880 *
1881 * See that when, say, Sr is implicitly defined (to P*Sc), this function
1882 * only applies Sc, not P !
1883 */
mmt_vec_apply_or_unapply_S_inner(matmul_top_data_ptr mmt,int midx,mmt_vec_ptr y,int apply)1884 void mmt_vec_apply_or_unapply_S_inner(matmul_top_data_ptr mmt, int midx, mmt_vec_ptr y, int apply)
1885 {
1886 ASSERT_ALWAYS(y->consistency == 2);
1887 /* input: fully consistent */
1888 /* output: fully consistent */
1889 int d = y->d;
1890 mpfq_vbase_ptr A = y->abase;
1891
1892 serialize_threads(y->pi->m);
1893
1894 /* We'll have two vectors of size n[d], one named y in direction d,
1895 * and one named yt in direction !d.
1896 *
1897 *
1898 * In the permutation mmt->perm[d], the pairs (i,j) are such that,
1899 * given two vectors of size n[d], one named y in direction d,
1900 * and one named yt in direction !d, we have:
1901 * i in [y->i0..y->i1[
1902 * j in [yt->i0..yt->i1[
1903 */
1904 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
1905 permutation_data_ptr s = Mloc->perm[d];
1906
1907 /* For square matrices, we'll use the other permutation transparently
1908 * with this piece of code. Note though that when we do so, applying
1909 * the permutation actually goes in the opposite direction. */
1910 int xd = d;
1911 if (!s && (Mloc->bal->h->flags & FLAG_REPLICATE)) {
1912 ASSERT_ALWAYS(Mloc->n[0] == Mloc->n[1]);
1913 s = Mloc->perm[!d];
1914 xd = !d;
1915 }
1916 if (!s) {
1917 /* could well be that we have nothing to do */
1918 return;
1919 }
1920
1921 if ((apply^d^xd) == 0) {
1922 /*
1923 * apply == 0 d == 0 Sr defined: v <- v * Sr^-1
1924 * apply == 0 d == 1 Sc defined: v <- v * Sc^-1
1925 * apply == 1 d == 0 Sr implicit: v <- v * Sc
1926 * apply == 1 d == 1 Sc implicit: v <- v * Sr
1927 */
1928 mmt_vec yt;
1929 mmt_vec_init(mmt, A, y->pitype, yt, !d, 0, y->n);
1930
1931 mmt_apply_identity(yt, y);
1932 mmt_vec_allreduce(yt);
1933 mmt_full_vec_set_zero(y);
1934 serialize_threads(y->pi->m);
1935 for(unsigned int k = 0 ; k < s->n ; k++) {
1936 if (s->x[k][d^xd] < y->i0 || s->x[k][d^xd] >= y->i1)
1937 continue;
1938 if (s->x[k][d^xd^1] < yt->i0 || s->x[k][d^xd^1] >= yt->i1)
1939 continue;
1940 A->vec_set(A,
1941 A->vec_coeff_ptr(A, y->v, s->x[k][d^xd] - y->i0),
1942 A->vec_coeff_ptr(A, yt->v, s->x[k][d^xd^1] - yt->i0),
1943 1);
1944 }
1945 y->consistency = 1;
1946 serialize_threads(y->pi->m);
1947 mmt_vec_allreduce(y);
1948 mmt_vec_clear(mmt, yt);
1949 } else {
1950 /*
1951 * apply == 1 d == 0 Sr defined: v <- v * Sr
1952 * apply == 1 d == 1 Sc defined: v <- v * Sc
1953 * apply == 0 d == 0 Sr implicit: v <- v * Sc^-1
1954 * apply == 0 d == 1 Sc implicit: v <- v * Sr^-1
1955 */
1956 mmt_vec yt;
1957 mmt_vec_init(mmt, A, y->pitype, yt, !d, 0, y->n);
1958 for(unsigned int k = 0 ; k < s->n ; k++) {
1959 if (s->x[k][d^xd] < y->i0 || s->x[k][d^xd] >= y->i1)
1960 continue;
1961 if (s->x[k][d^xd^1] < yt->i0 || s->x[k][d^xd^1] >= yt->i1)
1962 continue;
1963 A->vec_set(A,
1964 A->vec_coeff_ptr(A, yt->v, s->x[k][d^xd^1] - yt->i0),
1965 A->vec_coeff_ptr(A, y->v, s->x[k][d^xd] - y->i0),
1966 1);
1967 }
1968 yt->consistency = 1;
1969 mmt_vec_allreduce(yt);
1970 mmt_apply_identity(y, yt);
1971 mmt_vec_allreduce(y);
1972 mmt_vec_clear(mmt, yt);
1973 }
1974 serialize_threads(y->pi->m);
1975 ASSERT_ALWAYS(y->consistency == 2);
1976 }
1977
1978 /* multiply v by Sr^-1 if v->d == 0, by Sc^-1 if v->d == 1 */
mmt_vec_unapply_S(matmul_top_data_ptr mmt,int midx,mmt_vec_ptr y)1979 void mmt_vec_unapply_S(matmul_top_data_ptr mmt, int midx, mmt_vec_ptr y)
1980 {
1981 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
1982 mmt_vec_apply_or_unapply_S_inner(mmt, midx, y, 0);
1983 if ((Mloc->bal->h->flags & FLAG_REPLICATE) && !Mloc->perm[y->d]) {
1984 if (y->d == 0) {
1985 /* implicit Sr^-1 is Sc^-1*P^-1 */
1986 mmt_vec_unapply_P(mmt, y);
1987 } else {
1988 /* implicit Sc^-1 is Sr^-1*P */
1989 mmt_vec_apply_P(mmt, y);
1990 }
1991 }
1992 }
1993
1994 /* multiply v by Sr if v->d == 0, by Sc if v->d == 1 */
mmt_vec_apply_S(matmul_top_data_ptr mmt,int midx,mmt_vec_ptr y)1995 void mmt_vec_apply_S(matmul_top_data_ptr mmt, int midx, mmt_vec_ptr y)
1996 {
1997 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
1998 if ((Mloc->bal->h->flags & FLAG_REPLICATE) && !Mloc->perm[y->d]) {
1999
2000 if (y->d == 0) {
2001 /* implicit Sr is P * Sc */
2002 mmt_vec_apply_P(mmt, y);
2003 } else {
2004 /* implicit Sc is P^-1 * Sr */
2005 mmt_vec_unapply_P(mmt, y);
2006 }
2007 }
2008 mmt_vec_apply_or_unapply_S_inner(mmt, midx, y, 1);
2009 }
2010
2011 /* for square matrix products, the inner loops do
2012 * v <- v * (Sr=P*Sc)*Mpad*Sc^-1*P^-1 (for vector times matrix)
2013 * v <- v * Transpose(P^-1*(Sr=P*Sc)*Mpad*Sc^-1) (for matrix times vector)
2014 * while for non-square (no FLAG_REPLICATE), it's simply by
2015 * Sr*Mpad*Sc^-1
2016 *
2017 * therefore the rules for twisting and untwisting are as follows.
2018 *
2019 * twisting v->d == 0
2020 * we assume we want here to change v to become a good *input* for a
2021 * vector times matrix operation. Therefore we do:
2022 * v <- v * Sr^-1
2023 * twisting v->d == 1
2024 * v <- v * Sc^-1
2025 *
2026 */
2027
mmt_vec_twist(matmul_top_data_ptr mmt,mmt_vec_ptr y)2028 void mmt_vec_twist(matmul_top_data_ptr mmt, mmt_vec_ptr y)
2029 {
2030 mmt_vec_unapply_S(mmt, y->d == 0 ? 0 : (mmt->nmatrices-1), y);
2031 }
2032
mmt_vec_untwist(matmul_top_data_ptr mmt,mmt_vec_ptr y)2033 void mmt_vec_untwist(matmul_top_data_ptr mmt, mmt_vec_ptr y)
2034 {
2035 mmt_vec_apply_S(mmt, y->d == 0 ? 0 : (mmt->nmatrices-1), y);
2036 }
2037
2038 /* {{{ mmt_vec_{un,}appy_T -- this applies the fixed column
2039 * permutation which we use unconditionally in bwc to avoid correlation
2040 * of row and column weights.
2041 */
2042 // pshuf indicates two integers a,b such that the COLUMN i of the input
2043 // matrix is in fact mapped to column a*i+b mod n in the matrix we work
2044 // with. pshuf_inv indicates the inverse permutation. a and b do
mmt_vec_apply_or_unapply_T_inner(matmul_top_data_ptr mmt,mmt_vec_ptr y,int apply)2045 void mmt_vec_apply_or_unapply_T_inner(matmul_top_data_ptr mmt, mmt_vec_ptr y, int apply)
2046 {
2047 /* apply: coefficient i of the vector goes to coefficient
2048 * balancing_pre_shuffle[i]
2049 *
2050 * For row vectors this means: apply == (v <- v * T)
2051 * For column vectors this means: apply == (v <- T^-1 * v)
2052 *
2053 */
2054 if (y->d == 0) return;
2055 matmul_top_matrix_ptr Mloc = mmt->matrices[mmt->nmatrices - 1];
2056 ASSERT_ALWAYS(y->consistency == 2);
2057 serialize_threads(y->pi->m);
2058 mmt_vec yt;
2059 mmt_vec_init(mmt, y->abase, y->pitype, yt, !y->d, 0, y->n);
2060 for(unsigned int i = y->i0 ; i < y->i1 ; i++) {
2061 unsigned int j;
2062 if (apply) {
2063 j = balancing_pre_shuffle(Mloc->bal, i);
2064 } else {
2065 j = balancing_pre_unshuffle(Mloc->bal, i);
2066 }
2067 if (j >= yt->i0 && j < yt->i1) {
2068 y->abase->vec_set(y->abase,
2069 y->abase->vec_coeff_ptr(y->abase, yt->v, j - yt->i0),
2070 y->abase->vec_coeff_ptr_const(y->abase, y->v, i - y->i0),
2071 1);
2072 }
2073 }
2074 yt->consistency = 0;
2075 mmt_vec_allreduce(yt);
2076 mmt_apply_identity(y, yt);
2077 mmt_vec_allreduce(y);
2078 }
mmt_vec_unapply_T(matmul_top_data_ptr mmt,mmt_vec_ptr v)2079 void mmt_vec_unapply_T(matmul_top_data_ptr mmt, mmt_vec_ptr v)
2080 {
2081 mmt_vec_apply_or_unapply_T_inner(mmt, v, 0);
2082 }
2083
mmt_vec_apply_T(matmul_top_data_ptr mmt,mmt_vec_ptr v)2084 void mmt_vec_apply_T(matmul_top_data_ptr mmt, mmt_vec_ptr v)
2085 {
2086 mmt_vec_apply_or_unapply_T_inner(mmt, v, 1);
2087 }
2088 /* }}} */
2089
2090 /* {{{ Application of permutations to indices */
2091
2092 /* Given indices which relate to a vector in direction d, modify them in
2093 * place so that they correspond to coefficients of the matching twisted vector.
2094 *
2095 * So we want: v_i == twist(v)_{twist(i)}
2096 *
2097 * for d == 0, twist(v) = v * Sr^-1, so that index i in v goes to
2098 * position S(i) in in the twisted vector. For implicit Sr, we have to
2099 * take into account the fact that Sr=P*Sc, hence S(i) is in fact
2100 * Sc[P[i]].
2101 *
2102 * The algorithm is simple: only one thread knows about each single
2103 * (i,Sr(i)) pair. So this one changes the relevant xs[] values, leaving
2104 * the other to zero. And then we do a global allreduce() on the xs[]
2105 * values.
2106 *
2107 * (here, "relevant" may mean something which includes the P
2108 * permutation).
2109 */
2110
2111 /* returns the two intervals such that for all pairs (i,j) in
2112 * mmt->perm->x, we have ii[0] <= i < ii[1], and jj[0] <= j < jj[1]
2113 */
get_local_permutations_ranges(matmul_top_data_ptr mmt,int d,unsigned int ii[2],unsigned int jj[2])2114 static void get_local_permutations_ranges(matmul_top_data_ptr mmt, int d, unsigned int ii[2], unsigned int jj[2])
2115 {
2116 int pos[2];
2117
2118 for(int dir = 0 ; dir < 2 ; dir++) {
2119 pi_comm_ptr piwr = mmt->pi->wr[dir];
2120 pos[dir] = piwr->jrank * piwr->ncores + piwr->trank;
2121 }
2122
2123 size_t e = mmt->n[d] / mmt->pi->m->totalsize;
2124 ii[0] = e * pos[!d] * mmt->pi->wr[d]->totalsize;
2125 ii[1] = e * (pos[!d]+1) * mmt->pi->wr[d]->totalsize;
2126 jj[0] = e * pos[d] * mmt->pi->wr[!d]->totalsize;
2127 jj[1] = e * (pos[d]+1) * mmt->pi->wr[!d]->totalsize;
2128 }
2129
indices_twist(matmul_top_data_ptr mmt,uint32_t * xs,unsigned int n,int d)2130 void indices_twist(matmul_top_data_ptr mmt, uint32_t * xs, unsigned int n, int d)
2131 {
2132 int midx = d == 0 ? 0 : (mmt->nmatrices - 1);
2133 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
2134 /* d == 1: twist(v) = v*Sc^-1
2135 * d == 0: twist(v) = v*Sr^-1
2136 */
2137 unsigned int ii[2], jj[2];
2138
2139 if (Mloc->perm[d]) {
2140 /* explicit S */
2141 /* coordinate S[i] in the original vector becomes i in the
2142 * twisted vector.
2143 */
2144 get_local_permutations_ranges(mmt, d, ii, jj);
2145 unsigned int * r = malloc((jj[1] - jj[0]) * sizeof(unsigned int));
2146 memset(r, 0, (jj[1] - jj[0]) * sizeof(unsigned int));
2147 for(size_t i = 0 ; i < Mloc->perm[d]->n ; i++) {
2148 ASSERT_ALWAYS(Mloc->perm[d]->x[i][0] >= ii[0]);
2149 ASSERT_ALWAYS(Mloc->perm[d]->x[i][0] < ii[1]);
2150 ASSERT_ALWAYS(Mloc->perm[d]->x[i][1] >= jj[0]);
2151 ASSERT_ALWAYS(Mloc->perm[d]->x[i][1] < jj[1]);
2152 // r[Mloc->perm[d]->x[i][0] - ii[0]] = Mloc->perm[d]->x[i][1];
2153 r[Mloc->perm[d]->x[i][1] - jj[0]] = Mloc->perm[d]->x[i][0];
2154 }
2155 for(unsigned int k = 0 ; k < n ; k++) {
2156 unsigned int j = xs[k];
2157 if (j >= jj[0] && j < jj[1])
2158 xs[k] = r[j - jj[0]];
2159 else
2160 xs[k] = 0;
2161 }
2162 free(r);
2163 pi_allreduce(NULL, xs, n * sizeof(uint32_t), BWC_PI_BYTE, BWC_PI_BXOR, mmt->pi->m);
2164 } else if (Mloc->perm[!d] && (Mloc->bal->h->flags & FLAG_REPLICATE)) {
2165 ASSERT_ALWAYS(Mloc->n[0] == Mloc->n[1]);
2166 /* implicit S -- first we get the bits about the S in the other
2167 * direction, because the pieces we have are for the other
2168 * ranges, which is a bit disturbing...
2169 */
2170 get_local_permutations_ranges(mmt, !d, ii, jj);
2171 unsigned int * r = malloc((jj[1] - jj[0]) * sizeof(unsigned int));
2172 memset(r, 0, (jj[1] - jj[0]) * sizeof(unsigned int));
2173 for(size_t i = 0 ; i < Mloc->perm[!d]->n ; i++) {
2174 ASSERT_ALWAYS(Mloc->perm[!d]->x[i][0] >= ii[0]);
2175 ASSERT_ALWAYS(Mloc->perm[!d]->x[i][0] < ii[1]);
2176 ASSERT_ALWAYS(Mloc->perm[!d]->x[i][1] >= jj[0]);
2177 ASSERT_ALWAYS(Mloc->perm[!d]->x[i][1] < jj[1]);
2178 r[Mloc->perm[!d]->x[i][1] - jj[0]] = Mloc->perm[!d]->x[i][0];
2179 }
2180 /* nh and nv are the same for all submatrices, really */
2181 unsigned int nn[2] = { Mloc->bal->h->nh, Mloc->bal->h->nv };
2182 /* for d == 0, we have implicit Sr = P * Sc.
2183 * for d == 1, we have implicit Sc = P^-1 * Sc
2184 */
2185 size_t z = Mloc->bal->trows / (nn[0]*nn[1]);
2186 for(unsigned int k = 0 ; k < n ; k++) {
2187 unsigned int j = xs[k];
2188 /* for d == 0, index i goes to P^-1[Sc^-1[i]] */
2189 if (j >= jj[0] && j < jj[1]) {
2190 unsigned int i = r[j - jj[0]];
2191 unsigned int qz = i / z;
2192 unsigned int rz = i % z;
2193 /* P sends sub-block nv*i+j to sub-block nh*j+i */
2194 /* P^-1 sends sub-block nh*j+i to sub-block nv*i+j */
2195 unsigned int qh = qz / nn[d];
2196 unsigned int rh = qz % nn[d];
2197 ASSERT_ALWAYS(qz == qh * nn[d] + rh);
2198 ASSERT_ALWAYS(i == (qh * nn[d] + rh)*z + rz);
2199 xs[k] = (rh * nn[!d] + qh)*z + rz;
2200 } else {
2201 xs[k] = 0;
2202 }
2203 }
2204 free(r);
2205 pi_allreduce(NULL, xs, n * sizeof(uint32_t), BWC_PI_BYTE, BWC_PI_BXOR, mmt->pi->m);
2206 }
2207 }
2208 /* }}} */
2209
2210 /**********************************************************************/
2211 #if 0
2212 /* no longer used -- was only used by prep.
2213 * It's not buggy, but making this work in a context where we have
2214 * multiple threads is tricky.
2215 */
2216 void matmul_top_fill_random_source(matmul_top_data_ptr mmt, int d)
2217 {
2218 matmul_top_fill_random_source_generic(mmt, mmt->vr->stride, NULL, d);
2219 }
2220 #endif
2221
2222 /* For d == 0: do w = v * M
2223 * For d == 1: do w = M * v
2224 *
2225 * We do not necessarily have v->d == d, although this will admittedly be
2226 * the case most often:
2227 * - in block Wiedemann, we have a vector split in some direction (say
2228 * d==1 when we wanna solve Mw=0), we compute w=Mv, and then there's
2229 * the matmul_top_mul_comm step which moves stuff to v again.
2230 * - in block Lanczos, say we start from v->d == 1 again. We do w=M*v,
2231 * so that w->d==0. But then we want to compute M^T * w, which is w^T *
2232 * M. So again, w->d == 0 is appropriate with the second product being
2233 * in the direction foo*M.
2234 * - the only case where this does not necessarily happen so is when we
2235 * have several matrices.
2236 */
matmul_top_mul_cpu(matmul_top_data_ptr mmt,int midx,int d,mmt_vec_ptr w,mmt_vec_ptr v)2237 void matmul_top_mul_cpu(matmul_top_data_ptr mmt, int midx, int d, mmt_vec_ptr w, mmt_vec_ptr v)
2238 {
2239 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
2240 ASSERT_ALWAYS(v->consistency == 2);
2241 ASSERT_ALWAYS(w->abase == v->abase);
2242 unsigned int di_in = v->i1 - v->i0;
2243 unsigned int di_out = w->i1 - w->i0;
2244 ASSERT_ALWAYS(Mloc->mm->dim[!d] == di_out);
2245 ASSERT_ALWAYS(Mloc->mm->dim[d] == di_in);
2246
2247 ASSERT_ALWAYS(w->siblings); /* w must not be shared */
2248
2249 pi_log_op(mmt->pi->m, "[%s:%d] enter matmul_mul", __func__, __LINE__);
2250
2251 /* Note that matmul_init copies the calling abase argument to the
2252 * lower-level mm structure. It can quite probably be qualified as a
2253 * flaw.
2254 */
2255 matmul_mul(Mloc->mm, w->v, v->v, d);
2256 w->consistency = 0;
2257 }
2258
2259 /* This takes partial results in w, and puts the
2260 * collected and re-broadcasted results in the areas mmt->wd[d]->v
2261 *
2262 * Note that for the shuffled product, this is not equivalent to a trivial
2263 * operation.
2264 */
matmul_top_mul_comm(mmt_vec_ptr v,mmt_vec_ptr w)2265 void matmul_top_mul_comm(mmt_vec_ptr v, mmt_vec_ptr w)
2266 {
2267 /* this takes inconsistent input.
2268 * XXX if we have fully consistent input, then a reduce() is much
2269 * undesired !
2270 */
2271 ASSERT_ALWAYS(w->consistency != 2);
2272 pi_log_op(v->pi->m, "[%s:%d] enter mmt_vec_reduce", __func__, __LINE__);
2273 mmt_vec_reduce(v, w);
2274 ASSERT_ALWAYS(v->consistency == 1);
2275 pi_log_op(v->pi->m, "[%s:%d] enter mmt_vec_broadcast", __func__, __LINE__);
2276 mmt_vec_broadcast(v);
2277 ASSERT_ALWAYS(v->consistency == 2);
2278
2279 /* If we have shared input data for the column threads, then we'd
2280 * better make sure it has arrived completely, because while all
2281 * threads will need the data, only one is actually importing it.
2282 */
2283 if (!v->siblings) {
2284 pi_log_op(v->pi->wr[v->d], "[%s:%d] serialize threads", __func__, __LINE__);
2285 serialize_threads(v->pi->wr[v->d]);
2286 }
2287 }
2288
2289
2290 /* Vector I/O is done by only one job, one thread. It incurs a
2291 * significant amount of memory allocation, but this is done relatively
2292 * rarely.
2293 *
2294 * Vectors, as handled within the core routines, are permuted. He have
2295 * coordinates (v_{\sigma(0)}...v_{\sigma(n-1)}), where \sigma is the
2296 * permutation given by the *_perm files. For block Wiedemann, we assume
2297 * that we have conjugated permutations on the two sides. This means that
2298 * no data manipulation is required within the critical loops (this would
2299 * imply a fuzzy communication pattern, boiling down to essentially using
2300 * allreduce instead of reduce).
2301 */
2302
2303 /* As always, comments are written with one given point of view in mind.
2304 * The vector wich gets saved is always the source vector. Here, our
2305 * arbitrary description choice is that we iterate M^i times vector,
2306 * hence the source vector is on the right: d == 1.
2307 */
2308
2309 // comments, variable names and so on are names which for simplicity
2310 // reflect the situation d == 1 (save right vector). The stable state we
2311 // start with is after a matrix-times-vector product (or just before
2312 // one): The left vector has been computed, and the data for indices
2313 // [i0..i1[ is on the nodes seeing those indices vertically as well. Data
2314 // is therefore found in the right vector area, as after the reduce step.
2315 //
2316 // Doing a mmt_vec_broadcast columns will ensure that each row contains
2317 // the complete data set for our vector.
2318
mmt_vec_set_random_through_file(mmt_vec_ptr v,const char * filename_pattern,unsigned int itemsondisk,gmp_randstate_t rstate,unsigned int block_position)2319 void mmt_vec_set_random_through_file(mmt_vec_ptr v, const char * filename_pattern, unsigned int itemsondisk, gmp_randstate_t rstate, unsigned int block_position)
2320 {
2321 /* FIXME: this generates the complete vector on rank 0, saves it, and
2322 * loads it again. But I'm a bit puzzled by the choice of saving a
2323 * number of items which is n0[d]. Seems to me that this is in fact
2324 * incorrect, we want n0[!d] here.
2325 */
2326 mpfq_vbase_ptr A = v->abase;
2327 parallelizing_info_ptr pi = v->pi;
2328 int tcan_print = v->pi->m->trank == 0 && v->pi->m->jrank == 0;
2329
2330 mpz_t p;
2331 mpz_init(p);
2332 v->abase->field_characteristic(v->abase, p);
2333 int char2 = mpz_cmp_ui(p, 2) == 0;
2334 mpz_clear(p);
2335 int splitwidth = char2 ? 64 : 1;
2336 unsigned int Adisk_width = splitwidth;
2337 unsigned int Adisk_multiplex = v->abase->simd_groupsize(v->abase) / Adisk_width;
2338
2339 ASSERT_ALWAYS(itemsondisk % Adisk_multiplex == 0);
2340 unsigned int loc_itemsondisk = itemsondisk / Adisk_multiplex;
2341
2342 if (pi->m->trank == 0 && pi->m->jrank == 0) {
2343 for(unsigned int b = 0 ; b < Adisk_multiplex ; b++) {
2344 unsigned int b0 = block_position + b * Adisk_width;
2345 char * filename;
2346 int rc;
2347 rc = asprintf(&filename, filename_pattern, b0, b0 + splitwidth);
2348 ASSERT_ALWAYS(rc >= 0);
2349
2350 /* we want to create v->n / Adisk_multiplex entries --
2351 * but we can't do that with access to just A. So we
2352 * generate slightly more, and rely on itemsondisk to do
2353 * the job of properly cutting the overflowing data.
2354 */
2355
2356 size_t nitems = iceildiv(v->n, Adisk_multiplex);
2357 void * y;
2358 cheating_vec_init(A, &y, nitems);
2359 A->vec_set_zero(A, y, nitems);
2360 A->vec_random(A, y, nitems, rstate);
2361 double tt = -wct_seconds();
2362 if (tcan_print) {
2363 printf("Creating random vector %s...", filename);
2364 fflush(stdout);
2365 }
2366 FILE * f = fopen(filename, "wb");
2367 ASSERT_ALWAYS(f);
2368 rc = fwrite(y, A->vec_elt_stride(A,1), loc_itemsondisk, f);
2369 ASSERT_ALWAYS(rc == (int) loc_itemsondisk);
2370 fclose(f);
2371 tt += wct_seconds();
2372 if (tcan_print) {
2373 size_t sizeondisk = A->vec_elt_stride(A, loc_itemsondisk);
2374 char buf[20], buf2[20];
2375 printf(" done [%s in %.2fs, %s/s]\n",
2376 size_disp(sizeondisk / Adisk_multiplex, buf),
2377 tt,
2378 size_disp(sizeondisk / Adisk_multiplex / tt, buf2));
2379 }
2380 cheating_vec_clear(A, &y, v->n);
2381
2382 free(filename);
2383 }
2384 }
2385 int ok = mmt_vec_load(v, filename_pattern, itemsondisk, block_position);
2386 ASSERT_ALWAYS(ok);
2387 }
2388
mmt_vec_hamming_weight(mmt_vec_ptr y)2389 unsigned long mmt_vec_hamming_weight(mmt_vec_ptr y) {
2390 ASSERT_ALWAYS(y->consistency == 2);
2391 unsigned long w = y->abase->vec_hamming_weight(y->abase, y->v, y->i1 - y->i0);
2392 /* all threads / cores in wiring wr[y->d] share the same data and
2393 * thus deduce the same count */
2394 pi_allreduce(NULL, &w, 1, BWC_PI_UNSIGNED_LONG, BWC_PI_SUM, y->pi->wr[!y->d]);
2395 return w;
2396 }
2397
2398 /* this is inconsistent in the sense that it's balancing-dependent */
mmt_vec_set_random_inconsistent(mmt_vec_ptr v,gmp_randstate_t rstate)2399 void mmt_vec_set_random_inconsistent(mmt_vec_ptr v, gmp_randstate_t rstate)
2400 {
2401 ASSERT_ALWAYS(v != NULL);
2402 mmt_full_vec_set_zero(v);
2403 v->abase->vec_random(v->abase, mmt_my_own_subvec(v), mmt_my_own_size_in_items(v), rstate);
2404 v->consistency=1;
2405 mmt_vec_allreduce(v);
2406 }
2407
2408 /* _above and _below functions here do not use mmt, but we activate this
2409 * same interface nevertheless, for consistency with mmt_vec_truncate */
mmt_vec_truncate_above_index(matmul_top_data_ptr mmt MAYBE_UNUSED,mmt_vec_ptr v,unsigned int idx)2410 void mmt_vec_truncate_above_index(matmul_top_data_ptr mmt MAYBE_UNUSED, mmt_vec_ptr v, unsigned int idx)
2411 {
2412 ASSERT_ALWAYS(v != NULL);
2413 if (idx <= v->i0) idx = v->i0;
2414 if (v->i0 <= idx && idx < v->i1) {
2415 if (v->siblings) {
2416 v->abase->vec_set_zero(v->abase,
2417 v->abase->vec_subvec(v->abase, v->v, idx - v->i0),
2418 v->i1 - idx);
2419 } else {
2420 serialize_threads(v->pi->wr[v->d]);
2421 if (v->pi->wr[v->d]->trank == 0)
2422 v->abase->vec_set_zero(v->abase,
2423 v->abase->vec_subvec(v->abase, v->v, idx - v->i0),
2424 v->i1 - idx);
2425 serialize_threads(v->pi->wr[v->d]);
2426 }
2427 }
2428 }
2429
mmt_vec_truncate_below_index(matmul_top_data_ptr mmt MAYBE_UNUSED,mmt_vec_ptr v,unsigned int idx)2430 void mmt_vec_truncate_below_index(matmul_top_data_ptr mmt MAYBE_UNUSED, mmt_vec_ptr v, unsigned int idx)
2431 {
2432 ASSERT_ALWAYS(v != NULL);
2433 if (idx >= v->i1) idx = v->i1;
2434 if (v->i0 <= idx && idx < v->i1) {
2435 if (v->siblings) {
2436 v->abase->vec_set_zero(v->abase, v->v, idx - v->i0);
2437 } else {
2438 serialize_threads(v->pi->wr[v->d]);
2439 if (v->pi->wr[v->d]->trank == 0)
2440 v->abase->vec_set_zero(v->abase, v->v, idx - v->i0);
2441 serialize_threads(v->pi->wr[v->d]);
2442 }
2443 }
2444 }
2445
mmt_vec_truncate(matmul_top_data_ptr mmt,mmt_vec_ptr v)2446 void mmt_vec_truncate(matmul_top_data_ptr mmt, mmt_vec_ptr v)
2447 {
2448 mmt_vec_truncate_above_index(mmt, v, mmt->n0[v->d]);
2449 }
2450
mmt_vec_set_x_indices(mmt_vec_ptr y,uint32_t * gxvecs,int m,unsigned int nx)2451 void mmt_vec_set_x_indices(mmt_vec_ptr y, uint32_t * gxvecs, int m, unsigned int nx)
2452 {
2453 int shared = !y->siblings;
2454 mpfq_vbase_ptr A = y->abase;
2455 mmt_full_vec_set_zero(y);
2456 if (!shared || y->pi->wr[y->d]->trank == 0) {
2457 for(int j = 0 ; j < m ; j++) {
2458 for(unsigned int k = 0 ; k < nx ; k++) {
2459 uint32_t i = gxvecs[j*nx+k];
2460 // set bit j of entry i to 1.
2461 if (i < y->i0 || i >= y->i1)
2462 continue;
2463 {
2464 void * x = A->vec_coeff_ptr(A, y->v, i - y->i0);
2465 A->simd_add_ui_at(A, x, x, j, 1);
2466 }
2467 }
2468 }
2469 }
2470 y->consistency=2;
2471 if (shared)
2472 serialize_threads(y->pi->wr[y->d]);
2473 }
2474
2475 /* Set to the zero vector, except for the first n entries that are taken
2476 * from the vector v
2477 */
mmt_vec_set_expanded_copy_of_local_data(mmt_vec_ptr y,const void * v,unsigned int n)2478 void mmt_vec_set_expanded_copy_of_local_data(mmt_vec_ptr y, const void * v, unsigned int n)
2479 {
2480 int shared = !y->siblings;
2481 mpfq_vbase_ptr A = y->abase;
2482 mmt_full_vec_set_zero(y);
2483 if (!shared || y->pi->wr[y->d]->trank == 0) {
2484 for(unsigned int i = y->i0 ; i < y->i1 && i < n ; i++) {
2485 A->set(A,
2486 A->vec_coeff_ptr(A, y->v, i - y->i0),
2487 A->vec_coeff_ptr_const(A, v, i));
2488 }
2489 }
2490 y->consistency=2;
2491 if (shared)
2492 serialize_threads(y->pi->wr[y->d]);
2493 }
2494
2495
2496
2497 /**********************************************************************/
2498 static void matmul_top_read_submatrix(matmul_top_data_ptr mmt, int midx, param_list_ptr pl, int optimized_direction);
2499
2500 /* returns an allocated string holding the name of the midx-th submatrix */
matrix_list_get_item(param_list_ptr pl,const char * key,int midx)2501 static char * matrix_list_get_item(param_list_ptr pl, const char * key, int midx)
2502 {
2503 char * res = NULL;
2504 char ** mnames;
2505 int nmatrices;
2506 int rc = param_list_parse_string_list_alloc(pl, key, &mnames, &nmatrices, ",");
2507 if (rc == 0)
2508 return NULL;
2509 ASSERT_ALWAYS(midx < nmatrices);
2510 for(int i = 0 ; i < nmatrices ; i++) {
2511 if (i == midx) {
2512 res = mnames[i];
2513 } else {
2514 free(mnames[i]);
2515 }
2516 }
2517 free(mnames);
2518 return res;
2519 }
2520
matrix_get_derived_cache_subdir(const char * matrixname,parallelizing_info_ptr pi)2521 static char* matrix_get_derived_cache_subdir(const char * matrixname, parallelizing_info_ptr pi)
2522 {
2523 /* input is NULL in the case of random matrices */
2524 if (!matrixname) return NULL;
2525 unsigned int nh = pi->wr[1]->totalsize;
2526 unsigned int nv = pi->wr[0]->totalsize;
2527 char * copy = strdup(matrixname);
2528 char * t;
2529 if (strlen(copy) > 4 && strcmp((t = copy + strlen(copy) - 4), ".bin") == 0) {
2530 *t = '\0';
2531 }
2532 if ((t = strrchr(copy, '/')) == NULL) { /* basename */
2533 t = copy;
2534 } else {
2535 t++;
2536 }
2537 int rc = asprintf(&t, "%s.%ux%u", t, nh, nv);
2538 ASSERT_ALWAYS(rc >=0);
2539 free(copy);
2540 return t;
2541 }
2542
matrix_create_derived_cache_subdir(const char * matrixname,parallelizing_info_ptr pi)2543 static void matrix_create_derived_cache_subdir(const char * matrixname, parallelizing_info_ptr pi)
2544 {
2545 char * d = matrix_get_derived_cache_subdir(matrixname, pi);
2546 struct stat sbuf[1];
2547 int rc = stat(d, sbuf);
2548 if (rc < 0 && errno == ENOENT) {
2549 rc = mkdir(d, 0777);
2550 if (rc < 0 && errno != EEXIST) {
2551 fprintf(stderr, "mkdir(%s): %s\n", d, strerror(errno));
2552 }
2553 }
2554 free(d);
2555 }
2556
2557 /* return an allocated string with the name of a balancing file for this
2558 * matrix and this mpi/thr split.
2559 */
matrix_get_derived_balancing_filename(const char * matrixname,parallelizing_info_ptr pi)2560 static char* matrix_get_derived_balancing_filename(const char * matrixname, parallelizing_info_ptr pi)
2561 {
2562 /* input is NULL in the case of random matrices */
2563 if (!matrixname) return NULL;
2564 char * dn = matrix_get_derived_cache_subdir(matrixname, pi);
2565 char * t;
2566 int rc = asprintf(&t, "%s/%s.bin", dn, dn);
2567 free(dn);
2568 ASSERT_ALWAYS(rc >=0);
2569 return t;
2570 }
2571
matrix_get_derived_cache_filename_stem(const char * matrixname,parallelizing_info_ptr pi,uint32_t checksum)2572 static char* matrix_get_derived_cache_filename_stem(const char * matrixname, parallelizing_info_ptr pi, uint32_t checksum)
2573 {
2574 /* input is NULL in the case of random matrices */
2575 if (!matrixname) return NULL;
2576 char * copy = strdup(matrixname);
2577 char * t;
2578 if (strlen(copy) > 4 && strcmp((t = copy + strlen(copy) - 4), ".bin") == 0) {
2579 *t = '\0';
2580 }
2581 if ((t = strrchr(copy, '/')) == NULL) { /* basename */
2582 t = copy;
2583 } else {
2584 t++;
2585 }
2586 int pos[2];
2587 for(int d = 0 ; d < 2 ; d++) {
2588 pi_comm_ptr wr = pi->wr[d];
2589 pos[d] = wr->jrank * wr->ncores + wr->trank;
2590 }
2591 char * dn = matrix_get_derived_cache_subdir(matrixname, pi);
2592 int rc = asprintf(&t, "%s/%s.%08" PRIx32 ".h%d.v%d", dn, dn, checksum, pos[1], pos[0]);
2593 free(dn);
2594 ASSERT_ALWAYS(rc >=0);
2595 free(copy);
2596 return t;
2597 }
2598
matrix_get_derived_submatrix_filename(const char * matrixname,parallelizing_info_ptr pi)2599 static char* matrix_get_derived_submatrix_filename(const char * matrixname, parallelizing_info_ptr pi)
2600 {
2601 /* input is NULL in the case of random matrices */
2602 if (!matrixname) return NULL;
2603 char * copy = strdup(matrixname);
2604 char * t;
2605 if (strlen(copy) > 4 && strcmp((t = copy + strlen(copy) - 4), ".bin") == 0) {
2606 *t = '\0';
2607 }
2608 if ((t = strrchr(copy, '/')) == NULL) { /* basename */
2609 t = copy;
2610 } else {
2611 t++;
2612 }
2613 int pos[2];
2614 for(int d = 0 ; d < 2 ; d++) {
2615 pi_comm_ptr wr = pi->wr[d];
2616 pos[d] = wr->jrank * wr->ncores + wr->trank;
2617 }
2618 char * dn = matrix_get_derived_cache_subdir(matrixname, pi);
2619 int rc = asprintf(&t, "%s/%s.h%d.v%d.bin", dn, dn, pos[1], pos[0]);
2620 free(dn);
2621 ASSERT_ALWAYS(rc >=0);
2622 free(copy);
2623 return t;
2624 }
2625
matmul_top_init_fill_balancing_header(matmul_top_data_ptr mmt,int i,param_list_ptr pl)2626 static void matmul_top_init_fill_balancing_header(matmul_top_data_ptr mmt, int i, param_list_ptr pl)
2627 {
2628 parallelizing_info_ptr pi = mmt->pi;
2629 matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2630
2631 if (pi->m->jrank == 0 && pi->m->trank == 0) {
2632 if (!Mloc->mname) {
2633 random_matrix_fill_fake_balancing_header(Mloc->bal, pi, param_list_lookup_string(pl, "random_matrix"));
2634 } else {
2635 if (access(Mloc->bname, R_OK) != 0) {
2636 if (errno == ENOENT) {
2637 printf("Creating balancing file %s\n", Mloc->bname);
2638 struct mf_bal_args mba = {
2639 .mfile = Mloc->mname,
2640 .bfile = Mloc->bname,
2641 .nh = pi->wr[1]->totalsize,
2642 .nv = pi->wr[0]->totalsize,
2643 .do_perm = { MF_BAL_PERM_AUTO, MF_BAL_PERM_AUTO },
2644 };
2645 mf_bal_adjust_from_option_string(&mba, param_list_lookup_string(pl, "balancing_options"));
2646 /* withcoeffs being a switch for param_list, it is
2647 * clobbered by the configure_switch mechanism */
2648 mba.withcoeffs = !is_char2(mmt->abase);
2649 matrix_create_derived_cache_subdir(Mloc->mname, mmt->pi);
2650
2651 mf_bal(&mba);
2652 } else {
2653 fprintf(stderr, "Cannot access balancing file %s: %s\n", Mloc->bname, strerror(errno));
2654 exit(EXIT_FAILURE);
2655 }
2656 }
2657 balancing_read_header(Mloc->bal, Mloc->bname);
2658 }
2659 }
2660 pi_bcast(Mloc->bal, sizeof(balancing), BWC_PI_BYTE, 0, 0, mmt->pi->m);
2661
2662 /* check that balancing dimensions are compatible with our run */
2663 int ok = 1;
2664 ok = ok && mmt->pi->wr[0]->totalsize == Mloc->bal->h->nv;
2665 ok = ok && mmt->pi->wr[1]->totalsize == Mloc->bal->h->nh;
2666 if (ok) return;
2667
2668 if (pi->m->jrank == 0 && pi->m->trank == 0) {
2669 fprintf(stderr, "Matrix %d, %s: balancing file %s"
2670 " has dimensions %ux%u,"
2671 " this conflicts with the current run,"
2672 " which expects dimensions (%ux%u)x(%ux%u).\n",
2673 i, Mloc->mname, Mloc->bname,
2674 Mloc->bal->h->nh, Mloc->bal->h->nv,
2675 mmt->pi->wr[1]->njobs,
2676 mmt->pi->wr[1]->ncores,
2677 mmt->pi->wr[0]->njobs,
2678 mmt->pi->wr[0]->ncores);
2679 }
2680 serialize(mmt->pi->m);
2681 exit(1);
2682 }
2683
2684
matmul_top_init_prepare_local_permutations(matmul_top_data_ptr mmt,int i)2685 static void matmul_top_init_prepare_local_permutations(matmul_top_data_ptr mmt, int i)
2686 {
2687 matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2688 /* Here, we get a copy of the rowperm and colperm.
2689 *
2690 * For each (job,thread), two pairs of intervals are defined.
2691 *
2692 * for row indices: [i0[0], i1[0][ = [mrow->i0, mrow->i1[ and [Xi0[0], Xi1[0][
2693 * for col indices: [i0[1], i1[1][ = [mcol->i0, mcol->i1[ and [Xi0[1], Xi1[1][
2694 *
2695 * The parts we get are:
2696 * [i, rowperm[i]] for i in [mrow->i0, mrow->i1[
2697 * and rowperm[i] in [X0, X1[
2698 * [j, colperm[j]] for j in [mcol->i0, mcol->i1[
2699 * and colperm[i] in [Y0, Y1[
2700 * where X0 is what would be mcol->i0 if the matrix as many columns as
2701 * rows, and ditto for X1,Y0,Y1.
2702 */
2703
2704 unsigned int rowperm_items=0;
2705 unsigned int colperm_items=0;
2706
2707 /* Define a complete structure for the balancing which is shared
2708 * among threads, so that we'll be able to access it from all threads
2709 * simultaneously. We will put things in bal_tmp->rowperm and
2710 * bal_tmp->colperm, but beyond that, the header part will be wrong
2711 * at non-root nodes.
2712 */
2713 balancing_ptr bal_tmp = shared_malloc_set_zero(mmt->pi->m, sizeof(balancing));
2714
2715 if (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0) {
2716 if (Mloc->bname)
2717 balancing_read(bal_tmp, Mloc->bname);
2718 /* It's fine if we have nothing. This just means that we'll have
2719 * no balancing to deal with (this occurs only for matrices
2720 * which are generated at random on the fly). */
2721 rowperm_items = bal_tmp->rowperm != NULL ? bal_tmp->trows : 0;
2722 colperm_items = bal_tmp->colperm != NULL ? bal_tmp->tcols : 0;
2723 }
2724 pi_bcast(&rowperm_items, 1, BWC_PI_UNSIGNED, 0, 0, mmt->pi->m);
2725 pi_bcast(&colperm_items, 1, BWC_PI_UNSIGNED, 0, 0, mmt->pi->m);
2726
2727 if (mmt->pi->m->trank == 0) {
2728 if (rowperm_items) {
2729 ASSERT_ALWAYS(rowperm_items == Mloc->bal->trows);
2730 if (mmt->pi->m->jrank != 0)
2731 bal_tmp->rowperm = malloc(Mloc->bal->trows * sizeof(uint32_t));
2732 MPI_Bcast(bal_tmp->rowperm, Mloc->bal->trows * sizeof(uint32_t), MPI_BYTE, 0, mmt->pi->m->pals);
2733 }
2734 if (colperm_items) {
2735 ASSERT_ALWAYS(colperm_items == Mloc->bal->tcols);
2736 if (mmt->pi->m->jrank != 0)
2737 bal_tmp->colperm = malloc(Mloc->bal->tcols * sizeof(uint32_t));
2738 MPI_Bcast(bal_tmp->colperm, Mloc->bal->tcols * sizeof(uint32_t), MPI_BYTE, 0, mmt->pi->m->pals);
2739 }
2740 }
2741 serialize_threads(mmt->pi->m); /* important ! */
2742
2743 uint32_t * balperm[2] = { bal_tmp->rowperm, bal_tmp->colperm };
2744 for(int d = 0 ; d < 2 ; d++) {
2745 unsigned int ii[2];
2746 unsigned int jj[2];
2747 get_local_permutations_ranges(mmt, d, ii, jj);
2748
2749 if (!balperm[d]) continue;
2750
2751 Mloc->perm[d] = permutation_data_alloc();
2752 #ifdef MVAPICH2_NUMVERSION
2753 /* apparently mvapich2 frowns on realloc() */
2754 permutation_data_ensure(Mloc->perm[d],ii[1] - ii[0]);
2755 #endif
2756
2757 /* now create the really local permutation */
2758 for(unsigned int i = ii[0] ; i < ii[1] ; i++) {
2759 unsigned int j = balperm[d][i];
2760 if (j >= jj[0] && j < jj[1]) {
2761 unsigned int ij[2] = { i, j };
2762 permutation_data_push(Mloc->perm[d], ij);
2763 }
2764 }
2765 #if 0
2766 const char * text[2] = { "left", "right", };
2767 printf("[%s] J%uT%u does %zu/%u permutation pairs for %s vectors\n",
2768 mmt->pi->nodenumber_s,
2769 mmt->pi->m->jrank, mmt->pi->m->trank,
2770 Mloc->perm[d]->n, d ? Mloc->bal->tcols : Mloc->bal->trows,
2771 text[d]);
2772 #endif
2773 }
2774
2775 serialize_threads(mmt->pi->m); /* important ! */
2776
2777 if (mmt->pi->m->trank == 0) {
2778 if (bal_tmp->colperm) free(bal_tmp->colperm);
2779 if (bal_tmp->rowperm) free(bal_tmp->rowperm);
2780 }
2781
2782 shared_free(mmt->pi->m, bal_tmp);
2783 }
2784
matmul_top_init(matmul_top_data_ptr mmt,mpfq_vbase_ptr abase,parallelizing_info_ptr pi,param_list_ptr pl,int optimized_direction)2785 void matmul_top_init(matmul_top_data_ptr mmt,
2786 mpfq_vbase_ptr abase,
2787 /* matmul_ptr mm, */
2788 parallelizing_info_ptr pi,
2789 param_list_ptr pl,
2790 int optimized_direction)
2791 {
2792 memset(mmt, 0, sizeof(*mmt));
2793
2794 mmt->abase = abase;
2795 mmt->pitype = pi_alloc_mpfq_datatype(pi, abase);
2796 mmt->pi = pi;
2797 mmt->matrices = NULL;
2798
2799 int nbals = param_list_get_list_count(pl, "balancing");
2800 mmt->nmatrices = param_list_get_list_count(pl, "matrix");
2801 const char * random_description = param_list_lookup_string(pl, "random_matrix");
2802 const char * static_random_matrix = param_list_lookup_string(pl, "static_random_matrix");
2803
2804
2805 if (random_description || static_random_matrix) {
2806 if (nbals || mmt->nmatrices) {
2807 fprintf(stderr, "random_matrix is incompatible with balancing= and matrix=\n");
2808 exit(EXIT_FAILURE);
2809 }
2810 } else if (nbals && !mmt->nmatrices) {
2811 fprintf(stderr, "missing parameter matrix=\n");
2812 exit(EXIT_FAILURE);
2813 } else if (!nbals && mmt->nmatrices) {
2814 /* nbals == 0 is a hint towards taking the default balancing file
2815 * names, that's it */
2816 } else if (nbals != mmt->nmatrices) {
2817 fprintf(stderr, "balancing= and matrix= have inconsistent number of items\n");
2818 exit(EXIT_FAILURE);
2819 }
2820
2821 if (random_description)
2822 mmt->nmatrices = 1;
2823 if (static_random_matrix)
2824 mmt->nmatrices = 1;
2825
2826 mmt->matrices = malloc(mmt->nmatrices * sizeof(matmul_top_matrix));
2827 memset(mmt->matrices, 0, mmt->nmatrices * sizeof(matmul_top_matrix));
2828
2829 serialize_threads(mmt->pi->m);
2830
2831 /* The initialization goes through several passes */
2832 for(int i = 0 ; i < mmt->nmatrices ; i++) {
2833 matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2834 Mloc->mname = matrix_list_get_item(pl, "matrix", i);
2835 Mloc->bname = matrix_list_get_item(pl, "balancing", i);
2836 if (static_random_matrix) {
2837 ASSERT_ALWAYS(i == 0);
2838 Mloc->mname = strdup(static_random_matrix);
2839 }
2840 if (!Mloc->bname) {
2841 /* returns NULL is mname is NULL */
2842 Mloc->bname = matrix_get_derived_balancing_filename(Mloc->mname, mmt->pi);
2843 }
2844 ASSERT_ALWAYS((Mloc->bname != NULL) == !random_description);
2845
2846 matmul_top_init_fill_balancing_header(mmt, i, pl);
2847
2848 Mloc->n[0] = Mloc->bal->trows;
2849 Mloc->n[1] = Mloc->bal->tcols;
2850 Mloc->n0[0] = Mloc->bal->h->nrows;
2851 Mloc->n0[1] = Mloc->bal->h->ncols;
2852 Mloc->locfile = matrix_get_derived_cache_filename_stem(Mloc->mname, mmt->pi, Mloc->bal->h->checksum);
2853
2854 }
2855
2856 mmt->n[0] = mmt->matrices[0]->n[0];
2857 mmt->n0[0] = mmt->matrices[0]->n0[0];
2858 mmt->n[1] = mmt->matrices[mmt->nmatrices-1]->n[1];
2859 mmt->n0[1] = mmt->matrices[mmt->nmatrices-1]->n0[1];
2860
2861 /* in the second loop below, get_local_permutations_ranges uses
2862 * mmt->n[], so we do it in a second pass.
2863 * Now, given that double matrices only barely work at the moment,
2864 * I'm not absolutely sure that it's really needed.
2865 */
2866 for(int i = 0 ; i < mmt->nmatrices ; i++) {
2867 matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2868
2869 matmul_top_init_prepare_local_permutations(mmt, i);
2870
2871 if (!mmt->pi->interleaved) {
2872 matmul_top_read_submatrix(mmt, i, pl, optimized_direction );
2873 } else {
2874 /* Interleaved threads will share their matrix data. The first
2875 * thread to arrive will do the initialization, and the second
2876 * one will just grab the pointer. The trick is to be able to
2877 * pick the pointer in the right location ! We add a generic
2878 * pointer dictionary feature in the parallelizing_info
2879 * interface for this purpose.
2880 */
2881
2882 #define MMT_MM_MAGIC_KEY 0xaa000000UL
2883
2884 if (mmt->pi->interleaved->idx == 0) {
2885 matmul_top_read_submatrix(mmt, i, pl, optimized_direction);
2886 pi_store_generic(mmt->pi, MMT_MM_MAGIC_KEY + i, mmt->pi->m->trank, Mloc->mm);
2887 } else {
2888 Mloc->mm = pi_load_generic(mmt->pi, MMT_MM_MAGIC_KEY + i, mmt->pi->m->trank);
2889 }
2890 }
2891 }
2892 }
2893
matmul_top_rank_upper_bound(matmul_top_data_ptr mmt)2894 unsigned int matmul_top_rank_upper_bound(matmul_top_data_ptr mmt)
2895 {
2896 unsigned int r = MAX(mmt->n0[0], mmt->n0[1]);
2897 for(int i = 0 ; i < mmt->nmatrices ; i++) {
2898 matmul_top_matrix_ptr Mloc = mmt->matrices[i];
2899 r = MAX(r, Mloc->bal->h->nrows - Mloc->bal->h->nzrows);
2900 r = MAX(r, Mloc->bal->h->ncols - Mloc->bal->h->nzcols);
2901 }
2902 return r;
2903 }
2904
2905
export_cache_list_if_requested(matmul_top_matrix_ptr Mloc,parallelizing_info_ptr pi,param_list_ptr pl)2906 static int export_cache_list_if_requested(matmul_top_matrix_ptr Mloc, parallelizing_info_ptr pi, param_list_ptr pl)
2907 {
2908 const char * cachelist = param_list_lookup_string(pl, "export_cachelist");
2909 if (!cachelist) return 0;
2910
2911 char * myline = NULL;
2912 int rc;
2913 rc = asprintf(&myline, "%s %s", pi->nodename, Mloc->mm->cachefile_name);
2914 ASSERT_ALWAYS(rc >= 0);
2915 ASSERT_ALWAYS(myline != NULL);
2916 char ** tlines = NULL;
2917 tlines = shared_malloc_set_zero(pi->m, pi->m->ncores * sizeof(const char *));
2918 tlines[pi->m->trank] = myline;
2919 serialize_threads(pi->m);
2920
2921 /* Also, just out of curiosity, try to see what we have currently */
2922 struct stat st[1];
2923 int * has_cache = shared_malloc_set_zero(pi->m, pi->m->totalsize * sizeof(int));
2924 rc = stat(Mloc->mm->cachefile_name, st);
2925 unsigned int mynode = pi->m->ncores * pi->m->jrank;
2926 has_cache[mynode + pi->m->trank] = rc == 0;
2927 serialize_threads(pi->m);
2928
2929 int len = 0;
2930 if (pi->m->trank == 0) {
2931 MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
2932 has_cache, pi->m->ncores, MPI_INT, pi->m->pals);
2933
2934 for(unsigned int j = 0 ; j < pi->m->ncores ; j++) {
2935 int s = strlen(tlines[j]);
2936 if (s >= len) len = s;
2937 }
2938 MPI_Allreduce(MPI_IN_PLACE, &len, 1, MPI_INT, MPI_MAX, pi->m->pals);
2939 char * info = malloc(pi->m->totalsize * (len + 1));
2940 char * mybuf = malloc(pi->m->ncores * (len+1));
2941 memset(mybuf, 0, pi->m->ncores * (len+1));
2942 for(unsigned int j = 0 ; j < pi->m->ncores ; j++) {
2943 memcpy(mybuf + j * (len+1), tlines[j], strlen(tlines[j])+1);
2944 }
2945 MPI_Allgather(mybuf, pi->m->ncores * (len+1), MPI_BYTE,
2946 info, pi->m->ncores * (len+1), MPI_BYTE,
2947 pi->m->pals);
2948 if (pi->m->jrank == 0) {
2949 FILE * f = fopen(cachelist, "wb");
2950 DIE_ERRNO_DIAG(f == NULL, "fopen(%s)", cachelist);
2951 for(unsigned int j = 0 ; j < pi->m->njobs ; j++) {
2952 unsigned int j0 = j * pi->m->ncores;
2953 fprintf(f, "get-cache ");
2954 for(unsigned int k = 0 ; k < pi->m->ncores ; k++) {
2955 char * t = info + (j0 + k) * (len + 1);
2956 char * q = strchr(t, ' ');
2957 ASSERT_ALWAYS(q);
2958 if (!k) {
2959 *q='\0';
2960 fprintf(f, "%s", t);
2961 *q=' ';
2962 }
2963 fprintf(f, "%s", q);
2964 }
2965 fprintf(f, "\n");
2966 }
2967 for(unsigned int j = 0 ; j < pi->m->njobs ; j++) {
2968 unsigned int j0 = j * pi->m->ncores;
2969 fprintf(f, "has-cache ");
2970 for(unsigned int k = 0 ; k < pi->m->ncores ; k++) {
2971 char * t = info + (j0 + k) * (len + 1);
2972 char * q = strchr(t, ' ');
2973 ASSERT_ALWAYS(q);
2974 if (!k) {
2975 *q='\0';
2976 fprintf(f, "%s", t);
2977 *q=' ';
2978 }
2979 if (!has_cache[pi->m->ncores * j + k]) continue;
2980 fprintf(f, "%s", q);
2981 }
2982 fprintf(f, "\n");
2983 }
2984 fclose(f);
2985 }
2986 free(info);
2987 free(mybuf);
2988 }
2989 serialize_threads(pi->m);
2990 shared_free(pi->m, tlines);
2991 shared_free(pi->m, has_cache);
2992 serialize_threads(pi->m);
2993 free(myline);
2994 serialize(pi->m);
2995
2996 return 1;
2997 }
2998
local_fraction(unsigned int padded,unsigned int normal,pi_comm_ptr wr)2999 static unsigned int local_fraction(unsigned int padded, unsigned int normal, pi_comm_ptr wr)
3000 {
3001 ASSERT_ALWAYS(padded % wr->totalsize == 0);
3002 unsigned int i = wr->jrank * wr->ncores + wr->trank;
3003 unsigned int quo = padded / wr->totalsize;
3004 return MIN(normal - i * quo, quo);
3005 }
3006
3007
matmul_top_read_submatrix(matmul_top_data_ptr mmt,int midx,param_list_ptr pl,int optimized_direction)3008 static void matmul_top_read_submatrix(matmul_top_data_ptr mmt, int midx, param_list_ptr pl, int optimized_direction)
3009 {
3010 int rebuild = 0;
3011 param_list_parse_int(pl, "rebuild_cache", &rebuild);
3012 int can_print = (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0);
3013
3014 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
3015
3016 Mloc->mm = matmul_init(mmt->abase,
3017 Mloc->n[0] / mmt->pi->wr[1]->totalsize,
3018 Mloc->n[1] / mmt->pi->wr[0]->totalsize,
3019 Mloc->locfile, NULL /* means: choose mm_impl from pl */,
3020 pl, optimized_direction);
3021
3022 // *IF* we need to do a collective read of the matrix, we need to
3023 // provide the pointer *now*.
3024 unsigned int sqread = 0;
3025 param_list_parse_uint(pl, "sequential_cache_read", &sqread);
3026
3027 int cache_loaded = 0;
3028
3029 if (export_cache_list_if_requested(Mloc, mmt->pi, pl)) {
3030 /* If we are being called from dispatch, once all submatrices
3031 * have had their list of required files printed, the program
3032 * will exit. */
3033 return;
3034 }
3035
3036 if (!rebuild) {
3037 if (can_print) {
3038 printf("Now trying to load matrix cache files\n");
3039 }
3040 if (sqread) {
3041 for(unsigned int j = 0 ; j < mmt->pi->m->ncores ; j += sqread) {
3042 serialize_threads(mmt->pi->m);
3043 double t_read = -wct_seconds();
3044 if (j / sqread == mmt->pi->m->trank / sqread)
3045 cache_loaded = matmul_reload_cache(Mloc->mm);
3046 serialize(mmt->pi->m);
3047 t_read += wct_seconds();
3048 if (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == j && cache_loaded) {
3049 printf("[%s] J%uT%u-%u: read cache %s (and others) in %.2fs (round %u/%u)\n",
3050 mmt->pi->nodenumber_s,
3051 mmt->pi->m->jrank,
3052 mmt->pi->m->trank,
3053 MIN(mmt->pi->m->ncores, mmt->pi->m->trank + sqread) - 1,
3054 Mloc->mm->cachefile_name,
3055 t_read,
3056 j / sqread, iceildiv(mmt->pi->m->ncores, sqread)
3057 );
3058 }
3059 }
3060 } else {
3061 double t_read = -wct_seconds();
3062 cache_loaded = matmul_reload_cache(Mloc->mm);
3063 serialize(mmt->pi->m);
3064 t_read += wct_seconds();
3065 if (mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0 && cache_loaded) {
3066 printf("[%s] J%u: read cache %s (and others) in %.2fs\n",
3067 mmt->pi->nodenumber_s,
3068 mmt->pi->m->jrank,
3069 Mloc->mm->cachefile_name,
3070 t_read
3071 );
3072 }
3073 }
3074 if (!mmt->pi->m->trank) {
3075 printf("J%u %s done reading (result=%d)\n", mmt->pi->m->jrank, mmt->pi->nodename, cache_loaded);
3076 }
3077 }
3078
3079 if (!pi_data_eq(&cache_loaded, 1, BWC_PI_INT, mmt->pi->m)) {
3080 if (can_print) {
3081 fprintf(stderr, "Fatal error: cache files not present at expected locations\n");
3082 }
3083 SEVERAL_THREADS_PLAY_MPI_BEGIN(mmt->pi->m) {
3084 fprintf(stderr, "[%s] J%uT%u: cache %s: %s\n",
3085 mmt->pi->nodenumber_s,
3086 mmt->pi->m->jrank,
3087 mmt->pi->m->trank,
3088 Mloc->mm->cachefile_name,
3089 cache_loaded ? "ok" : "not ok");
3090 }
3091 SEVERAL_THREADS_PLAY_MPI_END();
3092 serialize(mmt->pi->m);
3093 abort();
3094 }
3095
3096 unsigned int sqb = 0;
3097 param_list_parse_uint(pl, "sequential_cache_build", &sqb);
3098
3099 matrix_u32 m;
3100 memset(m, 0, sizeof(matrix_u32));
3101 /* see remark in raw_matrix_u32.h about data ownership for type
3102 * matrix_u32 */
3103
3104 if (!cache_loaded) {
3105 // the mm layer is informed of the higher priority computations
3106 // that will take place. Depending on the implementation, this
3107 // may cause the direct or transposed ordering to be preferred.
3108 // Thus we have to read this back from the mm structure.
3109 m->bfile = Mloc->bname;
3110 m->mfile = Mloc->mname;
3111 m->transpose = Mloc->mm->store_transposed;
3112 m->withcoeffs = !is_char2(mmt->abase);
3113 if (!(Mloc->mname)) {
3114 if (can_print) {
3115 printf("Begin creation of fake matrix data in parallel\n");
3116 }
3117 /* Mloc->mm->dim[0,1] contains the dimensions of the padded
3118 * matrix. This is absolutely fine in the normal case. But in
3119 * the case of staged matrices, it's a bit different. We must
3120 * make sure that we generate matrices which have zeroes in
3121 * the padding area.
3122 */
3123 random_matrix_get_u32(mmt->pi, pl, m,
3124 local_fraction(Mloc->n[0], Mloc->n0[0], mmt->pi->wr[1]),
3125 local_fraction(Mloc->n[1], Mloc->n0[1], mmt->pi->wr[0]));
3126 } else {
3127 if (can_print) {
3128 printf("Matrix dispatching starts\n");
3129 }
3130
3131 /* It might be that the leader and the other nodes do _not_
3132 * share a common filesystem, in which case we must do this
3133 * also here.
3134 */
3135 if (mmt->pi->m->trank == 0)
3136 matrix_create_derived_cache_subdir(Mloc->mname, mmt->pi);
3137 serialize_threads(mmt->pi->m);
3138
3139 balancing_get_matrix_u32(mmt->pi, pl, m);
3140
3141 int ssm = 0;
3142 param_list_parse_int(pl, "save_submatrices", &ssm);
3143 if (ssm) {
3144 char * submat = matrix_get_derived_submatrix_filename(Mloc->mname, mmt->pi);
3145 fprintf(stderr, "DEBUG: creating %s\n", submat);
3146 FILE * f = fopen(submat, "wb");
3147 fwrite(m->p, sizeof(uint32_t), m->size, f);
3148 fclose(f);
3149 free(submat);
3150 }
3151 }
3152 }
3153
3154 if (can_print && Mloc->bname) {
3155 balancing bal;
3156 balancing_init(bal);
3157 balancing_read_header(bal, Mloc->bname);
3158 balancing_set_row_col_count(bal);
3159 printf("Matrix: total %" PRIu32 " rows %" PRIu32 " cols "
3160 "%" PRIu64 " coeffs\n",
3161 bal->h->nrows, bal->h->ncols, bal->h->ncoeffs);
3162 balancing_clear(bal);
3163 }
3164
3165 if (!sqb) {
3166 if (!cache_loaded) {
3167 // everybody does it in parallel
3168 if (verbose_enabled(CADO_VERBOSE_PRINT_BWC_CACHE_MAJOR_INFO))
3169 printf("[%s] J%uT%u building cache for %s\n",
3170 mmt->pi->nodenumber_s,
3171 mmt->pi->m->jrank,
3172 mmt->pi->m->trank,
3173 Mloc->locfile);
3174 matmul_build_cache(Mloc->mm, m);
3175 matmul_save_cache(Mloc->mm);
3176 }
3177 } else {
3178 if (can_print)
3179 printf("Building local caches %d at a time\n", sqb);
3180 for(unsigned int j = 0 ; j < mmt->pi->m->ncores + sqb ; j += sqb) {
3181 serialize_threads(mmt->pi->m);
3182 if (cache_loaded) continue;
3183 if (j / sqb == mmt->pi->m->trank / sqb) {
3184 if (verbose_enabled(CADO_VERBOSE_PRINT_BWC_CACHE_MAJOR_INFO))
3185 printf("[%s] J%uT%u building cache for %s\n",
3186 mmt->pi->nodenumber_s,
3187 mmt->pi->m->jrank,
3188 mmt->pi->m->trank,
3189 Mloc->locfile);
3190 matmul_build_cache(Mloc->mm, m);
3191 } else if (j / sqb == mmt->pi->m->trank / sqb + 1) {
3192 matmul_save_cache(Mloc->mm);
3193 }
3194 }
3195 }
3196
3197 /* see remark in raw_matrix_u32.h about data ownership for type
3198 * matrix_u32 */
3199
3200 if (Mloc->mm->cachefile_name && verbose_enabled(CADO_VERBOSE_PRINT_BWC_CACHE_MAJOR_INFO)) {
3201 my_pthread_mutex_lock(mmt->pi->m->th->m);
3202 printf("[%s] J%uT%u uses cache file %s\n",
3203 mmt->pi->nodenumber_s,
3204 mmt->pi->m->jrank, mmt->pi->m->trank,
3205 /* cache for mmt->locfile, */
3206 Mloc->mm->cachefile_name);
3207 my_pthread_mutex_unlock(mmt->pi->m->th->m);
3208 }
3209 }
3210
matmul_top_report(matmul_top_data_ptr mmt,double scale,int full)3211 void matmul_top_report(matmul_top_data_ptr mmt, double scale, int full)
3212 {
3213 for(int midx = 0 ; midx < mmt->nmatrices ; midx++) {
3214 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
3215 matmul_report(Mloc->mm, scale);
3216 size_t max_report_size = 0;
3217 pi_allreduce(&Mloc->mm->report_string_size, &max_report_size, 1, BWC_PI_SIZE_T, BWC_PI_MAX, mmt->pi->m);
3218 char * all_reports = malloc(mmt->pi->m->totalsize * max_report_size);
3219 memset(all_reports, 0, mmt->pi->m->totalsize * max_report_size);
3220 memcpy(all_reports + max_report_size * (mmt->pi->m->jrank * mmt->pi->m->ncores + mmt->pi->m->trank), Mloc->mm->report_string, Mloc->mm->report_string_size);
3221 pi_allgather(NULL, 0, 0,
3222 all_reports, max_report_size, BWC_PI_BYTE, mmt->pi->m);
3223
3224 if (max_report_size > 1 && mmt->pi->m->jrank == 0 && mmt->pi->m->trank == 0) {
3225 for(unsigned int j = 0 ; j < mmt->pi->m->njobs ; j++) {
3226 for(unsigned int t = 0 ; t < mmt->pi->m->ncores ; t++) {
3227 char * locreport = all_reports + max_report_size * (j * mmt->pi->m->ncores + t);
3228 if (full || (j == 0 && t == 0))
3229 printf("##### J%uT%u timing report:\n%s", j, t, locreport);
3230 }
3231 }
3232 }
3233 serialize(mmt->pi->m);
3234 free(all_reports);
3235 }
3236 }
3237
matmul_top_clear(matmul_top_data_ptr mmt)3238 void matmul_top_clear(matmul_top_data_ptr mmt)
3239 {
3240 pi_free_mpfq_datatype(mmt->pi, mmt->pitype);
3241 serialize_threads(mmt->pi->m);
3242 serialize(mmt->pi->m);
3243 serialize_threads(mmt->pi->m);
3244
3245 for(int midx = 0 ; midx < mmt->nmatrices ; midx++) {
3246 matmul_top_matrix_ptr Mloc = mmt->matrices[midx];
3247 for(int d = 0 ; d < 2 ; d++) {
3248 permutation_data_free(Mloc->perm[d]);
3249 // both are expected to hold storage for:
3250 // (mmt->n[d] / mmt->pi->m->totalsize * mmt->pi->wr[d]->ncores))
3251 // elements, corresponding to the largest abase encountered.
3252 }
3253 serialize(mmt->pi->m);
3254 if (!mmt->pi->interleaved) {
3255 matmul_clear(Mloc->mm);
3256 } else if (mmt->pi->interleaved->idx == 1) {
3257 matmul_clear(Mloc->mm);
3258 /* group 0 is the first to leave, thus it doesn't to freeing.
3259 */
3260 }
3261 free(Mloc->locfile);
3262 free(Mloc->mname);
3263 free(Mloc->bname);
3264 }
3265 serialize(mmt->pi->m);
3266 free(mmt->matrices);
3267 }
3268
permutation_data_alloc()3269 static permutation_data_ptr permutation_data_alloc()
3270 {
3271 permutation_data_ptr a = malloc(sizeof(permutation_data));
3272 a->n = 0;
3273 a->alloc = 0;
3274 a->x = NULL;
3275 return a;
3276 }
3277
permutation_data_free(permutation_data_ptr a)3278 static void permutation_data_free(permutation_data_ptr a)
3279 {
3280 if (!a) return;
3281 a->n = 0;
3282 a->alloc = 0;
3283 if (a->x) free(a->x);
3284 free(a);
3285 }
3286
3287 static pthread_mutex_t pp = PTHREAD_MUTEX_INITIALIZER;
3288
permutation_data_push(permutation_data_ptr a,unsigned int u[2])3289 static void permutation_data_push(permutation_data_ptr a, unsigned int u[2])
3290 {
3291 if (a->n >= a->alloc) {
3292 pthread_mutex_lock(&pp);
3293 a->alloc = a->n + 16 + a->alloc / 4;
3294 a->x = realloc(a->x, a->alloc * sizeof(unsigned int[2]));
3295 pthread_mutex_unlock(&pp);
3296 ASSERT_ALWAYS(a->x != NULL);
3297 }
3298 a->x[a->n][0] = u[0];
3299 a->x[a->n][1] = u[1];
3300 a->n++;
3301 }
3302
3303 #ifdef MVAPICH2_NUMVERSION
permutation_data_ensure(permutation_data_ptr a,size_t n)3304 static void permutation_data_ensure(permutation_data_ptr a, size_t n)
3305 {
3306 if (n >= a->alloc) {
3307 pthread_mutex_lock(&pp);
3308 a->alloc = n;
3309 a->x = realloc(a->x, n * sizeof(unsigned int[2]));
3310 pthread_mutex_unlock(&pp);
3311 ASSERT_ALWAYS(a->x != NULL);
3312 }
3313 }
3314 #endif
3315