1 // Copyright (C) 2006 Douglas Gregor <doug.gregor -at- gmail.com>.
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 /** @file nonblocking.hpp
8  *
9  *  This header defines operations for completing non-blocking
10  *  communication requests.
11  */
12 #ifndef BOOST_MPI_NONBLOCKING_HPP
13 #define BOOST_MPI_NONBLOCKING_HPP
14 
15 #include <boost/mpi/config.hpp>
16 #include <vector>
17 #include <iterator> // for std::iterator_traits
18 #include <boost/optional.hpp>
19 #include <utility> // for std::pair
20 #include <algorithm> // for iter_swap, reverse
21 #include <boost/static_assert.hpp>
22 #include <boost/mpi/request.hpp>
23 #include <boost/mpi/status.hpp>
24 #include <boost/mpi/exception.hpp>
25 
26 namespace boost { namespace mpi {
27 
28 /**
29  *  @brief Wait until any non-blocking request has completed.
30  *
31  *  This routine takes in a set of requests stored in the iterator
32  *  range @c [first,last) and waits until any of these requests has
33  *  been completed. It provides functionality equivalent to
34  *  @c MPI_Waitany.
35  *
36  *  @param first The iterator that denotes the beginning of the
37  *  sequence of request objects.
38  *
39  *  @param last The iterator that denotes the end of the sequence of
40  *  request objects. This may not be equal to @c first.
41  *
42  *  @returns A pair containing the status object that corresponds to
43  *  the completed operation and the iterator referencing the completed
44  *  request.
45  */
46 template<typename ForwardIterator>
47 std::pair<status, ForwardIterator>
wait_any(ForwardIterator first,ForwardIterator last)48 wait_any(ForwardIterator first, ForwardIterator last)
49 {
50   using std::advance;
51 
52   BOOST_ASSERT(first != last);
53 
54   typedef typename std::iterator_traits<ForwardIterator>::difference_type
55     difference_type;
56 
57   bool all_trivial_requests = true;
58   difference_type n = 0;
59   ForwardIterator current = first;
60   while (true) {
61     // Check if we have found a completed request. If so, return it.
62     if (current->active()) {
63       optional<status> result = current->test();
64       if (bool(result)) {
65         return std::make_pair(*result, current);
66       }
67     }
68 
69     // Check if this request (and all others before it) are "trivial"
70     // requests, e.g., they can be represented with a single
71     // MPI_Request.
72     // We could probably ignore non trivial request that are inactive,
73     // but we can assume that a mix of trivial and non trivial requests
74     // is unlikely enough not to care.
75     all_trivial_requests = all_trivial_requests && current->trivial();
76 
77     // Move to the next request.
78     ++n;
79     if (++current == last) {
80       // We have reached the end of the list. If all requests thus far
81       // have been trivial, we can call MPI_Waitany directly, because
82       // it may be more efficient than our busy-wait semantics.
83       if (all_trivial_requests) {
84         std::vector<MPI_Request> requests;
85         requests.reserve(n);
86         for (current = first; current != last; ++current) {
87           requests.push_back(*current->trivial());
88         }
89 
90         // Let MPI wait until one of these operations completes.
91         int index;
92         status stat;
93         BOOST_MPI_CHECK_RESULT(MPI_Waitany,
94                                (n, &requests[0], &index, &stat.m_status));
95 
96         // We don't have a notion of empty requests or status objects,
97         // so this is an error.
98         if (index == MPI_UNDEFINED)
99           boost::throw_exception(exception("MPI_Waitany", MPI_ERR_REQUEST));
100 
101         // Find the iterator corresponding to the completed request.
102         current = first;
103         advance(current, index);
104         *current->trivial() = requests[index];
105         return std::make_pair(stat, current);
106       }
107 
108       // There are some nontrivial requests, so we must continue our
109       // busy waiting loop.
110       n = 0;
111       current = first;
112       all_trivial_requests = true;
113     }
114   }
115 
116   // We cannot ever get here
117   BOOST_ASSERT(false);
118 }
119 
120 /**
121  *  @brief Test whether any non-blocking request has completed.
122  *
123  *  This routine takes in a set of requests stored in the iterator
124  *  range @c [first,last) and tests whether any of these requests has
125  *  been completed. This routine is similar to @c wait_any, but will
126  *  not block waiting for requests to completed. It provides
127  *  functionality equivalent to @c MPI_Testany.
128  *
129  *  @param first The iterator that denotes the beginning of the
130  *  sequence of request objects.
131  *
132  *  @param last The iterator that denotes the end of the sequence of
133  *  request objects.
134  *
135  *  @returns If any outstanding requests have completed, a pair
136  *  containing the status object that corresponds to the completed
137  *  operation and the iterator referencing the completed
138  *  request. Otherwise, an empty @c optional<>.
139  */
140 template<typename ForwardIterator>
141 optional<std::pair<status, ForwardIterator> >
test_any(ForwardIterator first,ForwardIterator last)142 test_any(ForwardIterator first, ForwardIterator last)
143 {
144   while (first != last) {
145     // Check if we have found a completed request. If so, return it.
146     if (optional<status> result = first->test()) {
147       return std::make_pair(*result, first);
148     }
149     ++first;
150   }
151 
152   // We found nothing
153   return optional<std::pair<status, ForwardIterator> >();
154 }
155 
156 /**
157  *  @brief Wait until all non-blocking requests have completed.
158  *
159  *  This routine takes in a set of requests stored in the iterator
160  *  range @c [first,last) and waits until all of these requests have
161  *  been completed. It provides functionality equivalent to
162  *  @c MPI_Waitall.
163  *
164  *  @param first The iterator that denotes the beginning of the
165  *  sequence of request objects.
166  *
167  *  @param last The iterator that denotes the end of the sequence of
168  *  request objects.
169  *
170  *  @param out If provided, an output iterator through which the
171  *  status of each request will be emitted. The @c status objects are
172  *  emitted in the same order as the requests are retrieved from
173  *  @c [first,last).
174  *
175  *  @returns If an @p out parameter was provided, the value @c out
176  *  after all of the @c status objects have been emitted.
177  */
178 template<typename ForwardIterator, typename OutputIterator>
179 OutputIterator
wait_all(ForwardIterator first,ForwardIterator last,OutputIterator out)180 wait_all(ForwardIterator first, ForwardIterator last, OutputIterator out)
181 {
182   typedef typename std::iterator_traits<ForwardIterator>::difference_type
183     difference_type;
184 
185   using std::distance;
186 
187   difference_type num_outstanding_requests = distance(first, last);
188 
189   std::vector<status> results(num_outstanding_requests);
190   std::vector<bool> completed(num_outstanding_requests);
191 
192   while (num_outstanding_requests > 0) {
193     bool all_trivial_requests = true;
194     difference_type idx = 0;
195     for (ForwardIterator current = first; current != last; ++current, ++idx) {
196       if (!completed[idx]) {
197         if (optional<status> stat = current->test()) {
198           // This outstanding request has been completed. We're done.
199           results[idx] = *stat;
200           completed[idx] = true;
201           --num_outstanding_requests;
202           all_trivial_requests = false;
203         } else {
204           // Check if this request (and all others before it) are "trivial"
205           // requests, e.g., they can be represented with a single
206           // MPI_Request.
207           all_trivial_requests = all_trivial_requests && current->trivial();
208         }
209       }
210     }
211 
212     // If we have yet to fulfill any requests and all of the requests
213     // are trivial (i.e., require only a single MPI_Request to be
214     // fulfilled), call MPI_Waitall directly.
215     if (all_trivial_requests
216         && num_outstanding_requests == (difference_type)results.size()) {
217       std::vector<MPI_Request> requests;
218       requests.reserve(num_outstanding_requests);
219       for (ForwardIterator current = first; current != last; ++current)
220         requests.push_back(*current->trivial());
221 
222       // Let MPI wait until all of these operations completes.
223       std::vector<MPI_Status> stats(num_outstanding_requests);
224       BOOST_MPI_CHECK_RESULT(MPI_Waitall,
225                              (num_outstanding_requests, &requests[0],
226                               &stats[0]));
227 
228       for (std::vector<MPI_Status>::iterator i = stats.begin();
229            i != stats.end(); ++i, ++out) {
230         status stat;
231         stat.m_status = *i;
232         *out = stat;
233       }
234 
235       return out;
236     }
237 
238     all_trivial_requests = false;
239   }
240 
241   return std::copy(results.begin(), results.end(), out);
242 }
243 
244 /**
245  * \overload
246  */
247 template<typename ForwardIterator>
248 void
wait_all(ForwardIterator first,ForwardIterator last)249 wait_all(ForwardIterator first, ForwardIterator last)
250 {
251   typedef typename std::iterator_traits<ForwardIterator>::difference_type
252     difference_type;
253 
254   using std::distance;
255 
256   difference_type num_outstanding_requests = distance(first, last);
257 
258   std::vector<bool> completed(num_outstanding_requests, false);
259 
260   while (num_outstanding_requests > 0) {
261     bool all_trivial_requests = true;
262 
263     difference_type idx = 0;
264     for (ForwardIterator current = first; current != last; ++current, ++idx) {
265       if (!completed[idx]) {
266         if (optional<status> stat = current->test()) {
267           // This outstanding request has been completed.
268           completed[idx] = true;
269           --num_outstanding_requests;
270           all_trivial_requests = false;
271         } else {
272           // Check if this request (and all others before it) are "trivial"
273           // requests, e.g., they can be represented with a single
274           // MPI_Request.
275           all_trivial_requests = all_trivial_requests && current->trivial();
276         }
277       }
278     }
279 
280     // If we have yet to fulfill any requests and all of the requests
281     // are trivial (i.e., require only a single MPI_Request to be
282     // fulfilled), call MPI_Waitall directly.
283     if (all_trivial_requests
284         && num_outstanding_requests == (difference_type)completed.size()) {
285       std::vector<MPI_Request> requests;
286       requests.reserve(num_outstanding_requests);
287       for (ForwardIterator current = first; current != last; ++current)
288         requests.push_back(*current->trivial());
289 
290       // Let MPI wait until all of these operations completes.
291       BOOST_MPI_CHECK_RESULT(MPI_Waitall,
292                              (num_outstanding_requests, &requests[0],
293                               MPI_STATUSES_IGNORE));
294 
295       // Signal completion
296       num_outstanding_requests = 0;
297     }
298   }
299 }
300 
301 /**
302  *  @brief Tests whether all non-blocking requests have completed.
303  *
304  *  This routine takes in a set of requests stored in the iterator
305  *  range @c [first,last) and determines whether all of these requests
306  *  have been completed. However, due to limitations of the underlying
307  *  MPI implementation, if any of the requests refers to a
308  *  non-blocking send or receive of a serialized data type, @c
309  *  test_all will always return the equivalent of @c false (i.e., the
310  *  requests cannot all be finished at this time). This routine
311  *  performs the same functionality as @c wait_all, except that this
312  *  routine will not block. This routine provides functionality
313  *  equivalent to @c MPI_Testall.
314  *
315  *  @param first The iterator that denotes the beginning of the
316  *  sequence of request objects.
317  *
318  *  @param last The iterator that denotes the end of the sequence of
319  *  request objects.
320  *
321  *  @param out If provided and all requests hav been completed, an
322  *  output iterator through which the status of each request will be
323  *  emitted. The @c status objects are emitted in the same order as
324  *  the requests are retrieved from @c [first,last).
325  *
326  *  @returns If an @p out parameter was provided, the value @c out
327  *  after all of the @c status objects have been emitted (if all
328  *  requests were completed) or an empty @c optional<>. If no @p out
329  *  parameter was provided, returns @c true if all requests have
330  *  completed or @c false otherwise.
331  */
332 template<typename ForwardIterator, typename OutputIterator>
333 optional<OutputIterator>
test_all(ForwardIterator first,ForwardIterator last,OutputIterator out)334 test_all(ForwardIterator first, ForwardIterator last, OutputIterator out)
335 {
336   std::vector<MPI_Request> requests;
337   for (; first != last; ++first) {
338     // If we have a non-trivial request, then no requests can be
339     // completed.
340     if (!first->trivial()) {
341       return optional<OutputIterator>();
342     }
343     requests.push_back(*first->trivial());
344   }
345 
346   int flag = 0;
347   int n = requests.size();
348   std::vector<MPI_Status> stats(n);
349   BOOST_MPI_CHECK_RESULT(MPI_Testall, (n, &requests[0], &flag, &stats[0]));
350   if (flag) {
351     for (int i = 0; i < n; ++i, ++out) {
352       status stat;
353       stat.m_status = stats[i];
354       *out = stat;
355     }
356     return out;
357   } else {
358     return optional<OutputIterator>();
359   }
360 }
361 
362 /**
363  *  \overload
364  */
365 template<typename ForwardIterator>
366 bool
test_all(ForwardIterator first,ForwardIterator last)367 test_all(ForwardIterator first, ForwardIterator last)
368 {
369   std::vector<MPI_Request> requests;
370   for (; first != last; ++first) {
371     // If we have a non-trivial request, then no requests can be
372     // completed.
373     if (!first->trivial()) {
374       return false;
375     }
376     requests.push_back(*first->trivial());
377   }
378 
379   int flag = 0;
380   int n = requests.size();
381   BOOST_MPI_CHECK_RESULT(MPI_Testall,
382                          (n, &requests[0], &flag, MPI_STATUSES_IGNORE));
383   return flag != 0;
384 }
385 
386 /**
387  *  @brief Wait until some non-blocking requests have completed.
388  *
389  *  This routine takes in a set of requests stored in the iterator
390  *  range @c [first,last) and waits until at least one of the requests
391  *  has completed. It then completes all of the requests it can,
392  *  partitioning the input sequence into pending requests followed by
393  *  completed requests. If an output iterator is provided, @c status
394  *  objects will be emitted for each of the completed requests. This
395  *  routine provides functionality equivalent to @c MPI_Waitsome.
396  *
397  *  @param first The iterator that denotes the beginning of the
398  *  sequence of request objects.
399  *
400  *  @param last The iterator that denotes the end of the sequence of
401  *  request objects. This may not be equal to @c first.
402  *
403  *  @param out If provided, the @c status objects corresponding to
404  *  completed requests will be emitted through this output iterator.
405 
406  *  @returns If the @p out parameter was provided, a pair containing
407  *  the output iterator @p out after all of the @c status objects have
408  *  been written through it and an iterator referencing the first
409  *  completed request. If no @p out parameter was provided, only the
410  *  iterator referencing the first completed request will be emitted.
411  */
412 template<typename BidirectionalIterator, typename OutputIterator>
413 std::pair<OutputIterator, BidirectionalIterator>
wait_some(BidirectionalIterator first,BidirectionalIterator last,OutputIterator out)414 wait_some(BidirectionalIterator first, BidirectionalIterator last,
415           OutputIterator out)
416 {
417   using std::advance;
418 
419   if (first == last)
420     return std::make_pair(out, first);
421 
422   typedef typename std::iterator_traits<BidirectionalIterator>::difference_type
423     difference_type;
424 
425   bool all_trivial_requests = true;
426   difference_type n = 0;
427   BidirectionalIterator current = first;
428   BidirectionalIterator start_of_completed = last;
429   while (true) {
430     // Check if we have found a completed request.
431     if (optional<status> result = current->test()) {
432       using std::iter_swap;
433 
434       // Emit the resulting status object
435       *out++ = *result;
436 
437       // We're expanding the set of completed requests
438       --start_of_completed;
439 
440       if (current == start_of_completed) {
441         // If we have hit the end of the list of pending
442         // requests. Finish up by fixing the order of the completed
443         // set to match the order in which we emitted status objects,
444         // then return.
445         std::reverse(start_of_completed, last);
446         return std::make_pair(out, start_of_completed);
447       }
448 
449       // Swap the request we just completed with the last request that
450       // has not yet been tested.
451       iter_swap(current, start_of_completed);
452 
453       continue;
454     }
455 
456     // Check if this request (and all others before it) are "trivial"
457     // requests, e.g., they can be represented with a single
458     // MPI_Request.
459     all_trivial_requests = all_trivial_requests && current->trivial();
460 
461     // Move to the next request.
462     ++n;
463     if (++current == start_of_completed) {
464       if (start_of_completed != last) {
465         // We have satisfied some requests. Make the order of the
466         // completed requests match that of the status objects we've
467         // already emitted and we're done.
468         std::reverse(start_of_completed, last);
469         return std::make_pair(out, start_of_completed);
470       }
471 
472       // We have reached the end of the list. If all requests thus far
473       // have been trivial, we can call MPI_Waitsome directly, because
474       // it may be more efficient than our busy-wait semantics.
475       if (all_trivial_requests) {
476         std::vector<MPI_Request> requests;
477         std::vector<int> indices(n);
478         std::vector<MPI_Status> stats(n);
479         requests.reserve(n);
480         for (current = first; current != last; ++current)
481           requests.push_back(*current->trivial());
482 
483         // Let MPI wait until some of these operations complete.
484         int num_completed;
485         BOOST_MPI_CHECK_RESULT(MPI_Waitsome,
486                                (n, &requests[0], &num_completed, &indices[0],
487                                 &stats[0]));
488 
489         // Translate the index-based result of MPI_Waitsome into a
490         // partitioning on the requests.
491         int current_offset = 0;
492         current = first;
493         for (int index = 0; index < num_completed; ++index, ++out) {
494           using std::iter_swap;
495 
496           // Move "current" to the request object at this index
497           advance(current, indices[index] - current_offset);
498           current_offset = indices[index];
499 
500           // Emit the status object
501           status stat;
502           stat.m_status = stats[index];
503           *out = stat;
504 
505           // Finish up the request and swap it into the "completed
506           // requests" partition.
507           *current->trivial() = requests[indices[index]];
508           --start_of_completed;
509           iter_swap(current, start_of_completed);
510         }
511 
512         // We have satisfied some requests. Make the order of the
513         // completed requests match that of the status objects we've
514         // already emitted and we're done.
515         std::reverse(start_of_completed, last);
516         return std::make_pair(out, start_of_completed);
517       }
518 
519       // There are some nontrivial requests, so we must continue our
520       // busy waiting loop.
521       n = 0;
522       current = first;
523     }
524   }
525 
526   // We cannot ever get here
527   BOOST_ASSERT(false);
528 }
529 
530 /**
531  *  \overload
532  */
533 template<typename BidirectionalIterator>
534 BidirectionalIterator
wait_some(BidirectionalIterator first,BidirectionalIterator last)535 wait_some(BidirectionalIterator first, BidirectionalIterator last)
536 {
537   using std::advance;
538 
539   if (first == last)
540     return first;
541 
542   typedef typename std::iterator_traits<BidirectionalIterator>::difference_type
543     difference_type;
544 
545   bool all_trivial_requests = true;
546   difference_type n = 0;
547   BidirectionalIterator current = first;
548   BidirectionalIterator start_of_completed = last;
549   while (true) {
550     // Check if we have found a completed request.
551     if (optional<status> result = current->test()) {
552       using std::iter_swap;
553 
554       // We're expanding the set of completed requests
555       --start_of_completed;
556 
557       // If we have hit the end of the list of pending requests, we're
558       // done.
559       if (current == start_of_completed)
560         return start_of_completed;
561 
562       // Swap the request we just completed with the last request that
563       // has not yet been tested.
564       iter_swap(current, start_of_completed);
565 
566       continue;
567     }
568 
569     // Check if this request (and all others before it) are "trivial"
570     // requests, e.g., they can be represented with a single
571     // MPI_Request.
572     all_trivial_requests = all_trivial_requests && current->trivial();
573 
574     // Move to the next request.
575     ++n;
576     if (++current == start_of_completed) {
577         // If we have satisfied some requests, we're done.
578       if (start_of_completed != last)
579         return start_of_completed;
580 
581       // We have reached the end of the list. If all requests thus far
582       // have been trivial, we can call MPI_Waitsome directly, because
583       // it may be more efficient than our busy-wait semantics.
584       if (all_trivial_requests) {
585         std::vector<MPI_Request> requests;
586         std::vector<int> indices(n);
587         requests.reserve(n);
588         for (current = first; current != last; ++current)
589           requests.push_back(*current->trivial());
590 
591         // Let MPI wait until some of these operations complete.
592         int num_completed;
593         BOOST_MPI_CHECK_RESULT(MPI_Waitsome,
594                                (n, &requests[0], &num_completed, &indices[0],
595                                 MPI_STATUSES_IGNORE));
596 
597         // Translate the index-based result of MPI_Waitsome into a
598         // partitioning on the requests.
599         int current_offset = 0;
600         current = first;
601         for (int index = 0; index < num_completed; ++index) {
602           using std::iter_swap;
603 
604           // Move "current" to the request object at this index
605           advance(current, indices[index] - current_offset);
606           current_offset = indices[index];
607 
608           // Finish up the request and swap it into the "completed
609           // requests" partition.
610           *current->trivial() = requests[indices[index]];
611           --start_of_completed;
612           iter_swap(current, start_of_completed);
613         }
614 
615         // We have satisfied some requests, so we are done.
616         return start_of_completed;
617       }
618 
619       // There are some nontrivial requests, so we must continue our
620       // busy waiting loop.
621       n = 0;
622       current = first;
623     }
624   }
625 
626   // We cannot ever get here
627   BOOST_ASSERT(false);
628 }
629 
630 /**
631  *  @brief Test whether some non-blocking requests have completed.
632  *
633  *  This routine takes in a set of requests stored in the iterator
634  *  range @c [first,last) and tests to see if any of the requests has
635  *  completed. It completes all of the requests it can, partitioning
636  *  the input sequence into pending requests followed by completed
637  *  requests. If an output iterator is provided, @c status objects
638  *  will be emitted for each of the completed requests. This routine
639  *  is similar to @c wait_some, but does not wait until any requests
640  *  have completed. This routine provides functionality equivalent to
641  *  @c MPI_Testsome.
642  *
643  *  @param first The iterator that denotes the beginning of the
644  *  sequence of request objects.
645  *
646  *  @param last The iterator that denotes the end of the sequence of
647  *  request objects. This may not be equal to @c first.
648  *
649  *  @param out If provided, the @c status objects corresponding to
650  *  completed requests will be emitted through this output iterator.
651 
652  *  @returns If the @p out parameter was provided, a pair containing
653  *  the output iterator @p out after all of the @c status objects have
654  *  been written through it and an iterator referencing the first
655  *  completed request. If no @p out parameter was provided, only the
656  *  iterator referencing the first completed request will be emitted.
657  */
658 template<typename BidirectionalIterator, typename OutputIterator>
659 std::pair<OutputIterator, BidirectionalIterator>
test_some(BidirectionalIterator first,BidirectionalIterator last,OutputIterator out)660 test_some(BidirectionalIterator first, BidirectionalIterator last,
661           OutputIterator out)
662 {
663   BidirectionalIterator current = first;
664   BidirectionalIterator start_of_completed = last;
665   while (current != start_of_completed) {
666     // Check if we have found a completed request.
667     if (optional<status> result = current->test()) {
668       using std::iter_swap;
669 
670       // Emit the resulting status object
671       *out++ = *result;
672 
673       // We're expanding the set of completed requests
674       --start_of_completed;
675 
676       // Swap the request we just completed with the last request that
677       // has not yet been tested.
678       iter_swap(current, start_of_completed);
679 
680       continue;
681     }
682 
683     // Move to the next request.
684     ++current;
685   }
686 
687   // Finish up by fixing the order of the completed set to match the
688   // order in which we emitted status objects, then return.
689   std::reverse(start_of_completed, last);
690   return std::make_pair(out, start_of_completed);
691 }
692 
693 /**
694  *  \overload
695  */
696 template<typename BidirectionalIterator>
697 BidirectionalIterator
test_some(BidirectionalIterator first,BidirectionalIterator last)698 test_some(BidirectionalIterator first, BidirectionalIterator last)
699 {
700   BidirectionalIterator current = first;
701   BidirectionalIterator start_of_completed = last;
702   while (current != start_of_completed) {
703     // Check if we have found a completed request.
704     if (optional<status> result = current->test()) {
705       using std::iter_swap;
706 
707       // We're expanding the set of completed requests
708       --start_of_completed;
709 
710       // Swap the request we just completed with the last request that
711       // has not yet been tested.
712       iter_swap(current, start_of_completed);
713 
714       continue;
715     }
716 
717     // Move to the next request.
718     ++current;
719   }
720 
721   return start_of_completed;
722 }
723 
724 } } // end namespace boost::mpi
725 
726 
727 #endif // BOOST_MPI_NONBLOCKING_HPP
728