1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 /****************************************************************************/
4 /*                                                                          */
5 /* File:      jcmds.c                                                       */
6 /*                                                                          */
7 /* Purpose:   DDD-commands for Join Environment                             */
8 /*                                                                          */
9 /* Author:    Klaus Birken                                                  */
10 /*            Institut fuer Computeranwendungen III                         */
11 /*            Universitaet Stuttgart                                        */
12 /*            Pfaffenwaldring 27                                            */
13 /*            70569 Stuttgart                                               */
14 /*            email: birken@ica3.uni-stuttgart.de                           */
15 /*            phone: 0049-(0)711-685-7007                                   */
16 /*            fax  : 0049-(0)711-685-7000                                   */
17 /*                                                                          */
18 /* History:   980126 kb  begin                                              */
19 /*                                                                          */
20 /* Remarks:                                                                 */
21 /*                                                                          */
22 /****************************************************************************/
23 
24 /****************************************************************************/
25 /*                                                                          */
26 /* include files                                                            */
27 /*            system include files                                          */
28 /*            application include files                                     */
29 /*                                                                          */
30 /****************************************************************************/
31 
32 /* standard C library */
33 #include <config.h>
34 #include <cstdlib>
35 #include <cstdio>
36 #include <cstring>
37 
38 #include <algorithm>
39 #include <iomanip>
40 
41 #include <dune/common/exceptions.hh>
42 #include <dune/common/stdstreams.hh>
43 
44 #include <dune/uggrid/parallel/ddd/dddcontext.hh>
45 
46 #include <dune/uggrid/parallel/ddd/dddi.h>
47 #include "join.h"
48 
49 
50 USING_UG_NAMESPACE
51 using namespace PPIF;
52 
53 START_UGDIM_NAMESPACE
54 
55 using namespace DDD::Join;
56 
57 /****************************************************************************/
58 /*                                                                          */
59 /* data structures                                                          */
60 /*                                                                          */
61 /****************************************************************************/
62 
63 
64 
65 
66 
67 /****************************************************************************/
68 /*                                                                          */
69 /* definition of exported global variables                                  */
70 /*                                                                          */
71 /****************************************************************************/
72 
73 
74 
75 /****************************************************************************/
76 /*                                                                          */
77 /* definition of variables global to this source file only (static!)        */
78 /*                                                                          */
79 /****************************************************************************/
80 
81 
82 
83 
84 /****************************************************************************/
85 /*                                                                          */
86 /* routines                                                                 */
87 /*                                                                          */
88 /****************************************************************************/
89 
90 
91 /*
92         prepare messages for phase 1.
93 
94  */
95 
PreparePhase1Msgs(DDD::DDDContext & context,std::vector<JIJoin * > & arrayJoin,JOINMSG1 ** theMsgs,size_t * memUsage)96 static int PreparePhase1Msgs (DDD::DDDContext& context, std::vector<JIJoin*>& arrayJoin,
97                               JOINMSG1 **theMsgs, size_t *memUsage)
98 {
99   auto& ctx = context.joinContext();
100 
101   int i, last_i, nMsgs;
102   JIJoin** itemsJ = arrayJoin.data();
103   const int nJ = arrayJoin.size();
104 
105 #       if DebugJoin<=3
106   printf("%4d: PreparePhase1Msgs, nJoins=%d\n",
107          me, nJ);
108   fflush(stdout);
109 #       endif
110 
111   /* init return parameters */
112   *theMsgs = NULL;
113   *memUsage = 0;
114 
115 
116   if (nJ==0)
117     /* no messages */
118     return(0);
119 
120 
121   /* check whether Join objects are really local (without copies) */
122   /* and set local GID to invalid (will be set to new value lateron) */
123   for(i=0; i<nJ; i++)
124   {
125     if (ObjHasCpl(context, itemsJ[i]->hdr))
126       DUNE_THROW(Dune::Exception,
127                  "cannot join " << OBJ_GID(itemsJ[i]->hdr)
128                  << ", object already distributed");
129 
130     OBJ_GID(itemsJ[i]->hdr) = GID_INVALID;
131   }
132 
133 
134   /* set local GID to new value */
135   for(i=0; i<nJ; i++)
136   {
137     DDD_GID local_gid = OBJ_GID(itemsJ[i]->hdr);
138 
139     /* check for double Joins with different new_gid */
140     if (local_gid!=GID_INVALID && local_gid!=itemsJ[i]->new_gid)
141       DUNE_THROW(Dune::Exception,
142                  "several (inconsistent) DDD_JoinObj-commands "
143                  "for local object " << local_gid);
144 
145     OBJ_GID(itemsJ[i]->hdr) = itemsJ[i]->new_gid;
146   }
147 
148 
149   nMsgs = 0;
150   last_i = i = 0;
151   do
152   {
153     size_t bufSize;
154 
155     /* skip until dest-processor is different */
156     while (i<nJ && (itemsJ[i]->dest == itemsJ[last_i]->dest))
157       i++;
158 
159     /* create new message */
160     JOINMSG1* jm = new JOINMSG1;
161     jm->nJoins = i-last_i;
162     jm->arrayJoin = &(itemsJ[last_i]);
163     jm->dest = itemsJ[last_i]->dest;
164     jm->next = *theMsgs;
165     *theMsgs = jm;
166     nMsgs++;
167 
168     /* create new send message */
169     jm->msg_h = LC_NewSendMsg(context, ctx.phase1msg_t, jm->dest);
170 
171     /* init table inside message */
172     LC_SetTableSize(jm->msg_h, ctx.jointab_id, jm->nJoins);
173 
174     /* prepare message for sending away */
175     bufSize = LC_MsgPrepareSend(context, jm->msg_h);
176     *memUsage += bufSize;
177 
178     if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
179     {
180       Dune::dwarn
181         << "DDD MESG [" << std::setw(3) << context.me() << "]: SHOW_MEM "
182         << "send msg phase1   dest=" << std::setw(4) << jm->dest
183         << " size=" << std::setw(10) << bufSize << "\n";
184     }
185 
186     last_i = i;
187 
188   } while (last_i < nJ);
189 
190   return(nMsgs);
191 }
192 
193 
194 
195 /****************************************************************************/
196 /*                                                                          */
197 /* Function:  PackPhase1Msgs                                                */
198 /*                                                                          */
199 /* Purpose:   allocate one message buffer for each outgoing message,        */
200 /*            fill buffer with message contents and initiate asynchronous   */
201 /*            send for each message.                                        */
202 /*                                                                          */
203 /* Input:     theMsgs: list of message-send-infos                           */
204 /*                                                                          */
205 /* Output:    -                                                             */
206 /*                                                                          */
207 /****************************************************************************/
208 
PackPhase1Msgs(DDD::DDDContext & context,JOINMSG1 * theMsgs)209 static void PackPhase1Msgs (DDD::DDDContext& context, JOINMSG1 *theMsgs)
210 {
211   auto& ctx = context.joinContext();
212   JOINMSG1 *jm;
213 
214   for(jm=theMsgs; jm!=NULL; jm=jm->next)
215   {
216     TEJoin *theJoinTab;
217     int i;
218 
219     /* copy data into message */
220     theJoinTab = (TEJoin *)LC_GetPtr(jm->msg_h, ctx.jointab_id);
221     for(i=0; i<jm->nJoins; i++)
222     {
223       theJoinTab[i].gid  = jm->arrayJoin[i]->new_gid;
224       theJoinTab[i].prio = OBJ_PRIO(jm->arrayJoin[i]->hdr);
225     }
226     LC_SetTableLen(jm->msg_h, ctx.jointab_id, jm->nJoins);
227 
228 
229     /* send away */
230     LC_MsgSend(context, jm->msg_h);
231   }
232 }
233 
234 
235 
236 /*
237         unpack phase1 messages.
238  */
239 
UnpackPhase1Msgs(DDD::DDDContext & context,LC_MSGHANDLE * theMsgs,int nRecvMsgs,DDD_HDR * localCplObjs,int nLCO,JIPartner ** p_joinObjs,int * p_nJoinObjs)240 static void UnpackPhase1Msgs (DDD::DDDContext& context,
241                               LC_MSGHANDLE *theMsgs, int nRecvMsgs,
242                               DDD_HDR *localCplObjs, int nLCO,
243                               JIPartner **p_joinObjs, int *p_nJoinObjs)
244 {
245   auto& ctx = context.joinContext();
246   const auto& me = context.me();
247   JIPartner *joinObjs;
248   int nJoinObjs = 0;
249   int m, jo;
250 
251   /* init return values */
252   *p_joinObjs  = NULL;
253   *p_nJoinObjs = 0;
254 
255 
256   for(m=0; m<nRecvMsgs; m++)
257   {
258     LC_MSGHANDLE jm = theMsgs[m];
259     TEJoin *theJoin = (TEJoin *) LC_GetPtr(jm, ctx.jointab_id);
260     int nJ       = (int) LC_GetTableLen(jm, ctx.jointab_id);
261     int i, j;
262 
263     nJoinObjs += nJ;
264 
265     for(i=0, j=0; i<nJ; i++)
266     {
267       while ((j<nLCO) && (OBJ_GID(localCplObjs[j]) < theJoin[i].gid))
268         j++;
269 
270       if ((j<nLCO) && (OBJ_GID(localCplObjs[j])==theJoin[i].gid))
271       {
272         COUPLING *cpl;
273 
274         /* found local object which is join target */
275         /* store shortcut to local object */
276         theJoin[i].hdr = localCplObjs[j];
277 
278         /* generate phase2-JIAddCpl for this object */
279         for(cpl=ObjCplList(context, localCplObjs[j]); cpl!=NULL; cpl=CPL_NEXT(cpl))
280         {
281           JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2));
282           ji->dest    = CPL_PROC(cpl);
283           ji->te.gid  = theJoin[i].gid;
284           ji->te.proc = LC_MsgGetProc(jm);
285           ji->te.prio = theJoin[i].prio;
286 
287           if (! JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2)))
288             continue;
289 
290 #                                       if DebugJoin<=1
291           printf("%4d: Phase1 Join for " DDD_GID_FMT " from %d, "
292                  "send AddCpl to %d.\n",
293                  me, theJoin[i].gid, ji->te.proc, ji->dest);
294 #                                       endif
295 
296         }
297 
298         /* send phase3-JIAddCpl back to Join-proc */
299         for(cpl=ObjCplList(context, localCplObjs[j]); cpl!=NULL; cpl=CPL_NEXT(cpl))
300         {
301           JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
302           ji->dest    = LC_MsgGetProc(jm);
303           ji->te.gid  = OBJ_GID(localCplObjs[j]);
304           ji->te.proc = CPL_PROC(cpl);
305           ji->te.prio = cpl->prio;
306 
307           if (! JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3)))
308             continue;
309         }
310       }
311       else
312       {
313         DUNE_THROW(Dune::Exception,
314                    "no object " << theJoin[i].gid
315                    << " for join from " << LC_MsgGetProc(jm));
316       }
317     }
318   }
319 
320 
321   /* return immediately if no join-objects have been found */
322   if (nJoinObjs==0)
323     return;
324 
325 
326   /* allocate array of objects, which has been contacted by a join */
327   joinObjs = new JIPartner[nJoinObjs];
328 
329   /* set return values */
330   *p_joinObjs  = joinObjs;
331   *p_nJoinObjs = nJoinObjs;
332 
333 
334   /* add one local coupling for each Join */
335   for(m=0, jo=0; m<nRecvMsgs; m++)
336   {
337     LC_MSGHANDLE jm = theMsgs[m];
338     TEJoin *theJoin = (TEJoin *) LC_GetPtr(jm, ctx.jointab_id);
339     int nJ       = (int) LC_GetTableLen(jm, ctx.jointab_id);
340     int i;
341 
342     for(i=0; i<nJ; i++)
343     {
344       AddCoupling(context, theJoin[i].hdr, LC_MsgGetProc(jm), theJoin[i].prio);
345 
346       /* send one phase3-JIAddCpl for symmetric connection */
347       {
348         JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
349         ji->dest    = LC_MsgGetProc(jm);
350         ji->te.gid  = OBJ_GID(theJoin[i].hdr);
351         ji->te.proc = me;
352         ji->te.prio = OBJ_PRIO(theJoin[i].hdr);
353 
354         JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
355       }
356 
357       joinObjs[jo].hdr  = theJoin[i].hdr;
358       joinObjs[jo].proc = LC_MsgGetProc(jm);
359       jo++;
360     }
361   }
362 
363 
364   /* sort joinObjs-array according to gid */
365   if (nJoinObjs>1) {
366     std::sort(
367       joinObjs, joinObjs + nJoinObjs,
368       [](const JIPartner& a, const JIPartner& b) {
369         return OBJ_GID(a.hdr) < OBJ_GID(b.hdr);
370       });
371   }
372 }
373 
374 
375 
376 /****************************************************************************/
377 
378 /*
379         prepare messages for phase 2.
380 
381  */
382 
PreparePhase2Msgs(DDD::DDDContext & context,std::vector<JIAddCpl * > & arrayAddCpl,JOINMSG2 ** theMsgs,size_t * memUsage)383 static int PreparePhase2Msgs (DDD::DDDContext& context, std::vector<JIAddCpl*>& arrayAddCpl,
384                               JOINMSG2 **theMsgs, size_t *memUsage)
385 {
386   auto& ctx = context.joinContext();
387   const auto& me = context.me();
388 
389   int i, last_i, nMsgs;
390   JIAddCpl** itemsAC = arrayAddCpl.data();
391   const int nAC = arrayAddCpl.size();
392 
393 #       if DebugJoin<=3
394   printf("%4d: PreparePhase2Msgs, nAddCpls=%d\n",
395          me, nAC);
396   fflush(stdout);
397 #       endif
398 
399   /* init return parameters */
400   *theMsgs = NULL;
401   *memUsage = 0;
402 
403 
404   if (nAC==0)
405     /* no messages */
406     return(0);
407 
408 
409   nMsgs = 0;
410   last_i = i = 0;
411   do
412   {
413     size_t bufSize;
414 
415     /* skip until dest-processor is different */
416     while (i<nAC && (itemsAC[i]->dest == itemsAC[last_i]->dest))
417       i++;
418 
419     /* create new message */
420     JOINMSG2* jm = new JOINMSG2;
421     jm->nAddCpls = i-last_i;
422     jm->arrayAddCpl = &(itemsAC[last_i]);
423     jm->dest = itemsAC[last_i]->dest;
424     jm->next = *theMsgs;
425     *theMsgs = jm;
426     nMsgs++;
427 
428     /* create new send message */
429     jm->msg_h = LC_NewSendMsg(context, ctx.phase2msg_t, jm->dest);
430 
431     /* init table inside message */
432     LC_SetTableSize(jm->msg_h, ctx.addtab_id, jm->nAddCpls);
433 
434     /* prepare message for sending away */
435     bufSize = LC_MsgPrepareSend(context, jm->msg_h);
436     *memUsage += bufSize;
437 
438     if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
439     {
440       Dune::dwarn
441         << "DDD MESG [" << std::setw(3) << me << "]: SHOW_MEM "
442         << "send msg phase2   dest=" << std::setw(4) << jm->dest
443         << " size=" << std::setw(10) << bufSize << "\n";
444     }
445 
446     last_i = i;
447 
448   } while (last_i < nAC);
449 
450   return(nMsgs);
451 }
452 
453 
454 
455 /****************************************************************************/
456 /*                                                                          */
457 /* Function:  PackPhase2Msgs                                                */
458 /*                                                                          */
459 /* Purpose:   allocate one message buffer for each outgoing message,        */
460 /*            fill buffer with message contents and initiate asynchronous   */
461 /*            send for each message.                                        */
462 /*                                                                          */
463 /* Input:     theMsgs: list of message-send-infos                           */
464 /*                                                                          */
465 /* Output:    -                                                             */
466 /*                                                                          */
467 /****************************************************************************/
468 
PackPhase2Msgs(DDD::DDDContext & context,JOINMSG2 * theMsgs)469 static void PackPhase2Msgs(DDD::DDDContext& context, JOINMSG2 *theMsgs)
470 {
471   auto& ctx = context.joinContext();
472 
473   JOINMSG2 *jm;
474 
475   for(jm=theMsgs; jm!=NULL; jm=jm->next)
476   {
477     TEAddCpl *theAddTab;
478     int i;
479 
480     /* copy data into message */
481     theAddTab = (TEAddCpl *)LC_GetPtr(jm->msg_h, ctx.addtab_id);
482     for(i=0; i<jm->nAddCpls; i++)
483     {
484       /* copy complete TEAddCpl item */
485       theAddTab[i] = jm->arrayAddCpl[i]->te;
486     }
487     LC_SetTableLen(jm->msg_h, ctx.addtab_id, jm->nAddCpls);
488 
489 
490     /* send away */
491     LC_MsgSend(context, jm->msg_h);
492   }
493 }
494 
495 
496 
497 /*
498         unpack phase2 messages.
499  */
500 
UnpackPhase2Msgs(DDD::DDDContext & context,LC_MSGHANDLE * theMsgs2,int nRecvMsgs2,JIPartner * joinObjs,int nJoinObjs,DDD_HDR * localCplObjs,int nLCO)501 static void UnpackPhase2Msgs (DDD::DDDContext& context,
502                               LC_MSGHANDLE *theMsgs2, int nRecvMsgs2,
503                               JIPartner *joinObjs, int nJoinObjs,
504                               DDD_HDR *localCplObjs, int nLCO)
505 {
506   auto& ctx = context.joinContext();
507 
508   int m;
509 
510   for(m=0; m<nRecvMsgs2; m++)
511   {
512     LC_MSGHANDLE jm = theMsgs2[m];
513     TEAddCpl *theAC = (TEAddCpl *) LC_GetPtr(jm, ctx.addtab_id);
514     int nAC      = (int) LC_GetTableLen(jm, ctx.addtab_id);
515     int i, j, jo;
516 
517     for(i=0, j=0, jo=0; i<nAC; i++)
518     {
519       while ((j<nLCO) && (OBJ_GID(localCplObjs[j]) < theAC[i].gid))
520         j++;
521 
522       while ((jo<nJoinObjs) && (OBJ_GID(joinObjs[jo].hdr) < theAC[i].gid))
523         jo++;
524 
525       if ((j<nLCO) && (OBJ_GID(localCplObjs[j])==theAC[i].gid))
526       {
527         /* found local object which is AddCpl target */
528         AddCoupling(context, localCplObjs[j], theAC[i].proc, theAC[i].prio);
529 
530 #                               if DebugJoin<=1
531         printf("%4d: Phase2 execute AddCpl(%08x,%d,%d) (from %d).\n",
532                context.me(),
533                theAC[i].gid, theAC[i].proc, theAC[i].prio,
534                LC_MsgGetProc(jm));
535 #                               endif
536 
537         while ((jo<nJoinObjs) && (OBJ_GID(joinObjs[jo].hdr) == theAC[i].gid))
538         {
539           JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
540           ji->dest    = joinObjs[jo].proc;
541           ji->te.gid  = theAC[i].gid;
542           ji->te.proc = theAC[i].proc;
543           ji->te.prio = theAC[i].prio;
544           JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
545 
546 
547 #                                       if DebugJoin<=1
548           printf("%4d: Phase2 forward AddCpl(%08x,%d,%d) to %d.\n",
549                  context.me(),
550                  theAC[i].gid, theAC[i].proc, theAC[i].prio,
551                  ji->dest);
552 #                                       endif
553 
554           jo++;
555         }
556       }
557       else
558       {
559         /* this should never happen. AddCpl send from invalid proc. */
560         assert(0);
561       }
562     }
563   }
564 }
565 
566 
567 
568 /****************************************************************************/
569 
570 
571 
572 /*
573         prepare messages for phase 3.
574 
575  */
576 
PreparePhase3Msgs(DDD::DDDContext & context,std::vector<JIAddCpl * > & arrayAddCpl,JOINMSG3 ** theMsgs,size_t * memUsage)577 static int PreparePhase3Msgs (DDD::DDDContext& context, std::vector<JIAddCpl*>& arrayAddCpl,
578                               JOINMSG3 **theMsgs, size_t *memUsage)
579 {
580   auto& ctx = context.joinContext();
581   const auto& me = context.me();
582 
583   int i, last_i, nMsgs;
584   JIAddCpl** itemsAC = arrayAddCpl.data();
585   const int nAC = arrayAddCpl.size();
586 
587 #       if DebugJoin<=3
588   printf("%4d: PreparePhase3Msgs, nAddCpls=%d\n",
589          me, nAC);
590   fflush(stdout);
591 #       endif
592 
593   /* init return parameters */
594   *theMsgs = NULL;
595   *memUsage = 0;
596 
597 
598   if (nAC==0)
599     /* no messages */
600     return(0);
601 
602 
603   nMsgs = 0;
604   last_i = i = 0;
605   do
606   {
607     size_t bufSize;
608 
609     /* skip until dest-processor is different */
610     while (i<nAC && (itemsAC[i]->dest == itemsAC[last_i]->dest))
611       i++;
612 
613     /* create new message */
614     JOINMSG3* jm = new JOINMSG3;
615     jm->nAddCpls = i-last_i;
616     jm->arrayAddCpl = &(itemsAC[last_i]);
617     jm->dest = itemsAC[last_i]->dest;
618     jm->next = *theMsgs;
619     *theMsgs = jm;
620     nMsgs++;
621 
622     /* create new send message */
623     jm->msg_h = LC_NewSendMsg(context, ctx.phase3msg_t, jm->dest);
624 
625     /* init table inside message */
626     LC_SetTableSize(jm->msg_h, ctx.cpltab_id, jm->nAddCpls);
627 
628     /* prepare message for sending away */
629     bufSize = LC_MsgPrepareSend(context, jm->msg_h);
630     *memUsage += bufSize;
631 
632     if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
633     {
634       Dune::dwarn
635         << "DDD MESG [" << std::setw(3) << me << "]: SHOW_MEM "
636         << "send msg phase3   dest=" << std::setw(4) << jm->dest
637         << " size=" << std::setw(10) << bufSize << "\n";
638     }
639 
640     last_i = i;
641 
642   } while (last_i < nAC);
643 
644   return(nMsgs);
645 }
646 
647 
648 
649 /****************************************************************************/
650 /*                                                                          */
651 /* Function:  PackPhase3Msgs                                                */
652 /*                                                                          */
653 /* Purpose:   allocate one message buffer for each outgoing message,        */
654 /*            fill buffer with message contents and initiate asynchronous   */
655 /*            send for each message.                                        */
656 /*                                                                          */
657 /* Input:     theMsgs: list of message-send-infos                           */
658 /*                                                                          */
659 /* Output:    -                                                             */
660 /*                                                                          */
661 /****************************************************************************/
662 
PackPhase3Msgs(DDD::DDDContext & context,JOINMSG3 * theMsgs)663 static void PackPhase3Msgs(DDD::DDDContext& context, JOINMSG3 *theMsgs)
664 {
665   auto& ctx = context.joinContext();
666 
667   JOINMSG3 *jm;
668 
669   for(jm=theMsgs; jm!=NULL; jm=jm->next)
670   {
671     TEAddCpl *theAddTab;
672     int i;
673 
674     /* copy data into message */
675     theAddTab = (TEAddCpl *)LC_GetPtr(jm->msg_h, ctx.cpltab_id);
676     for(i=0; i<jm->nAddCpls; i++)
677     {
678       /* copy complete TEAddCpl item */
679       theAddTab[i] = jm->arrayAddCpl[i]->te;
680     }
681     LC_SetTableLen(jm->msg_h, ctx.cpltab_id, jm->nAddCpls);
682 
683 
684     /* send away */
685     LC_MsgSend(context, jm->msg_h);
686   }
687 }
688 
689 
690 
691 /****************************************************************************/
692 
693 
694 /*
695         unpack phase3 messages.
696  */
697 
UnpackPhase3Msgs(DDD::DDDContext & context,LC_MSGHANDLE * theMsgs,int nRecvMsgs,std::vector<JIJoin * > & arrayJoin)698 static void UnpackPhase3Msgs (DDD::DDDContext& context,
699                               LC_MSGHANDLE *theMsgs, int nRecvMsgs,
700                               std::vector<JIJoin*>& arrayJoin)
701 {
702   auto& ctx = context.joinContext();
703 
704   JIJoin** itemsJ = arrayJoin.data();
705   const int nJ = arrayJoin.size();
706   int m;
707 
708 
709   for(m=0; m<nRecvMsgs; m++)
710   {
711     LC_MSGHANDLE jm = theMsgs[m];
712     TEAddCpl *theAC = (TEAddCpl *) LC_GetPtr(jm, ctx.cpltab_id);
713     int nAC      = (int) LC_GetTableLen(jm, ctx.cpltab_id);
714     int i, j;
715 
716     for(i=0, j=0; i<nAC; i++)
717     {
718       while ((j<nJ) && (OBJ_GID(itemsJ[j]->hdr) < theAC[i].gid))
719         j++;
720 
721       if ((j<nJ) && (OBJ_GID(itemsJ[j]->hdr) == theAC[i].gid))
722       {
723         /* found local object which is AddCpl target */
724         AddCoupling(context, itemsJ[j]->hdr, theAC[i].proc, theAC[i].prio);
725 
726 #                               if DebugJoin<=1
727         printf("%4d: Phase3 execute AddCpl(%08x,%d,%d) (from %d).\n",
728                context.me(),
729                OBJ_GID(itemsJ[j]->hdr), theAC[i].proc, theAC[i].prio,
730                LC_MsgGetProc(jm));
731 #                               endif
732       }
733       else
734       {
735         /* this should never happen. AddCpl send for unknown obj. */
736         assert(0);
737       }
738     }
739   }
740 }
741 
742 
743 
744 
745 /****************************************************************************/
746 /*                                                                          */
747 /* Function:  DDD_JoinEnd                                                   */
748 /*                                                                          */
749 /****************************************************************************/
750 
751 /**
752         End of join phase.
753         This function starts the actual join process. After a call to
754         this function (on all processors) all {\bf Join}-commands since
755         the last call to \funk{JoinBegin} are executed. This involves
756         a set of local communications between the processors.
757  */
758 
DDD_JoinEnd(DDD::DDDContext & context)759 DDD_RET DDD_JoinEnd(DDD::DDDContext& context)
760 {
761   auto& ctx = context.joinContext();
762 
763   int obsolete, nRecvMsgs1, nRecvMsgs2, nRecvMsgs3;
764   JOINMSG1    *sendMsgs1=NULL, *sm1=NULL;
765   JOINMSG2    *sendMsgs2=NULL, *sm2=NULL;
766   JOINMSG3    *sendMsgs3=NULL, *sm3=NULL;
767   LC_MSGHANDLE *recvMsgs1=NULL, *recvMsgs2=NULL, *recvMsgs3=NULL;
768   size_t sendMem=0, recvMem=0;
769   JIPartner   *joinObjs = NULL;
770   int nJoinObjs;
771   const auto& nCpls = context.couplingContext().nCpls;
772 
773 
774 
775   STAT_SET_MODULE(DDD_MODULE_JOIN);
776   STAT_ZEROALL;
777 
778   /* step mode and check whether call to JoinEnd is valid */
779   if (!JoinStepMode(context, JoinMode::JMODE_CMDS))
780     DUNE_THROW(Dune::Exception, "DDD_JoinEnd() aborted");
781 
782 
783   /*
784           PREPARATION PHASE
785    */
786   /* get sorted array of JIJoin-items */
787   std::vector<JIJoin*> arrayJIJoin = JIJoinSet_GetArray(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
788   obsolete = JIJoinSet_GetNDiscarded(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
789 
790 
791   /*
792           COMMUNICATION PHASE 1
793           all processors, where JoinObj-commands have been issued,
794           send information about these commands to the target
795           processors together with the GID of the objects on the
796           target procs and the local priority.
797    */
798   STAT_RESET;
799   /* prepare msgs for JIJoin-items */
800   PreparePhase1Msgs(context, arrayJIJoin, &sendMsgs1, &sendMem);
801   /* DisplayMemResources(); */
802 
803   /* init communication topology */
804   nRecvMsgs1 = LC_Connect(context, ctx.phase1msg_t);
805   STAT_TIMER(T_JOIN_PREP_MSGS);
806 
807   STAT_RESET;
808   /* build phase1 msgs on sender side and start send */
809   PackPhase1Msgs(context, sendMsgs1);
810   STAT_TIMER(T_JOIN_PACK_SEND);
811 
812 
813   /*
814           now messages are in the net, use spare time
815    */
816   STAT_RESET;
817   /* get sorted list of local objects with couplings */
818   std::vector<DDD_HDR> localCplObjs = LocalCoupledObjectsList(context);
819 
820   if (obsolete>0)
821   {
822     if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_OBSOLETE)
823     {
824       int all = JIJoinSet_GetNItems(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
825 
826       using std::setw;
827       Dune::dwarn
828         << "DDD MESG [" << setw(3) << context.me() << "]: " << setw(4) << obsolete
829         << " from " << setw(4) << all << " join-cmds obsolete.\n";
830     }
831   }
832   STAT_TIMER(T_JOIN);
833 
834 
835   /*
836           nothing more to do until incoming messages arrive
837    */
838 
839   /* display information about send-messages on lowcomm-level */
840   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
841   {
842     DDD_SyncAll(context);
843     if (context.isMaster())
844       Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase1Msg.Send\n";
845     LC_PrintSendMsgs(context);
846   }
847 
848 
849   /* wait for communication-completion (send AND receive) */
850   STAT_RESET;
851   recvMsgs1 = LC_Communicate(context);
852   STAT_TIMER(T_JOIN_WAIT_RECV);
853 
854 
855   /* display information about message buffer sizes */
856   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
857   {
858     int k;
859 
860     /* sum up sizes of receive mesg buffers */
861     for(k=0; k<nRecvMsgs1; k++)
862     {
863       recvMem += LC_GetBufferSize(recvMsgs1[k]);
864     }
865 
866     using std::setw;
867     Dune::dwarn
868       << "DDD MESG [" << setw(3) << context.me() << "]: SHOW_MEM msgs "
869       << " send=" << setw(10) << sendMem
870       << " recv=" << setw(10) << recvMem
871       << " all=" << setw(10) << (sendMem+recvMem) << "\n";
872   }
873 
874   /* display information about recv-messages on lowcomm-level */
875   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
876   {
877     DDD_SyncAll(context);
878     if (context.isMaster())
879       Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase1Msg.Recv\n";
880     LC_PrintRecvMsgs(context);
881   }
882 
883   /* unpack messages */
884   STAT_RESET;
885   UnpackPhase1Msgs(context, recvMsgs1, nRecvMsgs1, localCplObjs.data(), nCpls,
886                    &joinObjs, &nJoinObjs);
887   LC_Cleanup(context);
888   STAT_TIMER(T_JOIN_UNPACK);
889 
890 
891 
892 
893 
894   /*
895           COMMUNICATION PHASE 2
896           all processors which received notification of JoinObj-commands
897           during phase 1 send AddCpl-requests to all copies of DDD objects,
898           for which Joins had been issued remotely.
899    */
900   /* get sorted array of JIAddCpl-items */
901   std::vector<JIAddCpl*> arrayJIAddCpl2 = JIAddCplSet_GetArray(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2));
902 
903   STAT_RESET;
904   /* prepare msgs for JIAddCpl-items */
905   PreparePhase2Msgs(context, arrayJIAddCpl2, &sendMsgs2, &sendMem);
906   /* DisplayMemResources(); */
907 
908   /* init communication topology */
909   nRecvMsgs2 = LC_Connect(context, ctx.phase2msg_t);
910   STAT_TIMER(T_JOIN_PREP_MSGS);
911 
912   STAT_RESET;
913   /* build phase2 msgs on sender side and start send */
914   PackPhase2Msgs(context, sendMsgs2);
915   STAT_TIMER(T_JOIN_PACK_SEND);
916 
917   /*
918           now messages are in the net, use spare time
919    */
920 
921   /* reorder Join-commands according to new_gid */
922   /* this ordering is needed in UnpackPhase3 */
923   if (arrayJIJoin.size() > 1)
924   {
925     std::sort(
926       arrayJIJoin.begin(), arrayJIJoin.end(),
927       [](const JIJoin* a, const JIJoin* b) {
928         return a->new_gid < b->new_gid;
929       });
930   }
931 
932 
933   /*
934           nothing more to do until incoming messages arrive
935    */
936 
937   /* display information about send-messages on lowcomm-level */
938   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
939   {
940     DDD_SyncAll(context);
941     if (context.isMaster())
942       Dune::dwarn <<"DDD JOIN_SHOW_MSGSALL: Phase2Msg.Send\n";
943     LC_PrintSendMsgs(context);
944   }
945 
946 
947   /* wait for communication-completion (send AND receive) */
948   STAT_RESET;
949   recvMsgs2 = LC_Communicate(context);
950   STAT_TIMER(T_JOIN_WAIT_RECV);
951 
952 
953   /* display information about message buffer sizes */
954   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
955   {
956     int k;
957 
958     /* sum up sizes of receive mesg buffers */
959     for(k=0; k<nRecvMsgs2; k++)
960     {
961       recvMem += LC_GetBufferSize(recvMsgs2[k]);
962     }
963 
964     using std::setw;
965     Dune::dwarn
966       << "DDD MESG [" << setw(3) << context.me() << "]: SHOW_MEM msgs "
967       << " send=" << setw(10) << sendMem
968       << " recv=" << setw(10) << recvMem
969       << " all=" << setw(10) << (sendMem+recvMem) << "\n";
970   }
971 
972   /* display information about recv-messages on lowcomm-level */
973   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
974   {
975     DDD_SyncAll(context);
976     if (context.isMaster())
977       Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase2Msg.Recv\n";
978     LC_PrintRecvMsgs(context);
979   }
980 
981   /* unpack messages */
982   STAT_RESET;
983   UnpackPhase2Msgs(context, recvMsgs2, nRecvMsgs2, joinObjs, nJoinObjs,
984                    localCplObjs.data(), nCpls);
985 
986   LC_Cleanup(context);
987   STAT_TIMER(T_JOIN_UNPACK);
988 
989   for(; sendMsgs2!=NULL; sendMsgs2=sm2)
990   {
991     sm2 = sendMsgs2->next;
992     delete sendMsgs2;
993   }
994 
995 
996 
997 
998 
999 
1000   /*
1001           COMMUNICATION PHASE 3
1002           all processors which received notification of JoinObj-commands
1003           during phase 1 send AddCpl-requests to the procs where the
1004           JoinObj-commands have been issued. One AddCpl-request is sent
1005           for each cpl in the local objects coupling list. One AddCpl-request
1006           is sent for each AddCpl-request received during phase 2.
1007           (i.e., two kinds of AddCpl-requests are send to the processors
1008           on which the JoinObj-commands have been issued.
1009    */
1010   /* get sorted array of JIAddCpl-items */
1011   std::vector<JIAddCpl*> arrayJIAddCpl3 = JIAddCplSet_GetArray(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
1012 
1013   STAT_RESET;
1014   /* prepare msgs for JIAddCpl-items */
1015   PreparePhase3Msgs(context, arrayJIAddCpl3, &sendMsgs3, &sendMem);
1016   /* DisplayMemResources(); */
1017 
1018   /* init communication topology */
1019   nRecvMsgs3 = LC_Connect(context, ctx.phase3msg_t);
1020   STAT_TIMER(T_JOIN_PREP_MSGS);
1021 
1022   STAT_RESET;
1023   /* build phase3 msgs on sender side and start send */
1024   PackPhase3Msgs(context, sendMsgs3);
1025   STAT_TIMER(T_JOIN_PACK_SEND);
1026 
1027   /*
1028           now messages are in the net, use spare time
1029    */
1030   /* ... */
1031 
1032   /*
1033           nothing more to do until incoming messages arrive
1034    */
1035 
1036   /* display information about send-messages on lowcomm-level */
1037   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
1038   {
1039     DDD_SyncAll(context);
1040     if (context.isMaster())
1041       Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase3Msg.Send\n";
1042     LC_PrintSendMsgs(context);
1043   }
1044 
1045 
1046   /* wait for communication-completion (send AND receive) */
1047   STAT_RESET;
1048   recvMsgs3 = LC_Communicate(context);
1049   STAT_TIMER(T_JOIN_WAIT_RECV);
1050 
1051 
1052   /* display information about message buffer sizes */
1053   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
1054   {
1055     int k;
1056 
1057     /* sum up sizes of receive mesg buffers */
1058     for(k=0; k<nRecvMsgs3; k++)
1059     {
1060       recvMem += LC_GetBufferSize(recvMsgs3[k]);
1061     }
1062 
1063     using std::setw;
1064     Dune::dwarn
1065       << "DDD MESG [" << setw(3) << context.me() << "]: SHOW_MEM msgs "
1066       << " send=" << setw(10) << sendMem
1067       << " recv=" << setw(10) << recvMem
1068       << " all=" << setw(10) << (sendMem+recvMem) << "\n";
1069   }
1070 
1071   /* display information about recv-messages on lowcomm-level */
1072   if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
1073   {
1074     DDD_SyncAll(context);
1075     if (context.isMaster())
1076       Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase3Msg.Recv\n";
1077     LC_PrintRecvMsgs(context);
1078   }
1079 
1080   /* unpack messages */
1081   STAT_RESET;
1082   UnpackPhase3Msgs(context, recvMsgs3, nRecvMsgs3, arrayJIJoin);
1083   LC_Cleanup(context);
1084   STAT_TIMER(T_JOIN_UNPACK);
1085 
1086   for(; sendMsgs3!=NULL; sendMsgs3=sm3)
1087   {
1088     sm3 = sendMsgs3->next;
1089     delete sendMsgs3;
1090   }
1091 
1092 
1093 
1094 
1095 
1096 
1097   /*
1098           free temporary storage
1099    */
1100   JIJoinSet_Reset(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
1101 
1102   JIAddCplSet_Reset(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2));
1103 
1104   JIAddCplSet_Reset(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
1105 
1106   if (joinObjs!=NULL)
1107     delete[] joinObjs;
1108 
1109   for(; sendMsgs1!=NULL; sendMsgs1=sm1)
1110   {
1111     sm1 = sendMsgs1->next;
1112     delete sendMsgs1;
1113   }
1114 
1115 
1116 
1117 #       if DebugJoin<=4
1118   Dune::dverb << "JoinEnd, before IFAllFromScratch().\n";
1119 #       endif
1120 
1121   /* re-create all interfaces and step JMODE */
1122   STAT_RESET;
1123   IFAllFromScratch(context);
1124   STAT_TIMER(T_JOIN_BUILD_IF);
1125 
1126 
1127   JoinStepMode(context, JoinMode::JMODE_BUSY);
1128 
1129   return(DDD_RET_OK);
1130 }
1131 
1132 
1133 
1134 
1135 
1136 /****************************************************************************/
1137 /*                                                                          */
1138 /* Function:  DDD_JoinObj                                                   */
1139 /*                                                                          */
1140 /****************************************************************************/
1141 
1142 /**
1143         Join local object with a distributed object.
1144 
1145         \todoTBC
1146 
1147    @param hdr  DDD local object which should be joined.
1148  */
1149 
1150 
1151 
DDD_JoinObj(DDD::DDDContext & context,DDD_HDR hdr,DDD_PROC dest,DDD_GID new_gid)1152 void DDD_JoinObj(DDD::DDDContext& context, DDD_HDR hdr, DDD_PROC dest, DDD_GID new_gid)
1153 {
1154   auto& ctx = context.joinContext();
1155   const auto procs = context.procs();
1156 
1157   if (!ddd_JoinActive(context))
1158     DUNE_THROW(Dune::Exception, "Missing DDD_JoinBegin()");
1159 
1160   if (dest>=procs)
1161     DUNE_THROW(Dune::Exception,
1162                "cannot join " << OBJ_GID(hdr) << " with " << new_gid
1163                << " on processor " << dest << " (procs=" << procs << ")");
1164 
1165   if (dest==context.me())
1166     DUNE_THROW(Dune::Exception,
1167                "cannot join " << OBJ_GID(hdr) << " with myself");
1168 
1169   if (ObjHasCpl(context, hdr))
1170     DUNE_THROW(Dune::Exception,
1171                "cannot join " << OBJ_GID(hdr)
1172                << ", object already distributed");
1173 
1174 
1175 
1176   JIJoin* ji = JIJoinSet_NewItem(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
1177   ji->hdr     = hdr;
1178   ji->dest    = dest;
1179   ji->new_gid = new_gid;
1180 
1181   if (! JIJoinSet_ItemOK(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin)))
1182     return;
1183 
1184 #       if DebugJoin<=2
1185   Dune:dvverb << "DDD_JoinObj " << OBJ_GID(hdr)
1186               << ", dest=" << dest << ", new_gid=" << new_gid << "\n";
1187 #       endif
1188 }
1189 
1190 
1191 
1192 
1193 /****************************************************************************/
1194 /*                                                                          */
1195 /* Function:  DDD_JoinBegin                                                 */
1196 /*                                                                          */
1197 /****************************************************************************/
1198 
1199 /**
1200         Starts join phase.
1201         A call to this function establishes a global join operation.
1202         It must be issued on all processors. After this call an arbitrary
1203         series of {\bf Join}-commands may be issued. The global transfer operation
1204         is carried out via a \funk{JoinEnd} call on each processor.
1205  */
1206 
DDD_JoinBegin(DDD::DDDContext & context)1207 void DDD_JoinBegin(DDD::DDDContext& context)
1208 {
1209   /* step mode and check whether call to JoinBegin is valid */
1210   if (!JoinStepMode(context, JoinMode::JMODE_IDLE))
1211     DUNE_THROW(Dune::Exception, "DDD_JoinBegin() aborted");
1212 }
1213 
1214 
1215 
1216 END_UGDIM_NAMESPACE
1217