1 /* ---------------------------------------------------------------------
2 *
3 *  -- PBLAS routine (version 2.0) --
4 *     University of Tennessee, Knoxville, Oak Ridge National Laboratory,
5 *     and University of California, Berkeley.
6 *     April 1, 1998
7 *
8 *  ---------------------------------------------------------------------
9 */
10 /*
11 *  Include files
12 */
13 #include "pblas.h"
14 #include "PBpblas.h"
15 #include "PBtools.h"
16 #include "PBblacs.h"
17 #include "PBblas.h"
18 
19 #ifdef __STDC__
pzswap_(int * N,double * X,int * IX,int * JX,int * DESCX,int * INCX,double * Y,int * IY,int * JY,int * DESCY,int * INCY)20 void pzswap_( int * N,
21               double * X, int * IX, int * JX, int * DESCX, int * INCX,
22               double * Y, int * IY, int * JY, int * DESCY, int * INCY )
23 #else
24 void pzswap_( N, X, IX, JX, DESCX, INCX, Y, IY, JY, DESCY, INCY )
25 /*
26 *  .. Scalar Arguments ..
27 */
28    int            * INCX, * INCY, * IX, * IY, * JX, * JY, * N;
29 /*
30 *  .. Array Arguments ..
31 */
32    int            * DESCX, * DESCY;
33    double         * X, * Y;
34 #endif
35 {
36 /*
37 *  Purpose
38 *  =======
39 *
40 *  PZSWAP  swaps two subvectors,
41 *
42 *     sub( Y ) := sub( X ) and sub( X ) := sub( Y )
43 *
44 *  where sub( X ) denotes X(IX,JX:JX+N-1) if INCX = M_X,
45 *                         X(IX:IX+N-1,JX) if INCX = 1 and INCX <> M_X,
46 *
47 *        sub( Y ) denotes Y(IY,JY:JY+N-1) if INCY = M_Y,
48 *                         Y(IY:IY+N-1,JY) if INCY = 1 and INCY <> M_Y.
49 *
50 *  Notes
51 *  =====
52 *
53 *  A description  vector  is associated with each 2D block-cyclicly dis-
54 *  tributed matrix.  This  vector  stores  the  information  required to
55 *  establish the  mapping  between a  matrix entry and its corresponding
56 *  process and memory location.
57 *
58 *  In  the  following  comments,   the character _  should  be  read  as
59 *  "of  the  distributed  matrix".  Let  A  be a generic term for any 2D
60 *  block cyclicly distributed matrix.  Its description vector is DESC_A:
61 *
62 *  NOTATION         STORED IN       EXPLANATION
63 *  ---------------- --------------- ------------------------------------
64 *  DTYPE_A (global) DESCA[ DTYPE_ ] The descriptor type.
65 *  CTXT_A  (global) DESCA[ CTXT_  ] The BLACS context handle, indicating
66 *                                   the NPROW x NPCOL BLACS process grid
67 *                                   A  is  distributed over. The context
68 *                                   itself  is  global,  but  the handle
69 *                                   (the integer value) may vary.
70 *  M_A     (global) DESCA[ M_     ] The  number of rows in the distribu-
71 *                                   ted matrix A, M_A >= 0.
72 *  N_A     (global) DESCA[ N_     ] The number of columns in the distri-
73 *                                   buted matrix A, N_A >= 0.
74 *  IMB_A   (global) DESCA[ IMB_   ] The number of rows of the upper left
75 *                                   block of the matrix A, IMB_A > 0.
76 *  INB_A   (global) DESCA[ INB_   ] The  number  of columns of the upper
77 *                                   left   block   of   the  matrix   A,
78 *                                   INB_A > 0.
79 *  MB_A    (global) DESCA[ MB_    ] The blocking factor used to  distri-
80 *                                   bute the last  M_A-IMB_A  rows of A,
81 *                                   MB_A > 0.
82 *  NB_A    (global) DESCA[ NB_    ] The blocking factor used to  distri-
83 *                                   bute the last  N_A-INB_A  columns of
84 *                                   A, NB_A > 0.
85 *  RSRC_A  (global) DESCA[ RSRC_  ] The process row over which the first
86 *                                   row of the matrix  A is distributed,
87 *                                   NPROW > RSRC_A >= 0.
88 *  CSRC_A  (global) DESCA[ CSRC_  ] The  process column  over  which the
89 *                                   first column of  A  is  distributed.
90 *                                   NPCOL > CSRC_A >= 0.
91 *  LLD_A   (local)  DESCA[ LLD_   ] The  leading dimension  of the local
92 *                                   array  storing  the  local blocks of
93 *                                   the distributed matrix A,
94 *                                   IF( Lc( 1, N_A ) > 0 )
95 *                                      LLD_A >= MAX( 1, Lr( 1, M_A ) )
96 *                                   ELSE
97 *                                      LLD_A >= 1.
98 *
99 *  Let K be the number of  rows of a matrix A starting at the global in-
100 *  dex IA,i.e, A( IA:IA+K-1, : ). Lr( IA, K ) denotes the number of rows
101 *  that the process of row coordinate MYROW ( 0 <= MYROW < NPROW ) would
102 *  receive if these K rows were distributed over NPROW processes.  If  K
103 *  is the number of columns of a matrix  A  starting at the global index
104 *  JA, i.e, A( :, JA:JA+K-1, : ), Lc( JA, K ) denotes the number  of co-
105 *  lumns that the process MYCOL ( 0 <= MYCOL < NPCOL ) would  receive if
106 *  these K columns were distributed over NPCOL processes.
107 *
108 *  The values of Lr() and Lc() may be determined via a call to the func-
109 *  tion PB_Cnumroc:
110 *  Lr( IA, K ) = PB_Cnumroc( K, IA, IMB_A, MB_A, MYROW, RSRC_A, NPROW )
111 *  Lc( JA, K ) = PB_Cnumroc( K, JA, INB_A, NB_A, MYCOL, CSRC_A, NPCOL )
112 *
113 *  Arguments
114 *  =========
115 *
116 *  N       (global input) INTEGER
117 *          On entry,  N  specifies the  length of the  subvectors to  be
118 *          swapped. N must be at least zero.
119 *
120 *  X       (local input/local output) COMPLEX*16 array
121 *          On entry, X is an array of dimension (LLD_X, Kx), where LLD_X
122 *          is   at  least  MAX( 1, Lr( 1, IX ) )  when  INCX = M_X   and
123 *          MAX( 1, Lr( 1, IX+N-1 ) )  otherwise,  and,  Kx  is  at least
124 *          Lc( 1, JX+N-1 )  when  INCX = M_X  and Lc( 1, JX ) otherwise.
125 *          Before  entry,  this array  contains the local entries of the
126 *          matrix X. On exit, sub( X ) is overwritten with sub( Y ).
127 *
128 *  IX      (global input) INTEGER
129 *          On entry, IX  specifies X's global row index, which points to
130 *          the beginning of the submatrix sub( X ).
131 *
132 *  JX      (global input) INTEGER
133 *          On entry, JX  specifies X's global column index, which points
134 *          to the beginning of the submatrix sub( X ).
135 *
136 *  DESCX   (global and local input) INTEGER array
137 *          On entry, DESCX  is an integer array of dimension DLEN_. This
138 *          is the array descriptor for the matrix X.
139 *
140 *  INCX    (global input) INTEGER
141 *          On entry,  INCX   specifies  the  global  increment  for  the
142 *          elements of  X.  Only two values of  INCX   are  supported in
143 *          this version, namely 1 and M_X. INCX  must not be zero.
144 *
145 *  Y       (local input/local output) COMPLEX*16 array
146 *          On entry, Y is an array of dimension (LLD_Y, Ky), where LLD_Y
147 *          is   at  least  MAX( 1, Lr( 1, IY ) )  when  INCY = M_Y   and
148 *          MAX( 1, Lr( 1, IY+N-1 ) )  otherwise,  and,  Ky  is  at least
149 *          Lc( 1, JY+N-1 )  when  INCY = M_Y  and Lc( 1, JY ) otherwise.
150 *          Before  entry,  this array  contains the local entries of the
151 *          matrix Y. On exit, sub( Y ) is overwritten with sub( X ).
152 *
153 *  IY      (global input) INTEGER
154 *          On entry, IY  specifies Y's global row index, which points to
155 *          the beginning of the submatrix sub( Y ).
156 *
157 *  JY      (global input) INTEGER
158 *          On entry, JY  specifies Y's global column index, which points
159 *          to the beginning of the submatrix sub( Y ).
160 *
161 *  DESCY   (global and local input) INTEGER array
162 *          On entry, DESCY  is an integer array of dimension DLEN_. This
163 *          is the array descriptor for the matrix Y.
164 *
165 *  INCY    (global input) INTEGER
166 *          On entry,  INCY   specifies  the  global  increment  for  the
167 *          elements of  Y.  Only two values of  INCY   are  supported in
168 *          this version, namely 1 and M_Y. INCY  must not be zero.
169 *
170 *  -- Written on April 1, 1998 by
171 *     Antoine Petitet, University of Tennessee, Knoxville 37996, USA.
172 *
173 *  ---------------------------------------------------------------------
174 */
175 /*
176 *  .. Local Scalars ..
177 */
178    char           Xscope, Yscope, * one, * top, tran, * zero;
179    int            OneBlock, OneDgrid, RRorCC, Square, Xcol, Xi, XisD, XisR,
180                   Xinb1D, XinbD, XisRow, Xii, Xj, Xjj, Xld, Xlinc, Xm, XmyprocD,
181                   XmyprocR, Xn, XnbD, XnpD, XnprocsD, XnprocsR, XprocD, XprocR,
182                   Xroc, Xrow, Ycol, Yi, Yii, Yinb1D, YinbD, YisD, YisR, YisRow,
183                   Yj, Yjj, Yld, Ylinc, Ym, YmyprocD, YmyprocR, Yn, YnbD, YnpD,
184                   YnprocsD, YnprocsR, YprocD, YprocR, Yroc, Yrow, cdst, csrc,
185                   ctxt, dst, gcdPQ, info, ione=1, k, l, lcmPQ, lcmb, mycol,
186                   myrow, npcol, npq, nprow, p, q, rdst, rsrc, src, size;
187    PBTYP_T        * type;
188    PB_VM_T        VM;
189 /*
190 *  .. Local Arrays ..
191 */
192    int            Xd[DLEN_], Yd[DLEN_];
193    char           * buf = NULL;
194 /* ..
195 *  .. Executable Statements ..
196 *
197 */
198    PB_CargFtoC( *IX, *JX, DESCX, &Xi, &Xj, Xd );
199    PB_CargFtoC( *IY, *JY, DESCY, &Yi, &Yj, Yd );
200 #ifndef NO_ARGCHK
201 /*
202 *  Test the input parameters
203 */
204    Cblacs_gridinfo( ( ctxt = Xd[CTXT_] ), &nprow, &npcol, &myrow, &mycol );
205    if( !( info = ( ( nprow == -1 ) ? -( 501 + CTXT_ ) : 0 ) ) )
206    {
207       PB_Cchkvec( ctxt, "PZSWAP", "X", *N, 1, Xi, Xj, Xd, *INCX,  5, &info );
208       PB_Cchkvec( ctxt, "PZSWAP", "Y", *N, 1, Yi, Yj, Yd, *INCY, 10, &info );
209    }
210    if( info ) { PB_Cabort( ctxt, "PZSWAP", info ); return; }
211 #endif
212 /*
213 *  Quick return if possible
214 */
215    if( *N == 0 ) return;
216 /*
217 *  Retrieve process grid information
218 */
219 #ifdef NO_ARGCHK
220    Cblacs_gridinfo( ( ctxt = Xd[CTXT_] ), &nprow, &npcol, &myrow, &mycol );
221 #endif
222 /*
223 *  Determine if sub( X ) is distributed or not
224 */
225    if( ( XisRow = ( *INCX == Xd[M_] ) ) != 0 )
226       XisD = ( ( Xd[CSRC_] >= 0 ) && ( ( XnprocsD = npcol ) > 1 ) );
227    else
228       XisD = ( ( Xd[RSRC_] >= 0 ) && ( ( XnprocsD = nprow ) > 1 ) );
229 /*
230 *  Determine if sub( Y ) is distributed or not
231 */
232    if( ( YisRow = ( *INCY == Yd[M_] ) ) != 0 )
233       YisD = ( ( Yd[CSRC_] >= 0 ) && ( ( YnprocsD = npcol ) > 1 ) );
234    else
235       YisD = ( ( Yd[RSRC_] >= 0 ) && ( ( YnprocsD = nprow ) > 1 ) );
236 /*
237 *  Are sub( X ) and sub( Y ) both row or column vectors ?
238 */
239    RRorCC = ( ( XisRow && YisRow ) || ( !( XisRow ) && !( YisRow ) ) );
240 /*
241 *  XisD && YisD <=> both vector operands are indeed distributed
242 */
243    if( XisD && YisD )
244    {
245 /*
246 *  Retrieve sub( X )'s local information: Xii, Xjj, Xrow, Xcol ...
247 */
248       PB_Cinfog2l( Xi, Xj, Xd, nprow, npcol, myrow, mycol, &Xii, &Xjj,
249                    &Xrow, &Xcol );
250       if( XisRow )
251       {
252          XinbD  = Xd[INB_ ]; XnbD     = Xd[NB_ ]; Xld      = Xd[LLD_];
253          Xlinc  = Xld;
254          XprocD = Xcol;      XmyprocD = mycol;
255          XprocR = Xrow;      XmyprocR = myrow;    XnprocsR = nprow;
256          XisR   = ( ( Xrow == -1 ) || ( XnprocsR == 1 ) );
257          Mfirstnb( Xinb1D, *N, Xj, XinbD, XnbD );
258       }
259       else
260       {
261          XinbD  = Xd[IMB_ ]; XnbD     = Xd[MB_ ]; Xld      = Xd[LLD_];
262          Xlinc  = 1;
263          XprocD = Xrow;      XmyprocD = myrow;
264          XprocR = Xcol;      XmyprocR = mycol;    XnprocsR = npcol;
265          XisR   = ( ( Xcol == -1 ) || ( XnprocsR == 1 ) );
266          Mfirstnb( Xinb1D, *N, Xi, XinbD, XnbD );
267       }
268 /*
269 *  Retrieve sub( Y )'s local information: Yii, Yjj, Yrow, Ycol ...
270 */
271       PB_Cinfog2l( Yi, Yj, Yd, nprow, npcol, myrow, mycol, &Yii, &Yjj,
272                    &Yrow, &Ycol );
273       if( YisRow )
274       {
275          YinbD  = Yd[INB_ ]; YnbD     = Yd[NB_ ]; Yld      = Yd[LLD_];
276          Ylinc  = Yld;
277          YprocD = Ycol;      YmyprocD = mycol;
278          YprocR = Yrow;      YmyprocR = myrow;    YnprocsR = nprow;
279          YisR   = ( ( Yrow == -1 ) || ( YnprocsR == 1 ) );
280          Mfirstnb( Yinb1D, *N, Yj, YinbD, YnbD );
281       }
282       else
283       {
284          YinbD  = Yd[IMB_ ]; YnbD     = Yd[MB_ ]; Yld      = Yd[LLD_];
285          Ylinc  = 1;
286          YprocD = Yrow;      YmyprocD = myrow;
287          YprocR = Ycol;      YmyprocR = mycol;    YnprocsR = npcol;
288          YisR   = ( ( Ycol == -1 ) || ( YnprocsR == 1 ) );
289          Mfirstnb( Yinb1D, *N, Yi, YinbD, YnbD );
290       }
291 /*
292 *  Do sub( X ) and sub( Y ) span more than one process ?
293 */
294       OneDgrid = ( ( XnprocsD ==  1 ) && ( YnprocsD ==  1 ) );
295       OneBlock = ( ( Xinb1D   >= *N ) && ( Yinb1D   >= *N ) );
296 /*
297 *  Are sub( X ) and sub( Y ) distributed in the same manner ?
298 */
299       Square   = ( ( Xinb1D   ==   Yinb1D ) && ( XnbD == YnbD ) &&
300                    ( XnprocsD == YnprocsD ) );
301 
302       if( !( XisR ) )
303       {
304 /*
305 *  sub( X ) is distributed but not replicated
306 */
307          if( YisR )
308          {
309 /*
310 *  If sub( X ) is not replicated, but sub( Y ) is, a process row or column
311 *  YprocR need to be selected. It will contain the non-replicated vector to
312 *  swap sub( X ) with.
313 */
314             if( RRorCC )
315             {
316 /*
317 *  sub( X ) and sub( Y ) are both row or column vectors
318 */
319                if( ( OneDgrid || OneBlock || Square ) && ( XprocD == YprocD ) )
320                {
321 /*
322 *  sub( X ) and sub( Y ) start in the same process row or column XprocD=YprocD.
323 *  Enforce a purely local operation by choosing YprocR to be equal to XprocR.
324 */
325                   YprocR = XprocR;
326                }
327                else
328                {
329 /*
330 *  Otherwise, communication has to occur, so choose the next process row or
331 *  column for YprocR to maximize the number of links, i.e reduce contention.
332 */
333                   YprocR = MModAdd1( XprocR, XnprocsR );
334                }
335             }
336             else
337             {
338 /*
339 *  sub( X ) and sub( Y ) are distributed in orthogonal directions, what is
340 *  chosen for YprocR does not really matter. Select the process origin.
341 */
342                YprocR = XprocD;
343             }
344          }
345          else
346          {
347 /*
348 *  Neither sub( X ) nor sub( Y ) are replicated. If I am not in process row or
349 *  column XprocR and not in process row or column YprocR, then quick return.
350 */
351             if( ( XmyprocR != XprocR ) && ( YmyprocR != YprocR ) )
352                return;
353          }
354       }
355       else
356       {
357 /*
358 *  sub( X ) is distributed and replicated (so no quick return possible)
359 */
360          if( YisR )
361          {
362 /*
363 *  sub( Y ) is distributed and replicated as well
364 */
365             if( RRorCC )
366             {
367 /*
368 *  sub( X ) and sub( Y ) are both row or column vectors
369 */
370                if( ( OneDgrid || OneBlock || Square ) && ( XprocD == YprocD ) )
371                {
372 /*
373 *  sub( X ) and sub( Y ) start in the same process row or column XprocD=YprocD.
374 *  Enforce a purely local operation by choosing XprocR and YprocR to be equal
375 *  to zero.
376 */
377                   XprocR = YprocR = 0;
378                }
379                else
380                {
381 /*
382 *  Otherwise, communication has to occur, so select YprocR to be zero and the
383 *  next process row or column for XprocR in order to maximize the number of
384 *  used links, i.e reduce contention.
385 */
386                   YprocR = 0;
387                   XprocR = MModAdd1( YprocR, YnprocsR );
388                }
389             }
390             else
391             {
392 /*
393 *  sub( X ) and sub( Y ) are distributed in orthogonal directions, select the
394 *  origin processes.
395 */
396                XprocR = YprocD;
397                YprocR = XprocD;
398             }
399          }
400          else
401          {
402 /*
403 *  sub( Y ) is distributed, but not replicated
404 */
405             if( RRorCC )
406             {
407 /*
408 *  sub( X ) and sub( Y ) are both row or column vectors
409 */
410                if( ( OneDgrid || OneBlock || Square ) && ( XprocD == YprocD ) )
411                {
412 /*
413 *  sub( X ) and sub( Y ) start in the same process row or column XprocD=YprocD.
414 *  Enforce a purely local operation by choosing XprocR to be equal to YprocR.
415 */
416                   XprocR = YprocR;
417                }
418                else
419                {
420 /*
421 *  Otherwise, communication has to occur, so choose the next process row or
422 *  column for XprocR to maximize the number of links, i.e reduce contention.
423 */
424                   XprocR = MModAdd1( YprocR, YnprocsR );
425                }
426             }
427             else
428             {
429 /*
430 *  sub( X ) and sub( Y ) are distributed in orthogonal directions, what is
431 *  chosen for XprocR does not really matter. Select the origin process.
432 */
433                XprocR = YprocD;
434             }
435          }
436       }
437 /*
438 *  Even if sub( X ) and/or sub( Y ) are replicated, only two process row or
439 *  column are active, namely XprocR and YprocR. If any of those operands is
440 *  replicated, broadcast will occur (unless there is an easy way out).
441 */
442       type = PB_Cztypeset(); size = type->size;
443 /*
444 *  A purely local operation occurs iff the operands start in the same process
445 *  and, if either the grid is mono-dimensional or there is a single local block
446 *  to be swapped or if both operands are aligned.
447 */
448       if( ( (    RRorCC   && ( XprocD == YprocD ) && ( XprocR == YprocR ) ) ||
449             ( !( RRorCC ) && ( XprocD == YprocR ) && ( XprocR == YprocD ) ) ) &&
450           ( OneDgrid || OneBlock || ( RRorCC && Square ) ) )
451       {
452          if( ( !XisR &&         ( XmyprocR == XprocR ) &&
453                !YisR &&         ( YmyprocR == YprocR ) ) ||
454              ( !XisR && YisR && ( YmyprocR == YprocR ) ) ||
455              ( !YisR && XisR && ( XmyprocR == XprocR ) ) ||
456              (  XisR && YisR                           ) )
457          {
458             XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD,
459                                XnprocsD );
460             YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD,
461                                YnprocsD );
462             if( ( XnpD > 0 ) && ( YnpD > 0 ) )
463             {
464                zswap_( &XnpD,
465                        Mptr( ((char *) X), Xii, Xjj, Xld, size ), &Xlinc,
466                        Mptr( ((char *) Y), Yii, Yjj, Yld, size ), &Ylinc );
467             }
468             if( RRorCC && XisR && YisR ) return;
469          }
470       }
471       else if( ( RRorCC && OneDgrid ) || OneBlock || Square )
472       {
473 /*
474 *  Otherwise, it may be possible to swap the distributed vectors in a single
475 *  message exchange iff the grid is mono-dimensional and the operands are
476 *  distributed in the same direction, or there is just one block to be exchanged
477 *  or if both operands are similarly distributed in their respective direction.
478 */
479          if( RRorCC && ( XprocR != YprocR ) )
480          {
481 /*
482 *  Both operands are distributed in the same direction, but reside in different
483 *  process rows or columns.
484 */
485             if( XmyprocR == XprocR )
486             {
487                XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD,
488                                   XnprocsD );
489                if( XnpD > 0 )
490                {
491                   dst = YprocD + MModSub( XmyprocD, XprocD, XnprocsD );
492                   dst = MPosMod( dst, YnprocsD );
493                   if( XisRow )
494                   {
495                      Czgesd2d( ctxt, 1, XnpD, Mptr( ((char*) X), Xii, Xjj,
496                                Xld, size ), Xld, YprocR, dst );
497                      Czgerv2d( ctxt, 1, XnpD, Mptr( ((char*) X), Xii, Xjj,
498                                Xld, size ), Xld, YprocR, dst );
499                   }
500                   else
501                   {
502 
503                      Czgesd2d( ctxt, XnpD, 1, Mptr( ((char*) X), Xii, Xjj,
504                                Xld, size ), Xld, dst, YprocR );
505                      Czgerv2d( ctxt, XnpD, 1, Mptr( ((char*) X), Xii, Xjj,
506                                Xld, size ), Xld, dst, YprocR );
507                   }
508                }
509             }
510             if( YmyprocR == YprocR )
511             {
512                YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD,
513                                   YnprocsD );
514                if( YnpD > 0 )
515                {
516                   dst = XprocD + MModSub( YmyprocD, YprocD, YnprocsD );
517                   dst = MPosMod( dst, XnprocsD );
518                   if( YisRow )
519                   {
520                      Czgesd2d( ctxt, 1, YnpD, Mptr( ((char*) Y), Yii, Yjj,
521                                Yld, size ), Yld, XprocR, dst );
522                      Czgerv2d( ctxt, 1, YnpD, Mptr( ((char*) Y), Yii, Yjj,
523                                Yld, size ), Yld, XprocR, dst );
524                   }
525                   else
526                   {
527                      Czgesd2d( ctxt, YnpD, 1, Mptr( ((char*) Y), Yii, Yjj,
528                                Yld, size ), Yld, dst, XprocR );
529                      Czgerv2d( ctxt, YnpD, 1, Mptr( ((char*) Y), Yii, Yjj,
530                                Yld, size ), Yld, dst, XprocR );
531                   }
532                }
533             }
534          }
535          else
536          {
537 /*
538 *  General case when just one message needs to be exchanged
539 */
540             if( XmyprocR == XprocR )
541             {
542 /*
543 *  The processes owning a piece of sub( X ) send it to the corresponding
544 *  process owning s piece of sub ( Y ).
545 */
546                XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD,
547                                   XnprocsD );
548                if( XnpD > 0 )
549                {
550                   dst = YprocD + MModSub( XmyprocD, XprocD, XnprocsD );
551                   dst = MPosMod( dst, YnprocsD );
552                   if( YisRow ) { rdst = YprocR; cdst = dst; }
553                   else         { rdst = dst; cdst = YprocR; }
554 
555                   if( ( myrow == rdst ) && ( mycol == cdst ) )
556                   {
557                      zswap_( &XnpD,  Mptr( ((char *) X), Xii, Xjj, Xld,
558                              size ), &Xlinc, Mptr( ((char *) Y), Yii, Yjj, Yld,
559                              size ), &Ylinc );
560                   }
561                   else
562                   {
563                      if( XisRow )
564                         Czgesd2d( ctxt, 1, XnpD, Mptr( ((char *) X), Xii,
565                                   Xjj, Xld, size ), Xld, rdst, cdst );
566                      else
567                         Czgesd2d( ctxt, XnpD, 1, Mptr( ((char *) X), Xii,
568                                   Xjj, Xld, size ), Xld, rdst, cdst );
569                   }
570                }
571             }
572             if( YmyprocR == YprocR )
573             {
574 /*
575 *  The processes owning a piece of sub( Y ) receive the corresponding piece
576 *  of sub( X ) and send the piece of sub( Y ) they own to the same process.
577 */
578                YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD,
579                                   YnprocsD );
580                if( YnpD > 0 )
581                {
582                   src = XprocD + MModSub( YmyprocD, YprocD, YnprocsD );
583                   src = MPosMod( src, XnprocsD );
584                   if( XisRow ) { rsrc = XprocR; csrc = src; }
585                   else         { rsrc = src; csrc = XprocR; }
586 
587                   if( ( myrow != rsrc ) || ( mycol != csrc ) )
588                   {
589                      buf = PB_Cmalloc( YnpD * size );
590                      if( XisRow )
591                         Czgerv2d( ctxt, 1, YnpD, buf,    1, rsrc, csrc );
592                      else
593                         Czgerv2d( ctxt, YnpD, 1, buf, YnpD, rsrc, csrc );
594                      if( YisRow )
595                         Czgesd2d( ctxt, 1, YnpD, Mptr( ((char *) Y), Yii,
596                                   Yjj, Yld, size ), Yld, rsrc, csrc );
597                      else
598                         Czgesd2d( ctxt, YnpD, 1, Mptr( ((char *) Y), Yii,
599                                   Yjj, Yld, size ), Yld, rsrc, csrc );
600                      zcopy_( &YnpD, buf, &ione, Mptr( ((char *) Y), Yii,
601                              Yjj, Yld, size ), &Ylinc );
602                      if( buf ) free( buf );
603                   }
604                }
605             }
606             if( XmyprocR == XprocR )
607             {
608 /*
609 *  The processes owning a piece of sub( X ) receive the corresponding piece
610 *  of sub( Y ).
611 */
612                if( XnpD > 0 )
613                {
614                   if( ( myrow != rdst ) || ( mycol != cdst ) )
615                   {
616                      buf = PB_Cmalloc( XnpD * size );
617                      if( YisRow )
618                         Czgerv2d( ctxt, 1, XnpD, buf,    1, rdst, cdst );
619                      else
620                         Czgerv2d( ctxt, XnpD, 1, buf, XnpD, rdst, cdst );
621                      zcopy_( &XnpD, buf, &ione, Mptr( ((char *) X), Xii,
622                              Xjj, Xld, size ), &Xlinc );
623                      if( buf ) free( buf );
624                   }
625                }
626             }
627          }
628       }
629       else if( ( XmyprocR == XprocR ) || ( YmyprocR == YprocR ) )
630       {
631 /*
632 *  General case
633 */
634          tran   = ( RRorCC ? CNOTRAN : CTRAN );
635          if( XisRow ) { Xscope = CCOLUMN; Xm = 1; rsrc = XprocR; }
636          else         { Xscope = CROW;    Xn = 1; csrc = XprocR; }
637          if( YisRow ) { Yscope = CCOLUMN; Ym = 1; rdst = YprocR; }
638          else         { Yscope = CROW;    Yn = 1; cdst = YprocR; }
639          lcmb   = PB_Clcm( XnprocsD * XnbD, YnprocsD * YnbD );
640          one    = type->one; zero = type->zero;
641          gcdPQ  = PB_Cgcd( XnprocsD, YnprocsD );
642          lcmPQ  = ( XnprocsD / gcdPQ ) * YnprocsD;
643 
644          for( k = 0; k < gcdPQ; k++ )
645          {
646             p = 0; q = k;
647 
648             for( l = 0; l < lcmPQ; l++ )
649             {
650                Xroc = MModAdd( XprocD, p, XnprocsD );
651                Yroc = MModAdd( YprocD, q, YnprocsD );
652 
653                if( ( XmyprocD == Xroc ) || ( YmyprocD == Yroc ) )
654                {
655                   XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, Xroc, XprocD,
656                                      XnprocsD );
657                   YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, Yroc, YprocD,
658                                      YnprocsD );
659                   PB_CVMinit( &VM, 0, XnpD, YnpD, Xinb1D, Yinb1D, XnbD, YnbD,
660                               p, q, XnprocsD, YnprocsD, lcmb );
661                   if( npq = PB_CVMnpq( &VM ) )
662                   {
663                      if( (    RRorCC   && ( Xroc ==   Yroc ) &&
664                            ( XprocR == YprocR ) ) ||
665                          ( !( RRorCC ) && ( Xroc == YprocR ) &&
666                            ( XprocR == Yroc   ) ) )
667                      {
668 /*
669 *  If I am at the intersection of the process cross, or simply common to the
670 *  process rows or columns owning sub( X ) and sub( Y )
671 */
672                         if( ( YmyprocD == Yroc ) && ( YmyprocR == YprocR ) )
673                         {
674                            PB_CVMswp( type, &VM, ROW, &Xscope, &tran, npq,
675                                       Mptr( ((char *) X), Xii, Xjj, Xld, size ),
676                                       Xlinc, Mptr( ((char *) Y), Yii, Yjj, Yld,
677                                       size ), Ylinc );
678                         }
679                      }
680                      else
681                      {
682 /*
683 *  Perform the message exchange: pack the data I own, send it, receive the
684 *  remote data, and unpack it.
685 */
686                         if( ( XmyprocR == XprocR ) && ( XmyprocD == Xroc ) )
687                         {
688                            if( XisRow ) { Xn = npq; }
689                            else         { Xm = npq; }
690                            if( YisRow ) { Yn = npq; cdst = Yroc; }
691                            else         { Ym = npq; rdst = Yroc; }
692                            buf = PB_Cmalloc( npq * size );
693                            PB_CVMpack( type, &VM, ROW, &Xscope, PACKING, NOTRAN,
694                                        npq, 1, one, Mptr( ((char *) X), Xii,
695                                        Xjj, Xld, size ), Xld, zero, buf, Xm );
696                            Czgesd2d( ctxt, Xm, Xn, buf, Xm, rdst, cdst );
697                            Czgerv2d( ctxt, Ym, Yn, buf, Ym, rdst, cdst );
698                            PB_CVMpack( type, &VM, ROW, &Xscope, UNPACKING,
699                                        &tran, npq, 1, zero, Mptr( ((char *) X),
700                                        Xii, Xjj, Xld, size ), Xld, one, buf,
701                                        Ym );
702                            if( buf ) free ( buf );
703                         }
704                         if( ( YmyprocR == YprocR ) && ( YmyprocD == Yroc  ) )
705                         {
706                            if( XisRow ) { Xn = npq; csrc = Xroc; }
707                            else         { Xm = npq; rsrc = Xroc; }
708                            if( YisRow ) { Yn = npq; }
709                            else         { Ym = npq; }
710                            buf = PB_Cmalloc( npq * size );
711                            PB_CVMpack( type, &VM, COLUMN, &Yscope, PACKING,
712                                        NOTRAN, npq, 1, one, Mptr( ((char *) Y),
713                                        Yii, Yjj, Yld, size ), Yld, zero, buf,
714                                        Ym );
715                            Czgesd2d( ctxt, Ym, Yn, buf, Ym, rsrc, csrc );
716                            Czgerv2d( ctxt, Xm, Xn, buf, Xm, rsrc, csrc );
717                            PB_CVMpack( type, &VM, COLUMN, &Yscope, UNPACKING,
718                                        &tran, npq, 1, zero, Mptr( ((char *) Y),
719                                        Yii, Yjj, Yld, size ), Yld, one, buf,
720                                        Xm );
721                            if( buf ) free ( buf );
722                         }
723                      }
724                   }
725                }
726                p = MModAdd1( p, XnprocsD );
727                q = MModAdd1( q, YnprocsD );
728             }
729          }
730       }
731 
732       if( XisR )
733       {
734 /*
735 *  Replicate sub( X ) when necessary
736 */
737          XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD, XnprocsD );
738          if( XnpD > 0 )
739          {
740             if( XisRow )
741             {
742                top = PB_Ctop( &ctxt, BCAST, COLUMN, TOP_GET );
743                if( XmyprocR == XprocR )
744                   Czgebs2d( ctxt, COLUMN, top, 1, XnpD, Mptr( ((char *) X),
745                             Xii, Xjj, Xld, size ), Xld );
746                else
747                   Czgebr2d( ctxt, COLUMN, top, 1, XnpD, Mptr( ((char *) X),
748                             Xii, Xjj, Xld, size ), Xld, XprocR, XmyprocD );
749             }
750             else
751             {
752                top = PB_Ctop( &ctxt, BCAST, ROW,    TOP_GET );
753                if( XmyprocR == XprocR )
754                   Czgebs2d( ctxt, ROW,    top, XnpD, 1, Mptr( ((char *) X),
755                             Xii, Xjj, Xld, size ), Xld );
756                else
757                   Czgebr2d( ctxt, ROW,    top, XnpD, 1, Mptr( ((char *) X),
758                             Xii, Xjj, Xld, size ), Xld, XmyprocD, XprocR );
759             }
760          }
761       }
762 
763       if( YisR )
764       {
765 /*
766 *  Replicate sub( Y ) when necessary
767 */
768          YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD, YnprocsD );
769          if( YnpD > 0 )
770          {
771             if( YisRow )
772             {
773                top = PB_Ctop( &ctxt, BCAST, COLUMN, TOP_GET );
774                if( YmyprocR == YprocR )
775                   Czgebs2d( ctxt, COLUMN, top, 1, YnpD, Mptr( ((char *) Y),
776                             Yii, Yjj, Yld, size ), Yld );
777                else
778                   Czgebr2d( ctxt, COLUMN, top, 1, YnpD, Mptr( ((char *) Y),
779                             Yii, Yjj, Yld, size ), Yld, YprocR, YmyprocD );
780             }
781             else
782             {
783                top = PB_Ctop( &ctxt, BCAST, ROW,    TOP_GET );
784                if( YmyprocR == YprocR )
785                   Czgebs2d( ctxt, ROW,    top, YnpD, 1, Mptr( ((char *) Y),
786                             Yii, Yjj, Yld, size ), Yld );
787                else
788                   Czgebr2d( ctxt, ROW,    top, YnpD, 1, Mptr( ((char *) Y),
789                             Yii, Yjj, Yld, size ), Yld, YmyprocD, YprocR );
790             }
791          }
792       }
793    }
794    else if( !( XisD ) && YisD )
795    {
796 /*
797 *  sub( X ) is not distributed and sub( Y ) is distributed.
798 */
799       PB_CpswapND( PB_Cztypeset(), *N, ((char *) X), Xi, Xj, Xd, *INCX,
800                    ((char *) Y), Yi, Yj, Yd, *INCY );
801    }
802    else if( XisD && !( YisD ) )
803    {
804 /*
805 *  sub( X ) is distributed and sub( Y ) is not distributed.
806 */
807       PB_CpswapND( PB_Cztypeset(), *N, ((char *) Y), Yi, Yj, Yd, *INCY,
808                    ((char *) X), Xi, Xj, Xd, *INCX );
809    }
810    else
811    {
812 /*
813 *  Neither sub( X ) nor sub( Y ) are distributed.
814 */
815       PB_CpswapNN( PB_Cztypeset(), *N, ((char *) X), Xi, Xj, Xd, *INCX,
816                    ((char *) Y), Yi, Yj, Yd, *INCY );
817    }
818 /*
819 *  End of PZSWAP
820 */
821 }
822