1 /*****************************************************************************/
2 /*                                    XDMF                                   */
3 /*                       eXtensible Data Model and Format                    */
4 /*                                                                           */
5 /*  Id : XdmfDSMBuffer.hpp                                                   */
6 /*                                                                           */
7 /*  Author:                                                                  */
8 /*     Andrew Burns                                                          */
9 /*     andrew.j.burns2@us.army.mil                                           */
10 /*     US Army Research Laboratory                                           */
11 /*     Aberdeen Proving Ground, MD                                           */
12 /*                                                                           */
13 /*     Copyright @ 2013 US Army Research Laboratory                          */
14 /*     All Rights Reserved                                                   */
15 /*     See Copyright.txt for details                                         */
16 /*                                                                           */
17 /*     This software is distributed WITHOUT ANY WARRANTY; without            */
18 /*     even the implied warranty of MERCHANTABILITY or FITNESS               */
19 /*     FOR A PARTICULAR PURPOSE.  See the above copyright notice             */
20 /*     for more information.                                                 */
21 /*                                                                           */
22 /*****************************************************************************/
23 
24 /*=========================================================================
25   This code is derived from an earlier work and is distributed
26   with permission from, and thanks to ...
27 =========================================================================*/
28 
29 /*============================================================================
30 
31   Project                 : H5FDdsm
32   Module                  : H5FDdsmBufferService.cxx H5FDdsmBuffer.cxx
33 
34   Authors:
35      John Biddiscombe     Jerome Soumagne
36      biddisco@cscs.ch     soumagne@cscs.ch
37 
38   Copyright (C) CSCS - Swiss National Supercomputing Centre.
39   You may use modify and and distribute this code freely providing
40   1) This copyright notice appears on all copies of source code
41   2) An acknowledgment appears with any substantial usage of the code
42   3) If this code is contributed to any other open source project, it
43   must not be reformatted such that the indentation, bracketing or
44   overall style is modified significantly.
45 
46   This software is distributed WITHOUT ANY WARRANTY; without even the
47   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
48 
49   This work has received funding from the European Community's Seventh
50   Framework Programme (FP7/2007-2013) under grant agreement 225967 âxtMuSEâOC
51 
52 ============================================================================*/
53 
54 #include <XdmfDSMBuffer.hpp>
55 #include <XdmfDSMCommMPI.hpp>
56 #include <XdmfError.hpp>
57 #include <mpi.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <algorithm>
61 
62 #ifndef _WIN32
63   #include <unistd.h>
64 #endif
65 
XdmfDSMBuffer()66 XdmfDSMBuffer::XdmfDSMBuffer()
67 {
68   this->CommChannel = XDMF_DSM_INTER_COMM;
69   this->DsmType = XDMF_DSM_TYPE_UNIFORM;
70   this->IsServer = true;
71   this->StartAddress = this->EndAddress = 0;
72   this->StartServerId = this->EndServerId = -1;
73   this->LocalBufferSizeMBytes = 128;
74   this->Length = 0;
75   this->TotalLength = 0;
76   this->BlockLength = XDMF_DSM_DEFAULT_BLOCK_LENGTH;
77   this->NumPages = 0;
78   this->PagesAssigned = 0;
79   this->Comm = NULL;
80   this->DataPointer = NULL;
81   this->InterCommType = XDMF_DSM_COMM_MPI;
82   this->IsConnected = false;
83   this->ResizeFactor = 1;
84 }
85 
~XdmfDSMBuffer()86 XdmfDSMBuffer::~XdmfDSMBuffer()
87 {
88   if (this->DataPointer) {
89     free(this->DataPointer);
90   }
91   this->DataPointer = NULL;
92 }
93 
94 class XdmfDSMBuffer::CommandMsg
95 {
96   public:
97     int Opcode;
98     int Source;
99     int  Target;
100     int Address;
101     int Length;
102 };
103 
104 class XdmfDSMBuffer::InfoMsg
105 {
106   public:
107     int type;
108     unsigned int length;
109     unsigned int total_length;
110     unsigned int block_length;
111     int start_server_id;
112     int end_server_id;
113 };
114 
115 int
AddressToId(int Address)116 XdmfDSMBuffer::AddressToId(int Address)
117 {
118   int   ServerId = XDMF_DSM_FAIL;
119 
120   switch(this->DsmType) {
121     case XDMF_DSM_TYPE_UNIFORM :
122     case XDMF_DSM_TYPE_UNIFORM_RANGE :
123       // Block based allocation should use PageToId
124       // All Servers have same length
125       // This finds out which server the address provided starts on
126       ServerId = this->StartServerId + (Address / this->Length);
127       if(ServerId > this->EndServerId ){
128         try {
129           std::stringstream message;
130           message << "ServerId " << ServerId << " for Address "
131                   << Address << " is larger than EndServerId "
132                   << this->EndServerId;
133           XdmfError::message(XdmfError::FATAL, message.str());
134         }
135         catch (XdmfError & e) {
136           throw e;
137         }
138       }
139       break;
140     default :
141       // Not Implemented
142       try {
143         std::stringstream message;
144         message << "DsmType " << this->DsmType << " not yet implemented or not uniform";
145         XdmfError::message(XdmfError::FATAL, message.str());
146       }
147       catch (XdmfError & e) {
148         throw e;
149       }
150       break;
151     }
152     return(ServerId);
153 }
154 
155 void
BroadcastComm(int * comm,int root)156 XdmfDSMBuffer::BroadcastComm(int *comm, int root)
157 {
158   int status;
159 
160   this->Comm->Broadcast(comm,
161                         sizeof(int),
162                         root,
163                         XDMF_DSM_INTRA_COMM);
164   if (status != MPI_SUCCESS) {
165     try {
166       XdmfError(XdmfError::FATAL, "Broadcast of Comm failed");
167     }
168     catch (XdmfError & e) {
169       throw e;
170     }
171   }
172 }
173 
174 int
BufferService(int * returnOpcode)175 XdmfDSMBuffer::BufferService(int *returnOpcode)
176 {
177   int        opcode, who;
178   int        aLength;
179   int          address;
180   char        *datap;
181   static int syncId      = -1;
182 
183   if (this->CommChannel == XDMF_DSM_ANY_COMM) {
184     if (this->Comm->GetId() == 0) {
185       try {
186         this->Comm->Probe(&this->CommChannel);
187       }
188       catch (XdmfError & e) {
189         throw e;
190       }
191     }
192     try {
193       this->BroadcastComm(&this->CommChannel, 0);
194     }
195     catch (XdmfError & e) {
196       throw e;
197     }
198   }
199 
200   try {
201     this->ReceiveCommandHeader(&opcode,
202                                &who,
203                                &address,
204                                &aLength,
205                                this->CommChannel,
206                                syncId);
207   }
208   catch (XdmfError & e) {
209     throw e;
210   }
211 
212   // Connection is an ID for client or server,
213 //  int communicatorId = this->CommChannel;
214 
215   switch(opcode) {
216 
217   // H5FD_DSM_OPCODE_PUT
218   case XDMF_DSM_OPCODE_PUT:
219     if (((unsigned int) aLength + address) > this->Length) {
220       try {
221         std::stringstream message;
222         message << "Length " << aLength << " too long for Address " << address
223                 << "\n" << "Server Start = " << this->StartAddress << " End = "
224                 << this->EndAddress;
225         XdmfError::message(XdmfError::FATAL, message.str());
226       }
227       catch (XdmfError & e) {
228         throw e;
229       }
230     }
231     if ((datap = this->DataPointer) == NULL) {
232        try {
233          XdmfError::message(XdmfError::FATAL,
234                             "Null Data Pointer when trying to put data");
235        }
236        catch (XdmfError & e) {
237          throw e;
238        }
239     }
240     datap += address;
241     try {
242       this->ReceiveData(who,
243                         datap,
244                         aLength,
245                         XDMF_DSM_PUT_DATA_TAG,
246                         0,
247                         this->CommChannel);
248     }
249     catch (XdmfError & e) {
250       throw e;
251     }
252     break;
253 
254   // H5FD_DSM_OPCODE_GET
255   case XDMF_DSM_OPCODE_GET:
256     if (((unsigned int) aLength + address) > this->Length) {
257       try {
258         std::stringstream message;
259         message << "Length " << aLength << " too long for Address " << address
260                 << "\n" << "Server Start = " << this->StartAddress << " End = "
261                 << this->EndAddress;
262         XdmfError::message(XdmfError::FATAL, message.str());
263       }
264       catch (XdmfError & e) {
265         throw e;
266       }
267     }
268     if ((datap = this->DataPointer) == NULL) {
269       try {
270          XdmfError::message(XdmfError::FATAL,
271                             "Null Data Pointer when trying to put data");
272        }
273        catch (XdmfError & e) {
274          throw e;
275        }
276     }
277     datap += address;
278     try {
279       this->SendData(who,
280                      datap,
281                      aLength,
282                      XDMF_DSM_GET_DATA_TAG,
283                      0,
284                      this->CommChannel);
285     }
286     catch (XdmfError & e) {
287       throw e;
288     }
289     break;
290 
291   // H5FD_DSM_ACCEPT
292   // Comes from client
293   case XDMF_DSM_ACCEPT:
294   {
295     int numConnections;
296     this->ReceiveAcknowledgment(who,
297                                 numConnections,
298                                 XDMF_DSM_EXCHANGE_TAG,
299                                 XDMF_DSM_INTER_COMM);
300     this->Comm->Accept(numConnections);
301     this->SendInfo();
302     break;
303   }
304   // Comes from client, requests a notifcation when a file is touched.
305   // The notification is sent out when clear is called.
306   case XDMF_DSM_SET_NOTIFY:
307   {
308     // Send the notify info to all cores.
309     int strlength = 0;
310     char * notifystring;
311     int waitingCore = 0;
312     if (this->Comm->GetId() == 0)
313     {
314       waitingCore = who;
315       this->ReceiveAcknowledgment(who,
316                                   strlength,
317                                   XDMF_DSM_EXCHANGE_TAG,
318                                   this->CommChannel);
319       notifystring = new char[strlength+1]();
320       this->ReceiveData(who,
321                         notifystring,
322                         strlength,
323                         XDMF_DSM_EXCHANGE_TAG,
324                         0,
325                         this->CommChannel);
326       notifystring[strlength] = 0;
327       WaitingMap[std::string(notifystring)].push_back(who);
328       // Send XDMF_DSM_SET_NOTIFY to all server cores in order of increasing id
329       for (int i = this->GetStartServerId() + 1; // Since this is core 0 sending it
330            i <= this->GetEndServerId();
331            ++i) {
332         if (i != this->Comm->GetInterId())
333         {
334           this->SendCommandHeader(XDMF_DSM_SET_NOTIFY, i, 0, 0, XDMF_DSM_INTER_COMM);
335         }
336       }
337     }
338     // broadcast to the other server cores
339     // BCAST strlen
340     this->Comm->Broadcast(&strlength,
341                           sizeof(int),
342                           0,
343                           XDMF_DSM_INTRA_COMM);
344     // BCAST notifystring
345     if  (this->Comm->GetId() != 0)
346     {
347       notifystring = new char[strlength + 1]();
348     }
349     this->Comm->Broadcast(&notifystring,
350                           strlength,
351                           0,
352                           XDMF_DSM_INTRA_COMM);
353     notifystring[strlength] = 0;
354     // BCAST locked core
355     this->Comm->Broadcast(&waitingCore,
356                           sizeof(int),
357                           0,
358                           XDMF_DSM_INTRA_COMM);
359 
360     if (this->Comm->GetId() != 0)
361     {
362       WaitingMap[std::string(notifystring)].push_back(waitingCore);
363     }
364 
365     break;
366   }
367   // sends out and clears the notifcations that are stored for a specific file.
368   case XDMF_DSM_CLEAR_NOTIFY:
369   {
370     // send a command to other cores to clear this notification
371     int strlength = 0;
372     char * notifystring;
373     int clearCode = 0;
374     if (this->Comm->GetId() == 0)
375     {
376       this->ReceiveAcknowledgment(who,
377                                   strlength,
378                                   XDMF_DSM_EXCHANGE_TAG,
379                                   this->CommChannel);
380       notifystring = new char[strlength+1]();
381       this->ReceiveData(who,
382                         notifystring,
383                         strlength,
384                         XDMF_DSM_EXCHANGE_TAG,
385                         0,
386                         this->CommChannel);
387       notifystring[strlength] = 0;
388       this->ReceiveAcknowledgment(who,
389                                   clearCode,
390                                   XDMF_DSM_EXCHANGE_TAG,
391                                   this->CommChannel);
392     }
393     // broad cast string to be notified
394     if (WaitingMap[std::string(notifystring)].size() > 0)
395     {
396       // Request the help of the rest of the server
397       // Send XDMF_DSM_SET_NOTIFY to all server cores in order of increasing id
398       for (int i = this->GetStartServerId() + 1; // Since this is core 0 sending it
399            i <= this->GetEndServerId();
400            ++i) {
401         if (i != this->Comm->GetInterId())
402         {
403           this->SendCommandHeader(XDMF_DSM_CLEAR_NOTIFY, i, 0, 0, XDMF_DSM_INTER_COMM);
404         }
405       }
406 
407       // BCAST strlen and code
408       this->Comm->Broadcast(&strlength,
409                             sizeof(int),
410                             0,
411                             XDMF_DSM_INTRA_COMM);
412       this->Comm->Broadcast(&clearCode,
413                             sizeof(int),
414                             0,
415                             XDMF_DSM_INTRA_COMM);
416       // BCAST notifystring
417       if  (this->Comm->GetId() != 0)
418       {
419         notifystring = new char[strlength+1]();
420       }
421       this->Comm->Broadcast(&notifystring,
422                             strlength,
423                             0,
424                             XDMF_DSM_INTRA_COMM);
425       notifystring[strlength] = 0;
426       // cores notify based on their index, in order to split up the work
427       std::vector<unsigned int> notifiedCores = WaitingMap[std::string(notifystring)];
428       for (unsigned int i = this->Comm->GetId(); i < notifiedCores.size(); i+=this->Comm->GetIntraSize())
429       {
430         unsigned int recvCore = notifiedCores[i];
431         this->SendAcknowledgment(recvCore,
432                                  clearCode,
433                                  XDMF_DSM_EXCHANGE_TAG,
434                                  this->CommChannel);
435       }
436       // Then all cores remove the string from the map of notifications
437       WaitingMap.erase(std::string(notifystring));
438     }
439     break;
440   }
441   case XDMF_DSM_REGISTER_FILE:
442   {
443     // save file description
444     XdmfDSMBuffer::XDMF_file_desc * newfile = new XdmfDSMBuffer::XDMF_file_desc();
445 
446     int strlength = 0;
447 
448     this->ReceiveAcknowledgment(who,
449                                 strlength,
450                                 XDMF_DSM_EXCHANGE_TAG,
451                                 this->CommChannel);
452 
453     newfile->name = new char[strlength + 1];
454 
455     this->ReceiveData(who,
456                       newfile->name,
457                       strlength,
458                       XDMF_DSM_EXCHANGE_TAG,
459                       0,
460                       this->CommChannel);
461 
462     newfile->name[strlength] = 0;
463 
464     this->ReceiveData(who,
465                       (char *)&newfile->start,
466                       sizeof(haddr_t),
467                       XDMF_DSM_EXCHANGE_TAG,
468                       0,
469                       this->CommChannel);
470 
471     this->ReceiveData(who,
472                       (char *)&newfile->end,
473                       sizeof(haddr_t),
474                       XDMF_DSM_EXCHANGE_TAG,
475                       0,
476                       this->CommChannel);
477 
478     int recvNumPages = 0;
479 
480     this->ReceiveAcknowledgment(who,
481                                 recvNumPages,
482                                 XDMF_DSM_EXCHANGE_TAG,
483                                 this->CommChannel);
484 
485     newfile->numPages = recvNumPages;
486 
487     if (newfile->numPages > 0)
488     {
489       newfile->pages = new unsigned int[newfile->numPages]();
490 
491       this->ReceiveData(who,
492                         (char *)newfile->pages,
493                         newfile->numPages * sizeof(unsigned int),
494                         XDMF_DSM_EXCHANGE_TAG,
495                         0,
496                         this->CommChannel);
497     }
498     else
499     {
500       newfile->pages = NULL;
501     }
502 
503     // If old description exists, overwrite it.
504 
505     FileDefinitions[std::string(newfile->name)] = newfile;
506 
507     break;
508   }
509   case XDMF_DSM_REQUEST_PAGES:
510   {
511     // set aside pages to a file
512     char * requestfile;
513     int strlength = 0;
514 
515     this->ReceiveAcknowledgment(who,
516                                 strlength,
517                                 XDMF_DSM_EXCHANGE_TAG,
518                                 this->CommChannel);
519 
520     requestfile = new char[strlength + 1];
521 
522     this->ReceiveData(who,
523                       requestfile,
524                       strlength,
525                       XDMF_DSM_EXCHANGE_TAG,
526                       0,
527                       this->CommChannel);
528 
529     requestfile[strlength] = 0;
530 
531     // This file will have its pages appended to.
532     XdmfDSMBuffer::XDMF_file_desc * filedesc;
533 
534     if (FileDefinitions.count(std::string(requestfile)) > 0)
535     {
536       filedesc = FileDefinitions[std::string(requestfile)];
537     }
538     else
539     {
540       filedesc = new XDMF_file_desc();
541       filedesc->start = 0;
542       filedesc->end = 0;
543       filedesc->numPages = 0;
544       filedesc->pages = NULL;
545     }
546 
547     int datasize = 0;
548 
549     // Request size required for the file
550     this->ReceiveAcknowledgment(who,
551                                 datasize,
552                                 XDMF_DSM_EXCHANGE_TAG,
553                                 this->CommChannel);
554 
555     // TODO Error handling block length must be greater than 0
556     // If Block size = 0 then do nothing?
557     // Then return blank data?
558 
559     int requestedblocks = ((double)datasize) / this->BlockLength;
560 
561     // Round up
562     if (requestedblocks * this->BlockLength != datasize)
563     {
564       ++requestedblocks;
565     }
566 
567     while (requestedblocks + PagesAssigned >= this->NumPages * this->Comm->GetIntraSize())
568     {
569       // If requested blocks are out of range, resize
570       for (int i = this->GetStartServerId() + 1; // Since this is core 0 sending it
571            i <= this->GetEndServerId();
572            ++i) {
573         if (i != this->Comm->GetInterId())
574         {
575           this->SendCommandHeader(XDMF_DSM_OPCODE_RESIZE, i, 0, 0, XDMF_DSM_INTER_COMM);
576         }
577       }
578       this->SetLength(this->Length + (this->Length * this->ResizeFactor));
579     }
580 
581     unsigned int * newpagelist = new unsigned int[filedesc->numPages + requestedblocks]();
582 
583     unsigned int index = 0;
584 
585     for (unsigned int i = 0; i < filedesc->numPages; ++i)
586     {
587       newpagelist[index] = filedesc->pages[index];
588       ++index;
589     }
590 
591     for (;index < filedesc->numPages + requestedblocks; ++index)
592     {
593       // The number of pages assigned is incremented after the page is added.
594       // The value added is simply an index
595       newpagelist[index] = PagesAssigned++;
596     }
597 
598     filedesc->numPages = filedesc->numPages + requestedblocks;
599     unsigned int * oldpointer = filedesc->pages;
600     filedesc->pages = newpagelist;
601 
602     if (oldpointer != NULL)
603     {
604       delete oldpointer;
605     }
606 
607     // Send back new page allocation pointer
608 
609     this->SendAcknowledgment(who,
610                              filedesc->numPages,
611                              XDMF_DSM_EXCHANGE_TAG,
612                              this->CommChannel);
613 
614     this->SendData(who,
615                    (char *)filedesc->pages,
616                    filedesc->numPages * sizeof(unsigned int),
617                    XDMF_DSM_EXCHANGE_TAG,
618                    0,
619                    this->CommChannel);
620 
621     this->SendData(who,
622                    (char*)&filedesc->start,
623                    sizeof(haddr_t),
624                    XDMF_DSM_EXCHANGE_TAG,
625                    0,
626                    this->CommChannel);
627 
628     filedesc->end = filedesc->start + (filedesc->numPages * this->BlockLength);
629 
630     this->SendData(who,
631                    (char*)&(filedesc->end),
632                    sizeof(haddr_t),
633                    XDMF_DSM_EXCHANGE_TAG,
634                    0,
635                    this->CommChannel);
636 
637     // Notify the current size of the buffer
638     int currentLength = this->Length;
639     this->SendAcknowledgment(who,
640                              currentLength,
641                              XDMF_DSM_EXCHANGE_TAG,
642                              this->CommChannel);
643 
644     break;
645   }
646   case XDMF_DSM_REQUEST_FILE:
647   {
648     char * requestfile;
649     int strlength = 0;
650 
651     this->ReceiveAcknowledgment(who,
652                                 strlength,
653                                 XDMF_DSM_EXCHANGE_TAG,
654                                 this->CommChannel);
655 
656     requestfile = new char[strlength + 1];
657 
658     this->ReceiveData(who,
659                       requestfile,
660                       strlength,
661                       XDMF_DSM_EXCHANGE_TAG,
662                       0,
663                       this->CommChannel);
664 
665     requestfile[strlength] = 0;
666 
667     // This file will be returned.
668     XdmfDSMBuffer::XDMF_file_desc * filedesc;
669 
670     if (FileDefinitions.count(std::string(requestfile)) > 0)
671     {
672       this->SendAcknowledgment(who,
673                                XDMF_DSM_SUCCESS,
674                                XDMF_DSM_EXCHANGE_TAG,
675                                this->CommChannel);
676 
677       filedesc = FileDefinitions[std::string(requestfile)];
678 
679       this->SendData(who,
680                      (char*)&filedesc->start,
681                      sizeof(haddr_t),
682                      XDMF_DSM_EXCHANGE_TAG,
683                      0,
684                      this->CommChannel);
685 
686       this->SendData(who,
687                      (char*)&filedesc->end,
688                      sizeof(haddr_t),
689                      XDMF_DSM_EXCHANGE_TAG,
690                      0,
691                      this->CommChannel);
692 
693       int sendNumPages = filedesc->numPages;
694 
695       this->SendAcknowledgment(who,
696                                sendNumPages,
697                                XDMF_DSM_EXCHANGE_TAG,
698                                this->CommChannel);
699 
700       this->SendData(who,
701                      (char *)filedesc->pages,
702                      filedesc->numPages * sizeof(unsigned int),
703                      XDMF_DSM_EXCHANGE_TAG,
704                      0,
705                      this->CommChannel);
706     }
707     else
708     {
709       this->SendAcknowledgment(who,
710                                XDMF_DSM_FAIL,
711                                XDMF_DSM_EXCHANGE_TAG,
712                                this->CommChannel);
713     }
714 
715     break;
716   }
717   case XDMF_DSM_OPCODE_RESIZE:
718     this->SetLength(this->Length + (this->Length * this->ResizeFactor));
719     break;
720   case XDMF_DSM_REQUEST_ACCESS:
721   {
722     int isLocked = 0;
723 
724     char * requestfile;
725     int strlength = 0;
726 
727     this->ReceiveAcknowledgment(who,
728                                 strlength,
729                                 XDMF_DSM_EXCHANGE_TAG,
730                                 this->CommChannel);
731 
732     requestfile = new char[strlength + 1];
733 
734     this->ReceiveData(who,
735                       requestfile,
736                       strlength,
737                       XDMF_DSM_EXCHANGE_TAG,
738                       0,
739                       this->CommChannel);
740 
741     requestfile[strlength] = 0;
742 
743     // If the requesting core is the one who
744     // already locked the file then tell it that there is not lock.
745     std::map<std::string, int>::iterator isOwner = FileOwners.find(std::string(requestfile));
746 
747     if (LockedMap.count(std::string(requestfile)) > 0)
748     {
749       if (isOwner->second != who)
750       {
751         // If the file is locked notify the requesting core and add it to the queue.
752         isLocked = 1;
753         LockedMap[std::string(requestfile)].push(who);
754       }
755     }
756     else
757     {
758       LockedMap[std::string(requestfile)] = std::queue<unsigned int>();
759       FileOwners[std::string(requestfile)] = who;
760     }
761 
762     this->SendAcknowledgment(who,
763                              isLocked,
764                              XDMF_DSM_EXCHANGE_TAG,
765                              this->CommChannel);
766 
767     break;
768   }
769   case XDMF_DSM_UNLOCK_FILE:
770   {
771     char * requestfile;
772     int strlength = 0;
773 
774     this->ReceiveAcknowledgment(who,
775                                 strlength,
776                                 XDMF_DSM_EXCHANGE_TAG,
777                                 this->CommChannel);
778 
779     requestfile = new char[strlength + 1];
780 
781     this->ReceiveData(who,
782                       requestfile,
783                       strlength,
784                       XDMF_DSM_EXCHANGE_TAG,
785                       0,
786                       this->CommChannel);
787 
788     requestfile[strlength] = 0;
789 
790     // If file isn't locked do nothing
791     if (LockedMap.count(std::string(requestfile)) > 0)
792     {
793       // Remove the queue if there are no more waiting
794       if (LockedMap[std::string(requestfile)].size() > 0)
795       {
796         // Pop the next process waiting off the queue
797         unsigned int nextCore = LockedMap[std::string(requestfile)].front();
798         LockedMap[std::string(requestfile)].pop();
799         FileOwners[std::string(requestfile)] = nextCore;
800         this->SendAcknowledgment(nextCore,
801                                  1,
802                                  XDMF_DSM_EXCHANGE_TAG,
803                                  this->CommChannel);
804       }
805       if(LockedMap[std::string(requestfile)].size() == 0)
806       {
807         LockedMap.erase(std::string(requestfile));
808         FileOwners.erase(std::string(requestfile));
809       }
810     }
811 
812     break;
813   }
814   case XDMF_DSM_LOCK_ACQUIRE:
815     // Currently unsupported
816     break;
817 
818   // H5FD_DSM_LOCK_RELEASE
819   // Comes from client or server depending on communicator
820   case XDMF_DSM_LOCK_RELEASE:
821     // Currently unsupported
822     break;
823 
824   // H5FD_DSM_OPCODE_DONE
825   // Always received on server
826   case XDMF_DSM_OPCODE_DONE:
827     break;
828 
829   // DEFAULT
830   default :
831     try {
832       std::stringstream message;
833       message << "Error: Unknown Opcode " << opcode;
834       XdmfError::message(XdmfError::FATAL, message.str());
835     }
836     catch (XdmfError & e) {
837       throw e;
838     }
839   }
840 
841   if (returnOpcode) *returnOpcode = opcode;
842   return(XDMF_DSM_SUCCESS);
843 }
844 
845 void
BufferServiceLoop(int * returnOpcode)846 XdmfDSMBuffer::BufferServiceLoop(int *returnOpcode)
847 {
848   int op, status = XDMF_DSM_SUCCESS;
849   while (status == XDMF_DSM_SUCCESS) {
850     try {
851       status = this->BufferService(&op);
852     }
853     catch (XdmfError & e) {
854       throw e;
855     }
856     if (returnOpcode) *returnOpcode = op;
857     if (op == XDMF_DSM_OPCODE_DONE) {
858       break;
859     }
860   }
861 }
862 
863 void
Create(MPI_Comm newComm,int startId,int endId)864 XdmfDSMBuffer::Create(MPI_Comm newComm, int startId, int endId)
865 {
866   //
867   // Create DSM communicator
868   //
869   switch (this->InterCommType) {
870     case XDMF_DSM_COMM_MPI:
871       this->Comm = new XdmfDSMCommMPI();
872       break;
873     default:
874       try {
875         XdmfError::message(XdmfError::FATAL, "DSM communication type not supported");
876       }
877       catch (XdmfError & e) {
878         throw e;
879       }
880   }
881 
882   this->Comm->DupComm(newComm);
883   this->Comm->Init();
884 
885   // Uniform Dsm : every node has a buffer the same size. (Addresses are sequential)
886   // Block DSM : nodes are set up using paging
887   long length = (long) (this->LocalBufferSizeMBytes)*1024LU*1024LU;
888   switch (this->DsmType) {
889     case XDMF_DSM_TYPE_UNIFORM:
890     case XDMF_DSM_TYPE_UNIFORM_RANGE:
891       this->ConfigureUniform(this->Comm, length, startId, endId);
892       break;
893     case XDMF_DSM_TYPE_BLOCK_CYCLIC:
894       this->ConfigureUniform(this->Comm, length, startId, endId, this->BlockLength, false);
895       break;
896     case XDMF_DSM_TYPE_BLOCK_RANDOM:
897       this->ConfigureUniform(this->Comm, length, startId, endId, this->BlockLength, true);
898       break;
899     default:
900       try {
901         XdmfError(XdmfError::FATAL, "DSM configuration type not supported");
902       }
903       catch (XdmfError & e) {
904         throw e;
905       }
906   }
907 }
908 
909 void
ConfigureUniform(XdmfDSMCommMPI * aComm,long aLength,int startId,int endId,long aBlockLength,bool random)910 XdmfDSMBuffer::ConfigureUniform(XdmfDSMCommMPI *aComm, long aLength,
911                                 int startId, int endId, long aBlockLength,
912                                 bool random)
913 {
914   if (startId < 0) {
915     startId = 0;
916   }
917   if (endId < 0) {
918     endId = aComm->GetIntraSize() - 1;
919   }
920   this->SetDsmType(XDMF_DSM_TYPE_UNIFORM_RANGE);
921   if ((startId == 0) && (endId == aComm->GetIntraSize() - 1)) {
922     this->SetDsmType(XDMF_DSM_TYPE_UNIFORM);
923   }
924   if (aBlockLength) {
925     if (!random) {
926       this->SetDsmType(XDMF_DSM_TYPE_BLOCK_CYCLIC);
927     }
928     else {
929       this->SetDsmType(XDMF_DSM_TYPE_BLOCK_RANDOM);
930     }
931     this->SetBlockLength(aBlockLength);
932   }
933   this->StartServerId = startId;
934   this->EndServerId = endId;
935   this->SetComm(aComm);
936   if ((aComm->GetId() >= startId) &&
937       (aComm->GetId() <= endId) &&
938       this->IsServer) {
939     try {
940       if (aBlockLength) {
941         // For optimization we make the DSM length fit to a multiple of block size
942         this->SetLength(((long)(aLength / aBlockLength)) * aBlockLength);
943         this->NumPages = ((long)(aLength / aBlockLength));
944       }
945       else {
946         this->SetLength(aLength);
947       }
948     }
949     catch (XdmfError & e) {
950       throw e;
951     }
952     this->StartAddress = (aComm->GetId() - startId) * aLength;
953     this->EndAddress = this->StartAddress + aLength - 1;
954   }
955   else {
956     if (aBlockLength) {
957       this->Length = ((long)(aLength / aBlockLength)) * aBlockLength;
958     }
959     else {
960       this->Length = aLength;
961     }
962   }
963   this->TotalLength = this->GetLength() * (endId - startId + 1);
964   // Set DSM structure
965   std::vector<std::pair<std::string, unsigned int> > newStructure;
966   // determine size of application before the server
967   if (startId > 0)
968   {
969     newStructure.push_back(std::pair<std::string, unsigned int>(aComm->GetApplicationName(), startId));
970   }
971   newStructure.push_back(std::pair<std::string, unsigned int>("Server", (endId + 1) - startId));
972   if(aComm->GetInterSize() - (startId +((endId + 1) - startId)) > 0)
973   {
974     newStructure.push_back(std::pair<std::string, unsigned int>(aComm->GetApplicationName(), aComm->GetInterSize() - (startId +((endId + 1) - startId))));
975   }
976   aComm->SetDsmProcessStructure(newStructure);
977 }
978 
979 void
Connect(bool persist)980 XdmfDSMBuffer::Connect(bool persist)
981 {
982   int status;
983 
984   do {
985     try {
986       status = this->GetComm()->Connect();
987     }
988     catch (XdmfError & e) {
989       throw e;
990     }
991     if (status == MPI_SUCCESS) {
992       this->SetIsConnected(true);
993       try {
994         this->ReceiveInfo();
995       }
996       catch (XdmfError & e) {
997         throw e;
998       }
999     }
1000     else {
1001 #ifdef _WIN32
1002   Sleep(1000);
1003   // Since windows has a different sleep command
1004 #else
1005   sleep(1);
1006 #endif
1007     }
1008   } while (persist && (status != MPI_SUCCESS));
1009 }
1010 
1011 void
Disconnect()1012 XdmfDSMBuffer::Disconnect()
1013 {
1014   // Disconnecting is done manually
1015   try {
1016     this->GetComm()->Disconnect();
1017   }
1018   catch (XdmfError & e) {
1019     throw e;
1020   }
1021   this->SetIsConnected(false);
1022 }
1023 
1024 void
Get(long Address,long aLength,void * Data)1025 XdmfDSMBuffer::Get(long Address, long aLength, void *Data)
1026 {
1027   int   who, MyId = this->Comm->GetInterId();
1028   int   astart, aend, len;
1029   char   *datap = (char *)Data;
1030 
1031   // While there is length left
1032   while(aLength) {
1033     // Figure out what server core the address is located on
1034     who = this->AddressToId(Address);
1035     if(who == XDMF_DSM_FAIL){
1036       try {
1037         XdmfError::message(XdmfError::FATAL, "Address Error");
1038       }
1039       catch (XdmfError & e) {
1040         throw e;
1041       }
1042     }
1043     // Get the start and end of the block listed
1044     this->GetAddressRangeForId(who, &astart, &aend);
1045     // Determine the amount of data to be written to that core
1046     // Basically, it's how much data will fit from
1047     // the starting point of the address to the end
1048     len = std::min(aLength, aend - Address + 1);
1049     // If the data is on the core running this code, then the put is simple
1050     if(who == MyId){
1051       char *dp;
1052       dp = this->DataPointer;
1053       dp += Address - this->StartAddress;
1054       memcpy(datap, dp, len);
1055     }
1056     else{
1057       // Otherwise send it to the appropriate core to deal with
1058       int   dataComm = XDMF_DSM_INTRA_COMM;
1059       if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1060         dataComm = XDMF_DSM_INTER_COMM;
1061       }
1062       try {
1063         this->SendCommandHeader(XDMF_DSM_OPCODE_GET, who, Address - astart, len, dataComm);
1064       }
1065       catch (XdmfError & e) {
1066         throw e;
1067       }
1068       try {
1069         this->ReceiveData(who, datap, len, XDMF_DSM_GET_DATA_TAG, Address - astart, dataComm);
1070       }
1071       catch (XdmfError & e) {
1072         throw e;
1073       }
1074     }
1075     // Shift all the numbers by the length of the data written
1076     // Until aLength = 0
1077     aLength -= len;
1078     Address += len;
1079     datap += len;
1080   }
1081 }
1082 
1083 void
Get(unsigned int * pages,unsigned int numPages,long Address,long aLength,void * Data)1084 XdmfDSMBuffer::Get(unsigned int * pages, unsigned int numPages, long Address, long aLength, void *Data)
1085 {
1086   char * currentStart;
1087   unsigned int currentPageId = Address / this->BlockLength;
1088   unsigned int dsmPage;
1089   long startingAddress = Address % this->BlockLength;
1090   unsigned int tranferedLength;
1091   unsigned int dataPage = 0;
1092 
1093   long pointeroffset = 0;
1094 
1095   int serverCore;
1096   int writeAddress;
1097 
1098   while (aLength) {
1099     if (dataPage == 0) {
1100       tranferedLength = this->BlockLength - startingAddress;
1101     }
1102     else {
1103       tranferedLength = this->BlockLength;
1104     }
1105     if (tranferedLength > aLength) {
1106       tranferedLength = aLength;
1107     }
1108 
1109     dsmPage = pages[currentPageId];
1110 
1111     currentStart = (char *)Data + pointeroffset;;
1112 
1113     // Write page to DSM
1114     // page to DSM server Id
1115     // page to address
1116     // write to location
1117 
1118     serverCore = PageToId(dsmPage);
1119     writeAddress = PageToAddress(dsmPage);
1120 
1121     if (serverCore == XDMF_DSM_FAIL) {
1122       XdmfError::message(XdmfError::FATAL,
1123                          "Error: Unable to determine server core.");
1124     }
1125 
1126     if (writeAddress == XDMF_DSM_FAIL) {
1127       XdmfError::message(XdmfError::FATAL,
1128                          "Error: Unable to determine write address.");
1129     }
1130 
1131     if (dataPage == 0)
1132     {
1133       writeAddress += startingAddress;
1134     }
1135 
1136     // If the data is on the core running this code, then the put is simple
1137     if(serverCore == this->Comm->GetInterId()){
1138       char *dp;
1139       dp = this->DataPointer;
1140       dp += writeAddress;
1141       memcpy(currentStart, dp, tranferedLength);
1142     }
1143     else{
1144       // Otherwise send it to the appropriate core to deal with
1145       int dataComm = XDMF_DSM_INTRA_COMM;
1146       if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1147         dataComm = XDMF_DSM_INTER_COMM;
1148       }
1149       try {
1150         this->SendCommandHeader(XDMF_DSM_OPCODE_GET,
1151                                 serverCore,
1152                                 writeAddress,
1153                                 tranferedLength,
1154                                 dataComm);
1155       }
1156       catch (XdmfError & e) {
1157         throw e;
1158       }
1159       try {
1160         this->ReceiveData(serverCore,
1161                           currentStart,
1162                           tranferedLength,
1163                           XDMF_DSM_GET_DATA_TAG,
1164                           writeAddress,
1165                           dataComm);
1166       }
1167       catch (XdmfError & e) {
1168         throw e;
1169       }
1170     }
1171 
1172     aLength -= tranferedLength;
1173     pointeroffset += tranferedLength;
1174     // move to the next page
1175     ++currentPageId;
1176     ++dataPage;
1177   }
1178 }
1179 
1180 void
GetAddressRangeForId(int Id,int * Start,int * End)1181 XdmfDSMBuffer::GetAddressRangeForId(int Id, int *Start, int *End){
1182     switch(this->DsmType) {
1183       case XDMF_DSM_TYPE_UNIFORM :
1184       case XDMF_DSM_TYPE_UNIFORM_RANGE :
1185         // All Servers have same length
1186         // Start index is equal to the id inside the servers times
1187         // the length of the block per server
1188         // It is the starting index of the server's data block relative
1189         // to the entire block
1190         *Start = (Id - this->StartServerId) * this->Length;
1191         // End index is simply the start index + the length of the
1192         // server's data block.
1193         // The range produced is the start of the server's data block to its end.
1194         *End = *Start + Length - 1;
1195         break;
1196       default :
1197         // Not Implemented
1198         try {
1199           std::stringstream message;
1200           message << "DsmType " << this->DsmType << " not yet implemented";
1201           XdmfError::message(XdmfError::FATAL, message.str());
1202         }
1203         catch (XdmfError & e) {
1204           throw e;
1205         }
1206         break;
1207     }
1208 }
1209 
1210 long
GetBlockLength()1211 XdmfDSMBuffer::GetBlockLength()
1212 {
1213   return this->BlockLength;
1214 }
1215 
1216 XdmfDSMCommMPI *
GetComm()1217 XdmfDSMBuffer::GetComm()
1218 {
1219   return this->Comm;
1220 }
1221 
1222 char *
GetDataPointer()1223 XdmfDSMBuffer::GetDataPointer()
1224 {
1225   return this->DataPointer;
1226 }
1227 
1228 int
GetDsmType()1229 XdmfDSMBuffer::GetDsmType()
1230 {
1231   return this->DsmType;
1232 }
1233 
1234 int
GetEndAddress()1235 XdmfDSMBuffer::GetEndAddress()
1236 {
1237   return this->EndAddress;
1238 }
1239 
1240 int
GetEndServerId()1241 XdmfDSMBuffer::GetEndServerId()
1242 {
1243   return this->EndServerId;
1244 }
1245 
1246 int
GetInterCommType()1247 XdmfDSMBuffer::GetInterCommType()
1248 {
1249   return this->InterCommType;
1250 }
1251 
1252 bool
GetIsConnected()1253 XdmfDSMBuffer::GetIsConnected()
1254 {
1255   return IsConnected;
1256 }
1257 
1258 bool
GetIsServer()1259 XdmfDSMBuffer::GetIsServer()
1260 {
1261   return this->IsServer;
1262 }
1263 
1264 long
GetLength()1265 XdmfDSMBuffer::GetLength()
1266 {
1267   // This is the length of the pointer on the current core.
1268   // Different from local buffer size as that value is
1269   // the starting size.
1270   return this->Length;
1271 }
1272 
1273 unsigned int
GetLocalBufferSizeMBytes()1274 XdmfDSMBuffer::GetLocalBufferSizeMBytes()
1275 {
1276   // This is the starting value, so it is not updated as the pointer is expanded.
1277   return this->LocalBufferSizeMBytes;
1278 }
1279 
1280 double
GetResizeFactor()1281 XdmfDSMBuffer::GetResizeFactor()
1282 {
1283   return this->ResizeFactor;
1284 }
1285 
1286 int
GetStartAddress()1287 XdmfDSMBuffer::GetStartAddress()
1288 {
1289   return this->StartAddress;
1290 }
1291 
1292 int
GetStartServerId()1293 XdmfDSMBuffer::GetStartServerId()
1294 {
1295   return this->StartServerId;
1296 }
1297 
1298 long
GetTotalLength()1299 XdmfDSMBuffer::GetTotalLength()
1300 {
1301   return this->TotalLength;
1302 }
1303 
1304 void
Lock(char * filename)1305 XdmfDSMBuffer::Lock(char * filename)
1306 {
1307   int strlength = std::string(filename).size();
1308   // Request access to the file
1309   this->SendCommandHeader(XDMF_DSM_REQUEST_ACCESS,
1310                           this->GetStartServerId(),
1311                           0,
1312                           0,
1313                           XDMF_DSM_INTER_COMM);
1314 
1315   this->SendAcknowledgment(this->GetStartServerId(),
1316                            strlength,
1317                            XDMF_DSM_EXCHANGE_TAG,
1318                            XDMF_DSM_INTER_COMM);
1319 
1320   this->SendData(this->GetStartServerId(),
1321                  filename,
1322                  strlength,
1323                  XDMF_DSM_EXCHANGE_TAG,
1324                  0,
1325                  XDMF_DSM_INTER_COMM);
1326 
1327   int isLocked = 0;
1328 
1329   this->ReceiveAcknowledgment(this->GetStartServerId(),
1330                               isLocked,
1331                               XDMF_DSM_EXCHANGE_TAG,
1332                               XDMF_DSM_INTER_COMM);
1333 
1334   if (isLocked == 1)
1335   {
1336     // If locked wait for notification that the file is available.
1337     this->ReceiveAcknowledgment(this->GetStartServerId(),
1338                             isLocked,
1339                             XDMF_DSM_EXCHANGE_TAG,
1340                             XDMF_DSM_INTER_COMM);
1341   }
1342 }
1343 
1344 int
PageToId(int pageId)1345 XdmfDSMBuffer::PageToId(int pageId)
1346 {
1347   int   ServerId = XDMF_DSM_FAIL;
1348 
1349   switch(this->DsmType) {
1350     case XDMF_DSM_TYPE_BLOCK_CYCLIC :
1351     case XDMF_DSM_TYPE_BLOCK_RANDOM :
1352     {
1353       // Block based allocation should use PageToId
1354       // All Servers have same length
1355       // This finds out which server the address provided starts on
1356       int serversize = (this->EndServerId - this->StartServerId);
1357       if (serversize < 1)
1358       {
1359         serversize = 1;
1360       }
1361       ServerId = pageId % serversize;// This should only be called by the server
1362       ServerId += this->StartServerId; // Apply the offset of the server if required.
1363       break;
1364     }
1365     default :
1366       // Not Implemented
1367       try {
1368         std::stringstream message;
1369         message << "DsmType " << this->DsmType << " not yet implemented or not paged";
1370         XdmfError::message(XdmfError::FATAL, message.str());
1371       }
1372       catch (XdmfError & e) {
1373         throw e;
1374       }
1375       break;
1376     }
1377     return(ServerId);
1378 }
1379 
1380 int
PageToAddress(int pageId)1381 XdmfDSMBuffer::PageToAddress(int pageId)
1382 {
1383   int   resultAddress = XDMF_DSM_FAIL;
1384 
1385   switch(this->DsmType) {
1386     case XDMF_DSM_TYPE_BLOCK_CYCLIC :
1387     case XDMF_DSM_TYPE_BLOCK_RANDOM :
1388     {
1389       // Block based allocation should use PageToId
1390       // All Servers have same length
1391       // This finds out which server the address provided starts on
1392       // Since this is integers being divided the result is truncated.
1393       int serversize = (this->EndServerId - this->StartServerId);
1394       if (serversize < 1)
1395       {
1396         serversize = 1;
1397       }
1398       resultAddress = this->BlockLength * (pageId / serversize);
1399       break;
1400     }
1401     default :
1402       // Not Implemented
1403       try {
1404         std::stringstream message;
1405         message << "DsmType " << this->DsmType << " not yet implemented or not paged";
1406         XdmfError::message(XdmfError::FATAL, message.str());
1407       }
1408       catch (XdmfError & e) {
1409         throw e;
1410       }
1411       break;
1412     }
1413     return(resultAddress);
1414 }
1415 
1416 void
ProbeCommandHeader(int * comm)1417 XdmfDSMBuffer::ProbeCommandHeader(int *comm)
1418 {
1419   // Used for finding a comm that has a waiting command, then sets the comm
1420   int status = XDMF_DSM_FAIL;
1421   MPI_Status signalStatus;
1422 
1423   int flag;
1424   MPI_Comm probeComm =
1425     static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm();
1426 
1427   // Spin until a message is found on one of the communicators
1428   while (status != XDMF_DSM_SUCCESS) {
1429     status = MPI_Iprobe(XDMF_DSM_ANY_SOURCE,
1430                         XDMF_DSM_ANY_TAG,
1431                         probeComm,
1432                         &flag,
1433                         &signalStatus);
1434     if (status != MPI_SUCCESS)
1435     {
1436        try {
1437          XdmfError::message(XdmfError::FATAL,
1438                             "Error: Failed to probe for command header");
1439        }
1440        catch (XdmfError & e) {
1441          throw e;
1442        }
1443     }
1444     if (flag) {
1445       status = XDMF_DSM_SUCCESS;
1446     }
1447     else {
1448       if (static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm() != MPI_COMM_NULL) {
1449         if (probeComm == static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm()) {
1450           probeComm = static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm();
1451         }
1452         else {
1453           probeComm = static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm();
1454         }
1455       }
1456     }
1457   }
1458   if (probeComm == static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm()) {
1459     *comm = XDMF_DSM_INTER_COMM;
1460   }
1461   else
1462   {
1463     *comm = XDMF_DSM_INTRA_COMM;
1464   }
1465 
1466   probeComm = MPI_COMM_NULL;
1467 }
1468 
1469 void
Put(long Address,long aLength,const void * Data)1470 XdmfDSMBuffer::Put(long Address, long aLength, const void *Data)
1471 {
1472   int   who, MyId = this->Comm->GetInterId();
1473   int   astart, aend, len;
1474   char    *datap = (char *)Data;
1475 
1476   // While there is length left
1477   while(aLength){
1478     // Figure out what server core the address is located on
1479     who = this->AddressToId(Address);
1480     if(who == XDMF_DSM_FAIL){
1481       try {
1482         XdmfError::message(XdmfError::FATAL, "Address Error");
1483       }
1484       catch (XdmfError & e) {
1485         throw e;
1486       }
1487     }
1488     // Get the start and end of the block listed
1489     this->GetAddressRangeForId(who, &astart, &aend);
1490     // Determine the amount of data to be written to that core
1491     // Basically, it's how much data will fit from the starting point of
1492     // the address to the end
1493     len = std::min(aLength, aend - Address + 1);
1494     // If the data is on the core running this code, then the put is simple
1495     if(who == MyId){
1496       char *dp;
1497       dp = this->DataPointer;
1498       dp += Address - this->StartAddress;
1499       memcpy(dp, datap, len);
1500     }
1501     else{
1502       // Otherwise send it to the appropriate core to deal with
1503       int   dataComm = XDMF_DSM_INTRA_COMM;
1504       if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1505         dataComm = XDMF_DSM_INTER_COMM;
1506       }
1507       try {
1508         this->SendCommandHeader(XDMF_DSM_OPCODE_PUT,
1509                                 who,
1510                                 Address - astart,
1511                                 len,
1512                                 dataComm);
1513       }
1514       catch (XdmfError & e) {
1515         throw e;
1516       }
1517       try {
1518         this->SendData(who,
1519                        datap,
1520                        len,
1521                        XDMF_DSM_PUT_DATA_TAG,
1522                        Address - astart,
1523                        dataComm);
1524       }
1525       catch (XdmfError & e) {
1526         throw e;
1527       }
1528     }
1529     // Shift all the numbers by the length of the data written
1530     // Until aLength = 0
1531     aLength -= len;
1532     Address += len;
1533     datap += len;
1534   }
1535 }
1536 
1537 void
Put(unsigned int * pages,unsigned int numPages,haddr_t Address,haddr_t aLength,const void * Data)1538 XdmfDSMBuffer::Put(unsigned int * pages, unsigned int numPages, haddr_t Address, haddr_t aLength, const void *Data)
1539 {
1540 
1541   char * currentStart;
1542   unsigned int currentPageId = Address / this->BlockLength;
1543   unsigned int dsmPage = 0;
1544   long startingAddress = Address % this->BlockLength;
1545   unsigned int tranferedLength;
1546 
1547   long pointeroffset = 0;
1548 
1549   unsigned int dataPage = 0;
1550 
1551   int serverCore = 0;
1552   int writeAddress = 0;
1553 
1554   while (aLength) {
1555     if (dataPage == 0) {
1556       tranferedLength = this->BlockLength - startingAddress;
1557     }
1558     else {
1559       tranferedLength = this->BlockLength;
1560     }
1561     if (tranferedLength > aLength) {
1562       tranferedLength = aLength;
1563     }
1564 
1565     dsmPage = pages[currentPageId];
1566 
1567     currentStart = (char *)Data + pointeroffset;
1568 
1569     // Write page to DSM
1570     // page to DSM server Id
1571     // page to address
1572     // write to location
1573     serverCore = PageToId(dsmPage);
1574 
1575     writeAddress = PageToAddress(dsmPage);
1576 
1577     if (serverCore == XDMF_DSM_FAIL) {
1578       XdmfError::message(XdmfError::FATAL, "Error: Unable to determine page server core.");
1579     }
1580     if (writeAddress == XDMF_DSM_FAIL) {
1581       XdmfError::message(XdmfError::FATAL, "Error: Unable to determine page address.");
1582     }
1583 
1584     if (dataPage == 0)
1585     {
1586       writeAddress += startingAddress;
1587     }
1588 
1589     // If the data is on the core running this code, then the put is simple
1590     if(serverCore == this->Comm->GetInterId()) {
1591       char *dp;
1592       dp = this->DataPointer;
1593       dp += writeAddress;
1594       memcpy(dp, currentStart, tranferedLength);
1595     }
1596     else{
1597       // Otherwise send it to the appropriate core to deal with
1598       int dataComm = XDMF_DSM_INTRA_COMM;
1599       if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1600         dataComm = XDMF_DSM_INTER_COMM;
1601       }
1602       try {
1603         this->SendCommandHeader(XDMF_DSM_OPCODE_PUT,
1604                                 serverCore,
1605                                 writeAddress,
1606                                 tranferedLength,
1607                                 dataComm);
1608       }
1609       catch (XdmfError & e) {
1610         throw e;
1611       }
1612       try {
1613         this->SendData(serverCore,
1614                        currentStart,
1615                        tranferedLength,
1616                        XDMF_DSM_PUT_DATA_TAG,
1617                        writeAddress,
1618                        dataComm);
1619       }
1620       catch (XdmfError & e) {
1621         throw e;
1622       }
1623     }
1624 
1625     aLength -= tranferedLength;
1626     pointeroffset += tranferedLength;
1627     // move to the next page
1628     ++currentPageId;
1629     ++dataPage;
1630   }
1631 }
1632 
1633 void
ReceiveAcknowledgment(int source,int & data,int tag,int comm)1634 XdmfDSMBuffer::ReceiveAcknowledgment(int source, int &data, int tag, int comm)
1635 {
1636   int status;
1637   MPI_Status signalStatus;
1638 
1639   this->Comm->Receive(&data,
1640                       sizeof(int),
1641                       source,
1642                       comm,
1643                       tag);
1644   status = MPI_SUCCESS;
1645 
1646   if (status != MPI_SUCCESS) {
1647     try {
1648       XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
1649     }
1650     catch (XdmfError & e) {
1651       throw e;
1652     }
1653   }
1654 }
1655 
1656 void
ReceiveCommandHeader(int * opcode,int * source,int * address,int * aLength,int comm,int remoteSource)1657 XdmfDSMBuffer::ReceiveCommandHeader(int *opcode, int *source, int *address, int *aLength, int comm, int remoteSource)
1658 {
1659   CommandMsg cmd;
1660   memset(&cmd, 0, sizeof(CommandMsg));
1661   int status = MPI_ERR_OTHER;
1662   MPI_Status signalStatus;
1663 
1664   if (remoteSource < 0) {
1665     remoteSource = MPI_ANY_SOURCE;
1666   }
1667 
1668   this->Comm->Receive(&cmd,
1669                       sizeof(CommandMsg),
1670                       remoteSource,
1671                       comm,
1672                       XDMF_DSM_COMMAND_TAG);
1673 
1674   status = MPI_SUCCESS;
1675 
1676   if (status != MPI_SUCCESS) {
1677     try {
1678       XdmfError::message(XdmfError::FATAL, "Error: Failed to receive command header");
1679     }
1680     catch (XdmfError & e) {
1681       throw e;
1682     }
1683   }
1684   else {
1685     *opcode  = cmd.Opcode;
1686     *source  = cmd.Source;
1687     *address = cmd.Address;
1688     *aLength = cmd.Length;
1689   }
1690 }
1691 
1692 void
ReceiveData(int source,char * data,int aLength,int tag,int aAddress,int comm)1693 XdmfDSMBuffer::ReceiveData(int source, char * data, int aLength, int tag, int aAddress, int comm)
1694 {
1695   int status;
1696   MPI_Status signalStatus;
1697   this->Comm->Receive(data,
1698                       aLength,
1699                       source,
1700                       comm,
1701                       tag);
1702   status = MPI_SUCCESS;
1703   if (status != MPI_SUCCESS) {
1704     try {
1705       XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
1706     }
1707     catch (XdmfError & e) {
1708       throw e;
1709     }
1710   }
1711 }
1712 
1713 void
ReceiveInfo()1714 XdmfDSMBuffer::ReceiveInfo()
1715 {
1716   InfoMsg  dsmInfo;
1717   int status;
1718 
1719   memset(&dsmInfo, 0, sizeof(InfoMsg));
1720 
1721   int infoStatus = 0;
1722 
1723   if (this->Comm->GetId() == 0) {
1724     infoStatus = 1;
1725   }
1726 
1727   int * groupInfoStatus = new int[this->Comm->GetInterSize()]();
1728 
1729   this->Comm->AllGather(&infoStatus,
1730                         sizeof(int),
1731                         &(groupInfoStatus[0]),
1732                         sizeof(int),
1733                         XDMF_DSM_INTER_COMM);
1734 
1735   int sendCore = 0;
1736 
1737   for (int i = 0; i < this->Comm->GetInterSize(); ++i) {
1738     if (groupInfoStatus[i] == 2) {
1739       sendCore = i;
1740     }
1741   }
1742 
1743   status = MPI_SUCCESS;
1744 
1745   this->Comm->Broadcast(&dsmInfo,
1746                         sizeof(InfoMsg),
1747                         sendCore,
1748                         XDMF_DSM_INTER_COMM);
1749   if (status != MPI_SUCCESS) {
1750     try {
1751       XdmfError::message(XdmfError::FATAL, "Error: Failed to broadcast info");
1752     }
1753     catch (XdmfError & e) {
1754       throw e;
1755     }
1756   }
1757   this->SetDsmType(dsmInfo.type);
1758   // We are a client so don't allocate anything but only set a virtual remote length
1759   this->SetLength(dsmInfo.length);
1760   this->TotalLength = dsmInfo.total_length;
1761   this->SetBlockLength(dsmInfo.block_length);
1762   this->StartServerId = dsmInfo.start_server_id;
1763   this->EndServerId = dsmInfo.end_server_id;
1764 
1765   MPI_Comm comm = this->Comm->GetInterComm();
1766 
1767   // Cray needs to be launched via the colon notation so that it
1768   // can properly create a merged communicator
1769 
1770   int rank = this->Comm->GetInterId();
1771   int size = this->Comm->GetInterSize();
1772 
1773   int currentCore = 0;
1774   int * checkstatus = new int[size]();
1775   int localCheck = 0;
1776   std::string applicationName = this->Comm->GetApplicationName();
1777 
1778   char * coreTag;
1779   int tagSize = 0;
1780 
1781   std::vector<int> coreSplit;
1782   unsigned int splitid = 0;
1783 
1784   std::vector<std::pair<std::string, unsigned int> > newStructure;
1785 
1786   int * splitIds;
1787   unsigned int splitsize = 0;
1788 
1789   while (currentCore < size)
1790   {
1791     if (rank == currentCore)
1792     {
1793       tagSize = applicationName.size();
1794     }
1795     MPI_Bcast(&tagSize, 1, MPI_INT, currentCore, comm);
1796     coreTag = new char[tagSize+1]();
1797 
1798     if (rank == currentCore)
1799     {
1800       strcpy(coreTag, applicationName.c_str());
1801     }
1802     MPI_Bcast(coreTag, tagSize, MPI_CHAR, currentCore, comm);
1803     coreTag[tagSize] = 0;
1804 
1805     if (strcmp(coreTag, applicationName.c_str()) == 0)
1806     {
1807       localCheck = 1;
1808     }
1809     else
1810     {
1811       localCheck = 0;
1812     }
1813 
1814     checkstatus[rank] = localCheck;
1815 
1816     MPI_Allgather(&localCheck, 1, MPI_INT,
1817                   checkstatus, 1, MPI_INT,
1818                   comm);
1819 
1820     bool insplit = false;
1821     while (checkstatus[currentCore])
1822     {
1823       if (rank == currentCore)
1824       {
1825         insplit = true;
1826       }
1827       coreSplit.push_back(currentCore);
1828       ++currentCore;
1829       if (currentCore >= size)
1830       {
1831         break;
1832       }
1833     }
1834     if (insplit)
1835     {
1836       splitIds = (int *)calloc(coreSplit.size(), sizeof(int));
1837       memcpy(splitIds, &(coreSplit[0]), coreSplit.size() * sizeof(int));
1838       splitsize = coreSplit.size();
1839     }
1840     newStructure.push_back(std::pair<std::string, unsigned int>(std::string(coreTag), coreSplit.size()));
1841     coreSplit.clear();
1842     ++splitid;
1843   }
1844   this->Comm->SetDsmProcessStructure(newStructure);
1845 }
1846 
1847 int
RegisterFile(char * name,unsigned int * pages,unsigned int numPages,haddr_t start,haddr_t end)1848 XdmfDSMBuffer::RegisterFile(char * name, unsigned int * pages, unsigned int numPages, haddr_t start, haddr_t end)
1849 {
1850   this->SendCommandHeader(XDMF_DSM_REGISTER_FILE,
1851                           this->GetStartServerId(),
1852                           0,
1853                           0,
1854                           XDMF_DSM_INTER_COMM);
1855 
1856   int strlength = std::string(name).size();
1857 
1858   this->SendAcknowledgment(this->GetStartServerId(),
1859                            strlength,
1860                            XDMF_DSM_EXCHANGE_TAG,
1861                            XDMF_DSM_INTER_COMM);
1862 
1863   this->SendData(this->GetStartServerId(),
1864                  name,
1865                  strlength,
1866                  XDMF_DSM_EXCHANGE_TAG,
1867                  0,
1868                  XDMF_DSM_INTER_COMM);
1869 
1870   this->SendData(this->GetStartServerId(),
1871                  (char *)&start,
1872                  sizeof(haddr_t),
1873                  XDMF_DSM_EXCHANGE_TAG,
1874                  0,
1875                  XDMF_DSM_INTER_COMM);
1876 
1877   this->SendData(this->GetStartServerId(),
1878                  (char *)&end,
1879                  sizeof(haddr_t),
1880                  XDMF_DSM_EXCHANGE_TAG,
1881                  0,
1882                  XDMF_DSM_INTER_COMM);
1883 
1884   this->SendAcknowledgment(this->GetStartServerId(),
1885                            numPages,
1886                            XDMF_DSM_EXCHANGE_TAG,
1887                            XDMF_DSM_INTER_COMM);
1888 
1889   if (numPages > 0)
1890   {
1891     this->SendData(this->GetStartServerId(),
1892                    (char *)pages,
1893                    numPages * sizeof(unsigned int),
1894                    XDMF_DSM_EXCHANGE_TAG,
1895                    0,
1896                    XDMF_DSM_INTER_COMM);
1897   }
1898 
1899   return XDMF_DSM_SUCCESS;
1900 }
1901 
1902 int
RequestFileDescription(char * name,std::vector<unsigned int> & pages,unsigned int & numPages,haddr_t & start,haddr_t & end)1903 XdmfDSMBuffer::RequestFileDescription(char * name, std::vector<unsigned int> & pages, unsigned int & numPages, haddr_t & start, haddr_t & end)
1904 {
1905   this->SendCommandHeader(XDMF_DSM_REQUEST_FILE,
1906                           this->GetStartServerId(),
1907                           0,
1908                           0,
1909                           XDMF_DSM_INTER_COMM);
1910 
1911   int strlength = std::string(name).size();
1912 
1913   this->SendAcknowledgment(this->GetStartServerId(),
1914                            strlength,
1915                            XDMF_DSM_EXCHANGE_TAG,
1916                            XDMF_DSM_INTER_COMM);
1917 
1918   this->SendData(this->GetStartServerId(),
1919                  name,
1920                  strlength,
1921                  XDMF_DSM_EXCHANGE_TAG,
1922                  0,
1923                  XDMF_DSM_INTER_COMM);
1924 
1925   int fileExists = XDMF_DSM_SUCCESS;
1926 
1927   this->ReceiveAcknowledgment(this->GetStartServerId(),
1928                               fileExists,
1929                               XDMF_DSM_EXCHANGE_TAG,
1930                               XDMF_DSM_INTER_COMM);
1931 
1932   if (fileExists == XDMF_DSM_SUCCESS)
1933   {
1934     this->ReceiveData(this->GetStartServerId(),
1935                       (char*)&start,
1936                       sizeof(haddr_t),
1937                       XDMF_DSM_EXCHANGE_TAG,
1938                       0,
1939                       XDMF_DSM_INTER_COMM);
1940 
1941     this->ReceiveData(this->GetStartServerId(),
1942                       (char*)&end,
1943                       sizeof(haddr_t),
1944                       XDMF_DSM_EXCHANGE_TAG,
1945                       0,
1946                       XDMF_DSM_INTER_COMM);
1947 
1948     int recvNumPages = 0;
1949 
1950     this->ReceiveAcknowledgment(this->GetStartServerId(),
1951                                 recvNumPages,
1952                                 XDMF_DSM_EXCHANGE_TAG,
1953                                 XDMF_DSM_INTER_COMM);
1954 
1955     numPages = recvNumPages;
1956 
1957     // Reallocate pointer
1958     pages.clear();
1959 
1960     unsigned int * pagelist = new unsigned int[numPages];
1961 
1962     this->ReceiveData(this->GetStartServerId(),
1963                       (char *)pagelist,
1964                       numPages * sizeof(unsigned int),
1965                       XDMF_DSM_EXCHANGE_TAG,
1966                       0,
1967                       XDMF_DSM_INTER_COMM);
1968 
1969     for (unsigned int i = 0; i < numPages; ++i)
1970     {
1971       pages.push_back(pagelist[i]);
1972     }
1973 
1974     return XDMF_DSM_SUCCESS;
1975   }
1976   else
1977   {
1978     return XDMF_DSM_FAIL;
1979   }
1980 }
1981 
1982 
1983 void
RequestPages(char * name,haddr_t spaceRequired,std::vector<unsigned int> & pages,unsigned int & numPages,haddr_t & start,haddr_t & end)1984 XdmfDSMBuffer::RequestPages(char * name,
1985                             haddr_t spaceRequired,
1986                             std::vector<unsigned int> & pages,
1987                             unsigned int & numPages,
1988                             haddr_t & start,
1989                             haddr_t & end)
1990 {
1991   this->SendCommandHeader(XDMF_DSM_REQUEST_PAGES,
1992                           this->GetStartServerId(),
1993                           0,
1994                           0,
1995                           XDMF_DSM_INTER_COMM);
1996 
1997   // set aside pages to a file
1998   int strlength = std::string(name).size();
1999 
2000   this->SendAcknowledgment(this->GetStartServerId(),
2001                            strlength,
2002                            XDMF_DSM_EXCHANGE_TAG,
2003                            XDMF_DSM_INTER_COMM);
2004 
2005   this->SendData(this->GetStartServerId(),
2006                  name,
2007                  strlength,
2008                  XDMF_DSM_EXCHANGE_TAG,
2009                  0,
2010                  XDMF_DSM_INTER_COMM);
2011 
2012   // Request size required for the file
2013   this->SendAcknowledgment(this->GetStartServerId(),
2014                            spaceRequired,
2015                            XDMF_DSM_EXCHANGE_TAG,
2016                            XDMF_DSM_INTER_COMM);
2017 
2018   // Send back new page allocation pointer
2019 
2020   int newPageCount = 0;
2021 
2022   this->ReceiveAcknowledgment(this->GetStartServerId(),
2023                               newPageCount,
2024                               XDMF_DSM_EXCHANGE_TAG,
2025                               XDMF_DSM_INTER_COMM);
2026 
2027   numPages = newPageCount;
2028 
2029   unsigned int * pagelist = new unsigned int[numPages]();
2030   pages.clear();
2031 
2032   this->ReceiveData(this->GetStartServerId(),
2033                     (char *) pagelist,
2034                     numPages * sizeof(unsigned int),
2035                     XDMF_DSM_EXCHANGE_TAG,
2036                     0,
2037                     XDMF_DSM_INTER_COMM);
2038 
2039   for (unsigned int i = 0; i < numPages; ++i)
2040   {
2041     pages.push_back(pagelist[i]);
2042   }
2043 
2044   // Recieve the new start and end addresses
2045   this->ReceiveData(this->GetStartServerId(),
2046                     (char*)&start,
2047                     sizeof(haddr_t),
2048                     XDMF_DSM_EXCHANGE_TAG,
2049                     0,
2050                     XDMF_DSM_INTER_COMM);
2051 
2052   this->ReceiveData(this->GetStartServerId(),
2053                     (char*)&end,
2054                     sizeof(haddr_t),
2055                     XDMF_DSM_EXCHANGE_TAG,
2056                     0,
2057                     XDMF_DSM_INTER_COMM);
2058 
2059   // If resized, set up the reset the total length.
2060   int currentLength = 0;
2061   this->ReceiveAcknowledgment(this->GetStartServerId(),
2062                               currentLength,
2063                               XDMF_DSM_EXCHANGE_TAG,
2064                               XDMF_DSM_INTER_COMM);
2065 
2066   this->UpdateLength(currentLength);
2067 }
2068 
2069 void
SendAccept(unsigned int numConnections)2070 XdmfDSMBuffer::SendAccept(unsigned int numConnections)
2071 {
2072 #ifndef XDMF_DSM_IS_CRAY
2073   for (int i = this->StartServerId; i <= this->EndServerId; ++i) {
2074     if (i != this->Comm->GetInterId()){
2075       this->SendCommandHeader(XDMF_DSM_ACCEPT, i, 0, 0, XDMF_DSM_INTER_COMM);
2076       this->SendAcknowledgment(i, numConnections, XDMF_DSM_EXCHANGE_TAG, XDMF_DSM_INTER_COMM);
2077     }
2078   }
2079   this->Comm->Accept(numConnections);
2080   this->SendInfo();
2081 #endif
2082 }
2083 
2084 void
SendAcknowledgment(int dest,int data,int tag,int comm)2085 XdmfDSMBuffer::SendAcknowledgment(int dest, int data, int tag, int comm)
2086 {
2087   int status;
2088   this->Comm->Send(&data,
2089                    sizeof(int),
2090                    dest,
2091                    comm,
2092                    tag);
2093   status = MPI_SUCCESS;
2094   if (status != MPI_SUCCESS) {
2095     try {
2096       XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
2097     }
2098     catch (XdmfError & e) {
2099       throw e;
2100     }
2101   }
2102 }
2103 
2104 void
SendCommandHeader(int opcode,int dest,int address,int aLength,int comm)2105 XdmfDSMBuffer::SendCommandHeader(int opcode, int dest, int address, int aLength, int comm)
2106 {
2107   int status;
2108   CommandMsg cmd;
2109   memset(&cmd, 0, sizeof(CommandMsg));
2110   cmd.Opcode = opcode;
2111   if (comm == XDMF_DSM_INTRA_COMM) {
2112     cmd.Source = this->Comm->GetId();
2113   }
2114   else if (comm == XDMF_DSM_INTER_COMM) {
2115     cmd.Source = this->Comm->GetInterId();
2116   }
2117   cmd.Target = dest;
2118   cmd.Address = address;
2119   cmd.Length = aLength;
2120 
2121   this->Comm->Send(&cmd,
2122                    sizeof(CommandMsg),
2123                    dest,
2124                    comm,
2125                    XDMF_DSM_COMMAND_TAG);
2126   status = MPI_SUCCESS;
2127   if (status != MPI_SUCCESS) {
2128     try {
2129       XdmfError::message(XdmfError::FATAL, "Error: Failed to send command header");
2130     }
2131     catch (XdmfError & e) {
2132       throw e;
2133     }
2134   }
2135 }
2136 
2137 void
SendData(int dest,char * data,int aLength,int tag,int aAddress,int comm)2138 XdmfDSMBuffer::SendData(int dest, char * data, int aLength, int tag, int aAddress, int comm)
2139 {
2140   int status;
2141 
2142   this->Comm->Send(data,
2143                    aLength,
2144                    dest,
2145                    comm,
2146                    tag);
2147   status = MPI_SUCCESS;
2148   if (status != MPI_SUCCESS) {
2149     try {
2150       XdmfError::message(XdmfError::FATAL, "Error: Failed to send data");
2151     }
2152     catch (XdmfError & e) {
2153       throw e;
2154     }
2155   }
2156 }
2157 
2158 void
SendDone()2159 XdmfDSMBuffer::SendDone()
2160 {
2161   try {
2162     if (static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm() == MPI_COMM_NULL)
2163     {
2164       for (int i = this->StartServerId; i <= this->EndServerId; ++i) {
2165         if (i != this->Comm->GetId()){
2166           this->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTRA_COMM);
2167         }
2168       }
2169     }
2170     else
2171     {
2172       for (int i = this->StartServerId; i <= this->EndServerId; ++i) {
2173         if (i != this->Comm->GetId()){
2174           this->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTER_COMM);
2175         }
2176       }
2177     }
2178   }
2179   catch (XdmfError & e) {
2180     throw e;
2181   }
2182 }
2183 
2184 void
SendInfo()2185 XdmfDSMBuffer::SendInfo()
2186 {
2187   InfoMsg  dsmInfo;
2188   int status;
2189 
2190   memset(&dsmInfo, 0, sizeof(InfoMsg));
2191   dsmInfo.type = this->GetDsmType();
2192   dsmInfo.length = this->GetLength();
2193   dsmInfo.total_length = this->GetTotalLength();
2194   dsmInfo.block_length = this->GetBlockLength();
2195   dsmInfo.start_server_id = this->GetStartServerId();
2196   dsmInfo.end_server_id = this->GetEndServerId();
2197 
2198   int infoStatus = 3;
2199   if (this->Comm->GetId() == 0) {
2200     infoStatus = 2;
2201   }
2202 
2203   int * groupInfoStatus = new int[this->Comm->GetInterSize()]();
2204 
2205   this->Comm->AllGather(&infoStatus,
2206                         sizeof(int),
2207                         &(groupInfoStatus[0]),
2208                         sizeof(int),
2209                         XDMF_DSM_INTER_COMM);
2210 
2211   int sendCore = 0;
2212 
2213   for (int i = 0; i < this->Comm->GetInterSize(); ++i) {
2214     if (groupInfoStatus[i] == 2) {
2215       sendCore = i;
2216     }
2217   }
2218 
2219   status = MPI_SUCCESS;
2220 
2221   this->Comm->Broadcast(&dsmInfo,
2222                         sizeof(InfoMsg),
2223                         sendCore,
2224                         XDMF_DSM_INTER_COMM);
2225   if (status != MPI_SUCCESS) {
2226     try {
2227       XdmfError::message(XdmfError::FATAL, "Error: Failed to send info");
2228     }
2229     catch (XdmfError & e) {
2230       throw e;
2231     }
2232   }
2233 
2234   MPI_Comm comm = this->Comm->GetInterComm();
2235 
2236   // Cray needs to be launched via the colon notation so that it
2237   // can properly create a merged communicator
2238 
2239   int rank = this->Comm->GetInterId();
2240   int size = this->Comm->GetInterSize();
2241 
2242   int currentCore = 0;
2243   int * checkstatus = new int[size]();
2244   int localCheck = 0;
2245   std::string applicationName = this->Comm->GetApplicationName();
2246 
2247   char * coreTag;
2248   int tagSize = 0;
2249 
2250   std::vector<int> coreSplit;
2251   unsigned int splitid = 0;
2252 
2253   std::vector<std::pair<std::string, unsigned int> > newStructure;
2254 
2255   int * splitIds;
2256   unsigned int splitsize = 0;
2257 
2258   while (currentCore < size)
2259   {
2260     if (rank == currentCore)
2261     {
2262       tagSize = applicationName.size();
2263     }
2264     MPI_Bcast(&tagSize, 1, MPI_INT, currentCore, comm);
2265     coreTag = new char[tagSize+1]();
2266 
2267     if (rank == currentCore)
2268     {
2269       strcpy(coreTag, applicationName.c_str());
2270     }
2271     MPI_Bcast(coreTag, tagSize, MPI_CHAR, currentCore, comm);
2272     coreTag[tagSize] = 0;
2273 
2274     if (strcmp(coreTag, applicationName.c_str()) == 0)
2275     {
2276       localCheck = 1;
2277     }
2278     else
2279     {
2280       localCheck = 0;
2281     }
2282 
2283     checkstatus[rank] = localCheck;
2284 
2285     MPI_Allgather(&localCheck, 1, MPI_INT,
2286                   checkstatus, 1, MPI_INT,
2287                   comm);
2288 
2289     bool insplit = false;
2290     while (checkstatus[currentCore])
2291     {
2292       if (rank == currentCore)
2293       {
2294         insplit = true;
2295       }
2296       coreSplit.push_back(currentCore);
2297       ++currentCore;
2298       if (currentCore >= size)
2299       {
2300         break;
2301       }
2302     }
2303     if (insplit)
2304     {
2305       splitIds = (int *)calloc(coreSplit.size(), sizeof(int));
2306       memcpy(splitIds, &(coreSplit[0]), coreSplit.size() * sizeof(int));
2307       splitsize = coreSplit.size();
2308     }
2309     newStructure.push_back(std::pair<std::string, unsigned int>(std::string(coreTag), coreSplit.size()));
2310     coreSplit.clear();
2311     ++splitid;
2312   }
2313   this->Comm->SetDsmProcessStructure(newStructure);
2314 }
2315 
2316 void
SetBlockLength(long newBlock)2317 XdmfDSMBuffer::SetBlockLength(long newBlock)
2318 {
2319   this->BlockLength = newBlock;
2320 }
2321 
2322 void
SetComm(XdmfDSMCommMPI * newComm)2323 XdmfDSMBuffer::SetComm(XdmfDSMCommMPI * newComm)
2324 {
2325   this->Comm = newComm;
2326 }
2327 
2328 void
SetDsmType(int newDsmType)2329 XdmfDSMBuffer::SetDsmType(int newDsmType)
2330 {
2331   this->DsmType = newDsmType;
2332 }
2333 
2334 void
SetInterCommType(int newType)2335 XdmfDSMBuffer::SetInterCommType(int newType)
2336 {
2337   this->InterCommType = newType;
2338 }
2339 
2340 void
SetIsConnected(bool newStatus)2341 XdmfDSMBuffer::SetIsConnected(bool newStatus)
2342 {
2343   IsConnected =  newStatus;
2344 }
2345 
2346 void
SetIsServer(bool newIsServer)2347 XdmfDSMBuffer::SetIsServer(bool newIsServer)
2348 {
2349   this->IsServer = newIsServer;
2350 }
2351 
2352 void
SetResizeFactor(double newFactor)2353 XdmfDSMBuffer::SetResizeFactor(double newFactor)
2354 {
2355   if (newFactor >= 0)
2356   {
2357     this->ResizeFactor = newFactor;
2358   }
2359   else
2360   {
2361     this->ResizeFactor = newFactor * -1;
2362   }
2363 }
2364 
2365 void
SetLength(long aLength)2366 XdmfDSMBuffer::SetLength(long aLength)
2367 {
2368   this->Length = aLength;
2369   if (this->DataPointer) {
2370     // Try to reallocate
2371     // This should not be called in most cases
2372     this->DataPointer =
2373       static_cast<char *>(realloc(this->DataPointer, this->Length*sizeof(char)));
2374   }
2375   else {
2376 #ifdef _WIN32
2377     this->DataPointer = calloc(this->Length, sizeof(char));
2378 #else
2379   #ifdef _SC_PAGESIZE
2380     posix_memalign((void **)(&this->DataPointer), sysconf(_SC_PAGESIZE), this->Length);
2381     memset(this->DataPointer, 0, this->Length);
2382   #else
2383     // Older linux variation, for backwards compatibility
2384     posix_memalign((void **)(&this->DataPointer), getpagesize(), this->Length);
2385     memset(this->DataPointer, 0, this->Length);
2386   #endif
2387 #endif
2388   }
2389 
2390   if (this->DataPointer == NULL) {
2391     std::stringstream message;
2392     message << "Allocation Failed, unable to allocate " << this->Length;
2393     XdmfError::message(XdmfError::FATAL, message.str());
2394   }
2395 
2396   if (this->BlockLength > 0)
2397   {
2398     this->NumPages = this->Length / this->BlockLength;
2399   }
2400 }
2401 
2402 void
SetLocalBufferSizeMBytes(unsigned int newSize)2403 XdmfDSMBuffer::SetLocalBufferSizeMBytes(unsigned int newSize)
2404 {
2405   this->LocalBufferSizeMBytes = newSize;
2406 }
2407 
2408 void
Unlock(char * filename)2409 XdmfDSMBuffer::Unlock(char * filename)
2410 {
2411   int strlength = std::string(filename).size();
2412   this->SendCommandHeader(XDMF_DSM_UNLOCK_FILE,
2413                           this->GetStartServerId(),
2414                           0,
2415                           0,
2416                           XDMF_DSM_INTER_COMM);
2417 
2418   this->SendAcknowledgment(this->GetStartServerId(),
2419                            strlength,
2420                            XDMF_DSM_EXCHANGE_TAG,
2421                            XDMF_DSM_INTER_COMM);
2422 
2423   this->SendData(this->GetStartServerId(),
2424                  filename,
2425                  strlength,
2426                  XDMF_DSM_EXCHANGE_TAG,
2427                  0,
2428                  XDMF_DSM_INTER_COMM);
2429 }
2430 
2431 void
UpdateLength(unsigned int newLength)2432 XdmfDSMBuffer::UpdateLength(unsigned int newLength)
2433 {
2434   this->Length = newLength;
2435   this->TotalLength = this->Length * (this->EndServerId - this->StartServerId);
2436 }
2437 
2438 void
WaitRelease(std::string filename,std::string datasetname,int code)2439 XdmfDSMBuffer::WaitRelease(std::string filename, std::string datasetname, int code)
2440 {
2441   // Send Command Header
2442   this->SendCommandHeader(XDMF_DSM_CLEAR_NOTIFY,
2443                           this->GetStartServerId(),
2444                           0,
2445                           0,
2446                           XDMF_DSM_INTER_COMM);
2447   // Send string size
2448   this->SendAcknowledgment(this->GetStartServerId(),
2449                            filename.size() + datasetname.size(),
2450                            XDMF_DSM_EXCHANGE_TAG,
2451                            XDMF_DSM_INTER_COMM);
2452   // Send string
2453   char * sendPointer = new char[filename.size() + datasetname.size()]();
2454   unsigned int placementIndex = 0;
2455   for (unsigned int i = 0; i < filename.size(); ++i)
2456   {
2457     sendPointer[placementIndex++] = filename[i];
2458   }
2459   for (unsigned int i = 0; i < datasetname.size(); ++i)
2460   {
2461     sendPointer[placementIndex++] = datasetname[i];
2462   }
2463   this->SendData(this->GetStartServerId(),
2464                  sendPointer,
2465                  filename.size() + datasetname.size(),
2466                  XDMF_DSM_EXCHANGE_TAG,
2467                  0,
2468                  XDMF_DSM_INTER_COMM);
2469   delete sendPointer;
2470   // Send Release Code
2471   this->SendAcknowledgment(this->GetStartServerId(),
2472                            code,
2473                            XDMF_DSM_EXCHANGE_TAG,
2474                            XDMF_DSM_INTER_COMM);
2475 }
2476 
2477 int
WaitOn(std::string filename,std::string datasetname)2478 XdmfDSMBuffer::WaitOn(std::string filename, std::string datasetname)
2479 {
2480   // Send Command Header
2481   this->SendCommandHeader(XDMF_DSM_SET_NOTIFY,
2482                           this->GetStartServerId(),
2483                           0,
2484                           0,
2485                           XDMF_DSM_INTER_COMM);
2486   // Send string size
2487   this->SendAcknowledgment(this->GetStartServerId(),
2488                            filename.size() + datasetname.size(),
2489                            XDMF_DSM_EXCHANGE_TAG,
2490                            XDMF_DSM_INTER_COMM);
2491   // Send string
2492   char * sendPointer = new char[filename.size() + datasetname.size()]();
2493   unsigned int placementIndex = 0;
2494   for (unsigned int i = 0; i < filename.size(); ++i)
2495   {
2496     sendPointer[placementIndex++] = filename[i];
2497   }
2498   for (unsigned int i = 0; i < datasetname.size(); ++i)
2499   {
2500     sendPointer[placementIndex++] = datasetname[i];
2501   }
2502   this->SendData(this->GetStartServerId(),
2503                  sendPointer,
2504                  filename.size() + datasetname.size(),
2505                  XDMF_DSM_EXCHANGE_TAG,
2506                  0,
2507                  XDMF_DSM_INTER_COMM);
2508 
2509   // Wait for Release
2510   int code = 0;
2511   this->ReceiveAcknowledgment(MPI_ANY_SOURCE,
2512                               code,
2513                               XDMF_DSM_EXCHANGE_TAG,
2514                               this->CommChannel);
2515   delete sendPointer;
2516   // Return Code from Notification
2517   return code;
2518 }
2519 
2520 // C Wrappers
2521 
XdmfDSMBufferNew()2522 XDMFDSMBUFFER * XdmfDSMBufferNew()
2523 {
2524   try
2525   {
2526     return (XDMFDSMBUFFER *)((void *)(new XdmfDSMBuffer()));
2527   }
2528   catch (...)
2529   {
2530     return (XDMFDSMBUFFER *)((void *)(new XdmfDSMBuffer()));
2531   }
2532 }
2533 
XdmfDSMBufferFree(XDMFDSMBUFFER * item)2534 void XdmfDSMBufferFree(XDMFDSMBUFFER * item)
2535 {
2536   if (item != NULL) {
2537     delete ((XdmfDSMBuffer *)item);
2538   }
2539   item = NULL;
2540 }
2541 
XdmfDSMBufferBroadcastComm(XDMFDSMBUFFER * buffer,int * comm,int root,int * status)2542 void XdmfDSMBufferBroadcastComm(XDMFDSMBUFFER * buffer, int *comm, int root, int * status)
2543 {
2544   XDMF_ERROR_WRAP_START(status)
2545   ((XdmfDSMBuffer *)buffer)->BroadcastComm(comm, root);
2546   XDMF_ERROR_WRAP_END(status)
2547 }
2548 
XdmfDSMBufferBufferService(XDMFDSMBUFFER * buffer,int * returnOpcode,int * status)2549 int XdmfDSMBufferBufferService(XDMFDSMBUFFER * buffer, int *returnOpcode, int * status)
2550 {
2551   XDMF_ERROR_WRAP_START(status)
2552   return ((XdmfDSMBuffer *)buffer)->BufferService(returnOpcode);
2553   XDMF_ERROR_WRAP_END(status)
2554   return -1;
2555 }
2556 
XdmfDSMBufferBufferServiceLoop(XDMFDSMBUFFER * buffer,int * returnOpcode,int * status)2557 void XdmfDSMBufferBufferServiceLoop(XDMFDSMBUFFER * buffer, int *returnOpcode, int * status)
2558 {
2559   XDMF_ERROR_WRAP_START(status)
2560   ((XdmfDSMBuffer *)buffer)->BufferServiceLoop(returnOpcode);
2561   XDMF_ERROR_WRAP_END(status)
2562 }
2563 
XdmfDSMBufferConfigureUniform(XDMFDSMBUFFER * buffer,XDMFDSMCOMMMPI * Comm,long Length,int StartId,int EndId,long aBlockLength,int random,int * status)2564 void XdmfDSMBufferConfigureUniform(XDMFDSMBUFFER * buffer,
2565                                    XDMFDSMCOMMMPI * Comm,
2566                                    long Length,
2567                                    int StartId,
2568                                    int EndId,
2569                                    long aBlockLength,
2570                                    int random,
2571                                    int * status)
2572 {
2573   XDMF_ERROR_WRAP_START(status)
2574   ((XdmfDSMBuffer *)buffer)->ConfigureUniform((XdmfDSMCommMPI *)Comm, Length, StartId, EndId, aBlockLength, random);
2575   XDMF_ERROR_WRAP_END(status)
2576 }
2577 
XdmfDSMBufferConnect(XDMFDSMBUFFER * buffer,int persist,int * status)2578 void XdmfDSMBufferConnect(XDMFDSMBUFFER * buffer, int persist, int * status)
2579 {
2580   XDMF_ERROR_WRAP_START(status)
2581   ((XdmfDSMBuffer *)buffer)->Connect(persist);
2582   XDMF_ERROR_WRAP_END(status)
2583 }
2584 
XdmfDSMBufferCreate(XDMFDSMBUFFER * buffer,MPI_Comm comm,int startId,int endId,int * status)2585 void XdmfDSMBufferCreate(XDMFDSMBUFFER * buffer, MPI_Comm comm, int startId, int endId, int * status)
2586 {
2587   XDMF_ERROR_WRAP_START(status)
2588   ((XdmfDSMBuffer *)buffer)->Create(comm, startId, endId);
2589   XDMF_ERROR_WRAP_END(status)
2590 }
2591 
XdmfDSMBufferDisconnect(XDMFDSMBUFFER * buffer,int * status)2592 void XdmfDSMBufferDisconnect(XDMFDSMBUFFER * buffer, int * status)
2593 {
2594   XDMF_ERROR_WRAP_START(status)
2595   ((XdmfDSMBuffer *)buffer)->Disconnect();
2596   XDMF_ERROR_WRAP_END(status)
2597 }
2598 
XdmfDSMBufferGet(XDMFDSMBUFFER * buffer,long Address,long aLength,void * Data,int * status)2599 void XdmfDSMBufferGet(XDMFDSMBUFFER * buffer, long Address, long aLength, void * Data, int * status)
2600 {
2601   XDMF_ERROR_WRAP_START(status)
2602   ((XdmfDSMBuffer *)buffer)->Get(Address, aLength, Data);
2603   XDMF_ERROR_WRAP_END(status)
2604 }
2605 
XdmfDSMBufferGetAddressRangeForId(XDMFDSMBUFFER * buffer,int Id,int * Start,int * End,int * status)2606 void XdmfDSMBufferGetAddressRangeForId(XDMFDSMBUFFER * buffer, int Id, int * Start, int * End, int * status)
2607 {
2608   XDMF_ERROR_WRAP_START(status)
2609   ((XdmfDSMBuffer *)buffer)->GetAddressRangeForId(Id, Start, End);
2610   XDMF_ERROR_WRAP_END(status)
2611 }
2612 
XdmfDSMBufferGetBlockLength(XDMFDSMBUFFER * buffer)2613 long XdmfDSMBufferGetBlockLength(XDMFDSMBUFFER * buffer)
2614 {
2615   try
2616   {
2617     return ((XdmfDSMBuffer *)buffer)->GetBlockLength();
2618   }
2619   catch (...)
2620   {
2621     return ((XdmfDSMBuffer *)buffer)->GetBlockLength();
2622   }
2623 }
2624 
XdmfDSMBufferGetComm(XDMFDSMBUFFER * buffer)2625 XDMFDSMCOMMMPI * XdmfDSMBufferGetComm(XDMFDSMBUFFER * buffer)
2626 {
2627   try
2628   {
2629     return (XDMFDSMCOMMMPI *)((void *)(((XdmfDSMBuffer *)buffer)->GetComm()));
2630   }
2631   catch (...)
2632   {
2633     return (XDMFDSMCOMMMPI *)((void *)(((XdmfDSMBuffer *)buffer)->GetComm()));
2634   }
2635 }
2636 
XdmfDSMBufferGetDataPointer(XDMFDSMBUFFER * buffer)2637 char * XdmfDSMBufferGetDataPointer(XDMFDSMBUFFER * buffer)
2638 {
2639   try
2640   {
2641     return ((XdmfDSMBuffer *)buffer)->GetDataPointer();
2642   }
2643   catch (...)
2644   {
2645     return ((XdmfDSMBuffer *)buffer)->GetDataPointer();
2646   }
2647 }
2648 
XdmfDSMBufferGetDsmType(XDMFDSMBUFFER * buffer)2649 int XdmfDSMBufferGetDsmType(XDMFDSMBUFFER * buffer)
2650 {
2651   try
2652   {
2653     return ((XdmfDSMBuffer *)buffer)->GetDsmType();
2654   }
2655   catch (...)
2656   {
2657     return ((XdmfDSMBuffer *)buffer)->GetDsmType();
2658   }
2659 }
2660 
XdmfDSMBufferGetEndAddress(XDMFDSMBUFFER * buffer)2661 int XdmfDSMBufferGetEndAddress(XDMFDSMBUFFER * buffer)
2662 {
2663   try
2664   {
2665     return ((XdmfDSMBuffer *)buffer)->GetEndAddress();
2666   }
2667   catch (...)
2668   {
2669     return ((XdmfDSMBuffer *)buffer)->GetEndAddress();
2670   }
2671 }
2672 
XdmfDSMBufferGetEndServerId(XDMFDSMBUFFER * buffer)2673 int XdmfDSMBufferGetEndServerId(XDMFDSMBUFFER * buffer)
2674 {
2675   try
2676   {
2677     return ((XdmfDSMBuffer *)buffer)->GetEndServerId();
2678   }
2679   catch (...)
2680   {
2681     return ((XdmfDSMBuffer *)buffer)->GetEndServerId();
2682   }
2683 }
2684 
XdmfDSMBufferGetInterCommType(XDMFDSMBUFFER * buffer)2685 int XdmfDSMBufferGetInterCommType(XDMFDSMBUFFER * buffer)
2686 {
2687   try
2688   {
2689     return ((XdmfDSMBuffer *)buffer)->GetInterCommType();
2690   }
2691   catch (...)
2692   {
2693     return ((XdmfDSMBuffer *)buffer)->GetInterCommType();
2694   }
2695 }
2696 
XdmfDSMBufferGetIsConnected(XDMFDSMBUFFER * buffer)2697 int XdmfDSMBufferGetIsConnected(XDMFDSMBUFFER * buffer)
2698 {
2699   try
2700   {
2701     return ((XdmfDSMBuffer *)buffer)->GetIsConnected();
2702   }
2703   catch (...)
2704   {
2705     return ((XdmfDSMBuffer *)buffer)->GetIsConnected();
2706   }
2707 }
2708 
XdmfDSMBufferGetIsServer(XDMFDSMBUFFER * buffer)2709 int XdmfDSMBufferGetIsServer(XDMFDSMBUFFER * buffer)
2710 {
2711   try
2712   {
2713     return ((XdmfDSMBuffer *)buffer)->GetIsServer();
2714   }
2715   catch (...)
2716   {
2717     return ((XdmfDSMBuffer *)buffer)->GetIsServer();
2718   }
2719 }
2720 
XdmfDSMBufferGetLength(XDMFDSMBUFFER * buffer)2721 long XdmfDSMBufferGetLength(XDMFDSMBUFFER * buffer)
2722 {
2723   try
2724   {
2725     return ((XdmfDSMBuffer *)buffer)->GetLength();
2726   }
2727   catch (...)
2728   {
2729     return ((XdmfDSMBuffer *)buffer)->GetLength();
2730   }
2731 }
2732 
XdmfDSMBufferGetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer)2733 unsigned int XdmfDSMBufferGetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer)
2734 {
2735   try
2736   {
2737     return ((XdmfDSMBuffer *)buffer)->GetLocalBufferSizeMBytes();
2738   }
2739   catch (...)
2740   {
2741     return ((XdmfDSMBuffer *)buffer)->GetLocalBufferSizeMBytes();
2742   }
2743 }
2744 
XdmfDSMBufferGetResizeFactor(XDMFDSMBUFFER * buffer)2745 double XdmfDSMBufferGetResizeFactor(XDMFDSMBUFFER * buffer)
2746 {
2747   try
2748   {
2749     return ((XdmfDSMBuffer *)buffer)->GetResizeFactor();
2750   }
2751   catch (...)
2752   {
2753     return ((XdmfDSMBuffer *)buffer)->GetResizeFactor();
2754   }
2755 }
2756 
XdmfDSMBufferGetStartAddress(XDMFDSMBUFFER * buffer)2757 int XdmfDSMBufferGetStartAddress(XDMFDSMBUFFER * buffer)
2758 {
2759   try
2760   {
2761     return ((XdmfDSMBuffer *)buffer)->GetStartAddress();
2762   }
2763   catch (...)
2764   {
2765     return ((XdmfDSMBuffer *)buffer)->GetStartAddress();
2766   }
2767 }
2768 
XdmfDSMBufferGetStartServerId(XDMFDSMBUFFER * buffer)2769 int XdmfDSMBufferGetStartServerId(XDMFDSMBUFFER * buffer)
2770 {
2771   try
2772   {
2773     return ((XdmfDSMBuffer *)buffer)->GetStartServerId();
2774   }
2775   catch (...)
2776   {
2777     return ((XdmfDSMBuffer *)buffer)->GetStartServerId();
2778   }
2779 }
2780 
XdmfDSMBufferGetTotalLength(XDMFDSMBUFFER * buffer)2781 long XdmfDSMBufferGetTotalLength(XDMFDSMBUFFER * buffer)
2782 {
2783   try
2784   {
2785     return ((XdmfDSMBuffer *)buffer)->GetTotalLength();
2786   }
2787   catch (...)
2788   {
2789     return ((XdmfDSMBuffer *)buffer)->GetTotalLength();
2790   }
2791 }
2792 
XdmfDSMBufferProbeCommandHeader(XDMFDSMBUFFER * buffer,int * comm,int * status)2793 void XdmfDSMBufferProbeCommandHeader(XDMFDSMBUFFER * buffer, int * comm, int * status)
2794 {
2795   XDMF_ERROR_WRAP_START(status)
2796   ((XdmfDSMBuffer *)buffer)->ProbeCommandHeader(comm);
2797   XDMF_ERROR_WRAP_END(status)
2798 }
2799 
XdmfDSMBufferPut(XDMFDSMBUFFER * buffer,long Address,long aLength,void * Data,int * status)2800 void XdmfDSMBufferPut(XDMFDSMBUFFER * buffer, long Address, long aLength, void * Data, int * status)
2801 {
2802   XDMF_ERROR_WRAP_START(status)
2803   ((XdmfDSMBuffer *)buffer)->Put(Address, aLength, Data);
2804   XDMF_ERROR_WRAP_END(status)
2805 }
2806 
XdmfDSMBufferReceiveAcknowledgment(XDMFDSMBUFFER * buffer,int source,int * data,int tag,int comm,int * status)2807 void XdmfDSMBufferReceiveAcknowledgment(XDMFDSMBUFFER * buffer,
2808                                         int source,
2809                                         int * data,
2810                                         int tag,
2811                                         int comm,
2812                                         int * status)
2813 {
2814   XDMF_ERROR_WRAP_START(status)
2815   ((XdmfDSMBuffer *)buffer)->ReceiveAcknowledgment(source, *data, tag, comm);
2816   XDMF_ERROR_WRAP_END(status)
2817 }
2818 
XdmfDSMBufferReceiveCommandHeader(XDMFDSMBUFFER * buffer,int * opcode,int * source,int * address,int * aLength,int comm,int remoteSource,int * status)2819 void XdmfDSMBufferReceiveCommandHeader(XDMFDSMBUFFER * buffer,
2820                                        int * opcode,
2821                                        int * source,
2822                                        int * address,
2823                                        int * aLength,
2824                                        int comm,
2825                                        int remoteSource,
2826                                        int * status)
2827 {
2828   XDMF_ERROR_WRAP_START(status)
2829   ((XdmfDSMBuffer *)buffer)->ReceiveCommandHeader(opcode, source, address, aLength, comm, remoteSource);
2830   XDMF_ERROR_WRAP_END(status)
2831 }
2832 
XdmfDSMBufferReceiveData(XDMFDSMBUFFER * buffer,int source,char * data,int aLength,int tag,int aAddress,int comm,int * status)2833 void XdmfDSMBufferReceiveData(XDMFDSMBUFFER * buffer,
2834                               int source,
2835                               char * data,
2836                               int aLength,
2837                               int tag,
2838                               int aAddress,
2839                               int comm,
2840                               int * status)
2841 {
2842   XDMF_ERROR_WRAP_START(status)
2843   ((XdmfDSMBuffer *)buffer)->ReceiveData(source, data, aLength, tag, aAddress, comm);
2844   XDMF_ERROR_WRAP_END(status)
2845 }
2846 
XdmfDSMBufferReceiveInfo(XDMFDSMBUFFER * buffer,int * status)2847 void XdmfDSMBufferReceiveInfo(XDMFDSMBUFFER * buffer, int * status)
2848 {
2849   XDMF_ERROR_WRAP_START(status)
2850   ((XdmfDSMBuffer *)buffer)->ReceiveInfo();
2851   XDMF_ERROR_WRAP_END(status)
2852 }
2853 
XdmfDSMBufferSendAccept(XDMFDSMBUFFER * buffer,unsigned int numConnects)2854 void XdmfDSMBufferSendAccept(XDMFDSMBUFFER * buffer, unsigned int numConnects)
2855 {
2856   ((XdmfDSMBuffer *)buffer)->SendAccept(numConnects);
2857 }
2858 
XdmfDSMBufferSendAcknowledgment(XDMFDSMBUFFER * buffer,int dest,int data,int tag,int comm,int * status)2859 void XdmfDSMBufferSendAcknowledgment(XDMFDSMBUFFER * buffer,
2860                                      int dest,
2861                                      int data,
2862                                      int tag,
2863                                      int comm,
2864                                      int * status)
2865 {
2866   XDMF_ERROR_WRAP_START(status)
2867   ((XdmfDSMBuffer *)buffer)->SendAcknowledgment(dest, data, tag, comm);
2868   XDMF_ERROR_WRAP_END(status)
2869 }
2870 
XdmfDSMBufferSendCommandHeader(XDMFDSMBUFFER * buffer,int opcode,int dest,int address,int aLength,int comm,int * status)2871 void XdmfDSMBufferSendCommandHeader(XDMFDSMBUFFER * buffer,
2872                                     int opcode,
2873                                     int dest,
2874                                     int address,
2875                                     int aLength,
2876                                     int comm,
2877                                     int * status)
2878 {
2879   XDMF_ERROR_WRAP_START(status)
2880   ((XdmfDSMBuffer *)buffer)->SendCommandHeader(opcode, dest, address, aLength, comm);
2881   XDMF_ERROR_WRAP_END(status)
2882 }
2883 
XdmfDSMBufferSendData(XDMFDSMBUFFER * buffer,int dest,char * data,int aLength,int tag,int aAddress,int comm,int * status)2884 void XdmfDSMBufferSendData(XDMFDSMBUFFER * buffer,
2885                            int dest,
2886                            char * data,
2887                            int aLength,
2888                            int tag,
2889                            int aAddress,
2890                            int comm,
2891                            int * status)
2892 {
2893   XDMF_ERROR_WRAP_START(status)
2894   ((XdmfDSMBuffer *)buffer)->SendData(dest, data, aLength, tag, aAddress, comm);
2895   XDMF_ERROR_WRAP_END(status)
2896 }
2897 
XdmfDSMBufferSendDone(XDMFDSMBUFFER * buffer,int * status)2898 void XdmfDSMBufferSendDone(XDMFDSMBUFFER * buffer, int * status)
2899 {
2900   XDMF_ERROR_WRAP_START(status)
2901   ((XdmfDSMBuffer *)buffer)->SendDone();
2902   XDMF_ERROR_WRAP_END(status)
2903 }
2904 
XdmfDSMBufferSendInfo(XDMFDSMBUFFER * buffer,int * status)2905 void XdmfDSMBufferSendInfo(XDMFDSMBUFFER * buffer, int * status)
2906 {
2907   XDMF_ERROR_WRAP_START(status)
2908   ((XdmfDSMBuffer *)buffer)->SendInfo();
2909   XDMF_ERROR_WRAP_END(status)
2910 }
2911 
XdmfDSMBufferSetBlockLength(XDMFDSMBUFFER * buffer,long newBlock)2912 void XdmfDSMBufferSetBlockLength(XDMFDSMBUFFER * buffer, long newBlock)
2913 {
2914   try
2915   {
2916     ((XdmfDSMBuffer *)buffer)->SetBlockLength(newBlock);
2917   }
2918   catch (...)
2919   {
2920     ((XdmfDSMBuffer *)buffer)->SetBlockLength(newBlock);
2921   }
2922 }
2923 
XdmfDSMBufferSetComm(XDMFDSMBUFFER * buffer,XDMFDSMCOMMMPI * newComm)2924 void XdmfDSMBufferSetComm(XDMFDSMBUFFER * buffer, XDMFDSMCOMMMPI * newComm)
2925 {
2926   try
2927   {
2928     ((XdmfDSMBuffer *)buffer)->SetComm((XdmfDSMCommMPI *)newComm);
2929   }
2930   catch (...)
2931   {
2932     ((XdmfDSMBuffer *)buffer)->SetComm((XdmfDSMCommMPI *)newComm);
2933   }
2934 }
2935 
XdmfDSMBufferSetDsmType(XDMFDSMBUFFER * buffer,int newDsmType)2936 void XdmfDSMBufferSetDsmType(XDMFDSMBUFFER * buffer, int newDsmType)
2937 {
2938   try
2939   {
2940     ((XdmfDSMBuffer *)buffer)->SetDsmType(newDsmType);
2941   }
2942   catch (...)
2943   {
2944     ((XdmfDSMBuffer *)buffer)->SetDsmType(newDsmType);
2945   }
2946 }
2947 
XdmfDSMBufferSetInterCommType(XDMFDSMBUFFER * buffer,int newType)2948 void XdmfDSMBufferSetInterCommType(XDMFDSMBUFFER * buffer, int newType)
2949 {
2950   try
2951   {
2952     ((XdmfDSMBuffer *)buffer)->SetInterCommType(newType);
2953   }
2954   catch (...)
2955   {
2956     ((XdmfDSMBuffer *)buffer)->SetInterCommType(newType);
2957   }
2958 }
2959 
XdmfDSMBufferSetIsConnected(XDMFDSMBUFFER * buffer,int newStatus)2960 void XdmfDSMBufferSetIsConnected(XDMFDSMBUFFER * buffer, int newStatus)
2961 {
2962   try
2963   {
2964     ((XdmfDSMBuffer *)buffer)->SetIsConnected(newStatus);
2965   }
2966   catch (...)
2967   {
2968     ((XdmfDSMBuffer *)buffer)->SetIsConnected(newStatus);
2969   }
2970 }
2971 
XdmfDSMBufferSetIsServer(XDMFDSMBUFFER * buffer,int newIsServer)2972 void XdmfDSMBufferSetIsServer(XDMFDSMBUFFER * buffer, int newIsServer)
2973 {
2974   try
2975   {
2976     ((XdmfDSMBuffer *)buffer)->SetIsServer(newIsServer);
2977   }
2978   catch (...)
2979   {
2980     ((XdmfDSMBuffer *)buffer)->SetIsServer(newIsServer);
2981   }
2982 }
2983 
XdmfDSMBufferSetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer,unsigned int newSize)2984 void XdmfDSMBufferSetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer, unsigned int newSize)
2985 {
2986   try
2987   {
2988     ((XdmfDSMBuffer *)buffer)->SetLocalBufferSizeMBytes(newSize);
2989   }
2990   catch (...)
2991   {
2992     ((XdmfDSMBuffer *)buffer)->SetLocalBufferSizeMBytes(newSize);
2993   }
2994 }
2995 
XdmfDSMBufferSetResizeFactor(XDMFDSMBUFFER * buffer,double newFactor)2996 void XdmfDSMBufferSetResizeFactor(XDMFDSMBUFFER * buffer, double newFactor)
2997 {
2998   try
2999   {
3000     ((XdmfDSMBuffer *)buffer)->SetResizeFactor(newFactor);
3001   }
3002   catch (...)
3003   {
3004     ((XdmfDSMBuffer *)buffer)->SetResizeFactor(newFactor);
3005   }
3006 }
3007 
XdmfDSMBufferWaitRelease(XDMFDSMBUFFER * buffer,char * filename,char * datasetname,int code)3008 void XdmfDSMBufferWaitRelease(XDMFDSMBUFFER * buffer, char * filename, char * datasetname, int code)
3009 {
3010   try
3011   {
3012     ((XdmfDSMBuffer *)buffer)->WaitRelease(std::string(filename), std::string(datasetname), code);
3013   }
3014   catch (...)
3015   {
3016     ((XdmfDSMBuffer *)buffer)->WaitRelease(std::string(filename), std::string(datasetname), code);
3017   }
3018 }
3019 
XdmfDSMBufferWaitOn(XDMFDSMBUFFER * buffer,char * filename,char * datasetname)3020 int XdmfDSMBufferWaitOn(XDMFDSMBUFFER * buffer, char * filename, char * datasetname)
3021 {
3022   try
3023   {
3024     return ((XdmfDSMBuffer *)buffer)->WaitOn(std::string(filename), std::string(datasetname));
3025   }
3026   catch (...)
3027   {
3028     return ((XdmfDSMBuffer *)buffer)->WaitOn(std::string(filename), std::string(datasetname));
3029   }
3030 }
3031