1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 /****************************************************************************/
4 /* */
5 /* File: jcmds.c */
6 /* */
7 /* Purpose: DDD-commands for Join Environment */
8 /* */
9 /* Author: Klaus Birken */
10 /* Institut fuer Computeranwendungen III */
11 /* Universitaet Stuttgart */
12 /* Pfaffenwaldring 27 */
13 /* 70569 Stuttgart */
14 /* email: birken@ica3.uni-stuttgart.de */
15 /* phone: 0049-(0)711-685-7007 */
16 /* fax : 0049-(0)711-685-7000 */
17 /* */
18 /* History: 980126 kb begin */
19 /* */
20 /* Remarks: */
21 /* */
22 /****************************************************************************/
23
24 /****************************************************************************/
25 /* */
26 /* include files */
27 /* system include files */
28 /* application include files */
29 /* */
30 /****************************************************************************/
31
32 /* standard C library */
33 #include <config.h>
34 #include <cstdlib>
35 #include <cstdio>
36 #include <cstring>
37
38 #include <algorithm>
39 #include <iomanip>
40
41 #include <dune/common/exceptions.hh>
42 #include <dune/common/stdstreams.hh>
43
44 #include <dune/uggrid/parallel/ddd/dddcontext.hh>
45
46 #include <dune/uggrid/parallel/ddd/dddi.h>
47 #include "join.h"
48
49
50 USING_UG_NAMESPACE
51 using namespace PPIF;
52
53 START_UGDIM_NAMESPACE
54
55 using namespace DDD::Join;
56
57 /****************************************************************************/
58 /* */
59 /* data structures */
60 /* */
61 /****************************************************************************/
62
63
64
65
66
67 /****************************************************************************/
68 /* */
69 /* definition of exported global variables */
70 /* */
71 /****************************************************************************/
72
73
74
75 /****************************************************************************/
76 /* */
77 /* definition of variables global to this source file only (static!) */
78 /* */
79 /****************************************************************************/
80
81
82
83
84 /****************************************************************************/
85 /* */
86 /* routines */
87 /* */
88 /****************************************************************************/
89
90
91 /*
92 prepare messages for phase 1.
93
94 */
95
PreparePhase1Msgs(DDD::DDDContext & context,std::vector<JIJoin * > & arrayJoin,JOINMSG1 ** theMsgs,size_t * memUsage)96 static int PreparePhase1Msgs (DDD::DDDContext& context, std::vector<JIJoin*>& arrayJoin,
97 JOINMSG1 **theMsgs, size_t *memUsage)
98 {
99 auto& ctx = context.joinContext();
100
101 int i, last_i, nMsgs;
102 JIJoin** itemsJ = arrayJoin.data();
103 const int nJ = arrayJoin.size();
104
105 # if DebugJoin<=3
106 printf("%4d: PreparePhase1Msgs, nJoins=%d\n",
107 me, nJ);
108 fflush(stdout);
109 # endif
110
111 /* init return parameters */
112 *theMsgs = NULL;
113 *memUsage = 0;
114
115
116 if (nJ==0)
117 /* no messages */
118 return(0);
119
120
121 /* check whether Join objects are really local (without copies) */
122 /* and set local GID to invalid (will be set to new value lateron) */
123 for(i=0; i<nJ; i++)
124 {
125 if (ObjHasCpl(context, itemsJ[i]->hdr))
126 DUNE_THROW(Dune::Exception,
127 "cannot join " << OBJ_GID(itemsJ[i]->hdr)
128 << ", object already distributed");
129
130 OBJ_GID(itemsJ[i]->hdr) = GID_INVALID;
131 }
132
133
134 /* set local GID to new value */
135 for(i=0; i<nJ; i++)
136 {
137 DDD_GID local_gid = OBJ_GID(itemsJ[i]->hdr);
138
139 /* check for double Joins with different new_gid */
140 if (local_gid!=GID_INVALID && local_gid!=itemsJ[i]->new_gid)
141 DUNE_THROW(Dune::Exception,
142 "several (inconsistent) DDD_JoinObj-commands "
143 "for local object " << local_gid);
144
145 OBJ_GID(itemsJ[i]->hdr) = itemsJ[i]->new_gid;
146 }
147
148
149 nMsgs = 0;
150 last_i = i = 0;
151 do
152 {
153 size_t bufSize;
154
155 /* skip until dest-processor is different */
156 while (i<nJ && (itemsJ[i]->dest == itemsJ[last_i]->dest))
157 i++;
158
159 /* create new message */
160 JOINMSG1* jm = new JOINMSG1;
161 jm->nJoins = i-last_i;
162 jm->arrayJoin = &(itemsJ[last_i]);
163 jm->dest = itemsJ[last_i]->dest;
164 jm->next = *theMsgs;
165 *theMsgs = jm;
166 nMsgs++;
167
168 /* create new send message */
169 jm->msg_h = LC_NewSendMsg(context, ctx.phase1msg_t, jm->dest);
170
171 /* init table inside message */
172 LC_SetTableSize(jm->msg_h, ctx.jointab_id, jm->nJoins);
173
174 /* prepare message for sending away */
175 bufSize = LC_MsgPrepareSend(context, jm->msg_h);
176 *memUsage += bufSize;
177
178 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
179 {
180 Dune::dwarn
181 << "DDD MESG [" << std::setw(3) << context.me() << "]: SHOW_MEM "
182 << "send msg phase1 dest=" << std::setw(4) << jm->dest
183 << " size=" << std::setw(10) << bufSize << "\n";
184 }
185
186 last_i = i;
187
188 } while (last_i < nJ);
189
190 return(nMsgs);
191 }
192
193
194
195 /****************************************************************************/
196 /* */
197 /* Function: PackPhase1Msgs */
198 /* */
199 /* Purpose: allocate one message buffer for each outgoing message, */
200 /* fill buffer with message contents and initiate asynchronous */
201 /* send for each message. */
202 /* */
203 /* Input: theMsgs: list of message-send-infos */
204 /* */
205 /* Output: - */
206 /* */
207 /****************************************************************************/
208
PackPhase1Msgs(DDD::DDDContext & context,JOINMSG1 * theMsgs)209 static void PackPhase1Msgs (DDD::DDDContext& context, JOINMSG1 *theMsgs)
210 {
211 auto& ctx = context.joinContext();
212 JOINMSG1 *jm;
213
214 for(jm=theMsgs; jm!=NULL; jm=jm->next)
215 {
216 TEJoin *theJoinTab;
217 int i;
218
219 /* copy data into message */
220 theJoinTab = (TEJoin *)LC_GetPtr(jm->msg_h, ctx.jointab_id);
221 for(i=0; i<jm->nJoins; i++)
222 {
223 theJoinTab[i].gid = jm->arrayJoin[i]->new_gid;
224 theJoinTab[i].prio = OBJ_PRIO(jm->arrayJoin[i]->hdr);
225 }
226 LC_SetTableLen(jm->msg_h, ctx.jointab_id, jm->nJoins);
227
228
229 /* send away */
230 LC_MsgSend(context, jm->msg_h);
231 }
232 }
233
234
235
236 /*
237 unpack phase1 messages.
238 */
239
UnpackPhase1Msgs(DDD::DDDContext & context,LC_MSGHANDLE * theMsgs,int nRecvMsgs,DDD_HDR * localCplObjs,int nLCO,JIPartner ** p_joinObjs,int * p_nJoinObjs)240 static void UnpackPhase1Msgs (DDD::DDDContext& context,
241 LC_MSGHANDLE *theMsgs, int nRecvMsgs,
242 DDD_HDR *localCplObjs, int nLCO,
243 JIPartner **p_joinObjs, int *p_nJoinObjs)
244 {
245 auto& ctx = context.joinContext();
246 const auto& me = context.me();
247 JIPartner *joinObjs;
248 int nJoinObjs = 0;
249 int m, jo;
250
251 /* init return values */
252 *p_joinObjs = NULL;
253 *p_nJoinObjs = 0;
254
255
256 for(m=0; m<nRecvMsgs; m++)
257 {
258 LC_MSGHANDLE jm = theMsgs[m];
259 TEJoin *theJoin = (TEJoin *) LC_GetPtr(jm, ctx.jointab_id);
260 int nJ = (int) LC_GetTableLen(jm, ctx.jointab_id);
261 int i, j;
262
263 nJoinObjs += nJ;
264
265 for(i=0, j=0; i<nJ; i++)
266 {
267 while ((j<nLCO) && (OBJ_GID(localCplObjs[j]) < theJoin[i].gid))
268 j++;
269
270 if ((j<nLCO) && (OBJ_GID(localCplObjs[j])==theJoin[i].gid))
271 {
272 COUPLING *cpl;
273
274 /* found local object which is join target */
275 /* store shortcut to local object */
276 theJoin[i].hdr = localCplObjs[j];
277
278 /* generate phase2-JIAddCpl for this object */
279 for(cpl=ObjCplList(context, localCplObjs[j]); cpl!=NULL; cpl=CPL_NEXT(cpl))
280 {
281 JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2));
282 ji->dest = CPL_PROC(cpl);
283 ji->te.gid = theJoin[i].gid;
284 ji->te.proc = LC_MsgGetProc(jm);
285 ji->te.prio = theJoin[i].prio;
286
287 if (! JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2)))
288 continue;
289
290 # if DebugJoin<=1
291 printf("%4d: Phase1 Join for " DDD_GID_FMT " from %d, "
292 "send AddCpl to %d.\n",
293 me, theJoin[i].gid, ji->te.proc, ji->dest);
294 # endif
295
296 }
297
298 /* send phase3-JIAddCpl back to Join-proc */
299 for(cpl=ObjCplList(context, localCplObjs[j]); cpl!=NULL; cpl=CPL_NEXT(cpl))
300 {
301 JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
302 ji->dest = LC_MsgGetProc(jm);
303 ji->te.gid = OBJ_GID(localCplObjs[j]);
304 ji->te.proc = CPL_PROC(cpl);
305 ji->te.prio = cpl->prio;
306
307 if (! JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3)))
308 continue;
309 }
310 }
311 else
312 {
313 DUNE_THROW(Dune::Exception,
314 "no object " << theJoin[i].gid
315 << " for join from " << LC_MsgGetProc(jm));
316 }
317 }
318 }
319
320
321 /* return immediately if no join-objects have been found */
322 if (nJoinObjs==0)
323 return;
324
325
326 /* allocate array of objects, which has been contacted by a join */
327 joinObjs = new JIPartner[nJoinObjs];
328
329 /* set return values */
330 *p_joinObjs = joinObjs;
331 *p_nJoinObjs = nJoinObjs;
332
333
334 /* add one local coupling for each Join */
335 for(m=0, jo=0; m<nRecvMsgs; m++)
336 {
337 LC_MSGHANDLE jm = theMsgs[m];
338 TEJoin *theJoin = (TEJoin *) LC_GetPtr(jm, ctx.jointab_id);
339 int nJ = (int) LC_GetTableLen(jm, ctx.jointab_id);
340 int i;
341
342 for(i=0; i<nJ; i++)
343 {
344 AddCoupling(context, theJoin[i].hdr, LC_MsgGetProc(jm), theJoin[i].prio);
345
346 /* send one phase3-JIAddCpl for symmetric connection */
347 {
348 JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
349 ji->dest = LC_MsgGetProc(jm);
350 ji->te.gid = OBJ_GID(theJoin[i].hdr);
351 ji->te.proc = me;
352 ji->te.prio = OBJ_PRIO(theJoin[i].hdr);
353
354 JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
355 }
356
357 joinObjs[jo].hdr = theJoin[i].hdr;
358 joinObjs[jo].proc = LC_MsgGetProc(jm);
359 jo++;
360 }
361 }
362
363
364 /* sort joinObjs-array according to gid */
365 if (nJoinObjs>1) {
366 std::sort(
367 joinObjs, joinObjs + nJoinObjs,
368 [](const JIPartner& a, const JIPartner& b) {
369 return OBJ_GID(a.hdr) < OBJ_GID(b.hdr);
370 });
371 }
372 }
373
374
375
376 /****************************************************************************/
377
378 /*
379 prepare messages for phase 2.
380
381 */
382
PreparePhase2Msgs(DDD::DDDContext & context,std::vector<JIAddCpl * > & arrayAddCpl,JOINMSG2 ** theMsgs,size_t * memUsage)383 static int PreparePhase2Msgs (DDD::DDDContext& context, std::vector<JIAddCpl*>& arrayAddCpl,
384 JOINMSG2 **theMsgs, size_t *memUsage)
385 {
386 auto& ctx = context.joinContext();
387 const auto& me = context.me();
388
389 int i, last_i, nMsgs;
390 JIAddCpl** itemsAC = arrayAddCpl.data();
391 const int nAC = arrayAddCpl.size();
392
393 # if DebugJoin<=3
394 printf("%4d: PreparePhase2Msgs, nAddCpls=%d\n",
395 me, nAC);
396 fflush(stdout);
397 # endif
398
399 /* init return parameters */
400 *theMsgs = NULL;
401 *memUsage = 0;
402
403
404 if (nAC==0)
405 /* no messages */
406 return(0);
407
408
409 nMsgs = 0;
410 last_i = i = 0;
411 do
412 {
413 size_t bufSize;
414
415 /* skip until dest-processor is different */
416 while (i<nAC && (itemsAC[i]->dest == itemsAC[last_i]->dest))
417 i++;
418
419 /* create new message */
420 JOINMSG2* jm = new JOINMSG2;
421 jm->nAddCpls = i-last_i;
422 jm->arrayAddCpl = &(itemsAC[last_i]);
423 jm->dest = itemsAC[last_i]->dest;
424 jm->next = *theMsgs;
425 *theMsgs = jm;
426 nMsgs++;
427
428 /* create new send message */
429 jm->msg_h = LC_NewSendMsg(context, ctx.phase2msg_t, jm->dest);
430
431 /* init table inside message */
432 LC_SetTableSize(jm->msg_h, ctx.addtab_id, jm->nAddCpls);
433
434 /* prepare message for sending away */
435 bufSize = LC_MsgPrepareSend(context, jm->msg_h);
436 *memUsage += bufSize;
437
438 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
439 {
440 Dune::dwarn
441 << "DDD MESG [" << std::setw(3) << me << "]: SHOW_MEM "
442 << "send msg phase2 dest=" << std::setw(4) << jm->dest
443 << " size=" << std::setw(10) << bufSize << "\n";
444 }
445
446 last_i = i;
447
448 } while (last_i < nAC);
449
450 return(nMsgs);
451 }
452
453
454
455 /****************************************************************************/
456 /* */
457 /* Function: PackPhase2Msgs */
458 /* */
459 /* Purpose: allocate one message buffer for each outgoing message, */
460 /* fill buffer with message contents and initiate asynchronous */
461 /* send for each message. */
462 /* */
463 /* Input: theMsgs: list of message-send-infos */
464 /* */
465 /* Output: - */
466 /* */
467 /****************************************************************************/
468
PackPhase2Msgs(DDD::DDDContext & context,JOINMSG2 * theMsgs)469 static void PackPhase2Msgs(DDD::DDDContext& context, JOINMSG2 *theMsgs)
470 {
471 auto& ctx = context.joinContext();
472
473 JOINMSG2 *jm;
474
475 for(jm=theMsgs; jm!=NULL; jm=jm->next)
476 {
477 TEAddCpl *theAddTab;
478 int i;
479
480 /* copy data into message */
481 theAddTab = (TEAddCpl *)LC_GetPtr(jm->msg_h, ctx.addtab_id);
482 for(i=0; i<jm->nAddCpls; i++)
483 {
484 /* copy complete TEAddCpl item */
485 theAddTab[i] = jm->arrayAddCpl[i]->te;
486 }
487 LC_SetTableLen(jm->msg_h, ctx.addtab_id, jm->nAddCpls);
488
489
490 /* send away */
491 LC_MsgSend(context, jm->msg_h);
492 }
493 }
494
495
496
497 /*
498 unpack phase2 messages.
499 */
500
UnpackPhase2Msgs(DDD::DDDContext & context,LC_MSGHANDLE * theMsgs2,int nRecvMsgs2,JIPartner * joinObjs,int nJoinObjs,DDD_HDR * localCplObjs,int nLCO)501 static void UnpackPhase2Msgs (DDD::DDDContext& context,
502 LC_MSGHANDLE *theMsgs2, int nRecvMsgs2,
503 JIPartner *joinObjs, int nJoinObjs,
504 DDD_HDR *localCplObjs, int nLCO)
505 {
506 auto& ctx = context.joinContext();
507
508 int m;
509
510 for(m=0; m<nRecvMsgs2; m++)
511 {
512 LC_MSGHANDLE jm = theMsgs2[m];
513 TEAddCpl *theAC = (TEAddCpl *) LC_GetPtr(jm, ctx.addtab_id);
514 int nAC = (int) LC_GetTableLen(jm, ctx.addtab_id);
515 int i, j, jo;
516
517 for(i=0, j=0, jo=0; i<nAC; i++)
518 {
519 while ((j<nLCO) && (OBJ_GID(localCplObjs[j]) < theAC[i].gid))
520 j++;
521
522 while ((jo<nJoinObjs) && (OBJ_GID(joinObjs[jo].hdr) < theAC[i].gid))
523 jo++;
524
525 if ((j<nLCO) && (OBJ_GID(localCplObjs[j])==theAC[i].gid))
526 {
527 /* found local object which is AddCpl target */
528 AddCoupling(context, localCplObjs[j], theAC[i].proc, theAC[i].prio);
529
530 # if DebugJoin<=1
531 printf("%4d: Phase2 execute AddCpl(%08x,%d,%d) (from %d).\n",
532 context.me(),
533 theAC[i].gid, theAC[i].proc, theAC[i].prio,
534 LC_MsgGetProc(jm));
535 # endif
536
537 while ((jo<nJoinObjs) && (OBJ_GID(joinObjs[jo].hdr) == theAC[i].gid))
538 {
539 JIAddCpl *ji = JIAddCplSet_NewItem(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
540 ji->dest = joinObjs[jo].proc;
541 ji->te.gid = theAC[i].gid;
542 ji->te.proc = theAC[i].proc;
543 ji->te.prio = theAC[i].prio;
544 JIAddCplSet_ItemOK(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
545
546
547 # if DebugJoin<=1
548 printf("%4d: Phase2 forward AddCpl(%08x,%d,%d) to %d.\n",
549 context.me(),
550 theAC[i].gid, theAC[i].proc, theAC[i].prio,
551 ji->dest);
552 # endif
553
554 jo++;
555 }
556 }
557 else
558 {
559 /* this should never happen. AddCpl send from invalid proc. */
560 assert(0);
561 }
562 }
563 }
564 }
565
566
567
568 /****************************************************************************/
569
570
571
572 /*
573 prepare messages for phase 3.
574
575 */
576
PreparePhase3Msgs(DDD::DDDContext & context,std::vector<JIAddCpl * > & arrayAddCpl,JOINMSG3 ** theMsgs,size_t * memUsage)577 static int PreparePhase3Msgs (DDD::DDDContext& context, std::vector<JIAddCpl*>& arrayAddCpl,
578 JOINMSG3 **theMsgs, size_t *memUsage)
579 {
580 auto& ctx = context.joinContext();
581 const auto& me = context.me();
582
583 int i, last_i, nMsgs;
584 JIAddCpl** itemsAC = arrayAddCpl.data();
585 const int nAC = arrayAddCpl.size();
586
587 # if DebugJoin<=3
588 printf("%4d: PreparePhase3Msgs, nAddCpls=%d\n",
589 me, nAC);
590 fflush(stdout);
591 # endif
592
593 /* init return parameters */
594 *theMsgs = NULL;
595 *memUsage = 0;
596
597
598 if (nAC==0)
599 /* no messages */
600 return(0);
601
602
603 nMsgs = 0;
604 last_i = i = 0;
605 do
606 {
607 size_t bufSize;
608
609 /* skip until dest-processor is different */
610 while (i<nAC && (itemsAC[i]->dest == itemsAC[last_i]->dest))
611 i++;
612
613 /* create new message */
614 JOINMSG3* jm = new JOINMSG3;
615 jm->nAddCpls = i-last_i;
616 jm->arrayAddCpl = &(itemsAC[last_i]);
617 jm->dest = itemsAC[last_i]->dest;
618 jm->next = *theMsgs;
619 *theMsgs = jm;
620 nMsgs++;
621
622 /* create new send message */
623 jm->msg_h = LC_NewSendMsg(context, ctx.phase3msg_t, jm->dest);
624
625 /* init table inside message */
626 LC_SetTableSize(jm->msg_h, ctx.cpltab_id, jm->nAddCpls);
627
628 /* prepare message for sending away */
629 bufSize = LC_MsgPrepareSend(context, jm->msg_h);
630 *memUsage += bufSize;
631
632 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
633 {
634 Dune::dwarn
635 << "DDD MESG [" << std::setw(3) << me << "]: SHOW_MEM "
636 << "send msg phase3 dest=" << std::setw(4) << jm->dest
637 << " size=" << std::setw(10) << bufSize << "\n";
638 }
639
640 last_i = i;
641
642 } while (last_i < nAC);
643
644 return(nMsgs);
645 }
646
647
648
649 /****************************************************************************/
650 /* */
651 /* Function: PackPhase3Msgs */
652 /* */
653 /* Purpose: allocate one message buffer for each outgoing message, */
654 /* fill buffer with message contents and initiate asynchronous */
655 /* send for each message. */
656 /* */
657 /* Input: theMsgs: list of message-send-infos */
658 /* */
659 /* Output: - */
660 /* */
661 /****************************************************************************/
662
PackPhase3Msgs(DDD::DDDContext & context,JOINMSG3 * theMsgs)663 static void PackPhase3Msgs(DDD::DDDContext& context, JOINMSG3 *theMsgs)
664 {
665 auto& ctx = context.joinContext();
666
667 JOINMSG3 *jm;
668
669 for(jm=theMsgs; jm!=NULL; jm=jm->next)
670 {
671 TEAddCpl *theAddTab;
672 int i;
673
674 /* copy data into message */
675 theAddTab = (TEAddCpl *)LC_GetPtr(jm->msg_h, ctx.cpltab_id);
676 for(i=0; i<jm->nAddCpls; i++)
677 {
678 /* copy complete TEAddCpl item */
679 theAddTab[i] = jm->arrayAddCpl[i]->te;
680 }
681 LC_SetTableLen(jm->msg_h, ctx.cpltab_id, jm->nAddCpls);
682
683
684 /* send away */
685 LC_MsgSend(context, jm->msg_h);
686 }
687 }
688
689
690
691 /****************************************************************************/
692
693
694 /*
695 unpack phase3 messages.
696 */
697
UnpackPhase3Msgs(DDD::DDDContext & context,LC_MSGHANDLE * theMsgs,int nRecvMsgs,std::vector<JIJoin * > & arrayJoin)698 static void UnpackPhase3Msgs (DDD::DDDContext& context,
699 LC_MSGHANDLE *theMsgs, int nRecvMsgs,
700 std::vector<JIJoin*>& arrayJoin)
701 {
702 auto& ctx = context.joinContext();
703
704 JIJoin** itemsJ = arrayJoin.data();
705 const int nJ = arrayJoin.size();
706 int m;
707
708
709 for(m=0; m<nRecvMsgs; m++)
710 {
711 LC_MSGHANDLE jm = theMsgs[m];
712 TEAddCpl *theAC = (TEAddCpl *) LC_GetPtr(jm, ctx.cpltab_id);
713 int nAC = (int) LC_GetTableLen(jm, ctx.cpltab_id);
714 int i, j;
715
716 for(i=0, j=0; i<nAC; i++)
717 {
718 while ((j<nJ) && (OBJ_GID(itemsJ[j]->hdr) < theAC[i].gid))
719 j++;
720
721 if ((j<nJ) && (OBJ_GID(itemsJ[j]->hdr) == theAC[i].gid))
722 {
723 /* found local object which is AddCpl target */
724 AddCoupling(context, itemsJ[j]->hdr, theAC[i].proc, theAC[i].prio);
725
726 # if DebugJoin<=1
727 printf("%4d: Phase3 execute AddCpl(%08x,%d,%d) (from %d).\n",
728 context.me(),
729 OBJ_GID(itemsJ[j]->hdr), theAC[i].proc, theAC[i].prio,
730 LC_MsgGetProc(jm));
731 # endif
732 }
733 else
734 {
735 /* this should never happen. AddCpl send for unknown obj. */
736 assert(0);
737 }
738 }
739 }
740 }
741
742
743
744
745 /****************************************************************************/
746 /* */
747 /* Function: DDD_JoinEnd */
748 /* */
749 /****************************************************************************/
750
751 /**
752 End of join phase.
753 This function starts the actual join process. After a call to
754 this function (on all processors) all {\bf Join}-commands since
755 the last call to \funk{JoinBegin} are executed. This involves
756 a set of local communications between the processors.
757 */
758
DDD_JoinEnd(DDD::DDDContext & context)759 DDD_RET DDD_JoinEnd(DDD::DDDContext& context)
760 {
761 auto& ctx = context.joinContext();
762
763 int obsolete, nRecvMsgs1, nRecvMsgs2, nRecvMsgs3;
764 JOINMSG1 *sendMsgs1=NULL, *sm1=NULL;
765 JOINMSG2 *sendMsgs2=NULL, *sm2=NULL;
766 JOINMSG3 *sendMsgs3=NULL, *sm3=NULL;
767 LC_MSGHANDLE *recvMsgs1=NULL, *recvMsgs2=NULL, *recvMsgs3=NULL;
768 size_t sendMem=0, recvMem=0;
769 JIPartner *joinObjs = NULL;
770 int nJoinObjs;
771 const auto& nCpls = context.couplingContext().nCpls;
772
773
774
775 STAT_SET_MODULE(DDD_MODULE_JOIN);
776 STAT_ZEROALL;
777
778 /* step mode and check whether call to JoinEnd is valid */
779 if (!JoinStepMode(context, JoinMode::JMODE_CMDS))
780 DUNE_THROW(Dune::Exception, "DDD_JoinEnd() aborted");
781
782
783 /*
784 PREPARATION PHASE
785 */
786 /* get sorted array of JIJoin-items */
787 std::vector<JIJoin*> arrayJIJoin = JIJoinSet_GetArray(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
788 obsolete = JIJoinSet_GetNDiscarded(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
789
790
791 /*
792 COMMUNICATION PHASE 1
793 all processors, where JoinObj-commands have been issued,
794 send information about these commands to the target
795 processors together with the GID of the objects on the
796 target procs and the local priority.
797 */
798 STAT_RESET;
799 /* prepare msgs for JIJoin-items */
800 PreparePhase1Msgs(context, arrayJIJoin, &sendMsgs1, &sendMem);
801 /* DisplayMemResources(); */
802
803 /* init communication topology */
804 nRecvMsgs1 = LC_Connect(context, ctx.phase1msg_t);
805 STAT_TIMER(T_JOIN_PREP_MSGS);
806
807 STAT_RESET;
808 /* build phase1 msgs on sender side and start send */
809 PackPhase1Msgs(context, sendMsgs1);
810 STAT_TIMER(T_JOIN_PACK_SEND);
811
812
813 /*
814 now messages are in the net, use spare time
815 */
816 STAT_RESET;
817 /* get sorted list of local objects with couplings */
818 std::vector<DDD_HDR> localCplObjs = LocalCoupledObjectsList(context);
819
820 if (obsolete>0)
821 {
822 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_OBSOLETE)
823 {
824 int all = JIJoinSet_GetNItems(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
825
826 using std::setw;
827 Dune::dwarn
828 << "DDD MESG [" << setw(3) << context.me() << "]: " << setw(4) << obsolete
829 << " from " << setw(4) << all << " join-cmds obsolete.\n";
830 }
831 }
832 STAT_TIMER(T_JOIN);
833
834
835 /*
836 nothing more to do until incoming messages arrive
837 */
838
839 /* display information about send-messages on lowcomm-level */
840 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
841 {
842 DDD_SyncAll(context);
843 if (context.isMaster())
844 Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase1Msg.Send\n";
845 LC_PrintSendMsgs(context);
846 }
847
848
849 /* wait for communication-completion (send AND receive) */
850 STAT_RESET;
851 recvMsgs1 = LC_Communicate(context);
852 STAT_TIMER(T_JOIN_WAIT_RECV);
853
854
855 /* display information about message buffer sizes */
856 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
857 {
858 int k;
859
860 /* sum up sizes of receive mesg buffers */
861 for(k=0; k<nRecvMsgs1; k++)
862 {
863 recvMem += LC_GetBufferSize(recvMsgs1[k]);
864 }
865
866 using std::setw;
867 Dune::dwarn
868 << "DDD MESG [" << setw(3) << context.me() << "]: SHOW_MEM msgs "
869 << " send=" << setw(10) << sendMem
870 << " recv=" << setw(10) << recvMem
871 << " all=" << setw(10) << (sendMem+recvMem) << "\n";
872 }
873
874 /* display information about recv-messages on lowcomm-level */
875 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
876 {
877 DDD_SyncAll(context);
878 if (context.isMaster())
879 Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase1Msg.Recv\n";
880 LC_PrintRecvMsgs(context);
881 }
882
883 /* unpack messages */
884 STAT_RESET;
885 UnpackPhase1Msgs(context, recvMsgs1, nRecvMsgs1, localCplObjs.data(), nCpls,
886 &joinObjs, &nJoinObjs);
887 LC_Cleanup(context);
888 STAT_TIMER(T_JOIN_UNPACK);
889
890
891
892
893
894 /*
895 COMMUNICATION PHASE 2
896 all processors which received notification of JoinObj-commands
897 during phase 1 send AddCpl-requests to all copies of DDD objects,
898 for which Joins had been issued remotely.
899 */
900 /* get sorted array of JIAddCpl-items */
901 std::vector<JIAddCpl*> arrayJIAddCpl2 = JIAddCplSet_GetArray(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2));
902
903 STAT_RESET;
904 /* prepare msgs for JIAddCpl-items */
905 PreparePhase2Msgs(context, arrayJIAddCpl2, &sendMsgs2, &sendMem);
906 /* DisplayMemResources(); */
907
908 /* init communication topology */
909 nRecvMsgs2 = LC_Connect(context, ctx.phase2msg_t);
910 STAT_TIMER(T_JOIN_PREP_MSGS);
911
912 STAT_RESET;
913 /* build phase2 msgs on sender side and start send */
914 PackPhase2Msgs(context, sendMsgs2);
915 STAT_TIMER(T_JOIN_PACK_SEND);
916
917 /*
918 now messages are in the net, use spare time
919 */
920
921 /* reorder Join-commands according to new_gid */
922 /* this ordering is needed in UnpackPhase3 */
923 if (arrayJIJoin.size() > 1)
924 {
925 std::sort(
926 arrayJIJoin.begin(), arrayJIJoin.end(),
927 [](const JIJoin* a, const JIJoin* b) {
928 return a->new_gid < b->new_gid;
929 });
930 }
931
932
933 /*
934 nothing more to do until incoming messages arrive
935 */
936
937 /* display information about send-messages on lowcomm-level */
938 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
939 {
940 DDD_SyncAll(context);
941 if (context.isMaster())
942 Dune::dwarn <<"DDD JOIN_SHOW_MSGSALL: Phase2Msg.Send\n";
943 LC_PrintSendMsgs(context);
944 }
945
946
947 /* wait for communication-completion (send AND receive) */
948 STAT_RESET;
949 recvMsgs2 = LC_Communicate(context);
950 STAT_TIMER(T_JOIN_WAIT_RECV);
951
952
953 /* display information about message buffer sizes */
954 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
955 {
956 int k;
957
958 /* sum up sizes of receive mesg buffers */
959 for(k=0; k<nRecvMsgs2; k++)
960 {
961 recvMem += LC_GetBufferSize(recvMsgs2[k]);
962 }
963
964 using std::setw;
965 Dune::dwarn
966 << "DDD MESG [" << setw(3) << context.me() << "]: SHOW_MEM msgs "
967 << " send=" << setw(10) << sendMem
968 << " recv=" << setw(10) << recvMem
969 << " all=" << setw(10) << (sendMem+recvMem) << "\n";
970 }
971
972 /* display information about recv-messages on lowcomm-level */
973 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
974 {
975 DDD_SyncAll(context);
976 if (context.isMaster())
977 Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase2Msg.Recv\n";
978 LC_PrintRecvMsgs(context);
979 }
980
981 /* unpack messages */
982 STAT_RESET;
983 UnpackPhase2Msgs(context, recvMsgs2, nRecvMsgs2, joinObjs, nJoinObjs,
984 localCplObjs.data(), nCpls);
985
986 LC_Cleanup(context);
987 STAT_TIMER(T_JOIN_UNPACK);
988
989 for(; sendMsgs2!=NULL; sendMsgs2=sm2)
990 {
991 sm2 = sendMsgs2->next;
992 delete sendMsgs2;
993 }
994
995
996
997
998
999
1000 /*
1001 COMMUNICATION PHASE 3
1002 all processors which received notification of JoinObj-commands
1003 during phase 1 send AddCpl-requests to the procs where the
1004 JoinObj-commands have been issued. One AddCpl-request is sent
1005 for each cpl in the local objects coupling list. One AddCpl-request
1006 is sent for each AddCpl-request received during phase 2.
1007 (i.e., two kinds of AddCpl-requests are send to the processors
1008 on which the JoinObj-commands have been issued.
1009 */
1010 /* get sorted array of JIAddCpl-items */
1011 std::vector<JIAddCpl*> arrayJIAddCpl3 = JIAddCplSet_GetArray(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
1012
1013 STAT_RESET;
1014 /* prepare msgs for JIAddCpl-items */
1015 PreparePhase3Msgs(context, arrayJIAddCpl3, &sendMsgs3, &sendMem);
1016 /* DisplayMemResources(); */
1017
1018 /* init communication topology */
1019 nRecvMsgs3 = LC_Connect(context, ctx.phase3msg_t);
1020 STAT_TIMER(T_JOIN_PREP_MSGS);
1021
1022 STAT_RESET;
1023 /* build phase3 msgs on sender side and start send */
1024 PackPhase3Msgs(context, sendMsgs3);
1025 STAT_TIMER(T_JOIN_PACK_SEND);
1026
1027 /*
1028 now messages are in the net, use spare time
1029 */
1030 /* ... */
1031
1032 /*
1033 nothing more to do until incoming messages arrive
1034 */
1035
1036 /* display information about send-messages on lowcomm-level */
1037 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
1038 {
1039 DDD_SyncAll(context);
1040 if (context.isMaster())
1041 Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase3Msg.Send\n";
1042 LC_PrintSendMsgs(context);
1043 }
1044
1045
1046 /* wait for communication-completion (send AND receive) */
1047 STAT_RESET;
1048 recvMsgs3 = LC_Communicate(context);
1049 STAT_TIMER(T_JOIN_WAIT_RECV);
1050
1051
1052 /* display information about message buffer sizes */
1053 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MEMUSAGE)
1054 {
1055 int k;
1056
1057 /* sum up sizes of receive mesg buffers */
1058 for(k=0; k<nRecvMsgs3; k++)
1059 {
1060 recvMem += LC_GetBufferSize(recvMsgs3[k]);
1061 }
1062
1063 using std::setw;
1064 Dune::dwarn
1065 << "DDD MESG [" << setw(3) << context.me() << "]: SHOW_MEM msgs "
1066 << " send=" << setw(10) << sendMem
1067 << " recv=" << setw(10) << recvMem
1068 << " all=" << setw(10) << (sendMem+recvMem) << "\n";
1069 }
1070
1071 /* display information about recv-messages on lowcomm-level */
1072 if (DDD_GetOption(context, OPT_INFO_JOIN) & JOIN_SHOW_MSGSALL)
1073 {
1074 DDD_SyncAll(context);
1075 if (context.isMaster())
1076 Dune::dwarn << "DDD JOIN_SHOW_MSGSALL: Phase3Msg.Recv\n";
1077 LC_PrintRecvMsgs(context);
1078 }
1079
1080 /* unpack messages */
1081 STAT_RESET;
1082 UnpackPhase3Msgs(context, recvMsgs3, nRecvMsgs3, arrayJIJoin);
1083 LC_Cleanup(context);
1084 STAT_TIMER(T_JOIN_UNPACK);
1085
1086 for(; sendMsgs3!=NULL; sendMsgs3=sm3)
1087 {
1088 sm3 = sendMsgs3->next;
1089 delete sendMsgs3;
1090 }
1091
1092
1093
1094
1095
1096
1097 /*
1098 free temporary storage
1099 */
1100 JIJoinSet_Reset(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
1101
1102 JIAddCplSet_Reset(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl2));
1103
1104 JIAddCplSet_Reset(reinterpret_cast<JIAddCplSet*>(ctx.setJIAddCpl3));
1105
1106 if (joinObjs!=NULL)
1107 delete[] joinObjs;
1108
1109 for(; sendMsgs1!=NULL; sendMsgs1=sm1)
1110 {
1111 sm1 = sendMsgs1->next;
1112 delete sendMsgs1;
1113 }
1114
1115
1116
1117 # if DebugJoin<=4
1118 Dune::dverb << "JoinEnd, before IFAllFromScratch().\n";
1119 # endif
1120
1121 /* re-create all interfaces and step JMODE */
1122 STAT_RESET;
1123 IFAllFromScratch(context);
1124 STAT_TIMER(T_JOIN_BUILD_IF);
1125
1126
1127 JoinStepMode(context, JoinMode::JMODE_BUSY);
1128
1129 return(DDD_RET_OK);
1130 }
1131
1132
1133
1134
1135
1136 /****************************************************************************/
1137 /* */
1138 /* Function: DDD_JoinObj */
1139 /* */
1140 /****************************************************************************/
1141
1142 /**
1143 Join local object with a distributed object.
1144
1145 \todoTBC
1146
1147 @param hdr DDD local object which should be joined.
1148 */
1149
1150
1151
DDD_JoinObj(DDD::DDDContext & context,DDD_HDR hdr,DDD_PROC dest,DDD_GID new_gid)1152 void DDD_JoinObj(DDD::DDDContext& context, DDD_HDR hdr, DDD_PROC dest, DDD_GID new_gid)
1153 {
1154 auto& ctx = context.joinContext();
1155 const auto procs = context.procs();
1156
1157 if (!ddd_JoinActive(context))
1158 DUNE_THROW(Dune::Exception, "Missing DDD_JoinBegin()");
1159
1160 if (dest>=procs)
1161 DUNE_THROW(Dune::Exception,
1162 "cannot join " << OBJ_GID(hdr) << " with " << new_gid
1163 << " on processor " << dest << " (procs=" << procs << ")");
1164
1165 if (dest==context.me())
1166 DUNE_THROW(Dune::Exception,
1167 "cannot join " << OBJ_GID(hdr) << " with myself");
1168
1169 if (ObjHasCpl(context, hdr))
1170 DUNE_THROW(Dune::Exception,
1171 "cannot join " << OBJ_GID(hdr)
1172 << ", object already distributed");
1173
1174
1175
1176 JIJoin* ji = JIJoinSet_NewItem(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin));
1177 ji->hdr = hdr;
1178 ji->dest = dest;
1179 ji->new_gid = new_gid;
1180
1181 if (! JIJoinSet_ItemOK(reinterpret_cast<JIJoinSet*>(ctx.setJIJoin)))
1182 return;
1183
1184 # if DebugJoin<=2
1185 Dune:dvverb << "DDD_JoinObj " << OBJ_GID(hdr)
1186 << ", dest=" << dest << ", new_gid=" << new_gid << "\n";
1187 # endif
1188 }
1189
1190
1191
1192
1193 /****************************************************************************/
1194 /* */
1195 /* Function: DDD_JoinBegin */
1196 /* */
1197 /****************************************************************************/
1198
1199 /**
1200 Starts join phase.
1201 A call to this function establishes a global join operation.
1202 It must be issued on all processors. After this call an arbitrary
1203 series of {\bf Join}-commands may be issued. The global transfer operation
1204 is carried out via a \funk{JoinEnd} call on each processor.
1205 */
1206
DDD_JoinBegin(DDD::DDDContext & context)1207 void DDD_JoinBegin(DDD::DDDContext& context)
1208 {
1209 /* step mode and check whether call to JoinBegin is valid */
1210 if (!JoinStepMode(context, JoinMode::JMODE_IDLE))
1211 DUNE_THROW(Dune::Exception, "DDD_JoinBegin() aborted");
1212 }
1213
1214
1215
1216 END_UGDIM_NAMESPACE
1217