1 /*****************************************************************************/
2 /* XDMF */
3 /* eXtensible Data Model and Format */
4 /* */
5 /* Id : XdmfDSMBuffer.hpp */
6 /* */
7 /* Author: */
8 /* Andrew Burns */
9 /* andrew.j.burns2@us.army.mil */
10 /* US Army Research Laboratory */
11 /* Aberdeen Proving Ground, MD */
12 /* */
13 /* Copyright @ 2013 US Army Research Laboratory */
14 /* All Rights Reserved */
15 /* See Copyright.txt for details */
16 /* */
17 /* This software is distributed WITHOUT ANY WARRANTY; without */
18 /* even the implied warranty of MERCHANTABILITY or FITNESS */
19 /* FOR A PARTICULAR PURPOSE. See the above copyright notice */
20 /* for more information. */
21 /* */
22 /*****************************************************************************/
23
24 /*=========================================================================
25 This code is derived from an earlier work and is distributed
26 with permission from, and thanks to ...
27 =========================================================================*/
28
29 /*============================================================================
30
31 Project : H5FDdsm
32 Module : H5FDdsmBufferService.cxx H5FDdsmBuffer.cxx
33
34 Authors:
35 John Biddiscombe Jerome Soumagne
36 biddisco@cscs.ch soumagne@cscs.ch
37
38 Copyright (C) CSCS - Swiss National Supercomputing Centre.
39 You may use modify and and distribute this code freely providing
40 1) This copyright notice appears on all copies of source code
41 2) An acknowledgment appears with any substantial usage of the code
42 3) If this code is contributed to any other open source project, it
43 must not be reformatted such that the indentation, bracketing or
44 overall style is modified significantly.
45
46 This software is distributed WITHOUT ANY WARRANTY; without even the
47 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
48
49 This work has received funding from the European Community's Seventh
50 Framework Programme (FP7/2007-2013) under grant agreement 225967 âxtMuSEâOC
51
52 ============================================================================*/
53
54 #include <XdmfDSMBuffer.hpp>
55 #include <XdmfDSMCommMPI.hpp>
56 #include <XdmfError.hpp>
57 #include <mpi.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <algorithm>
61
62 #ifndef _WIN32
63 #include <unistd.h>
64 #endif
65
XdmfDSMBuffer()66 XdmfDSMBuffer::XdmfDSMBuffer()
67 {
68 this->CommChannel = XDMF_DSM_INTER_COMM;
69 this->DsmType = XDMF_DSM_TYPE_UNIFORM;
70 this->IsServer = true;
71 this->StartAddress = this->EndAddress = 0;
72 this->StartServerId = this->EndServerId = -1;
73 this->LocalBufferSizeMBytes = 128;
74 this->Length = 0;
75 this->TotalLength = 0;
76 this->BlockLength = XDMF_DSM_DEFAULT_BLOCK_LENGTH;
77 this->NumPages = 0;
78 this->PagesAssigned = 0;
79 this->Comm = NULL;
80 this->DataPointer = NULL;
81 this->InterCommType = XDMF_DSM_COMM_MPI;
82 this->IsConnected = false;
83 this->ResizeFactor = 1;
84 }
85
~XdmfDSMBuffer()86 XdmfDSMBuffer::~XdmfDSMBuffer()
87 {
88 if (this->DataPointer) {
89 free(this->DataPointer);
90 }
91 this->DataPointer = NULL;
92 }
93
94 class XdmfDSMBuffer::CommandMsg
95 {
96 public:
97 int Opcode;
98 int Source;
99 int Target;
100 int Address;
101 int Length;
102 };
103
104 class XdmfDSMBuffer::InfoMsg
105 {
106 public:
107 int type;
108 unsigned int length;
109 unsigned int total_length;
110 unsigned int block_length;
111 int start_server_id;
112 int end_server_id;
113 };
114
115 int
AddressToId(int Address)116 XdmfDSMBuffer::AddressToId(int Address)
117 {
118 int ServerId = XDMF_DSM_FAIL;
119
120 switch(this->DsmType) {
121 case XDMF_DSM_TYPE_UNIFORM :
122 case XDMF_DSM_TYPE_UNIFORM_RANGE :
123 // Block based allocation should use PageToId
124 // All Servers have same length
125 // This finds out which server the address provided starts on
126 ServerId = this->StartServerId + (Address / this->Length);
127 if(ServerId > this->EndServerId ){
128 try {
129 std::stringstream message;
130 message << "ServerId " << ServerId << " for Address "
131 << Address << " is larger than EndServerId "
132 << this->EndServerId;
133 XdmfError::message(XdmfError::FATAL, message.str());
134 }
135 catch (XdmfError & e) {
136 throw e;
137 }
138 }
139 break;
140 default :
141 // Not Implemented
142 try {
143 std::stringstream message;
144 message << "DsmType " << this->DsmType << " not yet implemented or not uniform";
145 XdmfError::message(XdmfError::FATAL, message.str());
146 }
147 catch (XdmfError & e) {
148 throw e;
149 }
150 break;
151 }
152 return(ServerId);
153 }
154
155 void
BroadcastComm(int * comm,int root)156 XdmfDSMBuffer::BroadcastComm(int *comm, int root)
157 {
158 int status;
159
160 this->Comm->Broadcast(comm,
161 sizeof(int),
162 root,
163 XDMF_DSM_INTRA_COMM);
164 if (status != MPI_SUCCESS) {
165 try {
166 XdmfError(XdmfError::FATAL, "Broadcast of Comm failed");
167 }
168 catch (XdmfError & e) {
169 throw e;
170 }
171 }
172 }
173
174 int
BufferService(int * returnOpcode)175 XdmfDSMBuffer::BufferService(int *returnOpcode)
176 {
177 int opcode, who;
178 int aLength;
179 int address;
180 char *datap;
181 static int syncId = -1;
182
183 if (this->CommChannel == XDMF_DSM_ANY_COMM) {
184 if (this->Comm->GetId() == 0) {
185 try {
186 this->Comm->Probe(&this->CommChannel);
187 }
188 catch (XdmfError & e) {
189 throw e;
190 }
191 }
192 try {
193 this->BroadcastComm(&this->CommChannel, 0);
194 }
195 catch (XdmfError & e) {
196 throw e;
197 }
198 }
199
200 try {
201 this->ReceiveCommandHeader(&opcode,
202 &who,
203 &address,
204 &aLength,
205 this->CommChannel,
206 syncId);
207 }
208 catch (XdmfError & e) {
209 throw e;
210 }
211
212 // Connection is an ID for client or server,
213 // int communicatorId = this->CommChannel;
214
215 switch(opcode) {
216
217 // H5FD_DSM_OPCODE_PUT
218 case XDMF_DSM_OPCODE_PUT:
219 if (((unsigned int) aLength + address) > this->Length) {
220 try {
221 std::stringstream message;
222 message << "Length " << aLength << " too long for Address " << address
223 << "\n" << "Server Start = " << this->StartAddress << " End = "
224 << this->EndAddress;
225 XdmfError::message(XdmfError::FATAL, message.str());
226 }
227 catch (XdmfError & e) {
228 throw e;
229 }
230 }
231 if ((datap = this->DataPointer) == NULL) {
232 try {
233 XdmfError::message(XdmfError::FATAL,
234 "Null Data Pointer when trying to put data");
235 }
236 catch (XdmfError & e) {
237 throw e;
238 }
239 }
240 datap += address;
241 try {
242 this->ReceiveData(who,
243 datap,
244 aLength,
245 XDMF_DSM_PUT_DATA_TAG,
246 0,
247 this->CommChannel);
248 }
249 catch (XdmfError & e) {
250 throw e;
251 }
252 break;
253
254 // H5FD_DSM_OPCODE_GET
255 case XDMF_DSM_OPCODE_GET:
256 if (((unsigned int) aLength + address) > this->Length) {
257 try {
258 std::stringstream message;
259 message << "Length " << aLength << " too long for Address " << address
260 << "\n" << "Server Start = " << this->StartAddress << " End = "
261 << this->EndAddress;
262 XdmfError::message(XdmfError::FATAL, message.str());
263 }
264 catch (XdmfError & e) {
265 throw e;
266 }
267 }
268 if ((datap = this->DataPointer) == NULL) {
269 try {
270 XdmfError::message(XdmfError::FATAL,
271 "Null Data Pointer when trying to put data");
272 }
273 catch (XdmfError & e) {
274 throw e;
275 }
276 }
277 datap += address;
278 try {
279 this->SendData(who,
280 datap,
281 aLength,
282 XDMF_DSM_GET_DATA_TAG,
283 0,
284 this->CommChannel);
285 }
286 catch (XdmfError & e) {
287 throw e;
288 }
289 break;
290
291 // H5FD_DSM_ACCEPT
292 // Comes from client
293 case XDMF_DSM_ACCEPT:
294 {
295 int numConnections;
296 this->ReceiveAcknowledgment(who,
297 numConnections,
298 XDMF_DSM_EXCHANGE_TAG,
299 XDMF_DSM_INTER_COMM);
300 this->Comm->Accept(numConnections);
301 this->SendInfo();
302 break;
303 }
304 // Comes from client, requests a notifcation when a file is touched.
305 // The notification is sent out when clear is called.
306 case XDMF_DSM_SET_NOTIFY:
307 {
308 // Send the notify info to all cores.
309 int strlength = 0;
310 char * notifystring;
311 int waitingCore = 0;
312 if (this->Comm->GetId() == 0)
313 {
314 waitingCore = who;
315 this->ReceiveAcknowledgment(who,
316 strlength,
317 XDMF_DSM_EXCHANGE_TAG,
318 this->CommChannel);
319 notifystring = new char[strlength+1]();
320 this->ReceiveData(who,
321 notifystring,
322 strlength,
323 XDMF_DSM_EXCHANGE_TAG,
324 0,
325 this->CommChannel);
326 notifystring[strlength] = 0;
327 WaitingMap[std::string(notifystring)].push_back(who);
328 // Send XDMF_DSM_SET_NOTIFY to all server cores in order of increasing id
329 for (int i = this->GetStartServerId() + 1; // Since this is core 0 sending it
330 i <= this->GetEndServerId();
331 ++i) {
332 if (i != this->Comm->GetInterId())
333 {
334 this->SendCommandHeader(XDMF_DSM_SET_NOTIFY, i, 0, 0, XDMF_DSM_INTER_COMM);
335 }
336 }
337 }
338 // broadcast to the other server cores
339 // BCAST strlen
340 this->Comm->Broadcast(&strlength,
341 sizeof(int),
342 0,
343 XDMF_DSM_INTRA_COMM);
344 // BCAST notifystring
345 if (this->Comm->GetId() != 0)
346 {
347 notifystring = new char[strlength + 1]();
348 }
349 this->Comm->Broadcast(¬ifystring,
350 strlength,
351 0,
352 XDMF_DSM_INTRA_COMM);
353 notifystring[strlength] = 0;
354 // BCAST locked core
355 this->Comm->Broadcast(&waitingCore,
356 sizeof(int),
357 0,
358 XDMF_DSM_INTRA_COMM);
359
360 if (this->Comm->GetId() != 0)
361 {
362 WaitingMap[std::string(notifystring)].push_back(waitingCore);
363 }
364
365 break;
366 }
367 // sends out and clears the notifcations that are stored for a specific file.
368 case XDMF_DSM_CLEAR_NOTIFY:
369 {
370 // send a command to other cores to clear this notification
371 int strlength = 0;
372 char * notifystring;
373 int clearCode = 0;
374 if (this->Comm->GetId() == 0)
375 {
376 this->ReceiveAcknowledgment(who,
377 strlength,
378 XDMF_DSM_EXCHANGE_TAG,
379 this->CommChannel);
380 notifystring = new char[strlength+1]();
381 this->ReceiveData(who,
382 notifystring,
383 strlength,
384 XDMF_DSM_EXCHANGE_TAG,
385 0,
386 this->CommChannel);
387 notifystring[strlength] = 0;
388 this->ReceiveAcknowledgment(who,
389 clearCode,
390 XDMF_DSM_EXCHANGE_TAG,
391 this->CommChannel);
392 }
393 // broad cast string to be notified
394 if (WaitingMap[std::string(notifystring)].size() > 0)
395 {
396 // Request the help of the rest of the server
397 // Send XDMF_DSM_SET_NOTIFY to all server cores in order of increasing id
398 for (int i = this->GetStartServerId() + 1; // Since this is core 0 sending it
399 i <= this->GetEndServerId();
400 ++i) {
401 if (i != this->Comm->GetInterId())
402 {
403 this->SendCommandHeader(XDMF_DSM_CLEAR_NOTIFY, i, 0, 0, XDMF_DSM_INTER_COMM);
404 }
405 }
406
407 // BCAST strlen and code
408 this->Comm->Broadcast(&strlength,
409 sizeof(int),
410 0,
411 XDMF_DSM_INTRA_COMM);
412 this->Comm->Broadcast(&clearCode,
413 sizeof(int),
414 0,
415 XDMF_DSM_INTRA_COMM);
416 // BCAST notifystring
417 if (this->Comm->GetId() != 0)
418 {
419 notifystring = new char[strlength+1]();
420 }
421 this->Comm->Broadcast(¬ifystring,
422 strlength,
423 0,
424 XDMF_DSM_INTRA_COMM);
425 notifystring[strlength] = 0;
426 // cores notify based on their index, in order to split up the work
427 std::vector<unsigned int> notifiedCores = WaitingMap[std::string(notifystring)];
428 for (unsigned int i = this->Comm->GetId(); i < notifiedCores.size(); i+=this->Comm->GetIntraSize())
429 {
430 unsigned int recvCore = notifiedCores[i];
431 this->SendAcknowledgment(recvCore,
432 clearCode,
433 XDMF_DSM_EXCHANGE_TAG,
434 this->CommChannel);
435 }
436 // Then all cores remove the string from the map of notifications
437 WaitingMap.erase(std::string(notifystring));
438 }
439 break;
440 }
441 case XDMF_DSM_REGISTER_FILE:
442 {
443 // save file description
444 XdmfDSMBuffer::XDMF_file_desc * newfile = new XdmfDSMBuffer::XDMF_file_desc();
445
446 int strlength = 0;
447
448 this->ReceiveAcknowledgment(who,
449 strlength,
450 XDMF_DSM_EXCHANGE_TAG,
451 this->CommChannel);
452
453 newfile->name = new char[strlength + 1];
454
455 this->ReceiveData(who,
456 newfile->name,
457 strlength,
458 XDMF_DSM_EXCHANGE_TAG,
459 0,
460 this->CommChannel);
461
462 newfile->name[strlength] = 0;
463
464 this->ReceiveData(who,
465 (char *)&newfile->start,
466 sizeof(haddr_t),
467 XDMF_DSM_EXCHANGE_TAG,
468 0,
469 this->CommChannel);
470
471 this->ReceiveData(who,
472 (char *)&newfile->end,
473 sizeof(haddr_t),
474 XDMF_DSM_EXCHANGE_TAG,
475 0,
476 this->CommChannel);
477
478 int recvNumPages = 0;
479
480 this->ReceiveAcknowledgment(who,
481 recvNumPages,
482 XDMF_DSM_EXCHANGE_TAG,
483 this->CommChannel);
484
485 newfile->numPages = recvNumPages;
486
487 if (newfile->numPages > 0)
488 {
489 newfile->pages = new unsigned int[newfile->numPages]();
490
491 this->ReceiveData(who,
492 (char *)newfile->pages,
493 newfile->numPages * sizeof(unsigned int),
494 XDMF_DSM_EXCHANGE_TAG,
495 0,
496 this->CommChannel);
497 }
498 else
499 {
500 newfile->pages = NULL;
501 }
502
503 // If old description exists, overwrite it.
504
505 FileDefinitions[std::string(newfile->name)] = newfile;
506
507 break;
508 }
509 case XDMF_DSM_REQUEST_PAGES:
510 {
511 // set aside pages to a file
512 char * requestfile;
513 int strlength = 0;
514
515 this->ReceiveAcknowledgment(who,
516 strlength,
517 XDMF_DSM_EXCHANGE_TAG,
518 this->CommChannel);
519
520 requestfile = new char[strlength + 1];
521
522 this->ReceiveData(who,
523 requestfile,
524 strlength,
525 XDMF_DSM_EXCHANGE_TAG,
526 0,
527 this->CommChannel);
528
529 requestfile[strlength] = 0;
530
531 // This file will have its pages appended to.
532 XdmfDSMBuffer::XDMF_file_desc * filedesc;
533
534 if (FileDefinitions.count(std::string(requestfile)) > 0)
535 {
536 filedesc = FileDefinitions[std::string(requestfile)];
537 }
538 else
539 {
540 filedesc = new XDMF_file_desc();
541 filedesc->start = 0;
542 filedesc->end = 0;
543 filedesc->numPages = 0;
544 filedesc->pages = NULL;
545 }
546
547 int datasize = 0;
548
549 // Request size required for the file
550 this->ReceiveAcknowledgment(who,
551 datasize,
552 XDMF_DSM_EXCHANGE_TAG,
553 this->CommChannel);
554
555 // TODO Error handling block length must be greater than 0
556 // If Block size = 0 then do nothing?
557 // Then return blank data?
558
559 int requestedblocks = ((double)datasize) / this->BlockLength;
560
561 // Round up
562 if (requestedblocks * this->BlockLength != datasize)
563 {
564 ++requestedblocks;
565 }
566
567 while (requestedblocks + PagesAssigned >= this->NumPages * this->Comm->GetIntraSize())
568 {
569 // If requested blocks are out of range, resize
570 for (int i = this->GetStartServerId() + 1; // Since this is core 0 sending it
571 i <= this->GetEndServerId();
572 ++i) {
573 if (i != this->Comm->GetInterId())
574 {
575 this->SendCommandHeader(XDMF_DSM_OPCODE_RESIZE, i, 0, 0, XDMF_DSM_INTER_COMM);
576 }
577 }
578 this->SetLength(this->Length + (this->Length * this->ResizeFactor));
579 }
580
581 unsigned int * newpagelist = new unsigned int[filedesc->numPages + requestedblocks]();
582
583 unsigned int index = 0;
584
585 for (unsigned int i = 0; i < filedesc->numPages; ++i)
586 {
587 newpagelist[index] = filedesc->pages[index];
588 ++index;
589 }
590
591 for (;index < filedesc->numPages + requestedblocks; ++index)
592 {
593 // The number of pages assigned is incremented after the page is added.
594 // The value added is simply an index
595 newpagelist[index] = PagesAssigned++;
596 }
597
598 filedesc->numPages = filedesc->numPages + requestedblocks;
599 unsigned int * oldpointer = filedesc->pages;
600 filedesc->pages = newpagelist;
601
602 if (oldpointer != NULL)
603 {
604 delete oldpointer;
605 }
606
607 // Send back new page allocation pointer
608
609 this->SendAcknowledgment(who,
610 filedesc->numPages,
611 XDMF_DSM_EXCHANGE_TAG,
612 this->CommChannel);
613
614 this->SendData(who,
615 (char *)filedesc->pages,
616 filedesc->numPages * sizeof(unsigned int),
617 XDMF_DSM_EXCHANGE_TAG,
618 0,
619 this->CommChannel);
620
621 this->SendData(who,
622 (char*)&filedesc->start,
623 sizeof(haddr_t),
624 XDMF_DSM_EXCHANGE_TAG,
625 0,
626 this->CommChannel);
627
628 filedesc->end = filedesc->start + (filedesc->numPages * this->BlockLength);
629
630 this->SendData(who,
631 (char*)&(filedesc->end),
632 sizeof(haddr_t),
633 XDMF_DSM_EXCHANGE_TAG,
634 0,
635 this->CommChannel);
636
637 // Notify the current size of the buffer
638 int currentLength = this->Length;
639 this->SendAcknowledgment(who,
640 currentLength,
641 XDMF_DSM_EXCHANGE_TAG,
642 this->CommChannel);
643
644 break;
645 }
646 case XDMF_DSM_REQUEST_FILE:
647 {
648 char * requestfile;
649 int strlength = 0;
650
651 this->ReceiveAcknowledgment(who,
652 strlength,
653 XDMF_DSM_EXCHANGE_TAG,
654 this->CommChannel);
655
656 requestfile = new char[strlength + 1];
657
658 this->ReceiveData(who,
659 requestfile,
660 strlength,
661 XDMF_DSM_EXCHANGE_TAG,
662 0,
663 this->CommChannel);
664
665 requestfile[strlength] = 0;
666
667 // This file will be returned.
668 XdmfDSMBuffer::XDMF_file_desc * filedesc;
669
670 if (FileDefinitions.count(std::string(requestfile)) > 0)
671 {
672 this->SendAcknowledgment(who,
673 XDMF_DSM_SUCCESS,
674 XDMF_DSM_EXCHANGE_TAG,
675 this->CommChannel);
676
677 filedesc = FileDefinitions[std::string(requestfile)];
678
679 this->SendData(who,
680 (char*)&filedesc->start,
681 sizeof(haddr_t),
682 XDMF_DSM_EXCHANGE_TAG,
683 0,
684 this->CommChannel);
685
686 this->SendData(who,
687 (char*)&filedesc->end,
688 sizeof(haddr_t),
689 XDMF_DSM_EXCHANGE_TAG,
690 0,
691 this->CommChannel);
692
693 int sendNumPages = filedesc->numPages;
694
695 this->SendAcknowledgment(who,
696 sendNumPages,
697 XDMF_DSM_EXCHANGE_TAG,
698 this->CommChannel);
699
700 this->SendData(who,
701 (char *)filedesc->pages,
702 filedesc->numPages * sizeof(unsigned int),
703 XDMF_DSM_EXCHANGE_TAG,
704 0,
705 this->CommChannel);
706 }
707 else
708 {
709 this->SendAcknowledgment(who,
710 XDMF_DSM_FAIL,
711 XDMF_DSM_EXCHANGE_TAG,
712 this->CommChannel);
713 }
714
715 break;
716 }
717 case XDMF_DSM_OPCODE_RESIZE:
718 this->SetLength(this->Length + (this->Length * this->ResizeFactor));
719 break;
720 case XDMF_DSM_REQUEST_ACCESS:
721 {
722 int isLocked = 0;
723
724 char * requestfile;
725 int strlength = 0;
726
727 this->ReceiveAcknowledgment(who,
728 strlength,
729 XDMF_DSM_EXCHANGE_TAG,
730 this->CommChannel);
731
732 requestfile = new char[strlength + 1];
733
734 this->ReceiveData(who,
735 requestfile,
736 strlength,
737 XDMF_DSM_EXCHANGE_TAG,
738 0,
739 this->CommChannel);
740
741 requestfile[strlength] = 0;
742
743 // If the requesting core is the one who
744 // already locked the file then tell it that there is not lock.
745 std::map<std::string, int>::iterator isOwner = FileOwners.find(std::string(requestfile));
746
747 if (LockedMap.count(std::string(requestfile)) > 0)
748 {
749 if (isOwner->second != who)
750 {
751 // If the file is locked notify the requesting core and add it to the queue.
752 isLocked = 1;
753 LockedMap[std::string(requestfile)].push(who);
754 }
755 }
756 else
757 {
758 LockedMap[std::string(requestfile)] = std::queue<unsigned int>();
759 FileOwners[std::string(requestfile)] = who;
760 }
761
762 this->SendAcknowledgment(who,
763 isLocked,
764 XDMF_DSM_EXCHANGE_TAG,
765 this->CommChannel);
766
767 break;
768 }
769 case XDMF_DSM_UNLOCK_FILE:
770 {
771 char * requestfile;
772 int strlength = 0;
773
774 this->ReceiveAcknowledgment(who,
775 strlength,
776 XDMF_DSM_EXCHANGE_TAG,
777 this->CommChannel);
778
779 requestfile = new char[strlength + 1];
780
781 this->ReceiveData(who,
782 requestfile,
783 strlength,
784 XDMF_DSM_EXCHANGE_TAG,
785 0,
786 this->CommChannel);
787
788 requestfile[strlength] = 0;
789
790 // If file isn't locked do nothing
791 if (LockedMap.count(std::string(requestfile)) > 0)
792 {
793 // Remove the queue if there are no more waiting
794 if (LockedMap[std::string(requestfile)].size() > 0)
795 {
796 // Pop the next process waiting off the queue
797 unsigned int nextCore = LockedMap[std::string(requestfile)].front();
798 LockedMap[std::string(requestfile)].pop();
799 FileOwners[std::string(requestfile)] = nextCore;
800 this->SendAcknowledgment(nextCore,
801 1,
802 XDMF_DSM_EXCHANGE_TAG,
803 this->CommChannel);
804 }
805 if(LockedMap[std::string(requestfile)].size() == 0)
806 {
807 LockedMap.erase(std::string(requestfile));
808 FileOwners.erase(std::string(requestfile));
809 }
810 }
811
812 break;
813 }
814 case XDMF_DSM_LOCK_ACQUIRE:
815 // Currently unsupported
816 break;
817
818 // H5FD_DSM_LOCK_RELEASE
819 // Comes from client or server depending on communicator
820 case XDMF_DSM_LOCK_RELEASE:
821 // Currently unsupported
822 break;
823
824 // H5FD_DSM_OPCODE_DONE
825 // Always received on server
826 case XDMF_DSM_OPCODE_DONE:
827 break;
828
829 // DEFAULT
830 default :
831 try {
832 std::stringstream message;
833 message << "Error: Unknown Opcode " << opcode;
834 XdmfError::message(XdmfError::FATAL, message.str());
835 }
836 catch (XdmfError & e) {
837 throw e;
838 }
839 }
840
841 if (returnOpcode) *returnOpcode = opcode;
842 return(XDMF_DSM_SUCCESS);
843 }
844
845 void
BufferServiceLoop(int * returnOpcode)846 XdmfDSMBuffer::BufferServiceLoop(int *returnOpcode)
847 {
848 int op, status = XDMF_DSM_SUCCESS;
849 while (status == XDMF_DSM_SUCCESS) {
850 try {
851 status = this->BufferService(&op);
852 }
853 catch (XdmfError & e) {
854 throw e;
855 }
856 if (returnOpcode) *returnOpcode = op;
857 if (op == XDMF_DSM_OPCODE_DONE) {
858 break;
859 }
860 }
861 }
862
863 void
Create(MPI_Comm newComm,int startId,int endId)864 XdmfDSMBuffer::Create(MPI_Comm newComm, int startId, int endId)
865 {
866 //
867 // Create DSM communicator
868 //
869 switch (this->InterCommType) {
870 case XDMF_DSM_COMM_MPI:
871 this->Comm = new XdmfDSMCommMPI();
872 break;
873 default:
874 try {
875 XdmfError::message(XdmfError::FATAL, "DSM communication type not supported");
876 }
877 catch (XdmfError & e) {
878 throw e;
879 }
880 }
881
882 this->Comm->DupComm(newComm);
883 this->Comm->Init();
884
885 // Uniform Dsm : every node has a buffer the same size. (Addresses are sequential)
886 // Block DSM : nodes are set up using paging
887 long length = (long) (this->LocalBufferSizeMBytes)*1024LU*1024LU;
888 switch (this->DsmType) {
889 case XDMF_DSM_TYPE_UNIFORM:
890 case XDMF_DSM_TYPE_UNIFORM_RANGE:
891 this->ConfigureUniform(this->Comm, length, startId, endId);
892 break;
893 case XDMF_DSM_TYPE_BLOCK_CYCLIC:
894 this->ConfigureUniform(this->Comm, length, startId, endId, this->BlockLength, false);
895 break;
896 case XDMF_DSM_TYPE_BLOCK_RANDOM:
897 this->ConfigureUniform(this->Comm, length, startId, endId, this->BlockLength, true);
898 break;
899 default:
900 try {
901 XdmfError(XdmfError::FATAL, "DSM configuration type not supported");
902 }
903 catch (XdmfError & e) {
904 throw e;
905 }
906 }
907 }
908
909 void
ConfigureUniform(XdmfDSMCommMPI * aComm,long aLength,int startId,int endId,long aBlockLength,bool random)910 XdmfDSMBuffer::ConfigureUniform(XdmfDSMCommMPI *aComm, long aLength,
911 int startId, int endId, long aBlockLength,
912 bool random)
913 {
914 if (startId < 0) {
915 startId = 0;
916 }
917 if (endId < 0) {
918 endId = aComm->GetIntraSize() - 1;
919 }
920 this->SetDsmType(XDMF_DSM_TYPE_UNIFORM_RANGE);
921 if ((startId == 0) && (endId == aComm->GetIntraSize() - 1)) {
922 this->SetDsmType(XDMF_DSM_TYPE_UNIFORM);
923 }
924 if (aBlockLength) {
925 if (!random) {
926 this->SetDsmType(XDMF_DSM_TYPE_BLOCK_CYCLIC);
927 }
928 else {
929 this->SetDsmType(XDMF_DSM_TYPE_BLOCK_RANDOM);
930 }
931 this->SetBlockLength(aBlockLength);
932 }
933 this->StartServerId = startId;
934 this->EndServerId = endId;
935 this->SetComm(aComm);
936 if ((aComm->GetId() >= startId) &&
937 (aComm->GetId() <= endId) &&
938 this->IsServer) {
939 try {
940 if (aBlockLength) {
941 // For optimization we make the DSM length fit to a multiple of block size
942 this->SetLength(((long)(aLength / aBlockLength)) * aBlockLength);
943 this->NumPages = ((long)(aLength / aBlockLength));
944 }
945 else {
946 this->SetLength(aLength);
947 }
948 }
949 catch (XdmfError & e) {
950 throw e;
951 }
952 this->StartAddress = (aComm->GetId() - startId) * aLength;
953 this->EndAddress = this->StartAddress + aLength - 1;
954 }
955 else {
956 if (aBlockLength) {
957 this->Length = ((long)(aLength / aBlockLength)) * aBlockLength;
958 }
959 else {
960 this->Length = aLength;
961 }
962 }
963 this->TotalLength = this->GetLength() * (endId - startId + 1);
964 // Set DSM structure
965 std::vector<std::pair<std::string, unsigned int> > newStructure;
966 // determine size of application before the server
967 if (startId > 0)
968 {
969 newStructure.push_back(std::pair<std::string, unsigned int>(aComm->GetApplicationName(), startId));
970 }
971 newStructure.push_back(std::pair<std::string, unsigned int>("Server", (endId + 1) - startId));
972 if(aComm->GetInterSize() - (startId +((endId + 1) - startId)) > 0)
973 {
974 newStructure.push_back(std::pair<std::string, unsigned int>(aComm->GetApplicationName(), aComm->GetInterSize() - (startId +((endId + 1) - startId))));
975 }
976 aComm->SetDsmProcessStructure(newStructure);
977 }
978
979 void
Connect(bool persist)980 XdmfDSMBuffer::Connect(bool persist)
981 {
982 int status;
983
984 do {
985 try {
986 status = this->GetComm()->Connect();
987 }
988 catch (XdmfError & e) {
989 throw e;
990 }
991 if (status == MPI_SUCCESS) {
992 this->SetIsConnected(true);
993 try {
994 this->ReceiveInfo();
995 }
996 catch (XdmfError & e) {
997 throw e;
998 }
999 }
1000 else {
1001 #ifdef _WIN32
1002 Sleep(1000);
1003 // Since windows has a different sleep command
1004 #else
1005 sleep(1);
1006 #endif
1007 }
1008 } while (persist && (status != MPI_SUCCESS));
1009 }
1010
1011 void
Disconnect()1012 XdmfDSMBuffer::Disconnect()
1013 {
1014 // Disconnecting is done manually
1015 try {
1016 this->GetComm()->Disconnect();
1017 }
1018 catch (XdmfError & e) {
1019 throw e;
1020 }
1021 this->SetIsConnected(false);
1022 }
1023
1024 void
Get(long Address,long aLength,void * Data)1025 XdmfDSMBuffer::Get(long Address, long aLength, void *Data)
1026 {
1027 int who, MyId = this->Comm->GetInterId();
1028 int astart, aend, len;
1029 char *datap = (char *)Data;
1030
1031 // While there is length left
1032 while(aLength) {
1033 // Figure out what server core the address is located on
1034 who = this->AddressToId(Address);
1035 if(who == XDMF_DSM_FAIL){
1036 try {
1037 XdmfError::message(XdmfError::FATAL, "Address Error");
1038 }
1039 catch (XdmfError & e) {
1040 throw e;
1041 }
1042 }
1043 // Get the start and end of the block listed
1044 this->GetAddressRangeForId(who, &astart, &aend);
1045 // Determine the amount of data to be written to that core
1046 // Basically, it's how much data will fit from
1047 // the starting point of the address to the end
1048 len = std::min(aLength, aend - Address + 1);
1049 // If the data is on the core running this code, then the put is simple
1050 if(who == MyId){
1051 char *dp;
1052 dp = this->DataPointer;
1053 dp += Address - this->StartAddress;
1054 memcpy(datap, dp, len);
1055 }
1056 else{
1057 // Otherwise send it to the appropriate core to deal with
1058 int dataComm = XDMF_DSM_INTRA_COMM;
1059 if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1060 dataComm = XDMF_DSM_INTER_COMM;
1061 }
1062 try {
1063 this->SendCommandHeader(XDMF_DSM_OPCODE_GET, who, Address - astart, len, dataComm);
1064 }
1065 catch (XdmfError & e) {
1066 throw e;
1067 }
1068 try {
1069 this->ReceiveData(who, datap, len, XDMF_DSM_GET_DATA_TAG, Address - astart, dataComm);
1070 }
1071 catch (XdmfError & e) {
1072 throw e;
1073 }
1074 }
1075 // Shift all the numbers by the length of the data written
1076 // Until aLength = 0
1077 aLength -= len;
1078 Address += len;
1079 datap += len;
1080 }
1081 }
1082
1083 void
Get(unsigned int * pages,unsigned int numPages,long Address,long aLength,void * Data)1084 XdmfDSMBuffer::Get(unsigned int * pages, unsigned int numPages, long Address, long aLength, void *Data)
1085 {
1086 char * currentStart;
1087 unsigned int currentPageId = Address / this->BlockLength;
1088 unsigned int dsmPage;
1089 long startingAddress = Address % this->BlockLength;
1090 unsigned int tranferedLength;
1091 unsigned int dataPage = 0;
1092
1093 long pointeroffset = 0;
1094
1095 int serverCore;
1096 int writeAddress;
1097
1098 while (aLength) {
1099 if (dataPage == 0) {
1100 tranferedLength = this->BlockLength - startingAddress;
1101 }
1102 else {
1103 tranferedLength = this->BlockLength;
1104 }
1105 if (tranferedLength > aLength) {
1106 tranferedLength = aLength;
1107 }
1108
1109 dsmPage = pages[currentPageId];
1110
1111 currentStart = (char *)Data + pointeroffset;;
1112
1113 // Write page to DSM
1114 // page to DSM server Id
1115 // page to address
1116 // write to location
1117
1118 serverCore = PageToId(dsmPage);
1119 writeAddress = PageToAddress(dsmPage);
1120
1121 if (serverCore == XDMF_DSM_FAIL) {
1122 XdmfError::message(XdmfError::FATAL,
1123 "Error: Unable to determine server core.");
1124 }
1125
1126 if (writeAddress == XDMF_DSM_FAIL) {
1127 XdmfError::message(XdmfError::FATAL,
1128 "Error: Unable to determine write address.");
1129 }
1130
1131 if (dataPage == 0)
1132 {
1133 writeAddress += startingAddress;
1134 }
1135
1136 // If the data is on the core running this code, then the put is simple
1137 if(serverCore == this->Comm->GetInterId()){
1138 char *dp;
1139 dp = this->DataPointer;
1140 dp += writeAddress;
1141 memcpy(currentStart, dp, tranferedLength);
1142 }
1143 else{
1144 // Otherwise send it to the appropriate core to deal with
1145 int dataComm = XDMF_DSM_INTRA_COMM;
1146 if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1147 dataComm = XDMF_DSM_INTER_COMM;
1148 }
1149 try {
1150 this->SendCommandHeader(XDMF_DSM_OPCODE_GET,
1151 serverCore,
1152 writeAddress,
1153 tranferedLength,
1154 dataComm);
1155 }
1156 catch (XdmfError & e) {
1157 throw e;
1158 }
1159 try {
1160 this->ReceiveData(serverCore,
1161 currentStart,
1162 tranferedLength,
1163 XDMF_DSM_GET_DATA_TAG,
1164 writeAddress,
1165 dataComm);
1166 }
1167 catch (XdmfError & e) {
1168 throw e;
1169 }
1170 }
1171
1172 aLength -= tranferedLength;
1173 pointeroffset += tranferedLength;
1174 // move to the next page
1175 ++currentPageId;
1176 ++dataPage;
1177 }
1178 }
1179
1180 void
GetAddressRangeForId(int Id,int * Start,int * End)1181 XdmfDSMBuffer::GetAddressRangeForId(int Id, int *Start, int *End){
1182 switch(this->DsmType) {
1183 case XDMF_DSM_TYPE_UNIFORM :
1184 case XDMF_DSM_TYPE_UNIFORM_RANGE :
1185 // All Servers have same length
1186 // Start index is equal to the id inside the servers times
1187 // the length of the block per server
1188 // It is the starting index of the server's data block relative
1189 // to the entire block
1190 *Start = (Id - this->StartServerId) * this->Length;
1191 // End index is simply the start index + the length of the
1192 // server's data block.
1193 // The range produced is the start of the server's data block to its end.
1194 *End = *Start + Length - 1;
1195 break;
1196 default :
1197 // Not Implemented
1198 try {
1199 std::stringstream message;
1200 message << "DsmType " << this->DsmType << " not yet implemented";
1201 XdmfError::message(XdmfError::FATAL, message.str());
1202 }
1203 catch (XdmfError & e) {
1204 throw e;
1205 }
1206 break;
1207 }
1208 }
1209
1210 long
GetBlockLength()1211 XdmfDSMBuffer::GetBlockLength()
1212 {
1213 return this->BlockLength;
1214 }
1215
1216 XdmfDSMCommMPI *
GetComm()1217 XdmfDSMBuffer::GetComm()
1218 {
1219 return this->Comm;
1220 }
1221
1222 char *
GetDataPointer()1223 XdmfDSMBuffer::GetDataPointer()
1224 {
1225 return this->DataPointer;
1226 }
1227
1228 int
GetDsmType()1229 XdmfDSMBuffer::GetDsmType()
1230 {
1231 return this->DsmType;
1232 }
1233
1234 int
GetEndAddress()1235 XdmfDSMBuffer::GetEndAddress()
1236 {
1237 return this->EndAddress;
1238 }
1239
1240 int
GetEndServerId()1241 XdmfDSMBuffer::GetEndServerId()
1242 {
1243 return this->EndServerId;
1244 }
1245
1246 int
GetInterCommType()1247 XdmfDSMBuffer::GetInterCommType()
1248 {
1249 return this->InterCommType;
1250 }
1251
1252 bool
GetIsConnected()1253 XdmfDSMBuffer::GetIsConnected()
1254 {
1255 return IsConnected;
1256 }
1257
1258 bool
GetIsServer()1259 XdmfDSMBuffer::GetIsServer()
1260 {
1261 return this->IsServer;
1262 }
1263
1264 long
GetLength()1265 XdmfDSMBuffer::GetLength()
1266 {
1267 // This is the length of the pointer on the current core.
1268 // Different from local buffer size as that value is
1269 // the starting size.
1270 return this->Length;
1271 }
1272
1273 unsigned int
GetLocalBufferSizeMBytes()1274 XdmfDSMBuffer::GetLocalBufferSizeMBytes()
1275 {
1276 // This is the starting value, so it is not updated as the pointer is expanded.
1277 return this->LocalBufferSizeMBytes;
1278 }
1279
1280 double
GetResizeFactor()1281 XdmfDSMBuffer::GetResizeFactor()
1282 {
1283 return this->ResizeFactor;
1284 }
1285
1286 int
GetStartAddress()1287 XdmfDSMBuffer::GetStartAddress()
1288 {
1289 return this->StartAddress;
1290 }
1291
1292 int
GetStartServerId()1293 XdmfDSMBuffer::GetStartServerId()
1294 {
1295 return this->StartServerId;
1296 }
1297
1298 long
GetTotalLength()1299 XdmfDSMBuffer::GetTotalLength()
1300 {
1301 return this->TotalLength;
1302 }
1303
1304 void
Lock(char * filename)1305 XdmfDSMBuffer::Lock(char * filename)
1306 {
1307 int strlength = std::string(filename).size();
1308 // Request access to the file
1309 this->SendCommandHeader(XDMF_DSM_REQUEST_ACCESS,
1310 this->GetStartServerId(),
1311 0,
1312 0,
1313 XDMF_DSM_INTER_COMM);
1314
1315 this->SendAcknowledgment(this->GetStartServerId(),
1316 strlength,
1317 XDMF_DSM_EXCHANGE_TAG,
1318 XDMF_DSM_INTER_COMM);
1319
1320 this->SendData(this->GetStartServerId(),
1321 filename,
1322 strlength,
1323 XDMF_DSM_EXCHANGE_TAG,
1324 0,
1325 XDMF_DSM_INTER_COMM);
1326
1327 int isLocked = 0;
1328
1329 this->ReceiveAcknowledgment(this->GetStartServerId(),
1330 isLocked,
1331 XDMF_DSM_EXCHANGE_TAG,
1332 XDMF_DSM_INTER_COMM);
1333
1334 if (isLocked == 1)
1335 {
1336 // If locked wait for notification that the file is available.
1337 this->ReceiveAcknowledgment(this->GetStartServerId(),
1338 isLocked,
1339 XDMF_DSM_EXCHANGE_TAG,
1340 XDMF_DSM_INTER_COMM);
1341 }
1342 }
1343
1344 int
PageToId(int pageId)1345 XdmfDSMBuffer::PageToId(int pageId)
1346 {
1347 int ServerId = XDMF_DSM_FAIL;
1348
1349 switch(this->DsmType) {
1350 case XDMF_DSM_TYPE_BLOCK_CYCLIC :
1351 case XDMF_DSM_TYPE_BLOCK_RANDOM :
1352 {
1353 // Block based allocation should use PageToId
1354 // All Servers have same length
1355 // This finds out which server the address provided starts on
1356 int serversize = (this->EndServerId - this->StartServerId);
1357 if (serversize < 1)
1358 {
1359 serversize = 1;
1360 }
1361 ServerId = pageId % serversize;// This should only be called by the server
1362 ServerId += this->StartServerId; // Apply the offset of the server if required.
1363 break;
1364 }
1365 default :
1366 // Not Implemented
1367 try {
1368 std::stringstream message;
1369 message << "DsmType " << this->DsmType << " not yet implemented or not paged";
1370 XdmfError::message(XdmfError::FATAL, message.str());
1371 }
1372 catch (XdmfError & e) {
1373 throw e;
1374 }
1375 break;
1376 }
1377 return(ServerId);
1378 }
1379
1380 int
PageToAddress(int pageId)1381 XdmfDSMBuffer::PageToAddress(int pageId)
1382 {
1383 int resultAddress = XDMF_DSM_FAIL;
1384
1385 switch(this->DsmType) {
1386 case XDMF_DSM_TYPE_BLOCK_CYCLIC :
1387 case XDMF_DSM_TYPE_BLOCK_RANDOM :
1388 {
1389 // Block based allocation should use PageToId
1390 // All Servers have same length
1391 // This finds out which server the address provided starts on
1392 // Since this is integers being divided the result is truncated.
1393 int serversize = (this->EndServerId - this->StartServerId);
1394 if (serversize < 1)
1395 {
1396 serversize = 1;
1397 }
1398 resultAddress = this->BlockLength * (pageId / serversize);
1399 break;
1400 }
1401 default :
1402 // Not Implemented
1403 try {
1404 std::stringstream message;
1405 message << "DsmType " << this->DsmType << " not yet implemented or not paged";
1406 XdmfError::message(XdmfError::FATAL, message.str());
1407 }
1408 catch (XdmfError & e) {
1409 throw e;
1410 }
1411 break;
1412 }
1413 return(resultAddress);
1414 }
1415
1416 void
ProbeCommandHeader(int * comm)1417 XdmfDSMBuffer::ProbeCommandHeader(int *comm)
1418 {
1419 // Used for finding a comm that has a waiting command, then sets the comm
1420 int status = XDMF_DSM_FAIL;
1421 MPI_Status signalStatus;
1422
1423 int flag;
1424 MPI_Comm probeComm =
1425 static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm();
1426
1427 // Spin until a message is found on one of the communicators
1428 while (status != XDMF_DSM_SUCCESS) {
1429 status = MPI_Iprobe(XDMF_DSM_ANY_SOURCE,
1430 XDMF_DSM_ANY_TAG,
1431 probeComm,
1432 &flag,
1433 &signalStatus);
1434 if (status != MPI_SUCCESS)
1435 {
1436 try {
1437 XdmfError::message(XdmfError::FATAL,
1438 "Error: Failed to probe for command header");
1439 }
1440 catch (XdmfError & e) {
1441 throw e;
1442 }
1443 }
1444 if (flag) {
1445 status = XDMF_DSM_SUCCESS;
1446 }
1447 else {
1448 if (static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm() != MPI_COMM_NULL) {
1449 if (probeComm == static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm()) {
1450 probeComm = static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm();
1451 }
1452 else {
1453 probeComm = static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm();
1454 }
1455 }
1456 }
1457 }
1458 if (probeComm == static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm()) {
1459 *comm = XDMF_DSM_INTER_COMM;
1460 }
1461 else
1462 {
1463 *comm = XDMF_DSM_INTRA_COMM;
1464 }
1465
1466 probeComm = MPI_COMM_NULL;
1467 }
1468
1469 void
Put(long Address,long aLength,const void * Data)1470 XdmfDSMBuffer::Put(long Address, long aLength, const void *Data)
1471 {
1472 int who, MyId = this->Comm->GetInterId();
1473 int astart, aend, len;
1474 char *datap = (char *)Data;
1475
1476 // While there is length left
1477 while(aLength){
1478 // Figure out what server core the address is located on
1479 who = this->AddressToId(Address);
1480 if(who == XDMF_DSM_FAIL){
1481 try {
1482 XdmfError::message(XdmfError::FATAL, "Address Error");
1483 }
1484 catch (XdmfError & e) {
1485 throw e;
1486 }
1487 }
1488 // Get the start and end of the block listed
1489 this->GetAddressRangeForId(who, &astart, &aend);
1490 // Determine the amount of data to be written to that core
1491 // Basically, it's how much data will fit from the starting point of
1492 // the address to the end
1493 len = std::min(aLength, aend - Address + 1);
1494 // If the data is on the core running this code, then the put is simple
1495 if(who == MyId){
1496 char *dp;
1497 dp = this->DataPointer;
1498 dp += Address - this->StartAddress;
1499 memcpy(dp, datap, len);
1500 }
1501 else{
1502 // Otherwise send it to the appropriate core to deal with
1503 int dataComm = XDMF_DSM_INTRA_COMM;
1504 if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1505 dataComm = XDMF_DSM_INTER_COMM;
1506 }
1507 try {
1508 this->SendCommandHeader(XDMF_DSM_OPCODE_PUT,
1509 who,
1510 Address - astart,
1511 len,
1512 dataComm);
1513 }
1514 catch (XdmfError & e) {
1515 throw e;
1516 }
1517 try {
1518 this->SendData(who,
1519 datap,
1520 len,
1521 XDMF_DSM_PUT_DATA_TAG,
1522 Address - astart,
1523 dataComm);
1524 }
1525 catch (XdmfError & e) {
1526 throw e;
1527 }
1528 }
1529 // Shift all the numbers by the length of the data written
1530 // Until aLength = 0
1531 aLength -= len;
1532 Address += len;
1533 datap += len;
1534 }
1535 }
1536
1537 void
Put(unsigned int * pages,unsigned int numPages,haddr_t Address,haddr_t aLength,const void * Data)1538 XdmfDSMBuffer::Put(unsigned int * pages, unsigned int numPages, haddr_t Address, haddr_t aLength, const void *Data)
1539 {
1540
1541 char * currentStart;
1542 unsigned int currentPageId = Address / this->BlockLength;
1543 unsigned int dsmPage = 0;
1544 long startingAddress = Address % this->BlockLength;
1545 unsigned int tranferedLength;
1546
1547 long pointeroffset = 0;
1548
1549 unsigned int dataPage = 0;
1550
1551 int serverCore = 0;
1552 int writeAddress = 0;
1553
1554 while (aLength) {
1555 if (dataPage == 0) {
1556 tranferedLength = this->BlockLength - startingAddress;
1557 }
1558 else {
1559 tranferedLength = this->BlockLength;
1560 }
1561 if (tranferedLength > aLength) {
1562 tranferedLength = aLength;
1563 }
1564
1565 dsmPage = pages[currentPageId];
1566
1567 currentStart = (char *)Data + pointeroffset;
1568
1569 // Write page to DSM
1570 // page to DSM server Id
1571 // page to address
1572 // write to location
1573 serverCore = PageToId(dsmPage);
1574
1575 writeAddress = PageToAddress(dsmPage);
1576
1577 if (serverCore == XDMF_DSM_FAIL) {
1578 XdmfError::message(XdmfError::FATAL, "Error: Unable to determine page server core.");
1579 }
1580 if (writeAddress == XDMF_DSM_FAIL) {
1581 XdmfError::message(XdmfError::FATAL, "Error: Unable to determine page address.");
1582 }
1583
1584 if (dataPage == 0)
1585 {
1586 writeAddress += startingAddress;
1587 }
1588
1589 // If the data is on the core running this code, then the put is simple
1590 if(serverCore == this->Comm->GetInterId()) {
1591 char *dp;
1592 dp = this->DataPointer;
1593 dp += writeAddress;
1594 memcpy(dp, currentStart, tranferedLength);
1595 }
1596 else{
1597 // Otherwise send it to the appropriate core to deal with
1598 int dataComm = XDMF_DSM_INTRA_COMM;
1599 if (this->Comm->GetInterComm() != MPI_COMM_NULL) {
1600 dataComm = XDMF_DSM_INTER_COMM;
1601 }
1602 try {
1603 this->SendCommandHeader(XDMF_DSM_OPCODE_PUT,
1604 serverCore,
1605 writeAddress,
1606 tranferedLength,
1607 dataComm);
1608 }
1609 catch (XdmfError & e) {
1610 throw e;
1611 }
1612 try {
1613 this->SendData(serverCore,
1614 currentStart,
1615 tranferedLength,
1616 XDMF_DSM_PUT_DATA_TAG,
1617 writeAddress,
1618 dataComm);
1619 }
1620 catch (XdmfError & e) {
1621 throw e;
1622 }
1623 }
1624
1625 aLength -= tranferedLength;
1626 pointeroffset += tranferedLength;
1627 // move to the next page
1628 ++currentPageId;
1629 ++dataPage;
1630 }
1631 }
1632
1633 void
ReceiveAcknowledgment(int source,int & data,int tag,int comm)1634 XdmfDSMBuffer::ReceiveAcknowledgment(int source, int &data, int tag, int comm)
1635 {
1636 int status;
1637 MPI_Status signalStatus;
1638
1639 this->Comm->Receive(&data,
1640 sizeof(int),
1641 source,
1642 comm,
1643 tag);
1644 status = MPI_SUCCESS;
1645
1646 if (status != MPI_SUCCESS) {
1647 try {
1648 XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
1649 }
1650 catch (XdmfError & e) {
1651 throw e;
1652 }
1653 }
1654 }
1655
1656 void
ReceiveCommandHeader(int * opcode,int * source,int * address,int * aLength,int comm,int remoteSource)1657 XdmfDSMBuffer::ReceiveCommandHeader(int *opcode, int *source, int *address, int *aLength, int comm, int remoteSource)
1658 {
1659 CommandMsg cmd;
1660 memset(&cmd, 0, sizeof(CommandMsg));
1661 int status = MPI_ERR_OTHER;
1662 MPI_Status signalStatus;
1663
1664 if (remoteSource < 0) {
1665 remoteSource = MPI_ANY_SOURCE;
1666 }
1667
1668 this->Comm->Receive(&cmd,
1669 sizeof(CommandMsg),
1670 remoteSource,
1671 comm,
1672 XDMF_DSM_COMMAND_TAG);
1673
1674 status = MPI_SUCCESS;
1675
1676 if (status != MPI_SUCCESS) {
1677 try {
1678 XdmfError::message(XdmfError::FATAL, "Error: Failed to receive command header");
1679 }
1680 catch (XdmfError & e) {
1681 throw e;
1682 }
1683 }
1684 else {
1685 *opcode = cmd.Opcode;
1686 *source = cmd.Source;
1687 *address = cmd.Address;
1688 *aLength = cmd.Length;
1689 }
1690 }
1691
1692 void
ReceiveData(int source,char * data,int aLength,int tag,int aAddress,int comm)1693 XdmfDSMBuffer::ReceiveData(int source, char * data, int aLength, int tag, int aAddress, int comm)
1694 {
1695 int status;
1696 MPI_Status signalStatus;
1697 this->Comm->Receive(data,
1698 aLength,
1699 source,
1700 comm,
1701 tag);
1702 status = MPI_SUCCESS;
1703 if (status != MPI_SUCCESS) {
1704 try {
1705 XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
1706 }
1707 catch (XdmfError & e) {
1708 throw e;
1709 }
1710 }
1711 }
1712
1713 void
ReceiveInfo()1714 XdmfDSMBuffer::ReceiveInfo()
1715 {
1716 InfoMsg dsmInfo;
1717 int status;
1718
1719 memset(&dsmInfo, 0, sizeof(InfoMsg));
1720
1721 int infoStatus = 0;
1722
1723 if (this->Comm->GetId() == 0) {
1724 infoStatus = 1;
1725 }
1726
1727 int * groupInfoStatus = new int[this->Comm->GetInterSize()]();
1728
1729 this->Comm->AllGather(&infoStatus,
1730 sizeof(int),
1731 &(groupInfoStatus[0]),
1732 sizeof(int),
1733 XDMF_DSM_INTER_COMM);
1734
1735 int sendCore = 0;
1736
1737 for (int i = 0; i < this->Comm->GetInterSize(); ++i) {
1738 if (groupInfoStatus[i] == 2) {
1739 sendCore = i;
1740 }
1741 }
1742
1743 status = MPI_SUCCESS;
1744
1745 this->Comm->Broadcast(&dsmInfo,
1746 sizeof(InfoMsg),
1747 sendCore,
1748 XDMF_DSM_INTER_COMM);
1749 if (status != MPI_SUCCESS) {
1750 try {
1751 XdmfError::message(XdmfError::FATAL, "Error: Failed to broadcast info");
1752 }
1753 catch (XdmfError & e) {
1754 throw e;
1755 }
1756 }
1757 this->SetDsmType(dsmInfo.type);
1758 // We are a client so don't allocate anything but only set a virtual remote length
1759 this->SetLength(dsmInfo.length);
1760 this->TotalLength = dsmInfo.total_length;
1761 this->SetBlockLength(dsmInfo.block_length);
1762 this->StartServerId = dsmInfo.start_server_id;
1763 this->EndServerId = dsmInfo.end_server_id;
1764
1765 MPI_Comm comm = this->Comm->GetInterComm();
1766
1767 // Cray needs to be launched via the colon notation so that it
1768 // can properly create a merged communicator
1769
1770 int rank = this->Comm->GetInterId();
1771 int size = this->Comm->GetInterSize();
1772
1773 int currentCore = 0;
1774 int * checkstatus = new int[size]();
1775 int localCheck = 0;
1776 std::string applicationName = this->Comm->GetApplicationName();
1777
1778 char * coreTag;
1779 int tagSize = 0;
1780
1781 std::vector<int> coreSplit;
1782 unsigned int splitid = 0;
1783
1784 std::vector<std::pair<std::string, unsigned int> > newStructure;
1785
1786 int * splitIds;
1787 unsigned int splitsize = 0;
1788
1789 while (currentCore < size)
1790 {
1791 if (rank == currentCore)
1792 {
1793 tagSize = applicationName.size();
1794 }
1795 MPI_Bcast(&tagSize, 1, MPI_INT, currentCore, comm);
1796 coreTag = new char[tagSize+1]();
1797
1798 if (rank == currentCore)
1799 {
1800 strcpy(coreTag, applicationName.c_str());
1801 }
1802 MPI_Bcast(coreTag, tagSize, MPI_CHAR, currentCore, comm);
1803 coreTag[tagSize] = 0;
1804
1805 if (strcmp(coreTag, applicationName.c_str()) == 0)
1806 {
1807 localCheck = 1;
1808 }
1809 else
1810 {
1811 localCheck = 0;
1812 }
1813
1814 checkstatus[rank] = localCheck;
1815
1816 MPI_Allgather(&localCheck, 1, MPI_INT,
1817 checkstatus, 1, MPI_INT,
1818 comm);
1819
1820 bool insplit = false;
1821 while (checkstatus[currentCore])
1822 {
1823 if (rank == currentCore)
1824 {
1825 insplit = true;
1826 }
1827 coreSplit.push_back(currentCore);
1828 ++currentCore;
1829 if (currentCore >= size)
1830 {
1831 break;
1832 }
1833 }
1834 if (insplit)
1835 {
1836 splitIds = (int *)calloc(coreSplit.size(), sizeof(int));
1837 memcpy(splitIds, &(coreSplit[0]), coreSplit.size() * sizeof(int));
1838 splitsize = coreSplit.size();
1839 }
1840 newStructure.push_back(std::pair<std::string, unsigned int>(std::string(coreTag), coreSplit.size()));
1841 coreSplit.clear();
1842 ++splitid;
1843 }
1844 this->Comm->SetDsmProcessStructure(newStructure);
1845 }
1846
1847 int
RegisterFile(char * name,unsigned int * pages,unsigned int numPages,haddr_t start,haddr_t end)1848 XdmfDSMBuffer::RegisterFile(char * name, unsigned int * pages, unsigned int numPages, haddr_t start, haddr_t end)
1849 {
1850 this->SendCommandHeader(XDMF_DSM_REGISTER_FILE,
1851 this->GetStartServerId(),
1852 0,
1853 0,
1854 XDMF_DSM_INTER_COMM);
1855
1856 int strlength = std::string(name).size();
1857
1858 this->SendAcknowledgment(this->GetStartServerId(),
1859 strlength,
1860 XDMF_DSM_EXCHANGE_TAG,
1861 XDMF_DSM_INTER_COMM);
1862
1863 this->SendData(this->GetStartServerId(),
1864 name,
1865 strlength,
1866 XDMF_DSM_EXCHANGE_TAG,
1867 0,
1868 XDMF_DSM_INTER_COMM);
1869
1870 this->SendData(this->GetStartServerId(),
1871 (char *)&start,
1872 sizeof(haddr_t),
1873 XDMF_DSM_EXCHANGE_TAG,
1874 0,
1875 XDMF_DSM_INTER_COMM);
1876
1877 this->SendData(this->GetStartServerId(),
1878 (char *)&end,
1879 sizeof(haddr_t),
1880 XDMF_DSM_EXCHANGE_TAG,
1881 0,
1882 XDMF_DSM_INTER_COMM);
1883
1884 this->SendAcknowledgment(this->GetStartServerId(),
1885 numPages,
1886 XDMF_DSM_EXCHANGE_TAG,
1887 XDMF_DSM_INTER_COMM);
1888
1889 if (numPages > 0)
1890 {
1891 this->SendData(this->GetStartServerId(),
1892 (char *)pages,
1893 numPages * sizeof(unsigned int),
1894 XDMF_DSM_EXCHANGE_TAG,
1895 0,
1896 XDMF_DSM_INTER_COMM);
1897 }
1898
1899 return XDMF_DSM_SUCCESS;
1900 }
1901
1902 int
RequestFileDescription(char * name,std::vector<unsigned int> & pages,unsigned int & numPages,haddr_t & start,haddr_t & end)1903 XdmfDSMBuffer::RequestFileDescription(char * name, std::vector<unsigned int> & pages, unsigned int & numPages, haddr_t & start, haddr_t & end)
1904 {
1905 this->SendCommandHeader(XDMF_DSM_REQUEST_FILE,
1906 this->GetStartServerId(),
1907 0,
1908 0,
1909 XDMF_DSM_INTER_COMM);
1910
1911 int strlength = std::string(name).size();
1912
1913 this->SendAcknowledgment(this->GetStartServerId(),
1914 strlength,
1915 XDMF_DSM_EXCHANGE_TAG,
1916 XDMF_DSM_INTER_COMM);
1917
1918 this->SendData(this->GetStartServerId(),
1919 name,
1920 strlength,
1921 XDMF_DSM_EXCHANGE_TAG,
1922 0,
1923 XDMF_DSM_INTER_COMM);
1924
1925 int fileExists = XDMF_DSM_SUCCESS;
1926
1927 this->ReceiveAcknowledgment(this->GetStartServerId(),
1928 fileExists,
1929 XDMF_DSM_EXCHANGE_TAG,
1930 XDMF_DSM_INTER_COMM);
1931
1932 if (fileExists == XDMF_DSM_SUCCESS)
1933 {
1934 this->ReceiveData(this->GetStartServerId(),
1935 (char*)&start,
1936 sizeof(haddr_t),
1937 XDMF_DSM_EXCHANGE_TAG,
1938 0,
1939 XDMF_DSM_INTER_COMM);
1940
1941 this->ReceiveData(this->GetStartServerId(),
1942 (char*)&end,
1943 sizeof(haddr_t),
1944 XDMF_DSM_EXCHANGE_TAG,
1945 0,
1946 XDMF_DSM_INTER_COMM);
1947
1948 int recvNumPages = 0;
1949
1950 this->ReceiveAcknowledgment(this->GetStartServerId(),
1951 recvNumPages,
1952 XDMF_DSM_EXCHANGE_TAG,
1953 XDMF_DSM_INTER_COMM);
1954
1955 numPages = recvNumPages;
1956
1957 // Reallocate pointer
1958 pages.clear();
1959
1960 unsigned int * pagelist = new unsigned int[numPages];
1961
1962 this->ReceiveData(this->GetStartServerId(),
1963 (char *)pagelist,
1964 numPages * sizeof(unsigned int),
1965 XDMF_DSM_EXCHANGE_TAG,
1966 0,
1967 XDMF_DSM_INTER_COMM);
1968
1969 for (unsigned int i = 0; i < numPages; ++i)
1970 {
1971 pages.push_back(pagelist[i]);
1972 }
1973
1974 return XDMF_DSM_SUCCESS;
1975 }
1976 else
1977 {
1978 return XDMF_DSM_FAIL;
1979 }
1980 }
1981
1982
1983 void
RequestPages(char * name,haddr_t spaceRequired,std::vector<unsigned int> & pages,unsigned int & numPages,haddr_t & start,haddr_t & end)1984 XdmfDSMBuffer::RequestPages(char * name,
1985 haddr_t spaceRequired,
1986 std::vector<unsigned int> & pages,
1987 unsigned int & numPages,
1988 haddr_t & start,
1989 haddr_t & end)
1990 {
1991 this->SendCommandHeader(XDMF_DSM_REQUEST_PAGES,
1992 this->GetStartServerId(),
1993 0,
1994 0,
1995 XDMF_DSM_INTER_COMM);
1996
1997 // set aside pages to a file
1998 int strlength = std::string(name).size();
1999
2000 this->SendAcknowledgment(this->GetStartServerId(),
2001 strlength,
2002 XDMF_DSM_EXCHANGE_TAG,
2003 XDMF_DSM_INTER_COMM);
2004
2005 this->SendData(this->GetStartServerId(),
2006 name,
2007 strlength,
2008 XDMF_DSM_EXCHANGE_TAG,
2009 0,
2010 XDMF_DSM_INTER_COMM);
2011
2012 // Request size required for the file
2013 this->SendAcknowledgment(this->GetStartServerId(),
2014 spaceRequired,
2015 XDMF_DSM_EXCHANGE_TAG,
2016 XDMF_DSM_INTER_COMM);
2017
2018 // Send back new page allocation pointer
2019
2020 int newPageCount = 0;
2021
2022 this->ReceiveAcknowledgment(this->GetStartServerId(),
2023 newPageCount,
2024 XDMF_DSM_EXCHANGE_TAG,
2025 XDMF_DSM_INTER_COMM);
2026
2027 numPages = newPageCount;
2028
2029 unsigned int * pagelist = new unsigned int[numPages]();
2030 pages.clear();
2031
2032 this->ReceiveData(this->GetStartServerId(),
2033 (char *) pagelist,
2034 numPages * sizeof(unsigned int),
2035 XDMF_DSM_EXCHANGE_TAG,
2036 0,
2037 XDMF_DSM_INTER_COMM);
2038
2039 for (unsigned int i = 0; i < numPages; ++i)
2040 {
2041 pages.push_back(pagelist[i]);
2042 }
2043
2044 // Recieve the new start and end addresses
2045 this->ReceiveData(this->GetStartServerId(),
2046 (char*)&start,
2047 sizeof(haddr_t),
2048 XDMF_DSM_EXCHANGE_TAG,
2049 0,
2050 XDMF_DSM_INTER_COMM);
2051
2052 this->ReceiveData(this->GetStartServerId(),
2053 (char*)&end,
2054 sizeof(haddr_t),
2055 XDMF_DSM_EXCHANGE_TAG,
2056 0,
2057 XDMF_DSM_INTER_COMM);
2058
2059 // If resized, set up the reset the total length.
2060 int currentLength = 0;
2061 this->ReceiveAcknowledgment(this->GetStartServerId(),
2062 currentLength,
2063 XDMF_DSM_EXCHANGE_TAG,
2064 XDMF_DSM_INTER_COMM);
2065
2066 this->UpdateLength(currentLength);
2067 }
2068
2069 void
SendAccept(unsigned int numConnections)2070 XdmfDSMBuffer::SendAccept(unsigned int numConnections)
2071 {
2072 #ifndef XDMF_DSM_IS_CRAY
2073 for (int i = this->StartServerId; i <= this->EndServerId; ++i) {
2074 if (i != this->Comm->GetInterId()){
2075 this->SendCommandHeader(XDMF_DSM_ACCEPT, i, 0, 0, XDMF_DSM_INTER_COMM);
2076 this->SendAcknowledgment(i, numConnections, XDMF_DSM_EXCHANGE_TAG, XDMF_DSM_INTER_COMM);
2077 }
2078 }
2079 this->Comm->Accept(numConnections);
2080 this->SendInfo();
2081 #endif
2082 }
2083
2084 void
SendAcknowledgment(int dest,int data,int tag,int comm)2085 XdmfDSMBuffer::SendAcknowledgment(int dest, int data, int tag, int comm)
2086 {
2087 int status;
2088 this->Comm->Send(&data,
2089 sizeof(int),
2090 dest,
2091 comm,
2092 tag);
2093 status = MPI_SUCCESS;
2094 if (status != MPI_SUCCESS) {
2095 try {
2096 XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
2097 }
2098 catch (XdmfError & e) {
2099 throw e;
2100 }
2101 }
2102 }
2103
2104 void
SendCommandHeader(int opcode,int dest,int address,int aLength,int comm)2105 XdmfDSMBuffer::SendCommandHeader(int opcode, int dest, int address, int aLength, int comm)
2106 {
2107 int status;
2108 CommandMsg cmd;
2109 memset(&cmd, 0, sizeof(CommandMsg));
2110 cmd.Opcode = opcode;
2111 if (comm == XDMF_DSM_INTRA_COMM) {
2112 cmd.Source = this->Comm->GetId();
2113 }
2114 else if (comm == XDMF_DSM_INTER_COMM) {
2115 cmd.Source = this->Comm->GetInterId();
2116 }
2117 cmd.Target = dest;
2118 cmd.Address = address;
2119 cmd.Length = aLength;
2120
2121 this->Comm->Send(&cmd,
2122 sizeof(CommandMsg),
2123 dest,
2124 comm,
2125 XDMF_DSM_COMMAND_TAG);
2126 status = MPI_SUCCESS;
2127 if (status != MPI_SUCCESS) {
2128 try {
2129 XdmfError::message(XdmfError::FATAL, "Error: Failed to send command header");
2130 }
2131 catch (XdmfError & e) {
2132 throw e;
2133 }
2134 }
2135 }
2136
2137 void
SendData(int dest,char * data,int aLength,int tag,int aAddress,int comm)2138 XdmfDSMBuffer::SendData(int dest, char * data, int aLength, int tag, int aAddress, int comm)
2139 {
2140 int status;
2141
2142 this->Comm->Send(data,
2143 aLength,
2144 dest,
2145 comm,
2146 tag);
2147 status = MPI_SUCCESS;
2148 if (status != MPI_SUCCESS) {
2149 try {
2150 XdmfError::message(XdmfError::FATAL, "Error: Failed to send data");
2151 }
2152 catch (XdmfError & e) {
2153 throw e;
2154 }
2155 }
2156 }
2157
2158 void
SendDone()2159 XdmfDSMBuffer::SendDone()
2160 {
2161 try {
2162 if (static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm() == MPI_COMM_NULL)
2163 {
2164 for (int i = this->StartServerId; i <= this->EndServerId; ++i) {
2165 if (i != this->Comm->GetId()){
2166 this->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTRA_COMM);
2167 }
2168 }
2169 }
2170 else
2171 {
2172 for (int i = this->StartServerId; i <= this->EndServerId; ++i) {
2173 if (i != this->Comm->GetId()){
2174 this->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTER_COMM);
2175 }
2176 }
2177 }
2178 }
2179 catch (XdmfError & e) {
2180 throw e;
2181 }
2182 }
2183
2184 void
SendInfo()2185 XdmfDSMBuffer::SendInfo()
2186 {
2187 InfoMsg dsmInfo;
2188 int status;
2189
2190 memset(&dsmInfo, 0, sizeof(InfoMsg));
2191 dsmInfo.type = this->GetDsmType();
2192 dsmInfo.length = this->GetLength();
2193 dsmInfo.total_length = this->GetTotalLength();
2194 dsmInfo.block_length = this->GetBlockLength();
2195 dsmInfo.start_server_id = this->GetStartServerId();
2196 dsmInfo.end_server_id = this->GetEndServerId();
2197
2198 int infoStatus = 3;
2199 if (this->Comm->GetId() == 0) {
2200 infoStatus = 2;
2201 }
2202
2203 int * groupInfoStatus = new int[this->Comm->GetInterSize()]();
2204
2205 this->Comm->AllGather(&infoStatus,
2206 sizeof(int),
2207 &(groupInfoStatus[0]),
2208 sizeof(int),
2209 XDMF_DSM_INTER_COMM);
2210
2211 int sendCore = 0;
2212
2213 for (int i = 0; i < this->Comm->GetInterSize(); ++i) {
2214 if (groupInfoStatus[i] == 2) {
2215 sendCore = i;
2216 }
2217 }
2218
2219 status = MPI_SUCCESS;
2220
2221 this->Comm->Broadcast(&dsmInfo,
2222 sizeof(InfoMsg),
2223 sendCore,
2224 XDMF_DSM_INTER_COMM);
2225 if (status != MPI_SUCCESS) {
2226 try {
2227 XdmfError::message(XdmfError::FATAL, "Error: Failed to send info");
2228 }
2229 catch (XdmfError & e) {
2230 throw e;
2231 }
2232 }
2233
2234 MPI_Comm comm = this->Comm->GetInterComm();
2235
2236 // Cray needs to be launched via the colon notation so that it
2237 // can properly create a merged communicator
2238
2239 int rank = this->Comm->GetInterId();
2240 int size = this->Comm->GetInterSize();
2241
2242 int currentCore = 0;
2243 int * checkstatus = new int[size]();
2244 int localCheck = 0;
2245 std::string applicationName = this->Comm->GetApplicationName();
2246
2247 char * coreTag;
2248 int tagSize = 0;
2249
2250 std::vector<int> coreSplit;
2251 unsigned int splitid = 0;
2252
2253 std::vector<std::pair<std::string, unsigned int> > newStructure;
2254
2255 int * splitIds;
2256 unsigned int splitsize = 0;
2257
2258 while (currentCore < size)
2259 {
2260 if (rank == currentCore)
2261 {
2262 tagSize = applicationName.size();
2263 }
2264 MPI_Bcast(&tagSize, 1, MPI_INT, currentCore, comm);
2265 coreTag = new char[tagSize+1]();
2266
2267 if (rank == currentCore)
2268 {
2269 strcpy(coreTag, applicationName.c_str());
2270 }
2271 MPI_Bcast(coreTag, tagSize, MPI_CHAR, currentCore, comm);
2272 coreTag[tagSize] = 0;
2273
2274 if (strcmp(coreTag, applicationName.c_str()) == 0)
2275 {
2276 localCheck = 1;
2277 }
2278 else
2279 {
2280 localCheck = 0;
2281 }
2282
2283 checkstatus[rank] = localCheck;
2284
2285 MPI_Allgather(&localCheck, 1, MPI_INT,
2286 checkstatus, 1, MPI_INT,
2287 comm);
2288
2289 bool insplit = false;
2290 while (checkstatus[currentCore])
2291 {
2292 if (rank == currentCore)
2293 {
2294 insplit = true;
2295 }
2296 coreSplit.push_back(currentCore);
2297 ++currentCore;
2298 if (currentCore >= size)
2299 {
2300 break;
2301 }
2302 }
2303 if (insplit)
2304 {
2305 splitIds = (int *)calloc(coreSplit.size(), sizeof(int));
2306 memcpy(splitIds, &(coreSplit[0]), coreSplit.size() * sizeof(int));
2307 splitsize = coreSplit.size();
2308 }
2309 newStructure.push_back(std::pair<std::string, unsigned int>(std::string(coreTag), coreSplit.size()));
2310 coreSplit.clear();
2311 ++splitid;
2312 }
2313 this->Comm->SetDsmProcessStructure(newStructure);
2314 }
2315
2316 void
SetBlockLength(long newBlock)2317 XdmfDSMBuffer::SetBlockLength(long newBlock)
2318 {
2319 this->BlockLength = newBlock;
2320 }
2321
2322 void
SetComm(XdmfDSMCommMPI * newComm)2323 XdmfDSMBuffer::SetComm(XdmfDSMCommMPI * newComm)
2324 {
2325 this->Comm = newComm;
2326 }
2327
2328 void
SetDsmType(int newDsmType)2329 XdmfDSMBuffer::SetDsmType(int newDsmType)
2330 {
2331 this->DsmType = newDsmType;
2332 }
2333
2334 void
SetInterCommType(int newType)2335 XdmfDSMBuffer::SetInterCommType(int newType)
2336 {
2337 this->InterCommType = newType;
2338 }
2339
2340 void
SetIsConnected(bool newStatus)2341 XdmfDSMBuffer::SetIsConnected(bool newStatus)
2342 {
2343 IsConnected = newStatus;
2344 }
2345
2346 void
SetIsServer(bool newIsServer)2347 XdmfDSMBuffer::SetIsServer(bool newIsServer)
2348 {
2349 this->IsServer = newIsServer;
2350 }
2351
2352 void
SetResizeFactor(double newFactor)2353 XdmfDSMBuffer::SetResizeFactor(double newFactor)
2354 {
2355 if (newFactor >= 0)
2356 {
2357 this->ResizeFactor = newFactor;
2358 }
2359 else
2360 {
2361 this->ResizeFactor = newFactor * -1;
2362 }
2363 }
2364
2365 void
SetLength(long aLength)2366 XdmfDSMBuffer::SetLength(long aLength)
2367 {
2368 this->Length = aLength;
2369 if (this->DataPointer) {
2370 // Try to reallocate
2371 // This should not be called in most cases
2372 this->DataPointer =
2373 static_cast<char *>(realloc(this->DataPointer, this->Length*sizeof(char)));
2374 }
2375 else {
2376 #ifdef _WIN32
2377 this->DataPointer = calloc(this->Length, sizeof(char));
2378 #else
2379 #ifdef _SC_PAGESIZE
2380 posix_memalign((void **)(&this->DataPointer), sysconf(_SC_PAGESIZE), this->Length);
2381 memset(this->DataPointer, 0, this->Length);
2382 #else
2383 // Older linux variation, for backwards compatibility
2384 posix_memalign((void **)(&this->DataPointer), getpagesize(), this->Length);
2385 memset(this->DataPointer, 0, this->Length);
2386 #endif
2387 #endif
2388 }
2389
2390 if (this->DataPointer == NULL) {
2391 std::stringstream message;
2392 message << "Allocation Failed, unable to allocate " << this->Length;
2393 XdmfError::message(XdmfError::FATAL, message.str());
2394 }
2395
2396 if (this->BlockLength > 0)
2397 {
2398 this->NumPages = this->Length / this->BlockLength;
2399 }
2400 }
2401
2402 void
SetLocalBufferSizeMBytes(unsigned int newSize)2403 XdmfDSMBuffer::SetLocalBufferSizeMBytes(unsigned int newSize)
2404 {
2405 this->LocalBufferSizeMBytes = newSize;
2406 }
2407
2408 void
Unlock(char * filename)2409 XdmfDSMBuffer::Unlock(char * filename)
2410 {
2411 int strlength = std::string(filename).size();
2412 this->SendCommandHeader(XDMF_DSM_UNLOCK_FILE,
2413 this->GetStartServerId(),
2414 0,
2415 0,
2416 XDMF_DSM_INTER_COMM);
2417
2418 this->SendAcknowledgment(this->GetStartServerId(),
2419 strlength,
2420 XDMF_DSM_EXCHANGE_TAG,
2421 XDMF_DSM_INTER_COMM);
2422
2423 this->SendData(this->GetStartServerId(),
2424 filename,
2425 strlength,
2426 XDMF_DSM_EXCHANGE_TAG,
2427 0,
2428 XDMF_DSM_INTER_COMM);
2429 }
2430
2431 void
UpdateLength(unsigned int newLength)2432 XdmfDSMBuffer::UpdateLength(unsigned int newLength)
2433 {
2434 this->Length = newLength;
2435 this->TotalLength = this->Length * (this->EndServerId - this->StartServerId);
2436 }
2437
2438 void
WaitRelease(std::string filename,std::string datasetname,int code)2439 XdmfDSMBuffer::WaitRelease(std::string filename, std::string datasetname, int code)
2440 {
2441 // Send Command Header
2442 this->SendCommandHeader(XDMF_DSM_CLEAR_NOTIFY,
2443 this->GetStartServerId(),
2444 0,
2445 0,
2446 XDMF_DSM_INTER_COMM);
2447 // Send string size
2448 this->SendAcknowledgment(this->GetStartServerId(),
2449 filename.size() + datasetname.size(),
2450 XDMF_DSM_EXCHANGE_TAG,
2451 XDMF_DSM_INTER_COMM);
2452 // Send string
2453 char * sendPointer = new char[filename.size() + datasetname.size()]();
2454 unsigned int placementIndex = 0;
2455 for (unsigned int i = 0; i < filename.size(); ++i)
2456 {
2457 sendPointer[placementIndex++] = filename[i];
2458 }
2459 for (unsigned int i = 0; i < datasetname.size(); ++i)
2460 {
2461 sendPointer[placementIndex++] = datasetname[i];
2462 }
2463 this->SendData(this->GetStartServerId(),
2464 sendPointer,
2465 filename.size() + datasetname.size(),
2466 XDMF_DSM_EXCHANGE_TAG,
2467 0,
2468 XDMF_DSM_INTER_COMM);
2469 delete sendPointer;
2470 // Send Release Code
2471 this->SendAcknowledgment(this->GetStartServerId(),
2472 code,
2473 XDMF_DSM_EXCHANGE_TAG,
2474 XDMF_DSM_INTER_COMM);
2475 }
2476
2477 int
WaitOn(std::string filename,std::string datasetname)2478 XdmfDSMBuffer::WaitOn(std::string filename, std::string datasetname)
2479 {
2480 // Send Command Header
2481 this->SendCommandHeader(XDMF_DSM_SET_NOTIFY,
2482 this->GetStartServerId(),
2483 0,
2484 0,
2485 XDMF_DSM_INTER_COMM);
2486 // Send string size
2487 this->SendAcknowledgment(this->GetStartServerId(),
2488 filename.size() + datasetname.size(),
2489 XDMF_DSM_EXCHANGE_TAG,
2490 XDMF_DSM_INTER_COMM);
2491 // Send string
2492 char * sendPointer = new char[filename.size() + datasetname.size()]();
2493 unsigned int placementIndex = 0;
2494 for (unsigned int i = 0; i < filename.size(); ++i)
2495 {
2496 sendPointer[placementIndex++] = filename[i];
2497 }
2498 for (unsigned int i = 0; i < datasetname.size(); ++i)
2499 {
2500 sendPointer[placementIndex++] = datasetname[i];
2501 }
2502 this->SendData(this->GetStartServerId(),
2503 sendPointer,
2504 filename.size() + datasetname.size(),
2505 XDMF_DSM_EXCHANGE_TAG,
2506 0,
2507 XDMF_DSM_INTER_COMM);
2508
2509 // Wait for Release
2510 int code = 0;
2511 this->ReceiveAcknowledgment(MPI_ANY_SOURCE,
2512 code,
2513 XDMF_DSM_EXCHANGE_TAG,
2514 this->CommChannel);
2515 delete sendPointer;
2516 // Return Code from Notification
2517 return code;
2518 }
2519
2520 // C Wrappers
2521
XdmfDSMBufferNew()2522 XDMFDSMBUFFER * XdmfDSMBufferNew()
2523 {
2524 try
2525 {
2526 return (XDMFDSMBUFFER *)((void *)(new XdmfDSMBuffer()));
2527 }
2528 catch (...)
2529 {
2530 return (XDMFDSMBUFFER *)((void *)(new XdmfDSMBuffer()));
2531 }
2532 }
2533
XdmfDSMBufferFree(XDMFDSMBUFFER * item)2534 void XdmfDSMBufferFree(XDMFDSMBUFFER * item)
2535 {
2536 if (item != NULL) {
2537 delete ((XdmfDSMBuffer *)item);
2538 }
2539 item = NULL;
2540 }
2541
XdmfDSMBufferBroadcastComm(XDMFDSMBUFFER * buffer,int * comm,int root,int * status)2542 void XdmfDSMBufferBroadcastComm(XDMFDSMBUFFER * buffer, int *comm, int root, int * status)
2543 {
2544 XDMF_ERROR_WRAP_START(status)
2545 ((XdmfDSMBuffer *)buffer)->BroadcastComm(comm, root);
2546 XDMF_ERROR_WRAP_END(status)
2547 }
2548
XdmfDSMBufferBufferService(XDMFDSMBUFFER * buffer,int * returnOpcode,int * status)2549 int XdmfDSMBufferBufferService(XDMFDSMBUFFER * buffer, int *returnOpcode, int * status)
2550 {
2551 XDMF_ERROR_WRAP_START(status)
2552 return ((XdmfDSMBuffer *)buffer)->BufferService(returnOpcode);
2553 XDMF_ERROR_WRAP_END(status)
2554 return -1;
2555 }
2556
XdmfDSMBufferBufferServiceLoop(XDMFDSMBUFFER * buffer,int * returnOpcode,int * status)2557 void XdmfDSMBufferBufferServiceLoop(XDMFDSMBUFFER * buffer, int *returnOpcode, int * status)
2558 {
2559 XDMF_ERROR_WRAP_START(status)
2560 ((XdmfDSMBuffer *)buffer)->BufferServiceLoop(returnOpcode);
2561 XDMF_ERROR_WRAP_END(status)
2562 }
2563
XdmfDSMBufferConfigureUniform(XDMFDSMBUFFER * buffer,XDMFDSMCOMMMPI * Comm,long Length,int StartId,int EndId,long aBlockLength,int random,int * status)2564 void XdmfDSMBufferConfigureUniform(XDMFDSMBUFFER * buffer,
2565 XDMFDSMCOMMMPI * Comm,
2566 long Length,
2567 int StartId,
2568 int EndId,
2569 long aBlockLength,
2570 int random,
2571 int * status)
2572 {
2573 XDMF_ERROR_WRAP_START(status)
2574 ((XdmfDSMBuffer *)buffer)->ConfigureUniform((XdmfDSMCommMPI *)Comm, Length, StartId, EndId, aBlockLength, random);
2575 XDMF_ERROR_WRAP_END(status)
2576 }
2577
XdmfDSMBufferConnect(XDMFDSMBUFFER * buffer,int persist,int * status)2578 void XdmfDSMBufferConnect(XDMFDSMBUFFER * buffer, int persist, int * status)
2579 {
2580 XDMF_ERROR_WRAP_START(status)
2581 ((XdmfDSMBuffer *)buffer)->Connect(persist);
2582 XDMF_ERROR_WRAP_END(status)
2583 }
2584
XdmfDSMBufferCreate(XDMFDSMBUFFER * buffer,MPI_Comm comm,int startId,int endId,int * status)2585 void XdmfDSMBufferCreate(XDMFDSMBUFFER * buffer, MPI_Comm comm, int startId, int endId, int * status)
2586 {
2587 XDMF_ERROR_WRAP_START(status)
2588 ((XdmfDSMBuffer *)buffer)->Create(comm, startId, endId);
2589 XDMF_ERROR_WRAP_END(status)
2590 }
2591
XdmfDSMBufferDisconnect(XDMFDSMBUFFER * buffer,int * status)2592 void XdmfDSMBufferDisconnect(XDMFDSMBUFFER * buffer, int * status)
2593 {
2594 XDMF_ERROR_WRAP_START(status)
2595 ((XdmfDSMBuffer *)buffer)->Disconnect();
2596 XDMF_ERROR_WRAP_END(status)
2597 }
2598
XdmfDSMBufferGet(XDMFDSMBUFFER * buffer,long Address,long aLength,void * Data,int * status)2599 void XdmfDSMBufferGet(XDMFDSMBUFFER * buffer, long Address, long aLength, void * Data, int * status)
2600 {
2601 XDMF_ERROR_WRAP_START(status)
2602 ((XdmfDSMBuffer *)buffer)->Get(Address, aLength, Data);
2603 XDMF_ERROR_WRAP_END(status)
2604 }
2605
XdmfDSMBufferGetAddressRangeForId(XDMFDSMBUFFER * buffer,int Id,int * Start,int * End,int * status)2606 void XdmfDSMBufferGetAddressRangeForId(XDMFDSMBUFFER * buffer, int Id, int * Start, int * End, int * status)
2607 {
2608 XDMF_ERROR_WRAP_START(status)
2609 ((XdmfDSMBuffer *)buffer)->GetAddressRangeForId(Id, Start, End);
2610 XDMF_ERROR_WRAP_END(status)
2611 }
2612
XdmfDSMBufferGetBlockLength(XDMFDSMBUFFER * buffer)2613 long XdmfDSMBufferGetBlockLength(XDMFDSMBUFFER * buffer)
2614 {
2615 try
2616 {
2617 return ((XdmfDSMBuffer *)buffer)->GetBlockLength();
2618 }
2619 catch (...)
2620 {
2621 return ((XdmfDSMBuffer *)buffer)->GetBlockLength();
2622 }
2623 }
2624
XdmfDSMBufferGetComm(XDMFDSMBUFFER * buffer)2625 XDMFDSMCOMMMPI * XdmfDSMBufferGetComm(XDMFDSMBUFFER * buffer)
2626 {
2627 try
2628 {
2629 return (XDMFDSMCOMMMPI *)((void *)(((XdmfDSMBuffer *)buffer)->GetComm()));
2630 }
2631 catch (...)
2632 {
2633 return (XDMFDSMCOMMMPI *)((void *)(((XdmfDSMBuffer *)buffer)->GetComm()));
2634 }
2635 }
2636
XdmfDSMBufferGetDataPointer(XDMFDSMBUFFER * buffer)2637 char * XdmfDSMBufferGetDataPointer(XDMFDSMBUFFER * buffer)
2638 {
2639 try
2640 {
2641 return ((XdmfDSMBuffer *)buffer)->GetDataPointer();
2642 }
2643 catch (...)
2644 {
2645 return ((XdmfDSMBuffer *)buffer)->GetDataPointer();
2646 }
2647 }
2648
XdmfDSMBufferGetDsmType(XDMFDSMBUFFER * buffer)2649 int XdmfDSMBufferGetDsmType(XDMFDSMBUFFER * buffer)
2650 {
2651 try
2652 {
2653 return ((XdmfDSMBuffer *)buffer)->GetDsmType();
2654 }
2655 catch (...)
2656 {
2657 return ((XdmfDSMBuffer *)buffer)->GetDsmType();
2658 }
2659 }
2660
XdmfDSMBufferGetEndAddress(XDMFDSMBUFFER * buffer)2661 int XdmfDSMBufferGetEndAddress(XDMFDSMBUFFER * buffer)
2662 {
2663 try
2664 {
2665 return ((XdmfDSMBuffer *)buffer)->GetEndAddress();
2666 }
2667 catch (...)
2668 {
2669 return ((XdmfDSMBuffer *)buffer)->GetEndAddress();
2670 }
2671 }
2672
XdmfDSMBufferGetEndServerId(XDMFDSMBUFFER * buffer)2673 int XdmfDSMBufferGetEndServerId(XDMFDSMBUFFER * buffer)
2674 {
2675 try
2676 {
2677 return ((XdmfDSMBuffer *)buffer)->GetEndServerId();
2678 }
2679 catch (...)
2680 {
2681 return ((XdmfDSMBuffer *)buffer)->GetEndServerId();
2682 }
2683 }
2684
XdmfDSMBufferGetInterCommType(XDMFDSMBUFFER * buffer)2685 int XdmfDSMBufferGetInterCommType(XDMFDSMBUFFER * buffer)
2686 {
2687 try
2688 {
2689 return ((XdmfDSMBuffer *)buffer)->GetInterCommType();
2690 }
2691 catch (...)
2692 {
2693 return ((XdmfDSMBuffer *)buffer)->GetInterCommType();
2694 }
2695 }
2696
XdmfDSMBufferGetIsConnected(XDMFDSMBUFFER * buffer)2697 int XdmfDSMBufferGetIsConnected(XDMFDSMBUFFER * buffer)
2698 {
2699 try
2700 {
2701 return ((XdmfDSMBuffer *)buffer)->GetIsConnected();
2702 }
2703 catch (...)
2704 {
2705 return ((XdmfDSMBuffer *)buffer)->GetIsConnected();
2706 }
2707 }
2708
XdmfDSMBufferGetIsServer(XDMFDSMBUFFER * buffer)2709 int XdmfDSMBufferGetIsServer(XDMFDSMBUFFER * buffer)
2710 {
2711 try
2712 {
2713 return ((XdmfDSMBuffer *)buffer)->GetIsServer();
2714 }
2715 catch (...)
2716 {
2717 return ((XdmfDSMBuffer *)buffer)->GetIsServer();
2718 }
2719 }
2720
XdmfDSMBufferGetLength(XDMFDSMBUFFER * buffer)2721 long XdmfDSMBufferGetLength(XDMFDSMBUFFER * buffer)
2722 {
2723 try
2724 {
2725 return ((XdmfDSMBuffer *)buffer)->GetLength();
2726 }
2727 catch (...)
2728 {
2729 return ((XdmfDSMBuffer *)buffer)->GetLength();
2730 }
2731 }
2732
XdmfDSMBufferGetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer)2733 unsigned int XdmfDSMBufferGetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer)
2734 {
2735 try
2736 {
2737 return ((XdmfDSMBuffer *)buffer)->GetLocalBufferSizeMBytes();
2738 }
2739 catch (...)
2740 {
2741 return ((XdmfDSMBuffer *)buffer)->GetLocalBufferSizeMBytes();
2742 }
2743 }
2744
XdmfDSMBufferGetResizeFactor(XDMFDSMBUFFER * buffer)2745 double XdmfDSMBufferGetResizeFactor(XDMFDSMBUFFER * buffer)
2746 {
2747 try
2748 {
2749 return ((XdmfDSMBuffer *)buffer)->GetResizeFactor();
2750 }
2751 catch (...)
2752 {
2753 return ((XdmfDSMBuffer *)buffer)->GetResizeFactor();
2754 }
2755 }
2756
XdmfDSMBufferGetStartAddress(XDMFDSMBUFFER * buffer)2757 int XdmfDSMBufferGetStartAddress(XDMFDSMBUFFER * buffer)
2758 {
2759 try
2760 {
2761 return ((XdmfDSMBuffer *)buffer)->GetStartAddress();
2762 }
2763 catch (...)
2764 {
2765 return ((XdmfDSMBuffer *)buffer)->GetStartAddress();
2766 }
2767 }
2768
XdmfDSMBufferGetStartServerId(XDMFDSMBUFFER * buffer)2769 int XdmfDSMBufferGetStartServerId(XDMFDSMBUFFER * buffer)
2770 {
2771 try
2772 {
2773 return ((XdmfDSMBuffer *)buffer)->GetStartServerId();
2774 }
2775 catch (...)
2776 {
2777 return ((XdmfDSMBuffer *)buffer)->GetStartServerId();
2778 }
2779 }
2780
XdmfDSMBufferGetTotalLength(XDMFDSMBUFFER * buffer)2781 long XdmfDSMBufferGetTotalLength(XDMFDSMBUFFER * buffer)
2782 {
2783 try
2784 {
2785 return ((XdmfDSMBuffer *)buffer)->GetTotalLength();
2786 }
2787 catch (...)
2788 {
2789 return ((XdmfDSMBuffer *)buffer)->GetTotalLength();
2790 }
2791 }
2792
XdmfDSMBufferProbeCommandHeader(XDMFDSMBUFFER * buffer,int * comm,int * status)2793 void XdmfDSMBufferProbeCommandHeader(XDMFDSMBUFFER * buffer, int * comm, int * status)
2794 {
2795 XDMF_ERROR_WRAP_START(status)
2796 ((XdmfDSMBuffer *)buffer)->ProbeCommandHeader(comm);
2797 XDMF_ERROR_WRAP_END(status)
2798 }
2799
XdmfDSMBufferPut(XDMFDSMBUFFER * buffer,long Address,long aLength,void * Data,int * status)2800 void XdmfDSMBufferPut(XDMFDSMBUFFER * buffer, long Address, long aLength, void * Data, int * status)
2801 {
2802 XDMF_ERROR_WRAP_START(status)
2803 ((XdmfDSMBuffer *)buffer)->Put(Address, aLength, Data);
2804 XDMF_ERROR_WRAP_END(status)
2805 }
2806
XdmfDSMBufferReceiveAcknowledgment(XDMFDSMBUFFER * buffer,int source,int * data,int tag,int comm,int * status)2807 void XdmfDSMBufferReceiveAcknowledgment(XDMFDSMBUFFER * buffer,
2808 int source,
2809 int * data,
2810 int tag,
2811 int comm,
2812 int * status)
2813 {
2814 XDMF_ERROR_WRAP_START(status)
2815 ((XdmfDSMBuffer *)buffer)->ReceiveAcknowledgment(source, *data, tag, comm);
2816 XDMF_ERROR_WRAP_END(status)
2817 }
2818
XdmfDSMBufferReceiveCommandHeader(XDMFDSMBUFFER * buffer,int * opcode,int * source,int * address,int * aLength,int comm,int remoteSource,int * status)2819 void XdmfDSMBufferReceiveCommandHeader(XDMFDSMBUFFER * buffer,
2820 int * opcode,
2821 int * source,
2822 int * address,
2823 int * aLength,
2824 int comm,
2825 int remoteSource,
2826 int * status)
2827 {
2828 XDMF_ERROR_WRAP_START(status)
2829 ((XdmfDSMBuffer *)buffer)->ReceiveCommandHeader(opcode, source, address, aLength, comm, remoteSource);
2830 XDMF_ERROR_WRAP_END(status)
2831 }
2832
XdmfDSMBufferReceiveData(XDMFDSMBUFFER * buffer,int source,char * data,int aLength,int tag,int aAddress,int comm,int * status)2833 void XdmfDSMBufferReceiveData(XDMFDSMBUFFER * buffer,
2834 int source,
2835 char * data,
2836 int aLength,
2837 int tag,
2838 int aAddress,
2839 int comm,
2840 int * status)
2841 {
2842 XDMF_ERROR_WRAP_START(status)
2843 ((XdmfDSMBuffer *)buffer)->ReceiveData(source, data, aLength, tag, aAddress, comm);
2844 XDMF_ERROR_WRAP_END(status)
2845 }
2846
XdmfDSMBufferReceiveInfo(XDMFDSMBUFFER * buffer,int * status)2847 void XdmfDSMBufferReceiveInfo(XDMFDSMBUFFER * buffer, int * status)
2848 {
2849 XDMF_ERROR_WRAP_START(status)
2850 ((XdmfDSMBuffer *)buffer)->ReceiveInfo();
2851 XDMF_ERROR_WRAP_END(status)
2852 }
2853
XdmfDSMBufferSendAccept(XDMFDSMBUFFER * buffer,unsigned int numConnects)2854 void XdmfDSMBufferSendAccept(XDMFDSMBUFFER * buffer, unsigned int numConnects)
2855 {
2856 ((XdmfDSMBuffer *)buffer)->SendAccept(numConnects);
2857 }
2858
XdmfDSMBufferSendAcknowledgment(XDMFDSMBUFFER * buffer,int dest,int data,int tag,int comm,int * status)2859 void XdmfDSMBufferSendAcknowledgment(XDMFDSMBUFFER * buffer,
2860 int dest,
2861 int data,
2862 int tag,
2863 int comm,
2864 int * status)
2865 {
2866 XDMF_ERROR_WRAP_START(status)
2867 ((XdmfDSMBuffer *)buffer)->SendAcknowledgment(dest, data, tag, comm);
2868 XDMF_ERROR_WRAP_END(status)
2869 }
2870
XdmfDSMBufferSendCommandHeader(XDMFDSMBUFFER * buffer,int opcode,int dest,int address,int aLength,int comm,int * status)2871 void XdmfDSMBufferSendCommandHeader(XDMFDSMBUFFER * buffer,
2872 int opcode,
2873 int dest,
2874 int address,
2875 int aLength,
2876 int comm,
2877 int * status)
2878 {
2879 XDMF_ERROR_WRAP_START(status)
2880 ((XdmfDSMBuffer *)buffer)->SendCommandHeader(opcode, dest, address, aLength, comm);
2881 XDMF_ERROR_WRAP_END(status)
2882 }
2883
XdmfDSMBufferSendData(XDMFDSMBUFFER * buffer,int dest,char * data,int aLength,int tag,int aAddress,int comm,int * status)2884 void XdmfDSMBufferSendData(XDMFDSMBUFFER * buffer,
2885 int dest,
2886 char * data,
2887 int aLength,
2888 int tag,
2889 int aAddress,
2890 int comm,
2891 int * status)
2892 {
2893 XDMF_ERROR_WRAP_START(status)
2894 ((XdmfDSMBuffer *)buffer)->SendData(dest, data, aLength, tag, aAddress, comm);
2895 XDMF_ERROR_WRAP_END(status)
2896 }
2897
XdmfDSMBufferSendDone(XDMFDSMBUFFER * buffer,int * status)2898 void XdmfDSMBufferSendDone(XDMFDSMBUFFER * buffer, int * status)
2899 {
2900 XDMF_ERROR_WRAP_START(status)
2901 ((XdmfDSMBuffer *)buffer)->SendDone();
2902 XDMF_ERROR_WRAP_END(status)
2903 }
2904
XdmfDSMBufferSendInfo(XDMFDSMBUFFER * buffer,int * status)2905 void XdmfDSMBufferSendInfo(XDMFDSMBUFFER * buffer, int * status)
2906 {
2907 XDMF_ERROR_WRAP_START(status)
2908 ((XdmfDSMBuffer *)buffer)->SendInfo();
2909 XDMF_ERROR_WRAP_END(status)
2910 }
2911
XdmfDSMBufferSetBlockLength(XDMFDSMBUFFER * buffer,long newBlock)2912 void XdmfDSMBufferSetBlockLength(XDMFDSMBUFFER * buffer, long newBlock)
2913 {
2914 try
2915 {
2916 ((XdmfDSMBuffer *)buffer)->SetBlockLength(newBlock);
2917 }
2918 catch (...)
2919 {
2920 ((XdmfDSMBuffer *)buffer)->SetBlockLength(newBlock);
2921 }
2922 }
2923
XdmfDSMBufferSetComm(XDMFDSMBUFFER * buffer,XDMFDSMCOMMMPI * newComm)2924 void XdmfDSMBufferSetComm(XDMFDSMBUFFER * buffer, XDMFDSMCOMMMPI * newComm)
2925 {
2926 try
2927 {
2928 ((XdmfDSMBuffer *)buffer)->SetComm((XdmfDSMCommMPI *)newComm);
2929 }
2930 catch (...)
2931 {
2932 ((XdmfDSMBuffer *)buffer)->SetComm((XdmfDSMCommMPI *)newComm);
2933 }
2934 }
2935
XdmfDSMBufferSetDsmType(XDMFDSMBUFFER * buffer,int newDsmType)2936 void XdmfDSMBufferSetDsmType(XDMFDSMBUFFER * buffer, int newDsmType)
2937 {
2938 try
2939 {
2940 ((XdmfDSMBuffer *)buffer)->SetDsmType(newDsmType);
2941 }
2942 catch (...)
2943 {
2944 ((XdmfDSMBuffer *)buffer)->SetDsmType(newDsmType);
2945 }
2946 }
2947
XdmfDSMBufferSetInterCommType(XDMFDSMBUFFER * buffer,int newType)2948 void XdmfDSMBufferSetInterCommType(XDMFDSMBUFFER * buffer, int newType)
2949 {
2950 try
2951 {
2952 ((XdmfDSMBuffer *)buffer)->SetInterCommType(newType);
2953 }
2954 catch (...)
2955 {
2956 ((XdmfDSMBuffer *)buffer)->SetInterCommType(newType);
2957 }
2958 }
2959
XdmfDSMBufferSetIsConnected(XDMFDSMBUFFER * buffer,int newStatus)2960 void XdmfDSMBufferSetIsConnected(XDMFDSMBUFFER * buffer, int newStatus)
2961 {
2962 try
2963 {
2964 ((XdmfDSMBuffer *)buffer)->SetIsConnected(newStatus);
2965 }
2966 catch (...)
2967 {
2968 ((XdmfDSMBuffer *)buffer)->SetIsConnected(newStatus);
2969 }
2970 }
2971
XdmfDSMBufferSetIsServer(XDMFDSMBUFFER * buffer,int newIsServer)2972 void XdmfDSMBufferSetIsServer(XDMFDSMBUFFER * buffer, int newIsServer)
2973 {
2974 try
2975 {
2976 ((XdmfDSMBuffer *)buffer)->SetIsServer(newIsServer);
2977 }
2978 catch (...)
2979 {
2980 ((XdmfDSMBuffer *)buffer)->SetIsServer(newIsServer);
2981 }
2982 }
2983
XdmfDSMBufferSetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer,unsigned int newSize)2984 void XdmfDSMBufferSetLocalBufferSizeMBytes(XDMFDSMBUFFER * buffer, unsigned int newSize)
2985 {
2986 try
2987 {
2988 ((XdmfDSMBuffer *)buffer)->SetLocalBufferSizeMBytes(newSize);
2989 }
2990 catch (...)
2991 {
2992 ((XdmfDSMBuffer *)buffer)->SetLocalBufferSizeMBytes(newSize);
2993 }
2994 }
2995
XdmfDSMBufferSetResizeFactor(XDMFDSMBUFFER * buffer,double newFactor)2996 void XdmfDSMBufferSetResizeFactor(XDMFDSMBUFFER * buffer, double newFactor)
2997 {
2998 try
2999 {
3000 ((XdmfDSMBuffer *)buffer)->SetResizeFactor(newFactor);
3001 }
3002 catch (...)
3003 {
3004 ((XdmfDSMBuffer *)buffer)->SetResizeFactor(newFactor);
3005 }
3006 }
3007
XdmfDSMBufferWaitRelease(XDMFDSMBUFFER * buffer,char * filename,char * datasetname,int code)3008 void XdmfDSMBufferWaitRelease(XDMFDSMBUFFER * buffer, char * filename, char * datasetname, int code)
3009 {
3010 try
3011 {
3012 ((XdmfDSMBuffer *)buffer)->WaitRelease(std::string(filename), std::string(datasetname), code);
3013 }
3014 catch (...)
3015 {
3016 ((XdmfDSMBuffer *)buffer)->WaitRelease(std::string(filename), std::string(datasetname), code);
3017 }
3018 }
3019
XdmfDSMBufferWaitOn(XDMFDSMBUFFER * buffer,char * filename,char * datasetname)3020 int XdmfDSMBufferWaitOn(XDMFDSMBUFFER * buffer, char * filename, char * datasetname)
3021 {
3022 try
3023 {
3024 return ((XdmfDSMBuffer *)buffer)->WaitOn(std::string(filename), std::string(datasetname));
3025 }
3026 catch (...)
3027 {
3028 return ((XdmfDSMBuffer *)buffer)->WaitOn(std::string(filename), std::string(datasetname));
3029 }
3030 }
3031