1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 /****************************************************************************/
4 /* */
5 /* File: lowcomm.c */
6 /* */
7 /* Purpose: lowlevel communication layer */
8 /* */
9 /* Author: Klaus Birken */
10 /* Institut fuer Computeranwendungen III */
11 /* Universitaet Stuttgart */
12 /* Pfaffenwaldring 27 */
13 /* 70569 Stuttgart */
14 /* internet: birken@ica3.uni-stuttgart.de */
15 /* */
16 /* History: 960715 kb begin */
17 /* 971007 kb reworked */
18 /* */
19 /* Remarks: */
20 /* This module provides two basic abstractions: */
21 /* - sending of messages without explicit receive calls */
22 /* - message types consisting of a set of components, where */
23 /* components are tables (with entries of equal sizes) and */
24 /* raw data chunks. */
25 /* */
26 /* The LowComm subsystem uses the Notify-subsystem in order to */
27 /* tell receiving processors that corresponding send-calls had */
28 /* been issued. */
29 /* */
30 /* The structure of each message is: */
31 /* */
32 /* description | type */
33 /* -------------------------------------------+--------- */
34 /* magic number | ULONG */
35 /* #components | ULONG */
36 /* offset component1 (from beginning of Msg) | ULONG */
37 /* length component1 (in bytes) | ULONG */
38 /* nItems component1 | ULONG */
39 /* ... | ... */
40 /* offset componentN | ULONG */
41 /* length componentN | ULONG */
42 /* nItems componentN | ULONG */
43 /* component1 */
44 /* ... */
45 /* componentN */
46 /* */
47 /* The LowComm subsystem is able to handle low-memory situations,*/
48 /* where the available memory is not enough for all send- and */
49 /* receive-buffers. See LC_MsgAlloc for details. */
50 /* */
51 /****************************************************************************/
52
53 /****************************************************************************/
54 /* */
55 /* include files */
56 /* system include files */
57 /* application include files */
58 /* */
59 /****************************************************************************/
60
61 /* standard C library */
62 #include <config.h>
63 #include <cstdlib>
64 #include <cstdio>
65 #include <cstring>
66 #include <cassert>
67
68 #include <iomanip>
69 #include <iostream>
70 #include <new>
71
72 #include <dune/common/exceptions.hh>
73 #include <dune/common/stdstreams.hh>
74
75 #include <dune/uggrid/parallel/ddd/dddi.h>
76 #include "lowcomm.h"
77 #include "notify.h"
78
79 #include <dune/uggrid/parallel/ddd/dddcontext.hh>
80
81 USING_UG_NAMESPACES
82
83 /* PPIF namespace: */
84 using namespace PPIF;
85
86 #define DebugLowComm 10 /* 0 is all, 10 is off */
87
88 namespace DDD {
89 namespace Basic {
90
91 /****************************************************************************/
92 /* */
93 /* defines in the following order */
94 /* */
95 /* compile time constants defining static data size (i.e. arrays) */
96 /* other constants */
97 /* macros */
98 /* */
99 /****************************************************************************/
100
101
102 /* maximum number of components in a message */
103 #define MAX_COMPONENTS 8
104
105
106 /* dummy magic number for messages */
107 #define MAGIC_DUMMY 0x1234
108
109
110 /* number of entries per chunk in message header */
111 #define HDR_ENTRIES_PER_CHUNK 3
112
113 enum CompType {
114 CT_NONE,
115 CT_TABLE,
116 CT_CHUNK
117 };
118
119
120 /****************************************************************************/
121 /* */
122 /* data structures */
123 /* */
124 /****************************************************************************/
125
126 struct COMP_DESC
127 {
128 const char *name; /* textual description of component */
129 int type; /* type of this component */
130 size_t entry_size; /* size per entry (for tables) */
131 };
132
133
134 struct MSG_TYPE
135 {
136 const char *name; /* textual description of msgtype */
137 int nComps; /* number of components */
138 COMP_DESC comp[MAX_COMPONENTS]; /* component array */
139
140 MSG_TYPE *next; /* linked list of all message types */
141 };
142
143
144 struct CHUNK_DESC
145 {
146 size_t size; /* size of chunk (in bytes) */
147 ULONG entries; /* number of valid entries (for tables) */
148 ULONG offset; /* offset of chunk in MSG */
149 };
150
151
152 enum MsgState { MSTATE_NEW, MSTATE_FREEZED, MSTATE_ALLOCATED, MSTATE_COMM, MSTATE_READY };
153
154 struct MSG_DESC
155 {
156 int msgState; /* message state of this message (one of MsgState) */
157 MSG_TYPE *msgType; /* message type of this message */
158
159 ULONG magic; /* magic number */
160 CHUNK_DESC *chunks; /* array of chunks */
161
162
163 size_t bufferSize; /* size of message buffer (in bytes) */
164 char *buffer; /* address of message buffer */
165
166
167 MSG_DESC *next; /* linked list inside Send/Recv-queue */
168 DDD_PROC proc;
169 msgid msgId;
170
171 };
172
173
174 struct TABLE_DESC
175 {
176 size_t entry_size; /* size of one table entry */
177 int nMax; /* number of entries in table */
178 int nValid; /* number of valid entries */
179 };
180
181 } /* namespace Basic */
182 } /* namespace DDD */
183
184 /****************************************************************************/
185 /* */
186 /* routines */
187 /* */
188 /****************************************************************************/
189
190 namespace DDD {
191
192 using namespace DDD::Basic;
193
194 /**
195 Initiates LowComm subsystem.
196 This function has to be called exactly once in order
197 to initialize the LowComm subsystem. After a call to
198 this function, the functionality of the LowComm can
199 be used.
200
201 @param aAllocFunc memory allocation function used as the default
202 @param aFreeFunc memory free function used as the default
203 */
204
LC_Init(DDD::DDDContext & context,AllocFunc aAllocFunc,FreeFunc aFreeFunc)205 void LC_Init(DDD::DDDContext& context, AllocFunc aAllocFunc, FreeFunc aFreeFunc)
206 {
207 auto& lcContext = context.lowCommContext();
208 lcContext.DefaultAlloc = aAllocFunc;
209 lcContext.DefaultFree = aFreeFunc;
210 LC_SetMemMgrDefault(context);
211 }
212
213
214
215 /**
216 Exits LowComm subsystem.
217 This function frees memory allocated by the LowComm subsystem
218 and shuts down its communication structures.
219 */
220
LC_Exit(DDD::DDDContext & context)221 void LC_Exit(DDD::DDDContext& context)
222 {
223 auto& lcContext = context.lowCommContext();
224
225 {
226 auto md = lcContext.FreeMsgDescs;
227 while (md != nullptr) {
228 const auto next = md->next;
229 delete md;
230 md = next;
231 }
232 lcContext.FreeMsgDescs = nullptr;
233 }
234
235 {
236 auto mt = lcContext.MsgTypes;
237 while (mt != nullptr) {
238 const auto next = mt->next;
239 delete mt;
240 mt = next;
241 }
242 lcContext.MsgTypes = nullptr;
243 }
244 }
245
246
247
248
249 /**
250 Customizing memory management for LowComm subsystem.
251 Currently this function supports only alloc/free of
252 buffers for messages to be sent.
253 */
LC_SetMemMgrSend(DDD::DDDContext & context,AllocFunc aAllocFunc,FreeFunc aFreeFunc)254 void LC_SetMemMgrSend(DDD::DDDContext& context, AllocFunc aAllocFunc, FreeFunc aFreeFunc)
255 {
256 auto& lcContext = context.lowCommContext();
257 lcContext.SendAlloc = aAllocFunc;
258 lcContext.SendFree = aFreeFunc;
259 }
260
261
262 /**
263 Customizing memory management for LowComm subsystem.
264 Currently this function supports only alloc/free of
265 buffers for messages to be received.
266 */
LC_SetMemMgrRecv(DDD::DDDContext & context,AllocFunc aAllocFunc,FreeFunc aFreeFunc)267 void LC_SetMemMgrRecv(DDD::DDDContext& context, AllocFunc aAllocFunc, FreeFunc aFreeFunc)
268 {
269 auto& lcContext = context.lowCommContext();
270 lcContext.RecvAlloc = aAllocFunc;
271 lcContext.RecvFree = aFreeFunc;
272 }
273
274
275 /**
276 Set memory management for LowComm subsystem to its default state.
277 Set alloc/free of message buffers to the functions provided to
278 \lcfunk{Init}.
279 */
LC_SetMemMgrDefault(DDD::DDDContext & context)280 void LC_SetMemMgrDefault(DDD::DDDContext& context)
281 {
282 auto& lcContext = context.lowCommContext();
283 lcContext.SendAlloc = lcContext.DefaultAlloc;
284 lcContext.SendFree = lcContext.DefaultFree;
285 lcContext.RecvAlloc = lcContext.DefaultAlloc;
286 lcContext.RecvFree = lcContext.DefaultFree;
287 }
288
289
290 /****************************************************************************/
291
292 /*
293 auxiliary functions
294 */
295
NewMsgDesc(DDD::DDDContext & context)296 static MSG_DESC *NewMsgDesc (DDD::DDDContext& context)
297 {
298 auto& lcContext = context.lowCommContext();
299 MSG_DESC *md;
300
301 if (lcContext.FreeMsgDescs != nullptr)
302 {
303 /* get item from freelist */
304 md = lcContext.FreeMsgDescs;
305 lcContext.FreeMsgDescs = lcContext.FreeMsgDescs->next;
306 }
307 else
308 {
309 /* freelist is empty */
310 md = new MSG_DESC;
311 }
312
313 return(md);
314 }
315
316
FreeMsgDesc(DDD::DDDContext & context,MSG_DESC * md)317 static void FreeMsgDesc (DDD::DDDContext& context, MSG_DESC *md)
318 {
319 auto& lcContext = context.lowCommContext();
320
321 /* sort into freelist */
322 md->next = lcContext.FreeMsgDescs;
323 lcContext.FreeMsgDescs = md;
324 }
325
326
327 /****************************************************************************/
328
329
330
331 /*
332 this function has internal access only because LowComm initiates
333 asynchronous receive calls itself.
334 */
335
LC_NewRecvMsg(DDD::DDDContext & context,LC_MSGTYPE mtyp,DDD_PROC source,size_t size)336 static LC_MSGHANDLE LC_NewRecvMsg (DDD::DDDContext& context, LC_MSGTYPE mtyp, DDD_PROC source, size_t size)
337 {
338 auto& lcContext = context.lowCommContext();
339 MSG_DESC *msg = NewMsgDesc(context);
340
341 # if DebugLowComm<=6
342 Dune::dverb << "LC_NewRecvMsg(" << mtyp->name << ") source=" << source << "\n";
343 # endif
344
345 msg->msgState = MSTATE_NEW;
346 msg->msgType = mtyp;
347 msg->proc = source;
348 msg->bufferSize = size;
349
350 /* allocate chunks array */
351 msg->chunks = new CHUNK_DESC[mtyp->nComps];
352
353 /* enter message into recv queue */
354 msg->next = lcContext.RecvQueue;
355 lcContext.RecvQueue = msg;
356
357 return msg;
358 }
359
360
361
362
LC_DeleteMsg(DDD::DDDContext & context,LC_MSGHANDLE md)363 static void LC_DeleteMsg (DDD::DDDContext& context, LC_MSGHANDLE md)
364 {
365 delete[] md->chunks;
366 FreeMsgDesc(context, md);
367 }
368
369
LC_DeleteMsgBuffer(const DDD::DDDContext & context,LC_MSGHANDLE md)370 static void LC_DeleteMsgBuffer (const DDD::DDDContext& context, LC_MSGHANDLE md)
371 {
372 const auto& lcContext = context.lowCommContext();
373
374 if (lcContext.SendFree != nullptr)
375 (*lcContext.SendFree)(md->buffer);
376 }
377
378
379
380
LC_MsgRecv(MSG_DESC * md)381 static void LC_MsgRecv (MSG_DESC *md)
382 {
383 int i, j;
384
385 /* get message address */
386 ULONG *hdr = (ULONG *)md->buffer;
387
388 /* get number of chunks */
389 int n = (int)(hdr[1]);
390
391 /* magic number is hdr[0] */
392 if (hdr[0] != MAGIC_DUMMY)
393 DUNE_THROW(Dune::Exception, "invalid magic number for message from " << md->proc);
394
395 /* number of chunks must be consistent with message type */
396 if (n!=md->msgType->nComps)
397 DUNE_THROW(Dune::Exception,
398 "wrong number of chunks (got " << n << ", expected "
399 << md->msgType->nComps << ") in message from " << md->proc);
400
401 /* get chunk descriptions from message header */
402 for(j=2, i=0; i<n; i++)
403 {
404 md->chunks[i].offset = hdr[j++];
405 md->chunks[i].size = (size_t)(hdr[j++]);
406 md->chunks[i].entries = hdr[j++];
407 }
408
409 #if DebugLowComm<=2
410 Dune:dvverb << "LC_MsgRecv() from=" << md->proc << " ready\n";
411 #endif
412 }
413
414
415
416 /****************************************************************************/
417 /* */
418 /* Function: LC_PollSend */
419 /* */
420 /* Purpose: polls all message-sends one time and returns remaining */
421 /* outstanding messages. whenever a message-send has been */
422 /* completed, its message buffer is freed. */
423 /* */
424 /* Input: - */
425 /* */
426 /* Output: remaining outstanding messages */
427 /* */
428 /****************************************************************************/
429
LC_PollSend(const DDD::DDDContext & context)430 static int LC_PollSend(const DDD::DDDContext& context)
431 {
432 const auto& lcContext = context.lowCommContext();
433
434 MSG_DESC *md;
435 int remaining, error;
436
437 remaining = 0;
438 for(md=lcContext.SendQueue; md != nullptr; md=md->next)
439 {
440 if (md->msgState==MSTATE_COMM)
441 {
442 error = InfoASend(context.ppifContext(), VCHAN_TO(context, md->proc), md->msgId);
443 if (error==-1)
444 DUNE_THROW(Dune::Exception, "InfoASend() failed for message to proc=" << md->proc);
445
446 if (error==1)
447 {
448 /* free message buffer */
449 LC_DeleteMsgBuffer(context, (LC_MSGHANDLE)md);
450
451 md->msgState=MSTATE_READY;
452 }
453 else
454 {
455 /* we keep this message in SendQueue */
456 remaining++;
457 }
458 }
459 }
460
461 #if DebugLowComm<=3
462 Dune::dvverb << "LC_PollSend, " << remaining << " msgs remaining\n";
463 #endif
464
465 return(remaining);
466 }
467
468
469
470 /****************************************************************************/
471 /* */
472 /* Function: LC_PollRecv */
473 /* */
474 /* Purpose: polls all message-recvs one time and returns remaining */
475 /* outstanding messages. this function doesn't free the message */
476 /* buffers. */
477 /* */
478 /* Input: - */
479 /* */
480 /* Output: remaining outstanding messages */
481 /* */
482 /****************************************************************************/
483
LC_PollRecv(const DDD::DDDContext & context)484 static int LC_PollRecv(const DDD::DDDContext& context)
485 {
486 const auto& lcContext = context.lowCommContext();
487
488 MSG_DESC *md;
489 int remaining, error;
490
491 remaining = 0;
492 for(md=lcContext.RecvQueue; md != nullptr; md=md->next)
493 {
494 if (md->msgState==MSTATE_COMM)
495 {
496 error = InfoARecv(context.ppifContext(), VCHAN_TO(context, md->proc), md->msgId);
497 if (error==-1)
498 DUNE_THROW(Dune::Exception,
499 "InfoARecv() failed for recv from proc=" << md->proc);
500
501 if (error==1)
502 {
503 LC_MsgRecv(md);
504
505 md->msgState=MSTATE_READY;
506 }
507 else
508 {
509 remaining++;
510 }
511 }
512 }
513
514 #if DebugLowComm<=3
515 Dune::dvverb << "LC_PollRecv, " << remaining << " msgs remaining\n";
516 #endif
517
518 return(remaining);
519 }
520
521
522 /****************************************************************************/
523 /* */
524 /* Function: LC_FreeSendQueue */
525 /* */
526 /****************************************************************************/
527
LC_FreeSendQueue(DDD::DDDContext & context)528 static void LC_FreeSendQueue (DDD::DDDContext& context)
529 {
530 auto& lcContext = context.lowCommContext();
531 MSG_DESC *md, *next=NULL;
532
533 for(md=lcContext.SendQueue; md != nullptr; md=next)
534 {
535 /* the following assertion is too picky. Freeing of
536 message queues should be possible in all msgStates. */
537 /*assert(md->msgState==MSTATE_READY);*/
538
539 next = md->next;
540 LC_DeleteMsg(context, (LC_MSGHANDLE)md);
541 }
542
543
544 lcContext.SendQueue = nullptr;
545 lcContext.nSends = 0;
546 }
547
548
549 /****************************************************************************/
550 /* */
551 /* Function: LC_FreeRecvQueue */
552 /* */
553 /****************************************************************************/
554
LC_FreeRecvQueue(DDD::DDDContext & context)555 static void LC_FreeRecvQueue (DDD::DDDContext& context)
556 {
557 auto& lcContext = context.lowCommContext();
558 MSG_DESC *md, *next=NULL;
559
560 for(md=lcContext.RecvQueue; md != nullptr; md=next)
561 {
562 /* the following assertion is too picky. Freeing of
563 message queues should be possible in all msgStates. */
564 /*assert(md->msgState==MSTATE_READY);*/
565
566 next = md->next;
567 LC_DeleteMsg(context, (LC_MSGHANDLE)md);
568 }
569
570
571 lcContext.RecvQueue = nullptr;
572 lcContext.nRecvs = 0;
573 }
574
575
576
577 /****************************************************************************/
578
579
580 /* LC_MsgFreeze and LC_MsgAlloc are the two parts of LC_MsgPrepareSend(). */
581
582 /* returns size of message buffer */
583
LC_MsgFreeze(LC_MSGHANDLE md)584 size_t LC_MsgFreeze (LC_MSGHANDLE md)
585 {
586 int i, n = md->msgType->nComps;
587
588 assert(md->msgState==MSTATE_NEW);
589
590 /* compute size of header */
591 md->bufferSize = 2 * sizeof(ULONG);
592 md->bufferSize += (n * HDR_ENTRIES_PER_CHUNK * sizeof(ULONG));
593
594 /* compute size and offset for each chunk */
595 for(i=0; i<n; i++)
596 {
597 md->chunks[i].offset = md->bufferSize;
598 md->bufferSize += md->chunks[i].size;
599 }
600
601 md->msgState=MSTATE_FREEZED;
602
603 return(md->bufferSize);
604 }
605
606
607
LC_MsgAlloc(DDD::DDDContext & context,LC_MSGHANDLE md)608 int LC_MsgAlloc(DDD::DDDContext& context, LC_MSGHANDLE md)
609 {
610 auto& lcContext = context.lowCommContext();
611 ULONG *hdr;
612 int i, j, n = md->msgType->nComps;
613 int remaining=1, give_up = false;
614
615 assert(md->msgState==MSTATE_FREEZED);
616
617 /* the following code tries to allocate the message buffer.
618 if this fails, the previously started asynchronous sends are
619 polled, in order to free their message buffers. if there are
620 no remaining async-sends, we give up. */
621 do {
622 /* allocate buffer for messages */
623 md->buffer = (char *) (*lcContext.SendAlloc)(md->bufferSize);
624 if (md->buffer==NULL)
625 {
626 if (remaining==0)
627 give_up = true;
628 else
629 {
630 # if DebugLowComm<=7
631 Dune::dinfo << "LC_MsgAlloc(" << md->msgType->name
632 << ") detected low memory.\n";
633 # endif
634
635 /* couldn't get msg-buffer. try to poll previous messages. */
636 /* first, poll receives to avoid communication deadlock. */
637 LC_PollRecv(context);
638
639 /* now, try to poll sends and free their message buffers */
640 remaining = LC_PollSend(context);
641
642 # if DebugLowComm<=6
643 Dune::dverb << "LC_MsgAlloc(" << md->msgType->name
644 << ") preliminary poll, sends_left=" << remaining << "\n";
645 # endif
646 }
647 }
648 } while (md->buffer==NULL && !give_up);
649
650 if (give_up)
651 {
652 # if DebugLowComm<=7
653 Dune::dinfo << "LC_MsgAlloc(" << md->msgType->name << ") giving up, no memory.\n";
654 # endif
655 return(false);
656 }
657
658
659 /* enter control data into message header */
660 hdr = (ULONG *)md->buffer;
661 j=0;
662 hdr[j++] = MAGIC_DUMMY; /* magic number */
663 hdr[j++] = n;
664
665 /* enter chunk descriptions into message header */
666 for(i=0; i<n; i++)
667 {
668 hdr[j++] = md->chunks[i].offset;
669 hdr[j++] = md->chunks[i].size;
670 hdr[j++] = md->chunks[i].entries;
671 }
672
673 md->msgState=MSTATE_ALLOCATED;
674
675 return(true);
676 }
677
678
679
680
681 /*
682 allocation of receive message buffers.
683 NOTE: one big memory block is allocated and used for all
684 message buffers.
685 */
LC_PrepareRecv(DDD::DDDContext & context)686 static RETCODE LC_PrepareRecv(DDD::DDDContext& context)
687 {
688 auto& lcContext = context.lowCommContext();
689 MSG_DESC *md;
690 size_t sumSize;
691 char *buffer;
692 int error;
693
694 /* compute sum of message buffer sizes */
695 for(sumSize=0, md=lcContext.RecvQueue; md != nullptr; md=md->next)
696 {
697 assert(md->msgState==MSTATE_NEW);
698 sumSize += md->bufferSize;
699 }
700
701
702 /* allocate buffer for messages */
703 lcContext.theRecvBuffer = (char *) (*lcContext.RecvAlloc)(sumSize);
704 if (lcContext.theRecvBuffer == nullptr)
705 {
706 Dune::dwarn << "Out of memory in LC_PrepareRecv "
707 << "(size of message buffer: " << sumSize << ")";
708 RET_ON_ERROR;
709 }
710
711
712 /* initiate receive calls */
713 buffer = lcContext.theRecvBuffer;
714 for(md=lcContext.RecvQueue; md != nullptr; md=md->next)
715 {
716 md->buffer = buffer;
717 buffer += md->bufferSize;
718
719 md->msgId = RecvASync(context.ppifContext(), VCHAN_TO(context, md->proc),
720 md->buffer, md->bufferSize, &error);
721
722 md->msgState=MSTATE_COMM;
723 }
724
725 RET_ON_OK;
726 }
727
728
729 /****************************************************************************/
730
731 /*
732 MSG_TYPE definition functions
733 */
734
735
736 /****************************************************************************/
737 /* */
738 /* Function: LC_NewMsgType */
739 /* */
740 /****************************************************************************/
741
742 /**
743 Declares new message-type.
744 Before messages may be sent and received with the LowComm
745 subsystem, at least one {\em message-type} must be defined by
746 a global call to this function. Subsequently, calls to
747 \lcfunk{NewMsgTable} and \lcfunk{NewMsgChunk} can be used
748 in order to define the structure of the new message-type.
749
750 Each message-type in the LowComm subsystem consists of a set
751 of {\em message-components}. Possible message-components are:
752 {\em tables} (with entries of equal size) and raw {\em data chunks}.
753 The set of message-components has the same structure for
754 all messages of the same type, but the number of table entries
755 and the size of the data chunks differ from message to message.
756
757 @return identifier of new message-type
758 @param aName name of message-type. This string is used for debugging
759 and logging output.
760 */
761
LC_NewMsgType(DDD::DDDContext & context,const char * aName)762 LC_MSGTYPE LC_NewMsgType(DDD::DDDContext& context, const char *aName)
763 {
764 auto& lcContext = context.lowCommContext();
765 MSG_TYPE *mt;
766
767 mt = new MSG_TYPE;
768 mt->name = aName;
769 mt->nComps = 0;
770
771 /* insert into linked list of message types */
772 mt->next = lcContext.MsgTypes;
773 lcContext.MsgTypes = mt;
774
775 return mt;
776 }
777
778
779
780 /****************************************************************************/
781 /* */
782 /* Function: LC_NewMsgChunk */
783 /* */
784 /****************************************************************************/
785
786 /**
787 Add data chunk to current set of a message-type's message-components.
788 This function is called after a previous call to \lcfunk{NewMsgType}
789 in order to add a new message-component to the message-type.
790 The component added by this function is a chunk of raw data.
791 The size of the chunk is not specified here, use \lcfunk{SetChunkSize}
792 for specifying the data chunk size for a given (concrete) message.
793
794 See \lcfunk{NewMsgTable} for adding message-tables, which are
795 a different kind of message-component.
796
797 @return identifier of new message-component
798 @param aName name of new message component
799 @param mtyp previously declared message-type
800 */
801
LC_NewMsgChunk(const char * aName,LC_MSGTYPE mtyp)802 LC_MSGCOMP LC_NewMsgChunk (const char *aName, LC_MSGTYPE mtyp)
803 {
804 LC_MSGCOMP id = mtyp->nComps++;
805
806 if (id>=MAX_COMPONENTS)
807 DUNE_THROW(Dune::Exception,
808 "too many message components (max. " << MAX_COMPONENTS << ")");
809
810 mtyp->comp[id].type = CT_CHUNK;
811 mtyp->comp[id].name = aName;
812
813 return(id);
814 }
815
816
817
818
819 /****************************************************************************/
820 /* */
821 /* Function: LC_NewMsgTable */
822 /* */
823 /****************************************************************************/
824
825 /**
826 Add table to current set of a message-type's message-components.
827 This function is called after a previous call to \lcfunk{NewMsgType}
828 in order to add a new message-component to the message-type.
829 The component added by this function is a table of data, where
830 each table entry has the same size.
831 The overall size of the whole table is not specified here, but only
832 the size for one table entry. Use \lcfunk{SetTableSize} for setting
833 the number of reserved table entries in a given (concrete) message;
834 use \lcfunk{SetTableLen} in order to specify the number of valid
835 entries in a given message.
836
837 See \lcfunk{NewMsgChunk} for adding data chunks, which are
838 a different kind of message-component.
839
840 @return identifier of new message-component
841 @param aName name of new message component
842 @param mtyp previously declared message-type
843 @param aSize size of each table entry (in byte)
844 */
845
LC_NewMsgTable(const char * aName,LC_MSGTYPE mtyp,size_t aSize)846 LC_MSGCOMP LC_NewMsgTable (const char *aName, LC_MSGTYPE mtyp, size_t aSize)
847 {
848 LC_MSGCOMP id = mtyp->nComps++;
849
850 if (id>=MAX_COMPONENTS)
851 DUNE_THROW(Dune::Exception,
852 "too many message components (max. " << MAX_COMPONENTS << ")");
853
854 mtyp->comp[id].type = CT_TABLE;
855 mtyp->comp[id].entry_size = aSize;
856 mtyp->comp[id].name = aName;
857
858 return(id);
859 }
860
861
862
863 /****************************************************************************/
864
865
866
867 /****************************************************************************/
868 /* */
869 /* Function: LC_NewSendMsg */
870 /* */
871 /****************************************************************************/
872
873 /**
874 Create new message on sending processor.
875 This function creates a new message handle on the sending processor and
876 links it into the LowComm send-queue. The message has a given message-type
877 and a given destination processor. Before the message is actually sent
878 (by calling \lcfunk{MsgSend}), the sizes of the message's components
879 must be set (\lcfunk{SetTableSize}, \lcfunk{SetChunkSize}) and the message
880 buffer must be prepared (via \lcfunk{MsgPrepareSend}). After that,
881 the message's tables and chunks can be filled with data and the message
882 sending process can be initiated by \lcfunk{MsgSend}.
883
884 @return identifier of new message
885 @param mtyp message-type for new message
886 @param aDest destination processor of new message
887 */
888
LC_NewSendMsg(DDD::DDDContext & context,LC_MSGTYPE mtyp,DDD_PROC aDest)889 LC_MSGHANDLE LC_NewSendMsg(DDD::DDDContext& context, LC_MSGTYPE mtyp, DDD_PROC aDest)
890 {
891 auto& lcContext = context.lowCommContext();
892
893 MSG_DESC *msg = NewMsgDesc(context);
894
895 # if DebugLowComm<=6
896 Dune::dverb << "LC_NewSendMsg(" << mtyp->name << ") dest="
897 << aDest << " nSends=" << (lcContext.nSends+1) << "\n";
898 # endif
899
900
901 msg->msgState = MSTATE_NEW;
902 msg->msgType = mtyp;
903 msg->proc = aDest;
904 msg->bufferSize = 0;
905
906 /* allocate chunks array */
907 msg->chunks = new CHUNK_DESC[mtyp->nComps];
908
909 /* enter message into send queue */
910 msg->next = lcContext.SendQueue;
911 lcContext.SendQueue = msg;
912 lcContext.nSends++;
913
914 return msg;
915 }
916
917
918
919
LC_SetChunkSize(LC_MSGHANDLE md,LC_MSGCOMP id,size_t size)920 void LC_SetChunkSize (LC_MSGHANDLE md, LC_MSGCOMP id, size_t size)
921 {
922 assert(md->msgState==MSTATE_NEW);
923 assert(id < md->msgType->nComps);
924
925 md->chunks[id].size = size;
926 md->chunks[id].entries = 1;
927 }
928
929
LC_SetTableSize(LC_MSGHANDLE md,LC_MSGCOMP id,ULONG entries)930 void LC_SetTableSize (LC_MSGHANDLE md, LC_MSGCOMP id, ULONG entries)
931 {
932 assert(md->msgState==MSTATE_NEW);
933 assert(id < md->msgType->nComps);
934
935 md->chunks[id].size = ((int)entries) * md->msgType->comp[id].entry_size;
936 md->chunks[id].entries = entries;
937 }
938
939
940
941 /****************************************************************************/
942 /* */
943 /* Function: LC_MsgPrepareSend */
944 /* */
945 /****************************************************************************/
946
947
948 /* returns size of message buffer */
949
LC_MsgPrepareSend(DDD::DDDContext & context,LC_MSGHANDLE msg)950 size_t LC_MsgPrepareSend (DDD::DDDContext& context, LC_MSGHANDLE msg)
951 {
952 size_t size = LC_MsgFreeze(msg);
953 if (! LC_MsgAlloc(context, msg))
954 throw std::bad_alloc();
955
956 return(size);
957 }
958
959
960
961
LC_MsgGetProc(LC_MSGHANDLE md)962 DDD_PROC LC_MsgGetProc (LC_MSGHANDLE md)
963 {
964 return md->proc;
965 }
966
967
LC_GetPtr(LC_MSGHANDLE md,LC_MSGCOMP id)968 void *LC_GetPtr (LC_MSGHANDLE md, LC_MSGCOMP id)
969 {
970 return ((void *)(((char *)md->buffer) + md->chunks[id].offset));
971 }
972
973
LC_SetTableLen(LC_MSGHANDLE md,LC_MSGCOMP id,ULONG n)974 void LC_SetTableLen (LC_MSGHANDLE md, LC_MSGCOMP id, ULONG n)
975 {
976 ULONG *hdr = (ULONG *)md->buffer;
977
978 hdr[HDR_ENTRIES_PER_CHUNK*id+4] = n;
979 md->chunks[id].entries = n;
980 }
981
982
LC_GetTableLen(LC_MSGHANDLE md,LC_MSGCOMP id)983 ULONG LC_GetTableLen (LC_MSGHANDLE md, LC_MSGCOMP id)
984 {
985 return((ULONG)md->chunks[id].entries);
986 }
987
988
LC_MsgSend(const DDD::DDDContext & context,LC_MSGHANDLE md)989 void LC_MsgSend(const DDD::DDDContext& context, LC_MSGHANDLE md)
990 {
991 int error;
992
993 assert(md->msgState==MSTATE_ALLOCATED);
994
995 /* initiate asynchronous send */
996 md->msgId = SendASync(context.ppifContext(), VCHAN_TO(context, md->proc),
997 md->buffer, md->bufferSize, &error);
998
999 md->msgState=MSTATE_COMM;
1000 }
1001
1002
LC_GetBufferSize(LC_MSGHANDLE md)1003 size_t LC_GetBufferSize (LC_MSGHANDLE md)
1004 {
1005 return(md->bufferSize);
1006 }
1007
1008
1009 /****************************************************************************/
1010
1011
1012 /****************************************************************************/
1013 /* */
1014 /* Function: LC_Connect */
1015 /* */
1016 /****************************************************************************/
1017
LC_Connect(DDD::DDDContext & context,LC_MSGTYPE mtyp)1018 int LC_Connect(DDD::DDDContext& context, LC_MSGTYPE mtyp)
1019 {
1020 auto& lcContext = context.lowCommContext();
1021
1022 DDD_PROC *partners = DDD_ProcArray(context);
1023 NOTIFY_DESC *msgs = DDD_NotifyBegin(context, lcContext.nSends);
1024 MSG_DESC *md;
1025 int i, p;
1026
1027 const auto procs = context.procs();
1028
1029 if (lcContext.nSends<0 || lcContext.nSends>procs-1)
1030 DUNE_THROW(Dune::Exception,
1031 "cannot send " << lcContext.nSends << "messages "
1032 "(must be less than " << (procs-1) << ")");
1033
1034 # if DebugLowComm<=9
1035 Dune::dinfo << "LC_Connect(" << mtyp->name
1036 << ") nSends=" << lcContext.nSends << " ...\n";
1037 # endif
1038
1039
1040
1041 /* fill notify array */
1042 for(i=0, p=0, md=lcContext.SendQueue; md != nullptr; i++, md=md->next)
1043 {
1044 msgs[i].proc = md->proc;
1045 msgs[i].size = md->bufferSize;
1046
1047 /* enhance list of communication partners (destinations) */
1048 partners[p++] = md->proc;
1049
1050 }
1051
1052
1053 /* inform message receivers */
1054 lcContext.nRecvs = DDD_Notify(context);
1055 if (lcContext.nRecvs<0)
1056 {
1057 /* some processor raised an exception */
1058 Dune::dwarn << "Notify() raised exception #"
1059 << (-lcContext.nRecvs) << " in LC_Connect()\n";
1060
1061 /* automatically call LC_Cleanup() */
1062 DDD_NotifyEnd(context);
1063 LC_Cleanup(context);
1064
1065 return lcContext.nRecvs;
1066 }
1067
1068
1069 if (lcContext.nRecvs>procs-1)
1070 {
1071 Dune::dwarn << "cannot receive " << lcContext.nRecvs
1072 << " messages (must be less than " << (procs-1) << ")\n";
1073 DDD_NotifyEnd(context);
1074 return(EXCEPTION_LOWCOMM_CONNECT);
1075 }
1076
1077
1078
1079 # if DebugLowComm<=7
1080 Dune::dinfo << "LC_Connect() nSends=" << lcContext.nSends
1081 << " nRecvs=" << lcContext.nRecvs << "\n";
1082 # endif
1083
1084
1085 /* create array of receive message handles */
1086 if (lcContext.nRecvs>0)
1087 lcContext.theRecvArray = new LC_MSGHANDLE[lcContext.nRecvs];
1088
1089
1090 /* create recv messages from notify array */
1091 for(i=0; i < lcContext.nRecvs; i++)
1092 {
1093 /* create recv message handle and store it in MSGHANDLE array */
1094 lcContext.theRecvArray[i] = LC_NewRecvMsg(context, mtyp, msgs[i].proc, msgs[i].size);
1095
1096 /* enhance list of communication partners (sources) */
1097 partners[p++] = msgs[i].proc;
1098
1099 }
1100
1101
1102 DDD_NotifyEnd(context);
1103
1104
1105 /* get necessary connections to comm-partners */
1106 if (p>0)
1107 {
1108 if (! IS_OK(DDD_GetChannels(context, lcContext.nRecvs+lcContext.nSends)))
1109 {
1110 DDD_PrintError('E', 6620, "couldn't get channels in LC_Connect()");
1111 return(EXCEPTION_LOWCOMM_CONNECT);
1112 }
1113 }
1114
1115
1116 # if DebugLowComm<=5
1117 DDD_DisplayTopo(context);
1118 # endif
1119
1120
1121 if (lcContext.nRecvs>0)
1122 {
1123 if (! IS_OK(LC_PrepareRecv(context)))
1124 return(EXCEPTION_LOWCOMM_CONNECT);
1125 }
1126
1127
1128 # if DebugLowComm<=9
1129 Dune::dinfo << "LC_Connect() ready\n";
1130 # endif
1131
1132 return lcContext.nRecvs;
1133 }
1134
1135
1136
1137 /****************************************************************************/
1138 /* */
1139 /* Function: LC_Abort */
1140 /* */
1141 /****************************************************************************/
1142
LC_Abort(DDD::DDDContext & context,int exception)1143 int LC_Abort(DDD::DDDContext& context, int exception)
1144 {
1145 int retException;
1146
1147 if (exception>EXCEPTION_LOWCOMM_USER)
1148 DUNE_THROW(Dune::Exception,
1149 "exception must be <= EXCEPTION_LOWCOMM_USER");
1150
1151 DDD_NotifyBegin(context, exception);
1152
1153 # if DebugLowComm<=9
1154 Dune::dwarn << "LC_Abort() exception=" << exception << " ...\n";
1155 # endif
1156
1157
1158 /* inform message receivers */
1159 retException = DDD_Notify(context);
1160
1161 DDD_NotifyEnd(context);
1162
1163
1164 # if DebugLowComm<=9
1165 Dune::dwarn << "LC_Abort() ready, exception=" << retException << "\n";
1166 # endif
1167
1168
1169 /* automatically call LC_Cleanup() */
1170 LC_Cleanup(context);
1171
1172 return(retException);
1173 }
1174
1175
1176
1177 /****************************************************************************/
1178 /* */
1179 /* Function: LC_Communicate */
1180 /* */
1181 /****************************************************************************/
1182
LC_Communicate(const DDD::DDDContext & context)1183 LC_MSGHANDLE *LC_Communicate(const DDD::DDDContext& context)
1184 {
1185 auto& lcContext = context.lowCommContext();
1186
1187 # if DebugLowComm<=9
1188 Dune::dinfo << "LC_Communicate() ...\n";
1189 # endif
1190
1191
1192 /* poll asynchronous send and receives */
1193 int leftSend = lcContext.nSends;
1194 int leftRecv = lcContext.nRecvs;
1195 do {
1196 if (leftRecv>0) leftRecv = LC_PollRecv(context);
1197 if (leftSend>0) leftSend = LC_PollSend(context);
1198 } while (leftRecv>0 || leftSend>0);
1199
1200
1201 # if DebugLowComm<=9
1202 Dune::dinfo << "LC_Communicate() ready\n";
1203 # endif
1204
1205 return lcContext.theRecvArray;
1206 }
1207
1208
1209 /****************************************************************************/
1210 /* */
1211 /* Function: LC_Cleanup */
1212 /* */
1213 /****************************************************************************/
1214
LC_Cleanup(DDD::DDDContext & context)1215 void LC_Cleanup(DDD::DDDContext& context)
1216 {
1217 auto& lcContext = context.lowCommContext();
1218
1219 # if DebugLowComm<=9
1220 Dune::dinfo << "LC_Cleanup() ...\n";
1221 # endif
1222
1223 if (lcContext.nRecvs>0)
1224 {
1225 if (lcContext.RecvFree != nullptr)
1226 (lcContext.RecvFree)(lcContext.theRecvBuffer);
1227
1228 lcContext.theRecvBuffer = nullptr;
1229 }
1230
1231 if (lcContext.theRecvArray != nullptr)
1232 {
1233 delete[] lcContext.theRecvArray;
1234 lcContext.theRecvArray = nullptr;
1235 }
1236
1237 /* free recv queue */
1238 LC_FreeRecvQueue(context);
1239
1240 /* free send queue */
1241 LC_FreeSendQueue(context);
1242
1243
1244 # if DebugLowComm<=9
1245 Dune::dinfo << "LC_Cleanup() ready\n";
1246 # endif
1247 }
1248
1249
1250 /****************************************************************************/
1251
1252 #define LC_COLWIDTH 10
1253
1254 static const char* LC_DefaultName = "<?>";
1255
1256 /* construct name or default name */
LC_Name(const char * name)1257 static const char* LC_Name(const char* name)
1258 {
1259 return name ? name : LC_DefaultName;
1260 }
1261
LC_PrintMsgList(MSG_DESC * list)1262 static void LC_PrintMsgList (MSG_DESC *list)
1263 {
1264 using std::setw;
1265
1266 std::ostream& out = std::cout;
1267 MSG_DESC *md;
1268 MSG_TYPE *last_mt=NULL;
1269 size_t sum, comp_size[MAX_COMPONENTS];
1270 int i;
1271
1272 for(md=list; md != nullptr; md=md->next)
1273 {
1274 MSG_TYPE *mt = md->msgType;
1275
1276 if (mt!=last_mt)
1277 {
1278 /* msg-type changes, print new header */
1279
1280 /* first, close part of msg-list with summary */
1281 if (last_mt!=NULL)
1282 {
1283 out << " = |";
1284 sum = 0;
1285 for(i=0; i<last_mt->nComps; i++)
1286 {
1287 out << setw(9) << comp_size[i];
1288 sum += comp_size[i]; /* horizontal sum */
1289 }
1290 out << setw(9) << sum << "\n";
1291 }
1292
1293 /* then, construct header */
1294 {
1295 std::string name = LC_Name(mt->name);
1296 out << setw(9) << name.substr(0, 9) << " |";
1297 }
1298 for(i=0; i<mt->nComps; i++)
1299 {
1300 if (mt->comp[i].name!=NULL) {
1301 std::string name = LC_Name(mt->comp[i].name);
1302 out << setw(9) << name.substr(0, 9);
1303 }
1304 else
1305 out << setw(9) << i;
1306
1307 comp_size[i] = 0;
1308 }
1309 out << " =\n";
1310 last_mt = mt;
1311 }
1312
1313 /* construct info about message components */
1314 out << setw(9) << md->proc << " |";
1315 sum = 0;
1316 for(i=0; i<mt->nComps; i++)
1317 {
1318 size_t s = md->chunks[i].size;
1319
1320 out << setw(9) << s;
1321
1322 sum += s; /* horizontal sum */
1323 comp_size[i] += s; /* vertical sum */
1324 }
1325 out << setw(9) << sum << "\n";
1326 }
1327
1328 /* close last part of msg-list with summary */
1329 if (last_mt!=NULL)
1330 {
1331 out << " = |";
1332 sum = 0;
1333 for(i=0; i<last_mt->nComps; i++)
1334 {
1335 out << setw(9) << comp_size[i];
1336 sum += comp_size[i]; /* horizontal sum */
1337 }
1338 out << setw(9) << sum << "\n";
1339 }
1340
1341 }
1342
1343
LC_PrintSendMsgs(const DDD::DDDContext & context)1344 void LC_PrintSendMsgs(const DDD::DDDContext& context)
1345 {
1346 LC_PrintMsgList(context.lowCommContext().SendQueue);
1347 }
1348
LC_PrintRecvMsgs(const DDD::DDDContext & context)1349 void LC_PrintRecvMsgs(const DDD::DDDContext& context)
1350 {
1351 LC_PrintMsgList(context.lowCommContext().RecvQueue);
1352 }
1353
1354
1355 /****************************************************************************/
1356
1357 } /* namespace DDD */
1358