1 /*
2 Copyright (c) 2009-2014, Jack Poulson
3 Copyright (c) 2011, The University of Texas at Austin
4 All rights reserved.
5
6 Authors:
7 This interface is mainly due to Martin Schatz, but it was put into its
8 current form by Jack Poulson.
9
10 This file is part of Elemental and is under the BSD 2-Clause License,
11 which can be found in the LICENSE file in the root directory, or at
12 http://opensource.org/licenses/BSD-2-Clause
13 */
14 #pragma once
15 #ifndef ELEM_AXPYINTERFACE_IMPL_HPP
16 #define ELEM_AXPYINTERFACE_IMPL_HPP
17
18 namespace elem {
19
20 template<typename T>
21 inline bool
Finished()22 AxpyInterface<T>::Finished()
23 {
24 DEBUG_ONLY(
25 CallStackEntry cse("AxpyInterface::Finished");
26 if( !attachedForLocalToGlobal_ && !attachedForGlobalToLocal_ )
27 LogicError("Not attached");
28 )
29 const Grid& g = ( attachedForLocalToGlobal_ ?
30 localToGlobalMat_->Grid() :
31 globalToLocalMat_->Grid() );
32 const Int p = g.Size();
33
34 bool finished = true;
35 for( Int rank=0; rank<p; ++rank )
36 {
37 if( !sentEomTo_[rank] || !haveEomFrom_[rank] )
38 {
39 finished = false;
40 break;
41 }
42 }
43 return finished;
44 }
45
46 template<typename T>
47 inline void
HandleEoms()48 AxpyInterface<T>::HandleEoms()
49 {
50 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::HandleEoms"))
51 const Grid& g = ( attachedForLocalToGlobal_ ?
52 localToGlobalMat_->Grid() :
53 globalToLocalMat_->Grid() );
54 const Int p = g.Size();
55
56 UpdateRequestStatuses();
57
58 // Try to progress our EOM sends
59 for( Int i=0; i<p; ++i )
60 {
61 if( !sentEomTo_[i] )
62 {
63 bool shouldSendEom = true;
64 const Int numSends = sendingData_[i].size();
65 for( Int j=0; j<numSends; ++j )
66 {
67 if( sendingData_[i][j] )
68 {
69 shouldSendEom = false;
70 break;
71 }
72 }
73 const Int numRequests = sendingRequest_[i].size();
74 for( Int j=0; j<numRequests; ++j )
75 {
76 if( !shouldSendEom || sendingRequest_[i][j] )
77 {
78 shouldSendEom = false;
79 break;
80 }
81 }
82 const Int numReplies = sendingReply_[i].size();
83 for( Int j=0; j<numReplies; ++j )
84 {
85 if( !shouldSendEom || sendingReply_[i][j] )
86 {
87 shouldSendEom = false;
88 break;
89 }
90 }
91 if( shouldSendEom )
92 {
93 mpi::Request& request = eomSendRequests_[i];
94 mpi::TaggedISSend
95 ( &sendDummy_, 1, i, EOM_TAG, g.VCComm(), request );
96 sentEomTo_[i] = true;
97 }
98 }
99 }
100
101 mpi::Status status;
102 if( mpi::IProbe( mpi::ANY_SOURCE, EOM_TAG, g.VCComm(), status ) )
103 {
104 const Int source = status.MPI_SOURCE;
105 mpi::TaggedRecv( &recvDummy_, 1, source, EOM_TAG, g.VCComm() );
106 haveEomFrom_[source] = true;
107 }
108 }
109
110 template<typename T>
111 inline void
HandleLocalToGlobalData()112 AxpyInterface<T>::HandleLocalToGlobalData()
113 {
114 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::HandleLocalToGlobalData"))
115 DistMatrix<T,MC,MR>& Y = *localToGlobalMat_;
116 const Grid& g = Y.Grid();
117 const Int r = g.Height();
118 const Int c = g.Width();
119 const Int myRow = g.Row();
120 const Int myCol = g.Col();
121
122 mpi::Status status;
123 if( mpi::IProbe( mpi::ANY_SOURCE, DATA_TAG, g.VCComm(), status ) )
124 {
125 // Message exists, so recv and pack
126 const Int count = mpi::GetCount<byte>( status );
127 DEBUG_ONLY(
128 if( count < Int(4*sizeof(Int)+sizeof(T)) )
129 LogicError("Count was too small");
130 )
131 const Int source = status.MPI_SOURCE;
132 recvVector_.resize( count );
133 byte* recvBuffer = recvVector_.data();
134 mpi::TaggedRecv( recvBuffer, count, source, DATA_TAG, g.VCComm() );
135
136 // Extract the header
137 byte* head = recvBuffer;
138 const Int i = *reinterpret_cast<const Int*>(head);
139 head += sizeof(Int);
140 const Int j = *reinterpret_cast<const Int*>(head);
141 head += sizeof(Int);
142 const Int height = *reinterpret_cast<const Int*>(head);
143 head += sizeof(Int);
144 const Int width = *reinterpret_cast<const Int*>(head);
145 head += sizeof(Int);
146 const T alpha = *reinterpret_cast<const T*>(head);
147 head += sizeof(T);
148 DEBUG_ONLY(
149 if( height < 0 || width < 0 )
150 RuntimeError
151 ("Unpacked heights were negative:\n",
152 " i= ",i,std::hex,"(",i,")\n",std::dec,
153 " j= ",j,std::hex,"(",j,")\n",std::dec,
154 " height=",height,std::hex,"(",height,")\n",std::dec,
155 " width= ",width,std::hex,"(",width,")\n",std::dec,
156 " alpha= ",alpha);
157 if( i < 0 || j < 0 )
158 RuntimeError
159 ("Unpacked offsets were negative:\n",
160 " i= ",i,std::hex,"(",i,")\n",std::dec,
161 " j= ",j,std::hex,"(",j,")\n",std::dec,
162 " height=",height,std::hex,"(",height,")\n",std::dec,
163 " width= ",width,std::hex,"(",width,")\n",std::dec,
164 " alpha= ",alpha);
165 if( i+height > Y.Height() || j+width > Y.Width() )
166 RuntimeError
167 ("Unpacked submatrix was out of bounds:\n",
168 " i= ",i,std::hex,"(",i,")\n",std::dec,
169 " j= ",j,std::hex,"(",j,")\n",std::dec,
170 " height=",height,std::hex,"(",height,")\n",std::dec,
171 " width= ",width,std::hex,"(",width,")\n",std::dec,
172 " alpha= ",alpha);
173 )
174
175 // Update Y
176 const T* XBuffer = reinterpret_cast<const T*>(head);
177 const Int colAlign = (Y.ColAlign()+i) % r;
178 const Int rowAlign = (Y.RowAlign()+j) % c;
179 const Int colShift = Shift( myRow, colAlign, r );
180 const Int rowShift = Shift( myCol, rowAlign, c );
181
182 const Int localHeight = Length( height, colShift, r );
183 const Int localWidth = Length( width, rowShift, c );
184 const Int iLocalOffset = Length( i, Y.ColShift(), r );
185 const Int jLocalOffset = Length( j, Y.RowShift(), c );
186
187 for( Int t=0; t<localWidth; ++t )
188 {
189 T* YCol = Y.Buffer(iLocalOffset,jLocalOffset+t);
190 const T* XCol = &XBuffer[t*localHeight];
191 for( Int s=0; s<localHeight; ++s )
192 YCol[s] += alpha*XCol[s];
193 }
194
195 // Free the memory for the recv buffer
196 recvVector_.clear();
197 }
198 }
199
200 template<typename T>
201 inline void
HandleGlobalToLocalRequest()202 AxpyInterface<T>::HandleGlobalToLocalRequest()
203 {
204 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::HandleGlobalToLocalRequest"))
205 const DistMatrix<T,MC,MR>& X = *globalToLocalMat_;
206 const Grid& g = X.Grid();
207 const Int r = g.Height();
208 const Int c = g.Width();
209 const Int myRow = g.Row();
210 const Int myCol = g.Col();
211
212 mpi::Status status;
213 if( mpi::IProbe( mpi::ANY_SOURCE, DATA_REQUEST_TAG, g.VCComm(), status ) )
214 {
215 // Request exists, so recv
216 const Int source = status.MPI_SOURCE;
217 const Int recvSize = 4*sizeof(Int);
218 recvVector_.resize( recvSize );
219 byte* recvBuffer = recvVector_.data();
220 mpi::TaggedRecv
221 ( recvBuffer, recvSize, source, DATA_REQUEST_TAG, g.VCComm() );
222
223 // Extract the header
224 const byte* recvHead = recvBuffer;
225 const Int i = *reinterpret_cast<const Int*>(recvHead);
226 recvHead += sizeof(Int);
227 const Int j = *reinterpret_cast<const Int*>(recvHead);
228 recvHead += sizeof(Int);
229 const Int height = *reinterpret_cast<const Int*>(recvHead);
230 recvHead += sizeof(Int);
231 const Int width = *reinterpret_cast<const Int*>(recvHead);
232 recvHead += sizeof(Int);
233
234 const Int colAlign = (X.ColAlign()+i) % r;
235 const Int rowAlign = (X.RowAlign()+j) % c;
236 const Int colShift = Shift( myRow, colAlign, r );
237 const Int rowShift = Shift( myCol, rowAlign, c );
238
239 const Int iLocalOffset = Length( i, X.ColShift(), r );
240 const Int jLocalOffset = Length( j, X.RowShift(), c );
241 const Int localHeight = Length( height, colShift, r );
242 const Int localWidth = Length( width, rowShift, c );
243 const Int numEntries = localHeight*localWidth;
244
245 const Int bufferSize = 2*sizeof(Int) + numEntries*sizeof(T);
246 const Int index =
247 ReadyForSend
248 ( bufferSize, replyVectors_[source],
249 replySendRequests_[source], sendingReply_[source] );
250
251 // Pack the reply header
252 byte* sendBuffer = replyVectors_[source][index].data();
253 byte* sendHead = sendBuffer;
254 *reinterpret_cast<Int*>(sendHead) = myRow; sendHead += sizeof(Int);
255 *reinterpret_cast<Int*>(sendHead) = myCol; sendHead += sizeof(Int);
256
257 // Pack the payload
258 T* sendData = reinterpret_cast<T*>(sendHead);
259 for( Int t=0; t<localWidth; ++t )
260 {
261 T* sendCol = &sendData[t*localHeight];
262 const T* XCol = X.LockedBuffer(iLocalOffset,jLocalOffset+t);
263 MemCopy( sendCol, XCol, localHeight );
264 }
265
266 // Fire off non-blocking send
267 mpi::TaggedISSend
268 ( sendBuffer, bufferSize, source, DATA_REPLY_TAG, g.VCComm(),
269 replySendRequests_[source][index] );
270 }
271 }
272
273 template<typename T>
274 inline
AxpyInterface()275 AxpyInterface<T>::AxpyInterface()
276 : attachedForLocalToGlobal_(false), attachedForGlobalToLocal_(false),
277 localToGlobalMat_(0), globalToLocalMat_(0),
278 sendDummy_(0), recvDummy_(0)
279 { }
280
281 template<typename T>
282 inline
AxpyInterface(AxpyType type,DistMatrix<T,MC,MR> & Z)283 AxpyInterface<T>::AxpyInterface( AxpyType type, DistMatrix<T,MC,MR>& Z )
284 : sendDummy_(0), recvDummy_(0)
285 {
286 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::AxpyInterface"))
287 if( type == LOCAL_TO_GLOBAL )
288 {
289 attachedForLocalToGlobal_ = true;
290 attachedForGlobalToLocal_ = false;
291 localToGlobalMat_ = &Z;
292 globalToLocalMat_ = 0;
293 }
294 else
295 {
296 attachedForLocalToGlobal_ = false;
297 attachedForGlobalToLocal_ = true;
298 localToGlobalMat_ = 0;
299 globalToLocalMat_ = &Z;
300 }
301
302 const Int p = Z.Grid().Size();
303 sentEomTo_.resize( p, false );
304 haveEomFrom_.resize( p, false );
305
306 sendingData_.resize( p );
307 sendingRequest_.resize( p );
308 sendingReply_.resize( p );
309
310 dataVectors_.resize( p );
311 requestVectors_.resize( p );
312 replyVectors_.resize( p );
313
314 dataSendRequests_.resize( p );
315 requestSendRequests_.resize( p );
316 replySendRequests_.resize( p );
317
318 eomSendRequests_.resize( p );
319 }
320
321 template<typename T>
322 inline
AxpyInterface(AxpyType type,const DistMatrix<T,MC,MR> & X)323 AxpyInterface<T>::AxpyInterface
324 ( AxpyType type, const DistMatrix<T,MC,MR>& X )
325 : sendDummy_(0), recvDummy_(0)
326 {
327 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::AxpyInterface"))
328 if( type == LOCAL_TO_GLOBAL )
329 {
330 LogicError("Cannot update a constant matrix");
331 }
332 else
333 {
334 attachedForLocalToGlobal_ = false;
335 attachedForGlobalToLocal_ = true;
336 localToGlobalMat_ = 0;
337 globalToLocalMat_ = &X;
338 }
339
340 const Int p = X.Grid().Size();
341 sentEomTo_.resize( p, false );
342 haveEomFrom_.resize( p, false );
343
344 sendingData_.resize( p );
345 sendingRequest_.resize( p );
346 sendingReply_.resize( p );
347
348 dataVectors_.resize( p );
349 requestVectors_.resize( p );
350 replyVectors_.resize( p );
351
352 dataSendRequests_.resize( p );
353 requestSendRequests_.resize( p );
354 replySendRequests_.resize( p );
355
356 eomSendRequests_.resize( p );
357 }
358
359 template<typename T>
360 inline
~AxpyInterface()361 AxpyInterface<T>::~AxpyInterface()
362 {
363 if( attachedForLocalToGlobal_ || attachedForGlobalToLocal_ )
364 {
365 if( std::uncaught_exception() )
366 {
367 const Grid& g = ( attachedForLocalToGlobal_ ?
368 localToGlobalMat_->Grid() :
369 globalToLocalMat_->Grid() );
370 std::ostringstream os;
371 os << g.Rank()
372 << "Uncaught exception detected during AxpyInterface destructor "
373 "that required a call to Detach. Instead of allowing for the "
374 "possibility of Detach throwing another exception and "
375 "resulting in a 'terminate', we instead immediately dump the "
376 "call stack (if not in RELEASE mode) since the program will "
377 "likely hang:" << std::endl;
378 std::cerr << os.str();
379 DEBUG_ONLY(DumpCallStack())
380 }
381 else
382 {
383 Detach();
384 }
385 }
386 }
387
388 template<typename T>
389 inline void
Attach(AxpyType type,DistMatrix<T,MC,MR> & Z)390 AxpyInterface<T>::Attach( AxpyType type, DistMatrix<T,MC,MR>& Z )
391 {
392 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::Attach"))
393 if( attachedForLocalToGlobal_ || attachedForGlobalToLocal_ )
394 LogicError("Must detach before reattaching.");
395
396 if( type == LOCAL_TO_GLOBAL )
397 {
398 attachedForLocalToGlobal_ = true;
399 localToGlobalMat_ = &Z;
400 }
401 else
402 {
403 attachedForGlobalToLocal_ = true;
404 globalToLocalMat_ = &Z;
405 }
406
407 const Int p = Z.Grid().Size();
408 sentEomTo_.resize( p, false );
409 haveEomFrom_.resize( p, false );
410
411 sendingData_.resize( p );
412 sendingRequest_.resize( p );
413 sendingReply_.resize( p );
414
415 dataVectors_.resize( p );
416 requestVectors_.resize( p );
417 replyVectors_.resize( p );
418
419 dataSendRequests_.resize( p );
420 requestSendRequests_.resize( p );
421 replySendRequests_.resize( p );
422
423 eomSendRequests_.resize( p );
424 }
425
426 template<typename T>
427 inline void
Attach(AxpyType type,const DistMatrix<T,MC,MR> & X)428 AxpyInterface<T>::Attach( AxpyType type, const DistMatrix<T,MC,MR>& X )
429 {
430 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::Attach"))
431 if( attachedForLocalToGlobal_ || attachedForGlobalToLocal_ )
432 LogicError("Must detach before reattaching.");
433
434 if( type == LOCAL_TO_GLOBAL )
435 {
436 LogicError("Cannot update a constant matrix");
437 }
438 else
439 {
440 attachedForGlobalToLocal_ = true;
441 globalToLocalMat_ = &X;
442 }
443
444 const Int p = X.Grid().Size();
445 sentEomTo_.resize( p, false );
446 haveEomFrom_.resize( p, false );
447
448 sendingData_.resize( p );
449 sendingRequest_.resize( p );
450 sendingReply_.resize( p );
451
452 dataVectors_.resize( p );
453 requestVectors_.resize( p );
454 replyVectors_.resize( p );
455
456 dataSendRequests_.resize( p );
457 requestSendRequests_.resize( p );
458 replySendRequests_.resize( p );
459
460 eomSendRequests_.resize( p );
461 }
462
463 template<typename T>
464 inline void
Axpy(T alpha,Matrix<T> & Z,Int i,Int j)465 AxpyInterface<T>::Axpy( T alpha, Matrix<T>& Z, Int i, Int j )
466 {
467 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::Axpy"))
468 if( attachedForLocalToGlobal_ )
469 AxpyLocalToGlobal( alpha, Z, i, j );
470 else if( attachedForGlobalToLocal_ )
471 AxpyGlobalToLocal( alpha, Z, i, j );
472 else
473 LogicError("Cannot axpy before attaching.");
474 }
475
476 template<typename T>
477 inline void
Axpy(T alpha,const Matrix<T> & Z,Int i,Int j)478 AxpyInterface<T>::Axpy( T alpha, const Matrix<T>& Z, Int i, Int j )
479 {
480 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::Axpy"))
481 if( attachedForLocalToGlobal_ )
482 AxpyLocalToGlobal( alpha, Z, i, j );
483 else if( attachedForGlobalToLocal_ )
484 LogicError("Cannot update a constant matrix.");
485 else
486 LogicError("Cannot axpy before attaching.");
487 }
488
489 // Update Y(i:i+height-1,j:j+width-1) += alpha X, where X is height x width
490 template<typename T>
491 inline void
AxpyLocalToGlobal(T alpha,const Matrix<T> & X,Int i,Int j)492 AxpyInterface<T>::AxpyLocalToGlobal
493 ( T alpha, const Matrix<T>& X, Int i, Int j )
494 {
495 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::AxpyLocalToGlobal"))
496 DistMatrix<T,MC,MR>& Y = *localToGlobalMat_;
497 if( i < 0 || j < 0 )
498 LogicError("Submatrix offsets must be non-negative");
499 if( i+X.Height() > Y.Height() || j+X.Width() > Y.Width() )
500 LogicError("Submatrix out of bounds of global matrix");
501
502 const Grid& g = Y.Grid();
503 const Int r = g.Height();
504 const Int c = g.Width();
505 const Int p = g.Size();
506 const Int myProcessRow = g.Row();
507 const Int myProcessCol = g.Col();
508 const Int colAlign = (Y.ColAlign() + i) % r;
509 const Int rowAlign = (Y.RowAlign() + j) % c;
510
511 const Int height = X.Height();
512 const Int width = X.Width();
513
514 Int receivingRow = myProcessRow;
515 Int receivingCol = myProcessCol;
516 for( Int step=0; step<p; ++step )
517 {
518 const Int colShift = Shift( receivingRow, colAlign, r );
519 const Int rowShift = Shift( receivingCol, rowAlign, c );
520 const Int localHeight = Length( height, colShift, r );
521 const Int localWidth = Length( width, rowShift, c );
522 const Int numEntries = localHeight*localWidth;
523
524 if( numEntries != 0 )
525 {
526 const Int destination = receivingRow + r*receivingCol;
527 const Int bufferSize = 4*sizeof(Int) + (numEntries+1)*sizeof(T);
528
529 const Int index =
530 ReadyForSend
531 ( bufferSize, dataVectors_[destination],
532 dataSendRequests_[destination], sendingData_[destination] );
533 DEBUG_ONLY(
534 if( Int(dataVectors_[destination][index].size()) != bufferSize )
535 LogicError("Error in ReadyForSend");
536 )
537
538 // Pack the header
539 byte* sendBuffer = dataVectors_[destination][index].data();
540 byte* head = sendBuffer;
541 *reinterpret_cast<Int*>(head) = i; head += sizeof(Int);
542 *reinterpret_cast<Int*>(head) = j; head += sizeof(Int);
543 *reinterpret_cast<Int*>(head) = height; head += sizeof(Int);
544 *reinterpret_cast<Int*>(head) = width; head += sizeof(Int);
545 *reinterpret_cast<T*>(head) = alpha; head += sizeof(T);
546
547 // Pack the payload
548 T* sendData = reinterpret_cast<T*>(head);
549 const T* XBuffer = X.LockedBuffer();
550 const Int XLDim = X.LDim();
551 for( Int t=0; t<localWidth; ++t )
552 {
553 T* thisSendCol = &sendData[t*localHeight];
554 const T* thisXCol = &XBuffer[(rowShift+t*c)*XLDim];
555 for( Int s=0; s<localHeight; ++s )
556 thisSendCol[s] = thisXCol[colShift+s*r];
557 }
558
559 // Fire off the non-blocking send
560 mpi::TaggedISSend
561 ( sendBuffer, bufferSize, destination, DATA_TAG, g.VCComm(),
562 dataSendRequests_[destination][index] );
563 }
564
565 receivingRow = (receivingRow + 1) % r;
566 if( receivingRow == 0 )
567 receivingCol = (receivingCol + 1) % c;
568 }
569 }
570
571 // Update Y += alpha X(i:i+height-1,j:j+width-1), where X is the dist-matrix
572 template<typename T>
573 inline void
AxpyGlobalToLocal(T alpha,Matrix<T> & Y,Int i,Int j)574 AxpyInterface<T>::AxpyGlobalToLocal
575 ( T alpha, Matrix<T>& Y, Int i, Int j )
576 {
577 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::AxpyGlobalToLocal"))
578 const DistMatrix<T,MC,MR>& X = *globalToLocalMat_;
579
580 const Int height = Y.Height();
581 const Int width = Y.Width();
582 if( i+height > X.Height() || j+width > X.Width() )
583 LogicError("Invalid AxpyGlobalToLocal submatrix");
584
585 const Grid& g = X.Grid();
586 const Int r = g.Height();
587 const Int c = g.Width();
588 const Int p = g.Size();
589
590 // Send out the requests to all processes in the grid
591 for( Int rank=0; rank<p; ++rank )
592 {
593 const Int bufferSize = 4*sizeof(Int);
594 const Int index =
595 ReadyForSend
596 ( bufferSize, requestVectors_[rank],
597 requestSendRequests_[rank], sendingRequest_[rank] );
598
599 // Copy the request header into the send buffer
600 byte* sendBuffer = requestVectors_[rank][index].data();
601 byte* head = sendBuffer;
602 *reinterpret_cast<Int*>(head) = i; head += sizeof(Int);
603 *reinterpret_cast<Int*>(head) = j; head += sizeof(Int);
604 *reinterpret_cast<Int*>(head) = height; head += sizeof(Int);
605 *reinterpret_cast<Int*>(head) = width; head += sizeof(Int);
606
607 // Begin the non-blocking send
608 mpi::TaggedISSend
609 ( sendBuffer, bufferSize, rank, DATA_REQUEST_TAG, g.VCComm(),
610 requestSendRequests_[rank][index] );
611 }
612
613 // Receive all of the replies
614 Int numReplies = 0;
615 while( numReplies < p )
616 {
617 HandleGlobalToLocalRequest();
618
619 mpi::Status status;
620 if( mpi::IProbe( mpi::ANY_SOURCE, DATA_REPLY_TAG, g.VCComm(), status ) )
621 {
622 const Int source = status.MPI_SOURCE;
623
624 // Ensure that we have a recv buffer
625 const Int count = mpi::GetCount<byte>( status );
626 recvVector_.resize( count );
627 byte* recvBuffer = recvVector_.data();
628
629 // Receive the data
630 mpi::TaggedRecv
631 ( recvBuffer, count, source, DATA_REPLY_TAG, g.VCComm() );
632
633 // Unpack the reply header
634 const byte* head = recvBuffer;
635 const Int row = *reinterpret_cast<const Int*>(head);
636 head += sizeof(Int);
637 const Int col = *reinterpret_cast<const Int*>(head);
638 head += sizeof(Int);
639 const T* recvData = reinterpret_cast<const T*>(head);
640
641 // Compute the local heights and offsets
642 const Int colAlign = (X.ColAlign()+i) % r;
643 const Int rowAlign = (X.RowAlign()+j) % c;
644 const Int colShift = Shift( row, colAlign, r );
645 const Int rowShift = Shift( col, rowAlign, c );
646 const Int localHeight = Length( height, colShift, r );
647 const Int localWidth = Length( width, rowShift, c );
648
649 // Unpack the local matrix
650 for( Int t=0; t<localWidth; ++t )
651 {
652 T* YCol = Y.Buffer(0,rowShift+t*c);
653 const T* XCol = &recvData[t*localHeight];
654 for( Int s=0; s<localHeight; ++s )
655 YCol[colShift+s*r] += alpha*XCol[s];
656 }
657
658 ++numReplies;
659 }
660 }
661 }
662
663 template<typename T>
664 inline Int
ReadyForSend(Int sendSize,std::deque<std::vector<byte>> & sendVectors,std::deque<mpi::Request> & requests,std::deque<bool> & requestStatuses)665 AxpyInterface<T>::ReadyForSend
666 ( Int sendSize,
667 std::deque<std::vector<byte>>& sendVectors,
668 std::deque<mpi::Request>& requests,
669 std::deque<bool>& requestStatuses )
670 {
671 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::ReadyForSend"))
672 const Int numCreated = sendVectors.size();
673 DEBUG_ONLY(
674 if( numCreated != Int(requests.size()) ||
675 numCreated != Int(requestStatuses.size()) )
676 LogicError("size mismatch");
677 )
678 for( Int i=0; i<numCreated; ++i )
679 {
680 // If this request is still running, test to see if it finished.
681 if( requestStatuses[i] )
682 {
683 const bool finished = mpi::Test( requests[i] );
684 requestStatuses[i] = !finished;
685 }
686
687 if( !requestStatuses[i] )
688 {
689 requestStatuses[i] = true;
690 sendVectors[i].resize( sendSize );
691 return i;
692 }
693 }
694
695 sendVectors.resize( numCreated+1 );
696 sendVectors[numCreated].resize( sendSize );
697 requests.push_back( mpi::REQUEST_NULL );
698 requestStatuses.push_back( true );
699
700 return numCreated;
701 }
702
703 template<typename T>
704 inline void
UpdateRequestStatuses()705 AxpyInterface<T>::UpdateRequestStatuses()
706 {
707 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::UpdateRequestStatuses"))
708 const Grid& g = ( attachedForLocalToGlobal_ ?
709 localToGlobalMat_->Grid() :
710 globalToLocalMat_->Grid() );
711 const Int p = g.Size();
712
713 for( Int i=0; i<p; ++i )
714 {
715 const Int numDataSendRequests = dataSendRequests_[i].size();
716 for( Int j=0; j<numDataSendRequests; ++j )
717 if( sendingData_[i][j] )
718 sendingData_[i][j] =
719 !mpi::Test( dataSendRequests_[i][j] );
720 const Int numRequestSendRequests = requestSendRequests_[i].size();
721 for( Int j=0; j<numRequestSendRequests; ++j )
722 if( sendingRequest_[i][j] )
723 sendingRequest_[i][j] =
724 !mpi::Test( requestSendRequests_[i][j] );
725 const Int numReplySendRequests = replySendRequests_[i].size();
726 for( Int j=0; j<numReplySendRequests; ++j )
727 if( sendingReply_[i][j] )
728 sendingReply_[i][j] =
729 !mpi::Test( replySendRequests_[i][j] );
730 }
731 }
732
733 template<typename T>
734 inline void
Detach()735 AxpyInterface<T>::Detach()
736 {
737 DEBUG_ONLY(CallStackEntry cse("AxpyInterface::Detach"))
738 if( !attachedForLocalToGlobal_ && !attachedForGlobalToLocal_ )
739 LogicError("Must attach before detaching.");
740
741 const Grid& g = ( attachedForLocalToGlobal_ ?
742 localToGlobalMat_->Grid() :
743 globalToLocalMat_->Grid() );
744
745 while( !Finished() )
746 {
747 if( attachedForLocalToGlobal_ )
748 HandleLocalToGlobalData();
749 else
750 HandleGlobalToLocalRequest();
751 HandleEoms();
752 }
753
754 mpi::Barrier( g.VCComm() );
755
756 attachedForLocalToGlobal_ = false;
757 attachedForGlobalToLocal_ = false;
758 recvVector_.clear();
759 sentEomTo_.clear();
760 haveEomFrom_.clear();
761
762 sendingData_.clear();
763 sendingRequest_.clear();
764 sendingReply_.clear();
765
766 dataVectors_.clear();
767 requestVectors_.clear();
768 replyVectors_.clear();
769
770 dataSendRequests_.clear();
771 requestSendRequests_.clear();
772 replySendRequests_.clear();
773
774 eomSendRequests_.clear();
775 }
776
777 } // namespace elem
778
779 #endif // ifndef ELEM_AXPYINTERFACE_IMPL_HPP
780