1 #include "Bdef.h"
BI_MringComb(BLACSCONTEXT * ctxt,BLACBUFF * bp,BLACBUFF * bp2,int N,VVFUNPTR Xvvop,int dest,int nrings)2 void BI_MringComb(BLACSCONTEXT *ctxt, BLACBUFF *bp, BLACBUFF *bp2,
3                   int N, VVFUNPTR Xvvop, int dest, int nrings)
4 {
5    void BI_Ssend(BLACSCONTEXT *, int, int, BLACBUFF *);
6    void BI_Srecv(BLACSCONTEXT *, int, int, BLACBUFF *);
7    void BI_MpathBS(BLACSCONTEXT *, BLACBUFF *, SDRVPTR, int);
8    void BI_MpathBR(BLACSCONTEXT *, BLACBUFF *, SDRVPTR, int, int);
9 
10    int Np, Iam, msgid, i, inc, mysrc, mydest, Np_1;
11    int mydist, ringlen, myring;
12    int nearedge, faredge;  /* edge closest and farthest from dest */
13    int REBS;               /* Is result leave-on-all? */
14 
15    Np = ctxt->scp->Np;
16    if (Np < 2) return;
17    Iam = ctxt->scp->Iam;
18    msgid = Mscopeid(ctxt);
19    if (REBS = (dest == -1)) dest = 0;
20 
21    if (nrings > 0)
22    {
23       mydist = (Np + dest - Iam) % Np;
24       inc = 1;
25    }
26    else
27    {
28       mydist = (Np + Iam - dest) % Np;
29       inc = -1;
30       nrings = -nrings;
31    }
32    Np_1 = Np - 1;
33    if (nrings > Np_1) nrings = Np_1;
34 
35 /*
36  * If I'm not the destination
37  */
38    if (Iam != dest)
39    {
40       ringlen = Np_1 / nrings;
41       myring = (mydist-1) / ringlen;
42       if (myring >= nrings) myring = nrings - 1;
43       nearedge = (myring*ringlen) + 1;
44       faredge = nearedge + ringlen - 1;
45       if (myring == nrings-1) faredge += Np_1 % nrings;
46       if (mydist == nearedge) mydest = dest;
47       else mydest = (Np + Iam + inc) % Np;
48       if (mydist != faredge)
49       {
50          BI_Srecv(ctxt, (Np + Iam - inc) % Np, msgid, bp2);
51 	 Xvvop(N, bp->Buff, bp2->Buff);
52       }
53       BI_Ssend(ctxt, mydest, msgid, bp);
54       if (REBS) BI_MpathBR(ctxt, bp, BI_Ssend, dest, nrings);
55    }
56 /*
57  * If I'm the destination process
58  */
59    else
60    {
61       if (!ctxt->TopsRepeat)
62       {
63          for(i=nrings; i; i--)
64          {
65             BI_Srecv(ctxt, BANYNODE, msgid, bp2);
66 	    Xvvop(N, bp->Buff, bp2->Buff);
67          }
68       }
69       else
70       {
71          ringlen = Np_1 / nrings;
72          if (inc == 1) mysrc = (Np + Iam - 1) % Np;
73          else mysrc = (Iam + 1) % Np;
74          for(i=nrings; i; i--)
75          {
76             BI_Srecv(ctxt, mysrc, msgid, bp2);
77 	    Xvvop(N, bp->Buff, bp2->Buff);
78             if (inc == 1) mysrc = (Np + mysrc - ringlen) % Np;
79             else mysrc = (mysrc + ringlen) % Np;
80          }
81       }
82       if (REBS) BI_MpathBS(ctxt, bp, BI_Ssend, nrings);
83    }
84 }  /* end BI_MringComb */
85