1 //  Copyright (c) 2007-2016 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 #if !defined(HPX_PARALLEL_SEGMENTED_ALGORITHM_COUNT_DEC_25_2014_0207PM)
7 #define HPX_PARALLEL_SEGMENTED_ALGORITHM_COUNT_DEC_25_2014_0207PM
8 
9 #include <hpx/config.hpp>
10 #include <hpx/traits/segmented_iterator_traits.hpp>
11 
12 #include <hpx/parallel/algorithms/count.hpp>
13 #include <hpx/parallel/algorithms/detail/dispatch.hpp>
14 #include <hpx/parallel/execution_policy.hpp>
15 #include <hpx/parallel/segmented_algorithms/detail/dispatch.hpp>
16 #include <hpx/parallel/util/detail/algorithm_result.hpp>
17 #include <hpx/parallel/util/detail/handle_remote_exceptions.hpp>
18 
19 #include <algorithm>
20 #include <exception>
21 #include <iterator>
22 #include <list>
23 #include <numeric>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27 
28 namespace hpx { namespace parallel { inline namespace v1
29 {
30     ///////////////////////////////////////////////////////////////////////////
31     // segmented_count
32     namespace detail
33     {
34         ///////////////////////////////////////////////////////////////////////
35         /// \cond NOINTERNAL
36 
37         // sequential remote implementation
38         template <typename Algo, typename ExPolicy, typename SegIter,
39             typename T, typename Proj>
40         static typename util::detail::algorithm_result<
41             ExPolicy, typename std::iterator_traits<SegIter>::difference_type
42         >::type
segmented_count(Algo && algo,ExPolicy const & policy,SegIter first,SegIter last,T const & value,Proj && proj,std::true_type)43         segmented_count(Algo && algo, ExPolicy const& policy,
44             SegIter first, SegIter last, T const& value,
45             Proj && proj, std::true_type)
46         {
47             typedef hpx::traits::segmented_iterator_traits<SegIter> traits;
48             typedef typename traits::segment_iterator segment_iterator;
49             typedef typename traits::local_iterator local_iterator_type;
50             typedef typename std::iterator_traits<SegIter>::difference_type
51                 value_type;
52             typedef util::detail::algorithm_result<ExPolicy, value_type> result;
53 
54             segment_iterator sit = traits::segment(first);
55             segment_iterator send = traits::segment(last);
56 
57             value_type overall_result = value_type();
58 
59             if (sit == send)
60             {
61                 // all elements are on the same partition
62                 local_iterator_type beg = traits::local(first);
63                 local_iterator_type end = traits::local(last);
64                 if (beg != end)
65                 {
66                     overall_result = dispatch(traits::get_id(sit),
67                         algo, policy, std::true_type(), beg, end,
68                         value, proj);
69                 }
70             }
71             else {
72                 // handle the remaining part of the first partition
73                 local_iterator_type beg = traits::local(first);
74                 local_iterator_type end = traits::end(sit);
75                 if (beg != end)
76                 {
77                     overall_result += dispatch(traits::get_id(sit),
78                         algo, policy, std::true_type(), beg, end,
79                         value, proj);
80                 }
81 
82                 // handle all of the full partitions
83                 for (++sit; sit != send; ++sit)
84                 {
85                     beg = traits::begin(sit);
86                     end = traits::end(sit);
87                     if (beg != end)
88                     {
89                         overall_result += dispatch(traits::get_id(sit),
90                             algo, policy, std::true_type(), beg, end,
91                             value, proj);
92                     }
93                 }
94 
95                 // handle the beginning of the last partition
96                 beg = traits::begin(sit);
97                 end = traits::local(last);
98                 if (beg != end)
99                 {
100                     overall_result += dispatch(traits::get_id(sit),
101                         algo, policy, std::true_type(), beg, end,
102                         value, proj);
103                 }
104             }
105 
106             return result::get(std::move(overall_result));
107         }
108 
109         // parallel remote implementation
110         template <typename Algo, typename ExPolicy, typename SegIter,
111             typename T, typename Proj>
112         static typename util::detail::algorithm_result<
113             ExPolicy, typename std::iterator_traits<SegIter>::difference_type
114         >::type
segmented_count(Algo && algo,ExPolicy const & policy,SegIter first,SegIter last,T const & value,Proj && proj,std::false_type)115         segmented_count(Algo && algo, ExPolicy const& policy,
116             SegIter first, SegIter last, T const& value,
117             Proj && proj, std::false_type)
118         {
119             typedef hpx::traits::segmented_iterator_traits<SegIter> traits;
120             typedef typename traits::segment_iterator segment_iterator;
121             typedef typename traits::local_iterator local_iterator_type;
122 
123             typedef std::integral_constant<bool,
124                     !hpx::traits::is_forward_iterator<SegIter>::value
125                 > forced_seq;
126 
127             typedef typename std::iterator_traits<SegIter>::difference_type
128                 value_type;
129             typedef util::detail::algorithm_result<ExPolicy, value_type> result;
130 
131             segment_iterator sit = traits::segment(first);
132             segment_iterator send = traits::segment(last);
133 
134             std::vector<shared_future<value_type> > segments;
135             segments.reserve(std::distance(sit, send));
136 
137             if (sit == send)
138             {
139                 // all elements are on the same partition
140                 local_iterator_type beg = traits::local(first);
141                 local_iterator_type end = traits::local(last);
142                 if (beg != end)
143                 {
144                     segments.push_back(dispatch_async(traits::get_id(sit),
145                         algo, policy, forced_seq(), beg, end, value, proj));
146                 }
147             }
148             else {
149                 // handle the remaining part of the first partition
150                 local_iterator_type beg = traits::local(first);
151                 local_iterator_type end = traits::end(sit);
152                 if (beg != end)
153                 {
154                     segments.push_back(dispatch_async(traits::get_id(sit),
155                         algo, policy, forced_seq(), beg, end, value, proj));
156                 }
157 
158                 // handle all of the full partitions
159                 for (++sit; sit != send; ++sit)
160                 {
161                     beg = traits::begin(sit);
162                     end = traits::end(sit);
163                     if (beg != end)
164                     {
165                         segments.push_back(dispatch_async(traits::get_id(sit),
166                             algo, policy, forced_seq(), beg, end, value, proj));
167                     }
168                 }
169 
170                 // handle the beginning of the last partition
171                 beg = traits::begin(sit);
172                 end = traits::local(last);
173                 if (beg != end)
174                 {
175                     segments.push_back(dispatch_async(traits::get_id(sit),
176                         algo, policy, forced_seq(), beg, end, value, proj));
177                 }
178             }
179 
180             return result::get(
181                 dataflow(
182                     hpx::util::unwrapping([=](std::vector<value_type> && r)
183                     {
184                         return std::accumulate(r.begin(), r.end(), value_type());
185                     }),
186                     segments));
187         }
188 
189         ///////////////////////////////////////////////////////////////////////
190         // segmented implementation
191         template <typename ExPolicy, typename InIter, typename T, typename Proj>
192         inline typename util::detail::algorithm_result<
193             ExPolicy, typename std::iterator_traits<InIter>::difference_type
194         >::type
count_(ExPolicy && policy,InIter first,InIter last,T const & value,Proj && proj,std::true_type)195         count_(ExPolicy&& policy, InIter first, InIter last, T const& value,
196             Proj && proj, std::true_type)
197         {
198             typedef parallel::execution::is_sequenced_execution_policy<
199                     ExPolicy
200                 >is_seq;
201 
202             typedef typename std::iterator_traits<InIter>::difference_type
203                 difference_type;
204 
205             if (first == last)
206             {
207                 return util::detail::algorithm_result<
208                     ExPolicy, difference_type>::get(difference_type());
209             }
210 
211             return segmented_count(
212                 count<difference_type>(), std::forward<ExPolicy>(policy),
213                 first, last, value, std::forward<Proj>(proj), is_seq());
214         }
215 
216         // forward declare the non-segmented version of this algorithm
217         template <typename ExPolicy, typename InIter, typename T, typename Proj>
218         typename util::detail::algorithm_result<
219             ExPolicy, typename std::iterator_traits<InIter>::difference_type
220         >::type
221         count_(ExPolicy&& policy, InIter first, InIter last, T const& value,
222             Proj && proj, std::false_type);
223 
224         /// \endcond
225     }
226 
227     ///////////////////////////////////////////////////////////////////////////
228     // segmented_count_if
229     namespace detail
230     {
231         ///////////////////////////////////////////////////////////////////////
232         /// \cond NOINTERNAL
233 
234         // sequential remote implementation
235         template <typename Algo, typename ExPolicy, typename SegIter,
236             typename F, typename Proj>
237         static typename util::detail::algorithm_result<
238             ExPolicy, typename std::iterator_traits<SegIter>::difference_type
239         >::type
segmented_count_if(Algo && algo,ExPolicy const & policy,SegIter first,SegIter last,F && f,Proj && proj,std::true_type)240         segmented_count_if(Algo && algo, ExPolicy const& policy,
241             SegIter first, SegIter last, F && f, Proj && proj, std::true_type)
242         {
243             typedef hpx::traits::segmented_iterator_traits<SegIter> traits;
244             typedef typename traits::segment_iterator segment_iterator;
245             typedef typename traits::local_iterator local_iterator_type;
246             typedef typename std::iterator_traits<SegIter>::difference_type
247                 value_type;
248             typedef util::detail::algorithm_result<ExPolicy, value_type> result;
249 
250             segment_iterator sit = traits::segment(first);
251             segment_iterator send = traits::segment(last);
252 
253             value_type overall_result = value_type();
254 
255             if (sit == send)
256             {
257                 // all elements are on the same partition
258                 local_iterator_type beg = traits::local(first);
259                 local_iterator_type end = traits::local(last);
260                 if (beg != end)
261                 {
262                     overall_result = dispatch(traits::get_id(sit),
263                         algo, policy, std::true_type(),
264                         beg, end, std::forward<F>(f),
265                         std::forward<Proj>(proj));
266                 }
267             }
268             else {
269                 // handle the remaining part of the first partition
270                 local_iterator_type beg = traits::local(first);
271                 local_iterator_type end = traits::end(sit);
272                 if (beg != end)
273                 {
274                     overall_result += dispatch(traits::get_id(sit),
275                         algo, policy, std::true_type(),
276                         beg, end, std::forward<F>(f),
277                         std::forward<Proj>(proj));
278                 }
279 
280                 // handle all of the full partitions
281                 for (++sit; sit != send; ++sit)
282                 {
283                     beg = traits::begin(sit);
284                     end = traits::end(sit);
285                     if (beg != end)
286                     {
287                         overall_result += dispatch(traits::get_id(sit),
288                             algo, policy, std::true_type(),
289                             beg, end, std::forward<F>(f),
290                             std::forward<Proj>(proj));
291                     }
292                 }
293 
294                 // handle the beginning of the last partition
295                 beg = traits::begin(sit);
296                 end = traits::local(last);
297                 if (beg != end)
298                 {
299                     overall_result += dispatch(traits::get_id(sit),
300                         algo, policy, std::true_type(),
301                         beg, end, std::forward<F>(f),
302                         std::forward<Proj>(proj));
303                 }
304             }
305 
306             return result::get(std::move(overall_result));
307         }
308 
309         // parallel remote implementation
310         template <typename Algo, typename ExPolicy, typename SegIter,
311             typename F, typename Proj>
312         static typename util::detail::algorithm_result<
313             ExPolicy, typename std::iterator_traits<SegIter>::difference_type
314         >::type
segmented_count_if(Algo && algo,ExPolicy const & policy,SegIter first,SegIter last,F && f,Proj && proj,std::false_type)315         segmented_count_if(Algo && algo, ExPolicy const& policy,
316             SegIter first, SegIter last, F && f, Proj && proj, std::false_type)
317         {
318             typedef hpx::traits::segmented_iterator_traits<SegIter> traits;
319             typedef typename traits::segment_iterator segment_iterator;
320             typedef typename traits::local_iterator local_iterator_type;
321 
322             typedef std::integral_constant<bool,
323                     !hpx::traits::is_forward_iterator<SegIter>::value
324                 > forced_seq;
325 
326             typedef typename std::iterator_traits<SegIter>::difference_type
327                 value_type;
328             typedef util::detail::algorithm_result<ExPolicy, value_type> result;
329 
330             segment_iterator sit = traits::segment(first);
331             segment_iterator send = traits::segment(last);
332 
333             std::vector<shared_future<value_type> > segments;
334             segments.reserve(std::distance(sit, send));
335 
336             if (sit == send)
337             {
338                 // all elements are on the same partition
339                 local_iterator_type beg = traits::local(first);
340                 local_iterator_type end = traits::local(last);
341                 if (beg != end)
342                 {
343                     segments.push_back(dispatch_async(traits::get_id(sit),
344                         algo, policy, forced_seq(),
345                         beg, end, std::forward<F>(f),
346                         std::forward<Proj>(proj)));
347                 }
348             }
349             else {
350                 // handle the remaining part of the first partition
351                 local_iterator_type beg = traits::local(first);
352                 local_iterator_type end = traits::end(sit);
353                 if (beg != end)
354                 {
355                     segments.push_back(dispatch_async(traits::get_id(sit),
356                         algo, policy, forced_seq(),
357                         beg, end, std::forward<F>(f),
358                         std::forward<Proj>(proj)));
359                 }
360 
361                 // handle all of the full partitions
362                 for (++sit; sit != send; ++sit)
363                 {
364                     beg = traits::begin(sit);
365                     end = traits::end(sit);
366                     if (beg != end)
367                     {
368                         segments.push_back(dispatch_async(traits::get_id(sit),
369                             algo, policy, forced_seq(),
370                             beg, end, std::forward<F>(f),
371                             std::forward<Proj>(proj)));
372                     }
373                 }
374 
375                 // handle the beginning of the last partition
376                 beg = traits::begin(sit);
377                 end = traits::local(last);
378                 if (beg != end)
379                 {
380                     segments.push_back(dispatch_async(traits::get_id(sit),
381                         algo, policy, forced_seq(),
382                         beg, end, std::forward<F>(f),
383                         std::forward<Proj>(proj)));
384                 }
385             }
386 
387             return result::get(
388                 dataflow(
389                     [=](std::vector<shared_future<value_type> > && r) -> value_type
390                     {
391                         // handle any remote exceptions, will throw on error
392                         std::list<std::exception_ptr> errors;
393                         parallel::util::detail::handle_remote_exceptions<
394                             ExPolicy
395                         >::call(r, errors);
396 
397                         return std::accumulate(
398                             r.begin(), r.end(), value_type(),
399                             [](value_type const& val, shared_future<value_type>& curr)
400                             {
401                                 return val + curr.get();
402                             });
403                     },
404                     std::move(segments)));
405         }
406 
407         template <typename ExPolicy, typename InIter, typename F, typename Proj>
408         inline typename util::detail::algorithm_result<
409             ExPolicy, typename std::iterator_traits<InIter>::difference_type
410         >::type
count_if_(ExPolicy && policy,InIter first,InIter last,F && f,Proj && proj,std::true_type)411         count_if_(ExPolicy && policy, InIter first, InIter last, F && f,
412             Proj && proj, std::true_type)
413         {
414             typedef parallel::execution::is_sequenced_execution_policy<
415                     ExPolicy
416                 > is_seq;
417 
418             typedef typename std::iterator_traits<InIter>::difference_type
419                 difference_type;
420 
421             if (first == last)
422             {
423                 return util::detail::algorithm_result<
424                     ExPolicy, difference_type>::get(difference_type());
425             }
426 
427             return segmented_count_if(
428                 count_if<difference_type>(), std::forward<ExPolicy>(policy),
429                 first, last, std::forward<F>(f), std::forward<Proj>(proj),
430                 is_seq());
431         }
432 
433         // forward declare the non-segmented version of this algorithm
434         template <typename ExPolicy, typename InIter, typename F, typename Proj>
435         typename util::detail::algorithm_result<
436             ExPolicy, typename std::iterator_traits<InIter>::difference_type
437         >::type
438         count_if_(ExPolicy && policy, InIter first, InIter last, F && f,
439             Proj && proj, std::false_type);
440 
441         /// \endcond
442     }
443 }}}
444 
445 #endif
446