1 /*
2  * Copyright (c) 2003, 2007-14 Matteo Frigo
3  * Copyright (c) 2003, 2007-14 Massachusetts Institute of Technology
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  */
20 
21 /* Distributed transposes using a sequence of carefully scheduled
22    pairwise exchanges.  This has the advantage that it can be done
23    in-place, or out-of-place while preserving the input, using buffer
24    space proportional to the local size divided by the number of
25    processes (i.e. to the total array size divided by the number of
26    processes squared). */
27 
28 #include "mpi-transpose.h"
29 #include <string.h>
30 
31 typedef struct {
32      solver super;
33      int preserve_input; /* preserve input even if DESTROY_INPUT was passed */
34 } S;
35 
36 typedef struct {
37      plan_mpi_transpose super;
38 
39      plan *cld1, *cld2, *cld2rest, *cld3;
40      INT rest_Ioff, rest_Ooff;
41 
42      int n_pes, my_pe, *sched;
43      INT *send_block_sizes, *send_block_offsets;
44      INT *recv_block_sizes, *recv_block_offsets;
45      MPI_Comm comm;
46      int preserve_input;
47 } P;
48 
transpose_chunks(int * sched,int n_pes,int my_pe,INT * sbs,INT * sbo,INT * rbs,INT * rbo,MPI_Comm comm,R * I,R * O)49 static void transpose_chunks(int *sched, int n_pes, int my_pe,
50 			     INT *sbs, INT *sbo, INT *rbs, INT *rbo,
51 			     MPI_Comm comm,
52 			     R *I, R *O)
53 {
54      if (sched) {
55 	  int i;
56 	  MPI_Status status;
57 
58 	  /* TODO: explore non-synchronous send/recv? */
59 
60 	  if (I == O) {
61 	       R *buf = (R*) MALLOC(sizeof(R) * sbs[0], BUFFERS);
62 
63 	       for (i = 0; i < n_pes; ++i) {
64 		    int pe = sched[i];
65 		    if (my_pe == pe) {
66 			 if (rbo[pe] != sbo[pe])
67 			      memmove(O + rbo[pe], O + sbo[pe],
68 				      sbs[pe] * sizeof(R));
69 		    }
70 		    else {
71 			 memcpy(buf, O + sbo[pe], sbs[pe] * sizeof(R));
72 			 MPI_Sendrecv(buf, (int) (sbs[pe]), FFTW_MPI_TYPE,
73 				      pe, (my_pe * n_pes + pe) & 0x7fff,
74 				      O + rbo[pe], (int) (rbs[pe]),
75 				      FFTW_MPI_TYPE,
76 				      pe, (pe * n_pes + my_pe) & 0x7fff,
77 				      comm, &status);
78 		    }
79 	       }
80 
81 	       X(ifree)(buf);
82 	  }
83 	  else { /* I != O */
84 	       for (i = 0; i < n_pes; ++i) {
85 		    int pe = sched[i];
86 		    if (my_pe == pe)
87 			 memcpy(O + rbo[pe], I + sbo[pe], sbs[pe] * sizeof(R));
88 		    else
89 			 MPI_Sendrecv(I + sbo[pe], (int) (sbs[pe]),
90 				      FFTW_MPI_TYPE,
91 				      pe, (my_pe * n_pes + pe) & 0x7fff,
92 				      O + rbo[pe], (int) (rbs[pe]),
93 				      FFTW_MPI_TYPE,
94 				      pe, (pe * n_pes + my_pe) & 0x7fff,
95 				      comm, &status);
96 	       }
97 	  }
98      }
99 }
100 
apply(const plan * ego_,R * I,R * O)101 static void apply(const plan *ego_, R *I, R *O)
102 {
103      const P *ego = (const P *) ego_;
104      plan_rdft *cld1, *cld2, *cld2rest, *cld3;
105 
106      /* transpose locally to get contiguous chunks */
107      cld1 = (plan_rdft *) ego->cld1;
108      if (cld1) {
109 	  cld1->apply(ego->cld1, I, O);
110 
111 	  if (ego->preserve_input) I = O;
112 
113 	  /* transpose chunks globally */
114 	  transpose_chunks(ego->sched, ego->n_pes, ego->my_pe,
115 			   ego->send_block_sizes, ego->send_block_offsets,
116 			   ego->recv_block_sizes, ego->recv_block_offsets,
117 			   ego->comm, O, I);
118      }
119      else if (ego->preserve_input) {
120 	  /* transpose chunks globally */
121 	  transpose_chunks(ego->sched, ego->n_pes, ego->my_pe,
122 			   ego->send_block_sizes, ego->send_block_offsets,
123 			   ego->recv_block_sizes, ego->recv_block_offsets,
124 			   ego->comm, I, O);
125 
126 	  I = O;
127      }
128      else {
129 	  /* transpose chunks globally */
130 	  transpose_chunks(ego->sched, ego->n_pes, ego->my_pe,
131 			   ego->send_block_sizes, ego->send_block_offsets,
132 			   ego->recv_block_sizes, ego->recv_block_offsets,
133 			   ego->comm, I, I);
134      }
135 
136      /* transpose locally, again, to get ordinary row-major;
137 	this may take two transposes if the block sizes are unequal
138 	(3 subplans, two of which operate on disjoint data) */
139      cld2 = (plan_rdft *) ego->cld2;
140      cld2->apply(ego->cld2, I, O);
141      cld2rest = (plan_rdft *) ego->cld2rest;
142      if (cld2rest) {
143 	  cld2rest->apply(ego->cld2rest,
144 			  I + ego->rest_Ioff, O + ego->rest_Ooff);
145 	  cld3 = (plan_rdft *) ego->cld3;
146 	  if (cld3)
147 	       cld3->apply(ego->cld3, O, O);
148 	  /* else TRANSPOSED_OUT is true and user wants O transposed */
149      }
150 }
151 
applicable(const S * ego,const problem * p_,const planner * plnr)152 static int applicable(const S *ego, const problem *p_,
153 		      const planner *plnr)
154 {
155      const problem_mpi_transpose *p = (const problem_mpi_transpose *) p_;
156      /* Note: this is *not* UGLY for out-of-place, destroy-input plans;
157 	the planner often prefers transpose-pairwise to transpose-alltoall,
158 	at least with LAM MPI on my machine. */
159      return (1
160 	     && (!ego->preserve_input || (!NO_DESTROY_INPUTP(plnr)
161 					  && p->I != p->O))
162 	     && ONLY_TRANSPOSEDP(p->flags));
163 }
164 
awake(plan * ego_,enum wakefulness wakefulness)165 static void awake(plan *ego_, enum wakefulness wakefulness)
166 {
167      P *ego = (P *) ego_;
168      X(plan_awake)(ego->cld1, wakefulness);
169      X(plan_awake)(ego->cld2, wakefulness);
170      X(plan_awake)(ego->cld2rest, wakefulness);
171      X(plan_awake)(ego->cld3, wakefulness);
172 }
173 
destroy(plan * ego_)174 static void destroy(plan *ego_)
175 {
176      P *ego = (P *) ego_;
177      X(ifree0)(ego->sched);
178      X(ifree0)(ego->send_block_sizes);
179      MPI_Comm_free(&ego->comm);
180      X(plan_destroy_internal)(ego->cld3);
181      X(plan_destroy_internal)(ego->cld2rest);
182      X(plan_destroy_internal)(ego->cld2);
183      X(plan_destroy_internal)(ego->cld1);
184 }
185 
print(const plan * ego_,printer * p)186 static void print(const plan *ego_, printer *p)
187 {
188      const P *ego = (const P *) ego_;
189      p->print(p, "(mpi-transpose-pairwise%s%(%p%)%(%p%)%(%p%)%(%p%))",
190 	      ego->preserve_input==2 ?"/p":"",
191 	      ego->cld1, ego->cld2, ego->cld2rest, ego->cld3);
192 }
193 
194 /* Given a process which_pe and a number of processes npes, fills
195    the array sched[npes] with a sequence of processes to communicate
196    with for a deadlock-free, optimum-overlap all-to-all communication.
197    (All processes must call this routine to get their own schedules.)
198    The schedule can be re-ordered arbitrarily as long as all processes
199    apply the same permutation to their schedules.
200 
201    The algorithm here is based upon the one described in:
202        J. A. M. Schreuder, "Constructing timetables for sport
203        competitions," Mathematical Programming Study 13, pp. 58-67 (1980).
204    In a sport competition, you have N teams and want every team to
205    play every other team in as short a time as possible (maximum overlap
206    between games).  This timetabling problem is therefore identical
207    to that of an all-to-all communications problem.  In our case, there
208    is one wrinkle: as part of the schedule, the process must do
209    some data transfer with itself (local data movement), analogous
210    to a requirement that each team "play itself" in addition to other
211    teams.  With this wrinkle, it turns out that an optimal timetable
212    (N parallel games) can be constructed for any N, not just for even
213    N as in the original problem described by Schreuder.
214 */
fill1_comm_sched(int * sched,int which_pe,int npes)215 static void fill1_comm_sched(int *sched, int which_pe, int npes)
216 {
217      int pe, i, n, s = 0;
218      A(which_pe >= 0 && which_pe < npes);
219      if (npes % 2 == 0) {
220 	  n = npes;
221 	  sched[s++] = which_pe;
222      }
223      else
224 	  n = npes + 1;
225      for (pe = 0; pe < n - 1; ++pe) {
226 	  if (npes % 2 == 0) {
227 	       if (pe == which_pe) sched[s++] = npes - 1;
228 	       else if (npes - 1 == which_pe) sched[s++] = pe;
229 	  }
230 	  else if (pe == which_pe) sched[s++] = pe;
231 
232 	  if (pe != which_pe && which_pe < n - 1) {
233 	       i = (pe - which_pe + (n - 1)) % (n - 1);
234 	       if (i < n/2)
235 		    sched[s++] = (pe + i) % (n - 1);
236 
237 	       i = (which_pe - pe + (n - 1)) % (n - 1);
238 	       if (i < n/2)
239 		    sched[s++] = (pe - i + (n - 1)) % (n - 1);
240 	  }
241      }
242      A(s == npes);
243 }
244 
245 /* Sort the communication schedule sched for npes so that the schedule
246    on process sortpe is ascending or descending (!ascending).  This is
247    necessary to allow in-place transposes when the problem does not
248    divide equally among the processes.  In this case there is one
249    process where the incoming blocks are bigger/smaller than the
250    outgoing blocks and thus have to be received in
251    descending/ascending order, respectively, to avoid overwriting data
252    before it is sent. */
sort1_comm_sched(int * sched,int npes,int sortpe,int ascending)253 static void sort1_comm_sched(int *sched, int npes, int sortpe, int ascending)
254 {
255      int *sortsched, i;
256      sortsched = (int *) MALLOC(npes * sizeof(int) * 2, OTHER);
257      fill1_comm_sched(sortsched, sortpe, npes);
258      if (ascending)
259 	  for (i = 0; i < npes; ++i)
260 	       sortsched[npes + sortsched[i]] = sched[i];
261      else
262 	  for (i = 0; i < npes; ++i)
263 	       sortsched[2*npes - 1 - sortsched[i]] = sched[i];
264      for (i = 0; i < npes; ++i)
265 	  sched[i] = sortsched[npes + i];
266      X(ifree)(sortsched);
267 }
268 
269 /* make the plans to do the post-MPI transpositions (shared with
270    transpose-alltoall) */
XM(mkplans_posttranspose)271 int XM(mkplans_posttranspose)(const problem_mpi_transpose *p, planner *plnr,
272 			      R *I, R *O, int my_pe,
273 			      plan **cld2, plan **cld2rest, plan **cld3,
274 			      INT *rest_Ioff, INT *rest_Ooff)
275 {
276      INT vn = p->vn;
277      INT b = p->block;
278      INT bt = XM(block)(p->ny, p->tblock, my_pe);
279      INT nxb = p->nx / b; /* number of equal-sized blocks */
280      INT nxr = p->nx - nxb * b; /* leftover rows after equal blocks */
281 
282      *cld2 = *cld2rest = *cld3 = NULL;
283      *rest_Ioff = *rest_Ooff = 0;
284 
285      if (!(p->flags & TRANSPOSED_OUT) && (nxr == 0 || I != O)) {
286 	  INT nx = p->nx * vn;
287 	  b *= vn;
288 	  *cld2 = X(mkplan_f_d)(plnr,
289 				X(mkproblem_rdft_0_d)(X(mktensor_3d)
290 						      (nxb, bt * b, b,
291 						       bt, b, nx,
292 						       b, 1, 1),
293 						      I, O),
294 				0, 0, NO_SLOW);
295 	  if (!*cld2) goto nada;
296 
297 	  if (nxr > 0) {
298 	       *rest_Ioff = nxb * bt * b;
299 	       *rest_Ooff = nxb * b;
300 	       b = nxr * vn;
301 	       *cld2rest = X(mkplan_f_d)(plnr,
302 					 X(mkproblem_rdft_0_d)(X(mktensor_2d)
303 							       (bt, b, nx,
304 								b, 1, 1),
305 							       I + *rest_Ioff,
306 							       O + *rest_Ooff),
307                                         0, 0, NO_SLOW);
308                if (!*cld2rest) goto nada;
309 	  }
310      }
311      else {
312 	  *cld2 = X(mkplan_f_d)(plnr,
313 				X(mkproblem_rdft_0_d)(
314 				     X(mktensor_4d)
315 				     (nxb, bt * b * vn, bt * b * vn,
316 				      bt, b * vn, vn,
317 				      b, vn, bt * vn,
318 				      vn, 1, 1),
319 				     I, O),
320 				0, 0, NO_SLOW);
321 	  if (!*cld2) goto nada;
322 
323 	  *rest_Ioff = *rest_Ooff = nxb * bt * b * vn;
324 	  *cld2rest = X(mkplan_f_d)(plnr,
325 				    X(mkproblem_rdft_0_d)(
326 					 X(mktensor_3d)
327 					 (bt, nxr * vn, vn,
328 					  nxr, vn, bt * vn,
329 					  vn, 1, 1),
330 					 I + *rest_Ioff, O + *rest_Ooff),
331 				    0, 0, NO_SLOW);
332 	  if (!*cld2rest) goto nada;
333 
334 	  if (!(p->flags & TRANSPOSED_OUT)) {
335 	       *cld3 = X(mkplan_f_d)(plnr,
336 				     X(mkproblem_rdft_0_d)(
337 					  X(mktensor_3d)
338 					  (p->nx, bt * vn, vn,
339 					   bt, vn, p->nx * vn,
340 					   vn, 1, 1),
341 					  O, O),
342 				     0, 0, NO_SLOW);
343 	       if (!*cld3) goto nada;
344 	  }
345      }
346 
347      return 1;
348 
349 nada:
350      X(plan_destroy_internal)(*cld3);
351      X(plan_destroy_internal)(*cld2rest);
352      X(plan_destroy_internal)(*cld2);
353      *cld2 = *cld2rest = *cld3 = NULL;
354      return 0;
355 }
356 
mkplan(const solver * ego_,const problem * p_,planner * plnr)357 static plan *mkplan(const solver *ego_, const problem *p_, planner *plnr)
358 {
359      const S *ego = (const S *) ego_;
360      const problem_mpi_transpose *p;
361      P *pln;
362      plan *cld1 = 0, *cld2 = 0, *cld2rest = 0, *cld3 = 0;
363      INT b, bt, vn, rest_Ioff, rest_Ooff;
364      INT *sbs, *sbo, *rbs, *rbo;
365      int pe, my_pe, n_pes, sort_pe = -1, ascending = 1;
366      R *I, *O;
367      static const plan_adt padt = {
368           XM(transpose_solve), awake, print, destroy
369      };
370 
371      UNUSED(ego);
372 
373      if (!applicable(ego, p_, plnr))
374           return (plan *) 0;
375 
376      p = (const problem_mpi_transpose *) p_;
377      vn = p->vn;
378      I = p->I; O = p->O;
379 
380      MPI_Comm_rank(p->comm, &my_pe);
381      MPI_Comm_size(p->comm, &n_pes);
382 
383      b = XM(block)(p->nx, p->block, my_pe);
384 
385      if (!(p->flags & TRANSPOSED_IN)) { /* b x ny x vn -> ny x b x vn */
386 	  cld1 = X(mkplan_f_d)(plnr,
387 			       X(mkproblem_rdft_0_d)(X(mktensor_3d)
388 						     (b, p->ny * vn, vn,
389 						      p->ny, vn, b * vn,
390 						      vn, 1, 1),
391 						     I, O),
392 			       0, 0, NO_SLOW);
393 	  if (XM(any_true)(!cld1, p->comm)) goto nada;
394      }
395      if (ego->preserve_input || NO_DESTROY_INPUTP(plnr)) I = O;
396 
397      if (XM(any_true)(!XM(mkplans_posttranspose)(p, plnr, I, O, my_pe,
398 						 &cld2, &cld2rest, &cld3,
399 						 &rest_Ioff, &rest_Ooff),
400 		      p->comm)) goto nada;
401 
402      pln = MKPLAN_MPI_TRANSPOSE(P, &padt, apply);
403 
404      pln->cld1 = cld1;
405      pln->cld2 = cld2;
406      pln->cld2rest = cld2rest;
407      pln->rest_Ioff = rest_Ioff;
408      pln->rest_Ooff = rest_Ooff;
409      pln->cld3 = cld3;
410      pln->preserve_input = ego->preserve_input ? 2 : NO_DESTROY_INPUTP(plnr);
411 
412      MPI_Comm_dup(p->comm, &pln->comm);
413 
414      n_pes = (int) X(imax)(XM(num_blocks)(p->nx, p->block),
415 			   XM(num_blocks)(p->ny, p->tblock));
416 
417      /* Compute sizes/offsets of blocks to exchange between processors */
418      sbs = (INT *) MALLOC(4 * n_pes * sizeof(INT), PLANS);
419      sbo = sbs + n_pes;
420      rbs = sbo + n_pes;
421      rbo = rbs + n_pes;
422      b = XM(block)(p->nx, p->block, my_pe);
423      bt = XM(block)(p->ny, p->tblock, my_pe);
424      for (pe = 0; pe < n_pes; ++pe) {
425 	  INT db, dbt; /* destination block sizes */
426 	  db = XM(block)(p->nx, p->block, pe);
427 	  dbt = XM(block)(p->ny, p->tblock, pe);
428 
429 	  sbs[pe] = b * dbt * vn;
430 	  sbo[pe] = pe * (b * p->tblock) * vn;
431 	  rbs[pe] = db * bt * vn;
432 	  rbo[pe] = pe * (p->block * bt) * vn;
433 
434 	  if (db * dbt > 0 && db * p->tblock != p->block * dbt) {
435 	       A(sort_pe == -1); /* only one process should need sorting */
436 	       sort_pe = pe;
437 	       ascending = db * p->tblock > p->block * dbt;
438 	  }
439      }
440      pln->n_pes = n_pes;
441      pln->my_pe = my_pe;
442      pln->send_block_sizes = sbs;
443      pln->send_block_offsets = sbo;
444      pln->recv_block_sizes = rbs;
445      pln->recv_block_offsets = rbo;
446 
447      if (my_pe >= n_pes) {
448 	  pln->sched = 0; /* this process is not doing anything */
449      }
450      else {
451 	  pln->sched = (int *) MALLOC(n_pes * sizeof(int), PLANS);
452 	  fill1_comm_sched(pln->sched, my_pe, n_pes);
453 	  if (sort_pe >= 0)
454 	       sort1_comm_sched(pln->sched, n_pes, sort_pe, ascending);
455      }
456 
457      X(ops_zero)(&pln->super.super.ops);
458      if (cld1) X(ops_add2)(&cld1->ops, &pln->super.super.ops);
459      if (cld2) X(ops_add2)(&cld2->ops, &pln->super.super.ops);
460      if (cld2rest) X(ops_add2)(&cld2rest->ops, &pln->super.super.ops);
461      if (cld3) X(ops_add2)(&cld3->ops, &pln->super.super.ops);
462      /* FIXME: should MPI operations be counted in "other" somehow? */
463 
464      return &(pln->super.super);
465 
466  nada:
467      X(plan_destroy_internal)(cld3);
468      X(plan_destroy_internal)(cld2rest);
469      X(plan_destroy_internal)(cld2);
470      X(plan_destroy_internal)(cld1);
471      return (plan *) 0;
472 }
473 
mksolver(int preserve_input)474 static solver *mksolver(int preserve_input)
475 {
476      static const solver_adt sadt = { PROBLEM_MPI_TRANSPOSE, mkplan, 0 };
477      S *slv = MKSOLVER(S, &sadt);
478      slv->preserve_input = preserve_input;
479      return &(slv->super);
480 }
481 
XM(transpose_pairwise_register)482 void XM(transpose_pairwise_register)(planner *p)
483 {
484      int preserve_input;
485      for (preserve_input = 0; preserve_input <= 1; ++preserve_input)
486 	  REGISTER_SOLVER(p, mksolver(preserve_input));
487 }
488