1 #pragma once
2 
3 #include "../cuda_flow.hpp"
4 #include "../cuda_capturer.hpp"
5 #include "../cuda_meta.hpp"
6 
7 /**
8 @file cuda_merge.hpp
9 @brief CUDA merge algorithm include file
10 */
11 
12 namespace tf::detail {
13 
14 /**
15 @private
16 @brief merge bound type
17 */
18 enum class cudaMergeBoundType {
19   LOWER,
20   UPPER
21 };
22 
23 /** @private */
24 template<typename T, unsigned N>
25 struct cudaMergePair {
26   cudaArray<T, N> keys;
27   cudaArray<unsigned, N> indices;
28 };
29 
30 /** @private */
31 struct cudaMergeRange {
32   unsigned a_begin, a_end, b_begin, b_end;
33 
a_counttf::detail::cudaMergeRange34   __device__ unsigned a_count() const { return a_end - a_begin; }
b_counttf::detail::cudaMergeRange35   __device__ unsigned b_count() const { return b_end - b_begin; }
totaltf::detail::cudaMergeRange36   __device__ unsigned total() const { return a_count() + b_count(); }
37 
a_rangetf::detail::cudaMergeRange38   __device__ cudaRange a_range() const {
39     return cudaRange { a_begin, a_end };
40   }
b_rangetf::detail::cudaMergeRange41   __device__ cudaRange b_range() const {
42     return cudaRange { b_begin, b_end };
43   }
44 
to_localtf::detail::cudaMergeRange45   __device__ cudaMergeRange to_local() const {
46     return cudaMergeRange { 0, a_count(), a_count(), total() };
47   }
48 
49   // Partition from mp to the end.
partitiontf::detail::cudaMergeRange50   __device__ cudaMergeRange partition(unsigned mp0, unsigned diag) const {
51     return cudaMergeRange { a_begin + mp0, a_end, b_begin + diag - mp0, b_end };
52   }
53 
54   // Partition from mp0 to mp1.
partitiontf::detail::cudaMergeRange55   __device__ cudaMergeRange partition(unsigned mp0, unsigned diag0,
56     unsigned mp1, unsigned diag1) const {
57     return cudaMergeRange {
58       a_begin + mp0,
59       a_begin + mp1,
60       b_begin + diag0 - mp0,
61       b_begin + diag1 - mp1
62     };
63   }
64 
a_validtf::detail::cudaMergeRange65   __device__ bool a_valid() const {
66     return a_begin < a_end;
67   }
b_validtf::detail::cudaMergeRange68   __device__ bool b_valid() const {
69     return b_begin < b_end;
70   }
71 };
72 
73 /** @private */
74 template<
75   cudaMergeBoundType bounds = cudaMergeBoundType::LOWER,
76   typename a_keys_it, typename b_keys_it, typename comp_t
77 >
cuda_merge_path(a_keys_it a_keys,unsigned a_count,b_keys_it b_keys,unsigned b_count,unsigned diag,comp_t comp)78 __device__ auto cuda_merge_path(
79   a_keys_it a_keys, unsigned a_count,
80   b_keys_it b_keys, unsigned b_count,
81   unsigned diag, comp_t comp
82 ) {
83 
84   unsigned beg = (diag > b_count) ? diag - b_count : 0;
85   unsigned end = diag < a_count ? diag : a_count;
86 
87   while(beg < end) {
88     auto mid = (beg + end) / 2;
89     auto a_key = a_keys[mid];
90     auto b_key = b_keys[diag - 1 - mid];
91     bool pred = (cudaMergeBoundType::UPPER == bounds) ?
92       comp(a_key, b_key) :
93       !comp(b_key, a_key);
94 
95     if(pred) beg = mid + 1;
96     else end = mid;
97   }
98   return beg;
99 }
100 
101 /** @private */
102 template<cudaMergeBoundType bounds, typename keys_it, typename comp_t>
cuda_merge_path(keys_it keys,cudaMergeRange range,unsigned diag,comp_t comp)103 __device__ auto cuda_merge_path(
104   keys_it keys, cudaMergeRange range, unsigned diag, comp_t comp
105 ) {
106 
107   return cuda_merge_path<bounds>(
108     keys + range.a_begin, range.a_count(),
109     keys + range.b_begin, range.b_count(),
110     diag, comp);
111 }
112 
113 /** @private */
114 template<cudaMergeBoundType bounds, bool range_check, typename T, typename comp_t>
cuda_merge_predicate(T a_key,T b_key,cudaMergeRange range,comp_t comp)115 __device__ bool cuda_merge_predicate(
116   T a_key, T b_key, cudaMergeRange range, comp_t comp
117 ) {
118 
119   bool p;
120   if(range_check && !range.a_valid()) {
121     p = false;
122   }
123   else if(range_check && !range.b_valid()) {
124     p = true;
125   }
126   else {
127     p = (cudaMergeBoundType::UPPER == bounds) ? comp(a_key, b_key) :
128                                                !comp(b_key, a_key);
129   }
130   return p;
131 }
132 
133 /** @private */
cuda_compute_merge_range(unsigned a_count,unsigned b_count,unsigned partition,unsigned spacing,unsigned mp0,unsigned mp1)134 inline __device__ auto cuda_compute_merge_range(
135   unsigned a_count, unsigned b_count,
136   unsigned partition, unsigned spacing,
137   unsigned mp0, unsigned mp1
138 ) {
139 
140   auto diag0 = spacing * partition;
141   auto diag1 = min(a_count + b_count, diag0 + spacing);
142 
143   return cudaMergeRange { mp0, mp1, diag0 - mp0, diag1 - mp1 };
144 }
145 
146 /**
147 @private
148 
149 Specialization that emits just one LD instruction. Can only reliably used
150 with raw pointer types. Fixed not to use pointer arithmetic so that
151 we don't get undefined behaviors with unaligned types.
152 */
153 template<unsigned nt, unsigned vt, typename T>
cuda_load_two_streams_reg(const T * a,unsigned a_count,const T * b,unsigned b_count,unsigned tid)154 __device__ auto cuda_load_two_streams_reg(
155   const T* a, unsigned a_count, const T* b, unsigned b_count, unsigned tid
156 ) {
157 
158   b -= a_count;
159   cudaArray<T, vt> x;
160   cuda_strided_iterate<nt, vt>([&](auto i, auto index) {
161     const T* p = (index >= a_count) ? b : a;
162     x[i] = p[index];
163   }, tid, a_count + b_count);
164 
165   return x;
166 }
167 
168 /** @private */
169 template<unsigned nt, unsigned vt, typename T, typename a_it, typename b_it>
170 __device__
171 std::enable_if_t<
172   !(std::is_pointer<a_it>::value && std::is_pointer<b_it>::value),
173   cudaArray<T, vt>
load_two_streams_reg(a_it a,unsigned a_count,b_it b,unsigned b_count,unsigned tid)174 > load_two_streams_reg(a_it a, unsigned a_count, b_it b, unsigned b_count, unsigned tid) {
175   b -= a_count;
176   cudaArray<T, vt> x;
177   cuda_strided_iterate<nt, vt>([&](auto i, auto index) {
178     x[i] = (index < a_count) ? a[index] : b[index];
179   }, tid, a_count + b_count);
180   return x;
181 }
182 
183 /** @private */
184 template<unsigned nt, unsigned vt, typename A, typename B, typename T, unsigned S>
cuda_load_two_streams_shared(A a,unsigned a_count,B b,unsigned b_count,unsigned tid,T (& shared)[S],bool sync=true)185 __device__ void cuda_load_two_streams_shared(A a, unsigned a_count,
186   B b, unsigned b_count, unsigned tid, T (&shared)[S], bool sync = true
187 ) {
188   // Load into register then make an unconditional strided store into memory.
189   auto x = cuda_load_two_streams_reg<nt, vt, T>(a, a_count, b, b_count, tid);
190   cuda_reg_to_shared_strided<nt>(x, tid, shared, sync);
191 }
192 
193 /** @private */
194 template<unsigned nt, unsigned vt, typename T>
cuda_gather_two_streams_strided(const T * a,unsigned a_count,const T * b,unsigned b_count,cudaArray<unsigned,vt> indices,unsigned tid)195 __device__ auto cuda_gather_two_streams_strided(const T* a,
196   unsigned a_count, const T* b, unsigned b_count, cudaArray<unsigned, vt> indices,
197   unsigned tid) {
198 
199   ptrdiff_t b_offset = b - a - a_count;
200   auto count = a_count + b_count;
201 
202   cudaArray<T, vt> x;
203   cuda_strided_iterate<nt, vt>([&](auto i, auto j) {
204     ptrdiff_t gather = indices[i];
205     if(gather >= a_count) gather += b_offset;
206     x[i] = a[gather];
207   }, tid, count);
208 
209   return x;
210 }
211 
212 /** @private */
213 template<unsigned nt, unsigned vt, typename T, typename a_it, typename b_it>
214 __device__
215 std::enable_if_t<
216   !(std::is_pointer<a_it>::value && std::is_pointer<b_it>::value),
217   cudaArray<T, vt>
cuda_gather_two_streams_strided(a_it a,unsigned a_count,b_it b,unsigned b_count,cudaArray<unsigned,vt> indices,unsigned tid)218 > cuda_gather_two_streams_strided(a_it a,
219   unsigned a_count, b_it b, unsigned b_count, cudaArray<unsigned, vt> indices, unsigned tid) {
220 
221   b -= a_count;
222   cudaArray<T, vt> x;
223   cuda_strided_iterate<nt, vt>([&](auto i, auto j) {
224     x[i] = (indices[i] < a_count) ? a[indices[i]] : b[indices[i]];
225   }, tid, a_count + b_count);
226 
227   return x;
228 }
229 
230 /** @private */
231 template<unsigned nt, unsigned vt, typename a_it, typename b_it, typename c_it>
cuda_transfer_two_streams_strided(a_it a,unsigned a_count,b_it b,unsigned b_count,cudaArray<unsigned,vt> indices,unsigned tid,c_it c)232 __device__ void cuda_transfer_two_streams_strided(
233   a_it a, unsigned a_count, b_it b, unsigned b_count,
234   cudaArray<unsigned, vt> indices, unsigned tid, c_it c
235 ) {
236 
237   using T = typename std::iterator_traits<a_it>::value_type;
238   auto x = cuda_gather_two_streams_strided<nt, vt, T>(
239     a, a_count, b, b_count, indices, tid
240   );
241 
242   cuda_reg_to_mem_strided<nt>(x, tid, a_count + b_count, c);
243 }
244 
245 
246 /**
247 @private
248 
249 This function must be able to dereference keys[a_begin] and keys[b_begin],
250 no matter the indices for each. The caller should allocate at least
251 nt * vt + 1 elements for
252 */
253 template<cudaMergeBoundType bounds, unsigned vt, typename T, typename comp_t>
cuda_serial_merge(const T * keys_shared,cudaMergeRange range,comp_t comp,bool sync=true)254 __device__ auto cuda_serial_merge(
255   const T* keys_shared, cudaMergeRange range, comp_t comp, bool sync = true
256 ) {
257 
258   auto a_key = keys_shared[range.a_begin];
259   auto b_key = keys_shared[range.b_begin];
260 
261   cudaMergePair<T, vt> merge_pair;
262   cuda_iterate<vt>([&](auto i) {
263     bool p = cuda_merge_predicate<bounds, true>(a_key, b_key, range, comp);
264     auto index = p ? range.a_begin : range.b_begin;
265 
266     merge_pair.keys[i] = p ? a_key : b_key;
267     merge_pair.indices[i] = index;
268 
269     T c_key = keys_shared[++index];
270     if(p) a_key = c_key, range.a_begin = index;
271     else b_key = c_key, range.b_begin = index;
272   });
273 
274   if(sync) __syncthreads();
275   return merge_pair;
276 }
277 
278 /**
279 @private
280 
281 Load arrays a and b from global memory and merge unsignedo register.
282 */
283 template<cudaMergeBoundType bounds,
284   unsigned nt, unsigned vt,
285   typename a_it, typename b_it, typename T, typename comp_t, unsigned S
286 >
block_merge_from_mem(a_it a,b_it b,cudaMergeRange range_mem,unsigned tid,comp_t comp,T (& keys_shared)[S])287 __device__ auto block_merge_from_mem(
288   a_it a, b_it b, cudaMergeRange range_mem, unsigned tid, comp_t comp, T (&keys_shared)[S]
289 ) {
290 
291   static_assert(S >= nt * vt + 1,
292     "block_merge_from_mem requires temporary storage of at "
293     "least nt * vt + 1 items");
294 
295   // Load the data into shared memory.
296   cuda_load_two_streams_shared<nt, vt>(
297     a + range_mem.a_begin, range_mem.a_count(),
298     b + range_mem.b_begin, range_mem.b_count(),
299     tid, keys_shared, true
300   );
301 
302   // Run a merge path to find the start of the serial merge for each thread.
303   auto range_local = range_mem.to_local();
304   auto diag = vt * tid;
305   auto mp = cuda_merge_path<bounds>(keys_shared, range_local, diag, comp);
306 
307   // Compute the ranges of the sources in shared memory. The end iterators
308   // of the range are inaccurate, but still facilitate exact merging, because
309   // only vt elements will be merged.
310   auto merged = cuda_serial_merge<bounds, vt>(
311     keys_shared, range_local.partition(mp, diag), comp
312   );
313 
314   return merged;
315 };
316 
317 /** @private */
318 template<cudaMergeBoundType bounds,
319   typename P, typename a_keys_it, typename b_keys_it, typename comp_t
320 >
cuda_merge_path_partitions(P && p,a_keys_it a,unsigned a_count,b_keys_it b,unsigned b_count,unsigned spacing,comp_t comp,unsigned * buf)321 void cuda_merge_path_partitions(
322   P&& p,
323   a_keys_it a, unsigned a_count,
324   b_keys_it b, unsigned b_count,
325   unsigned spacing,
326   comp_t comp,
327   unsigned* buf
328 ) {
329 
330   //int num_partitions = (int)div_up(a_count + b_count, spacing) + 1;
331 
332   unsigned num_partitions = (a_count + b_count + spacing - 1) / spacing + 1;
333 
334   const unsigned nt = 128;
335   const unsigned vt = 1;
336   const unsigned nv = nt * vt;
337 
338   unsigned B = (num_partitions + nv - 1) / nv;  // nt = 128, vt = 1
339 
340   cuda_kernel<<<B, nt, 0, p.stream()>>>([=]__device__(auto tid, auto bid) {
341     auto range = cuda_get_tile(bid, nt * vt, num_partitions);
342     cuda_strided_iterate<nt, vt>([=](auto, auto j) {
343       auto index = range.begin + j;
344       auto diag = min(spacing * index, a_count + b_count);
345       buf[index] = cuda_merge_path<bounds>(a, a_count, b, b_count, diag, comp);
346     }, tid, range.count());
347   });
348 }
349 
350 //template<typename segments_it>
351 //auto load_balance_partitions(int64_t dest_count, segments_it segments,
352 //  int num_segments, int spacing, context_t& context) ->
353 //  mem_t<typename std::iterator_traits<segments_it>::value_type> {
354 //
355 //  typedef typename std::iterator_traits<segments_it>::value_type int_t;
356 //  return merge_path_partitions<bounds_upper>(counting_iterator_t<int_t>(0),
357 //    dest_count, segments, num_segments, spacing, less_t<int_t>(), context);
358 //}
359 
360 //template<bounds_t bounds, typename keys_it>
361 //mem_t<int> binary_search_partitions(keys_it keys, int count, int num_items,
362 //  int spacing, context_t& context) {
363 //
364 //  int num_partitions = div_up(count, spacing) + 1;
365 //  mem_t<int> mem(num_partitions, context);
366 //  int* p = mem.data();
367 //  transform([=]MGPU_DEVICE(int index) {
368 //    int key = min(spacing * index, count);
369 //    p[index] = binary_search<bounds>(keys, num_items, key, less_t<int>());
370 //  }, num_partitions, context);
371 //  return mem;
372 //}
373 
374 /** @private */
375 template<
376   typename P,
377   typename a_keys_it, typename a_vals_it,
378   typename b_keys_it, typename b_vals_it,
379   typename c_keys_it, typename c_vals_it,
380   typename comp_t
381 >
cuda_merge_loop(P && p,a_keys_it a_keys,a_vals_it a_vals,unsigned a_count,b_keys_it b_keys,b_vals_it b_vals,unsigned b_count,c_keys_it c_keys,c_vals_it c_vals,comp_t comp,void * ptr)382 void cuda_merge_loop(
383   P&& p,
384   a_keys_it a_keys, a_vals_it a_vals, unsigned a_count,
385   b_keys_it b_keys, b_vals_it b_vals, unsigned b_count,
386   c_keys_it c_keys, c_vals_it c_vals,
387   comp_t comp,
388   void* ptr
389 ) {
390 
391   using E = std::decay_t<P>;
392   using T = typename std::iterator_traits<a_keys_it>::value_type;
393   using V = typename std::iterator_traits<a_vals_it>::value_type;
394 
395   auto buf = static_cast<unsigned*>(ptr);
396 
397   auto has_values = !std::is_same<V, cudaEmpty>::value;
398 
399   cuda_merge_path_partitions<cudaMergeBoundType::LOWER>(
400     p, a_keys, a_count, b_keys, b_count, E::nv, comp, buf
401   );
402 
403   unsigned B = (a_count + b_count + E::nv - 1)/ E::nv;
404 
405   // we use small kernel
406   cuda_kernel<<<B, E::nt, 0, p.stream()>>>([=] __device__ (auto tid, auto bid) {
407 
408     __shared__ union {
409       T keys[E::nv + 1];
410       unsigned indices[E::nv];
411     } shared;
412 
413     // Load the range for this CTA and merge the values into register.
414     auto mp0 = buf[bid + 0];
415     auto mp1 = buf[bid + 1];
416     auto range = cuda_compute_merge_range(a_count, b_count, bid, E::nv, mp0, mp1);
417 
418     auto merge = block_merge_from_mem<cudaMergeBoundType::LOWER, E::nt, E::vt>(
419       a_keys, b_keys, range, tid, comp, shared.keys
420     );
421 
422     auto dest_offset = E::nv * bid;
423     cuda_reg_to_mem_thread<E::nt>(
424       merge.keys, tid, range.total(), c_keys + dest_offset, shared.keys
425     );
426 
427     if(has_values) {
428       // Transpose the indices from thread order to strided order.
429       auto indices = cuda_reg_thread_to_strided<E::nt>(
430         merge.indices, tid, shared.indices
431       );
432 
433       // Gather the input values and merge into the output values.
434       cuda_transfer_two_streams_strided<E::nt>(
435         a_vals + range.a_begin, range.a_count(),
436         b_vals + range.b_begin, range.b_count(), indices, tid,
437         c_vals + dest_offset
438       );
439     }
440   });
441 }
442 
443 }  // end of namespace tf::detail ---------------------------------------------
444 
445 namespace tf {
446 
447 // ----------------------------------------------------------------------------
448 // standalone merge algorithms
449 // ----------------------------------------------------------------------------
450 
451 /**
452 @brief queries the buffer size in bytes needed to call merge kernels
453 
454 @tparam P execution polity type
455 @param a_count number of elements in the first input array
456 @param b_count number of elements in the second input array
457 
458 The function is used to allocate a buffer for calling
459 tf::cuda_merge.
460 */
461 template <typename P>
cuda_merge_buffer_size(unsigned a_count,unsigned b_count)462 unsigned cuda_merge_buffer_size(unsigned a_count, unsigned b_count) {
463   using E = std::decay_t<P>;
464   unsigned sz = (a_count + b_count + E::nv - 1) / E::nv + 1;
465   return sz*sizeof(unsigned);
466 }
467 
468 // ----------------------------------------------------------------------------
469 // key-value merge
470 // ----------------------------------------------------------------------------
471 
472 //template<
473 //  typename P,
474 //  typename a_keys_it, typename a_vals_it,
475 //  typename b_keys_it, typename b_vals_it,
476 //  typename c_keys_it, typename c_vals_it,
477 //  typename C
478 //>
479 //void cuda_merge(
480 //  P&& p,
481 //  a_keys_it a_keys_first, a_vals_it a_vals_first, a_keys_it a_keys_last,
482 //  b_keys_it b_keys_first, b_vals_it b_vals_first, b_keys_it b_keys_last,
483 //  c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
484 //) {
485 //
486 //  unsigned a_count = std::distance(a_keys_first, a_keys_last);
487 //  unsigned b_count = std::distance(b_keys_first, b_keys_last);
488 //
489 //  if(a_count + b_count == 0) {
490 //    return;
491 //  }
492 //
493 //  // allocate temporary buffer
494 //  cudaScopedDeviceMemory<std::byte> temp(cuda_merge_buffer_size<P>(a_count, b_count));
495 //
496 //  detail::cuda_merge_loop(
497 //    p,
498 //    a_keys_first, a_vals_first, a_count,
499 //    b_keys_first, b_vals_first, b_count,
500 //    c_keys_first, c_vals_first, comp,
501 //    temp.data()
502 //  );
503 //
504 //  // synchronize the execution
505 //  p.synchronize();
506 //}
507 
508 /**
509 @brief performs asynchronous key-value merge over a range of keys and values
510 
511 @tparam P execution policy type
512 @tparam a_keys_it first key iterator type
513 @tparam a_vals_it first value iterator type
514 @tparam b_keys_it second key iterator type
515 @tparam b_vals_it second value iterator type
516 @tparam c_keys_it output key iterator type
517 @tparam c_vals_it output value iterator type
518 @tparam C comparator type
519 
520 @param p execution policy
521 @param a_keys_first iterator to the beginning of the first key range
522 @param a_keys_last iterator to the end of the first key range
523 @param a_vals_first iterator to the beginning of the first value range
524 @param b_keys_first iterator to the beginning of the second key range
525 @param b_keys_last iterator to the end of the second key range
526 @param b_vals_first iterator to the beginning of the second value range
527 @param c_keys_first iterator to the beginning of the output key range
528 @param c_vals_first iterator to the beginning of the output value range
529 @param comp comparator
530 @param buf pointer to the temporary buffer
531 
532 Performs a key-value merge that copies elements from
533 <tt>[a_keys_first, a_keys_last)</tt> and <tt>[b_keys_first, b_keys_last)</tt>
534 into a single range, <tt>[c_keys_first, c_keys_last + (a_keys_last - a_keys_first) + (b_keys_last - b_keys_first))</tt>
535 such that the resulting range is in ascending key order.
536 
537 At the same time, the merge copies elements from the two associated ranges
538 <tt>[a_vals_first + (a_keys_last - a_keys_first))</tt> and
539 <tt>[b_vals_first + (b_keys_last - b_keys_first))</tt> into a single range,
540 <tt>[c_vals_first, c_vals_first + (a_keys_last - a_keys_first) + (b_keys_last - b_keys_first))</tt>
541 such that the resulting range is in ascending order
542 implied by each input element's associated key.
543 
544 For example, assume:
545   + @c a_keys = {8, 1};
546   + @c a_vals = {1, 2};
547   + @c b_keys = {3, 7};
548   + @c b_vals = {3, 4};
549 
550 After the merge, we have:
551   + @c c_keys = {1, 3, 7, 8}
552   + @c c_vals = {2, 3, 4, 1}
553 
554 */
555 template<
556   typename P,
557   typename a_keys_it, typename a_vals_it,
558   typename b_keys_it, typename b_vals_it,
559   typename c_keys_it, typename c_vals_it,
560   typename C
561 >
cuda_merge_by_key(P && p,a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp,void * buf)562 void cuda_merge_by_key(
563   P&& p,
564   a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
565   b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
566   c_keys_it c_keys_first, c_vals_it c_vals_first, C comp,
567   void* buf
568 ) {
569 
570   unsigned a_count = std::distance(a_keys_first, a_keys_last);
571   unsigned b_count = std::distance(b_keys_first, b_keys_last);
572 
573   if(a_count + b_count == 0) {
574     return;
575   }
576 
577   detail::cuda_merge_loop(p,
578     a_keys_first, a_vals_first, a_count,
579     b_keys_first, b_vals_first, b_count,
580     c_keys_first, c_vals_first, comp,
581     buf
582   );
583 }
584 
585 // ----------------------------------------------------------------------------
586 // key-only merge
587 // ----------------------------------------------------------------------------
588 
589 //template<typename P,
590 //  typename a_keys_it, typename b_keys_it, typename c_keys_it, typename C
591 //>
592 //void cuda_merge(
593 //  P&& p,
594 //  a_keys_it a_keys_first, a_keys_it a_keys_last,
595 //  b_keys_it b_keys_first, b_keys_it b_keys_last,
596 //  c_keys_it c_keys_first,
597 //  C comp
598 //) {
599 //  cuda_merge(
600 //    p,
601 //    a_keys_first, (const cudaEmpty*)nullptr, a_keys_last,
602 //    b_keys_first, (const cudaEmpty*)nullptr, b_keys_last,
603 //    c_keys_first, (cudaEmpty*)nullptr, comp
604 //  );
605 //}
606 
607 /**
608 @brief performs asynchronous key-only merge over a range of keys
609 
610 @tparam P execution policy type
611 @tparam a_keys_it first key iterator type
612 @tparam b_keys_it second key iterator type
613 @tparam c_keys_it output key iterator type
614 @tparam C comparator type
615 
616 @param p execution policy
617 @param a_keys_first iterator to the beginning of the first key range
618 @param a_keys_last iterator to the end of the first key range
619 @param b_keys_first iterator to the beginning of the second key range
620 @param b_keys_last iterator to the end of the second key range
621 @param c_keys_first iterator to the beginning of the output key range
622 @param comp comparator
623 @param buf pointer to the temporary buffer
624 
625 This function is equivalent to tf::cuda_merge_by_key without values.
626 
627 */
628 template<typename P,
629   typename a_keys_it, typename b_keys_it, typename c_keys_it, typename C
630 >
cuda_merge(P && p,a_keys_it a_keys_first,a_keys_it a_keys_last,b_keys_it b_keys_first,b_keys_it b_keys_last,c_keys_it c_keys_first,C comp,void * buf)631 void cuda_merge(
632   P&& p,
633   a_keys_it a_keys_first, a_keys_it a_keys_last,
634   b_keys_it b_keys_first, b_keys_it b_keys_last,
635   c_keys_it c_keys_first,
636   C comp,
637   void* buf
638 ) {
639   cuda_merge_by_key(
640     p,
641     a_keys_first, a_keys_last, (const cudaEmpty*)nullptr,
642     b_keys_first, b_keys_last, (const cudaEmpty*)nullptr,
643     c_keys_first, (cudaEmpty*)nullptr, comp,
644     buf
645   );
646 }
647 
648 // ----------------------------------------------------------------------------
649 // cudaFlow merge algorithms
650 // ----------------------------------------------------------------------------
651 
652 // Function: merge
653 template<typename A, typename B, typename C, typename Comp>
merge(A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)654 cudaTask cudaFlow::merge(
655   A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
656 ) {
657   return capture([=](cudaFlowCapturer& cap){
658     cap.make_optimizer<cudaLinearCapturing>();
659     cap.merge(a_first, a_last, b_first, b_last, c_first, comp);
660   });
661 }
662 
663 // Function: merge
664 template<typename A, typename B, typename C, typename Comp>
merge(cudaTask task,A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)665 void cudaFlow::merge(
666   cudaTask task, A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
667 ) {
668   capture(task, [=](cudaFlowCapturer& cap){
669     cap.make_optimizer<cudaLinearCapturing>();
670     cap.merge(a_first, a_last, b_first, b_last, c_first, comp);
671   });
672 }
673 
674 // Function: merge_by_key
675 template<
676   typename a_keys_it, typename a_vals_it,
677   typename b_keys_it, typename b_vals_it,
678   typename c_keys_it, typename c_vals_it,
679   typename C
680 >
merge_by_key(a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)681 cudaTask cudaFlow::merge_by_key(
682   a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
683   b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
684   c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
685 ) {
686   return capture([=](cudaFlowCapturer& cap){
687     cap.make_optimizer<cudaLinearCapturing>();
688     cap.merge_by_key(
689       a_keys_first, a_keys_last, a_vals_first,
690       b_keys_first, b_keys_last, b_vals_first,
691       c_keys_first, c_vals_first,
692       comp
693     );
694   });
695 }
696 
697 // Function: merge_by_key
698 template<
699   typename a_keys_it, typename a_vals_it,
700   typename b_keys_it, typename b_vals_it,
701   typename c_keys_it, typename c_vals_it,
702   typename C
703 >
merge_by_key(cudaTask task,a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)704 void cudaFlow::merge_by_key(
705   cudaTask task,
706   a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
707   b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
708   c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
709 ) {
710   capture(task, [=](cudaFlowCapturer& cap){
711     cap.make_optimizer<cudaLinearCapturing>();
712     cap.merge_by_key(
713       a_keys_first, a_keys_last, a_vals_first,
714       b_keys_first, b_keys_last, b_vals_first,
715       c_keys_first, c_vals_first,
716       comp
717     );
718   });
719 }
720 
721 // ----------------------------------------------------------------------------
722 // cudaFlowCapturer merge algorithms
723 // ----------------------------------------------------------------------------
724 
725 // Function: merge
726 template<typename A, typename B, typename C, typename Comp>
merge(A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)727 cudaTask cudaFlowCapturer::merge(
728   A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
729 ) {
730   // TODO
731   auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
732     std::distance(a_first, a_last), std::distance(b_first, b_last)
733   );
734 
735   return on([=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
736   (cudaStream_t stream) mutable {
737     cuda_merge(cudaDefaultExecutionPolicy{stream},
738       a_first, a_last, b_first, b_last, c_first, comp, buf.get().data()
739     );
740   });
741 }
742 
743 // Procedure: merge (update)
744 template<typename A, typename B, typename C, typename Comp>
merge(cudaTask task,A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)745 void cudaFlowCapturer::merge(
746   cudaTask task, A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
747 ) {
748   // TODO
749   auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
750     std::distance(a_first, a_last), std::distance(b_first, b_last)
751   );
752 
753   on(task, [=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
754   (cudaStream_t stream) mutable {
755     cuda_merge(cudaDefaultExecutionPolicy{stream},
756       a_first, a_last, b_first, b_last, c_first, comp, buf.get().data()
757     );
758   });
759 }
760 
761 // Function: merge_by_key
762 template<
763   typename a_keys_it, typename a_vals_it,
764   typename b_keys_it, typename b_vals_it,
765   typename c_keys_it, typename c_vals_it,
766   typename C
767 >
merge_by_key(a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)768 cudaTask cudaFlowCapturer::merge_by_key(
769   a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
770   b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
771   c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
772 ) {
773 
774   auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
775     std::distance(a_keys_first, a_keys_last),
776     std::distance(b_keys_first, b_keys_last)
777   );
778 
779   return on([=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
780   (cudaStream_t stream) mutable {
781     cuda_merge_by_key(cudaDefaultExecutionPolicy{stream},
782       a_keys_first, a_keys_last, a_vals_first,
783       b_keys_first, b_keys_last, b_vals_first,
784       c_keys_first, c_vals_first,
785       comp,
786       buf.get().data()
787     );
788   });
789 }
790 
791 // Function: merge_by_key
792 template<
793   typename a_keys_it, typename a_vals_it,
794   typename b_keys_it, typename b_vals_it,
795   typename c_keys_it, typename c_vals_it,
796   typename C
797 >
merge_by_key(cudaTask task,a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)798 void cudaFlowCapturer::merge_by_key(
799   cudaTask task,
800   a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
801   b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
802   c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
803 ) {
804 
805   auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
806     std::distance(a_keys_first, a_keys_last),
807     std::distance(b_keys_first, b_keys_last)
808   );
809 
810   on(task, [=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
811   (cudaStream_t stream) mutable {
812     cuda_merge_by_key(cudaDefaultExecutionPolicy{stream},
813       a_keys_first, a_keys_last, a_vals_first,
814       b_keys_first, b_keys_last, b_vals_first,
815       c_keys_first, c_vals_first,
816       comp,
817       buf.get().data()
818     );
819   });
820 }
821 
822 
823 
824 }  // end of namespace tf -----------------------------------------------------
825