1 #pragma once
2
3 #include "../cuda_flow.hpp"
4 #include "../cuda_capturer.hpp"
5 #include "../cuda_meta.hpp"
6
7 /**
8 @file cuda_merge.hpp
9 @brief CUDA merge algorithm include file
10 */
11
12 namespace tf::detail {
13
14 /**
15 @private
16 @brief merge bound type
17 */
18 enum class cudaMergeBoundType {
19 LOWER,
20 UPPER
21 };
22
23 /** @private */
24 template<typename T, unsigned N>
25 struct cudaMergePair {
26 cudaArray<T, N> keys;
27 cudaArray<unsigned, N> indices;
28 };
29
30 /** @private */
31 struct cudaMergeRange {
32 unsigned a_begin, a_end, b_begin, b_end;
33
a_counttf::detail::cudaMergeRange34 __device__ unsigned a_count() const { return a_end - a_begin; }
b_counttf::detail::cudaMergeRange35 __device__ unsigned b_count() const { return b_end - b_begin; }
totaltf::detail::cudaMergeRange36 __device__ unsigned total() const { return a_count() + b_count(); }
37
a_rangetf::detail::cudaMergeRange38 __device__ cudaRange a_range() const {
39 return cudaRange { a_begin, a_end };
40 }
b_rangetf::detail::cudaMergeRange41 __device__ cudaRange b_range() const {
42 return cudaRange { b_begin, b_end };
43 }
44
to_localtf::detail::cudaMergeRange45 __device__ cudaMergeRange to_local() const {
46 return cudaMergeRange { 0, a_count(), a_count(), total() };
47 }
48
49 // Partition from mp to the end.
partitiontf::detail::cudaMergeRange50 __device__ cudaMergeRange partition(unsigned mp0, unsigned diag) const {
51 return cudaMergeRange { a_begin + mp0, a_end, b_begin + diag - mp0, b_end };
52 }
53
54 // Partition from mp0 to mp1.
partitiontf::detail::cudaMergeRange55 __device__ cudaMergeRange partition(unsigned mp0, unsigned diag0,
56 unsigned mp1, unsigned diag1) const {
57 return cudaMergeRange {
58 a_begin + mp0,
59 a_begin + mp1,
60 b_begin + diag0 - mp0,
61 b_begin + diag1 - mp1
62 };
63 }
64
a_validtf::detail::cudaMergeRange65 __device__ bool a_valid() const {
66 return a_begin < a_end;
67 }
b_validtf::detail::cudaMergeRange68 __device__ bool b_valid() const {
69 return b_begin < b_end;
70 }
71 };
72
73 /** @private */
74 template<
75 cudaMergeBoundType bounds = cudaMergeBoundType::LOWER,
76 typename a_keys_it, typename b_keys_it, typename comp_t
77 >
cuda_merge_path(a_keys_it a_keys,unsigned a_count,b_keys_it b_keys,unsigned b_count,unsigned diag,comp_t comp)78 __device__ auto cuda_merge_path(
79 a_keys_it a_keys, unsigned a_count,
80 b_keys_it b_keys, unsigned b_count,
81 unsigned diag, comp_t comp
82 ) {
83
84 unsigned beg = (diag > b_count) ? diag - b_count : 0;
85 unsigned end = diag < a_count ? diag : a_count;
86
87 while(beg < end) {
88 auto mid = (beg + end) / 2;
89 auto a_key = a_keys[mid];
90 auto b_key = b_keys[diag - 1 - mid];
91 bool pred = (cudaMergeBoundType::UPPER == bounds) ?
92 comp(a_key, b_key) :
93 !comp(b_key, a_key);
94
95 if(pred) beg = mid + 1;
96 else end = mid;
97 }
98 return beg;
99 }
100
101 /** @private */
102 template<cudaMergeBoundType bounds, typename keys_it, typename comp_t>
cuda_merge_path(keys_it keys,cudaMergeRange range,unsigned diag,comp_t comp)103 __device__ auto cuda_merge_path(
104 keys_it keys, cudaMergeRange range, unsigned diag, comp_t comp
105 ) {
106
107 return cuda_merge_path<bounds>(
108 keys + range.a_begin, range.a_count(),
109 keys + range.b_begin, range.b_count(),
110 diag, comp);
111 }
112
113 /** @private */
114 template<cudaMergeBoundType bounds, bool range_check, typename T, typename comp_t>
cuda_merge_predicate(T a_key,T b_key,cudaMergeRange range,comp_t comp)115 __device__ bool cuda_merge_predicate(
116 T a_key, T b_key, cudaMergeRange range, comp_t comp
117 ) {
118
119 bool p;
120 if(range_check && !range.a_valid()) {
121 p = false;
122 }
123 else if(range_check && !range.b_valid()) {
124 p = true;
125 }
126 else {
127 p = (cudaMergeBoundType::UPPER == bounds) ? comp(a_key, b_key) :
128 !comp(b_key, a_key);
129 }
130 return p;
131 }
132
133 /** @private */
cuda_compute_merge_range(unsigned a_count,unsigned b_count,unsigned partition,unsigned spacing,unsigned mp0,unsigned mp1)134 inline __device__ auto cuda_compute_merge_range(
135 unsigned a_count, unsigned b_count,
136 unsigned partition, unsigned spacing,
137 unsigned mp0, unsigned mp1
138 ) {
139
140 auto diag0 = spacing * partition;
141 auto diag1 = min(a_count + b_count, diag0 + spacing);
142
143 return cudaMergeRange { mp0, mp1, diag0 - mp0, diag1 - mp1 };
144 }
145
146 /**
147 @private
148
149 Specialization that emits just one LD instruction. Can only reliably used
150 with raw pointer types. Fixed not to use pointer arithmetic so that
151 we don't get undefined behaviors with unaligned types.
152 */
153 template<unsigned nt, unsigned vt, typename T>
cuda_load_two_streams_reg(const T * a,unsigned a_count,const T * b,unsigned b_count,unsigned tid)154 __device__ auto cuda_load_two_streams_reg(
155 const T* a, unsigned a_count, const T* b, unsigned b_count, unsigned tid
156 ) {
157
158 b -= a_count;
159 cudaArray<T, vt> x;
160 cuda_strided_iterate<nt, vt>([&](auto i, auto index) {
161 const T* p = (index >= a_count) ? b : a;
162 x[i] = p[index];
163 }, tid, a_count + b_count);
164
165 return x;
166 }
167
168 /** @private */
169 template<unsigned nt, unsigned vt, typename T, typename a_it, typename b_it>
170 __device__
171 std::enable_if_t<
172 !(std::is_pointer<a_it>::value && std::is_pointer<b_it>::value),
173 cudaArray<T, vt>
load_two_streams_reg(a_it a,unsigned a_count,b_it b,unsigned b_count,unsigned tid)174 > load_two_streams_reg(a_it a, unsigned a_count, b_it b, unsigned b_count, unsigned tid) {
175 b -= a_count;
176 cudaArray<T, vt> x;
177 cuda_strided_iterate<nt, vt>([&](auto i, auto index) {
178 x[i] = (index < a_count) ? a[index] : b[index];
179 }, tid, a_count + b_count);
180 return x;
181 }
182
183 /** @private */
184 template<unsigned nt, unsigned vt, typename A, typename B, typename T, unsigned S>
cuda_load_two_streams_shared(A a,unsigned a_count,B b,unsigned b_count,unsigned tid,T (& shared)[S],bool sync=true)185 __device__ void cuda_load_two_streams_shared(A a, unsigned a_count,
186 B b, unsigned b_count, unsigned tid, T (&shared)[S], bool sync = true
187 ) {
188 // Load into register then make an unconditional strided store into memory.
189 auto x = cuda_load_two_streams_reg<nt, vt, T>(a, a_count, b, b_count, tid);
190 cuda_reg_to_shared_strided<nt>(x, tid, shared, sync);
191 }
192
193 /** @private */
194 template<unsigned nt, unsigned vt, typename T>
cuda_gather_two_streams_strided(const T * a,unsigned a_count,const T * b,unsigned b_count,cudaArray<unsigned,vt> indices,unsigned tid)195 __device__ auto cuda_gather_two_streams_strided(const T* a,
196 unsigned a_count, const T* b, unsigned b_count, cudaArray<unsigned, vt> indices,
197 unsigned tid) {
198
199 ptrdiff_t b_offset = b - a - a_count;
200 auto count = a_count + b_count;
201
202 cudaArray<T, vt> x;
203 cuda_strided_iterate<nt, vt>([&](auto i, auto j) {
204 ptrdiff_t gather = indices[i];
205 if(gather >= a_count) gather += b_offset;
206 x[i] = a[gather];
207 }, tid, count);
208
209 return x;
210 }
211
212 /** @private */
213 template<unsigned nt, unsigned vt, typename T, typename a_it, typename b_it>
214 __device__
215 std::enable_if_t<
216 !(std::is_pointer<a_it>::value && std::is_pointer<b_it>::value),
217 cudaArray<T, vt>
cuda_gather_two_streams_strided(a_it a,unsigned a_count,b_it b,unsigned b_count,cudaArray<unsigned,vt> indices,unsigned tid)218 > cuda_gather_two_streams_strided(a_it a,
219 unsigned a_count, b_it b, unsigned b_count, cudaArray<unsigned, vt> indices, unsigned tid) {
220
221 b -= a_count;
222 cudaArray<T, vt> x;
223 cuda_strided_iterate<nt, vt>([&](auto i, auto j) {
224 x[i] = (indices[i] < a_count) ? a[indices[i]] : b[indices[i]];
225 }, tid, a_count + b_count);
226
227 return x;
228 }
229
230 /** @private */
231 template<unsigned nt, unsigned vt, typename a_it, typename b_it, typename c_it>
cuda_transfer_two_streams_strided(a_it a,unsigned a_count,b_it b,unsigned b_count,cudaArray<unsigned,vt> indices,unsigned tid,c_it c)232 __device__ void cuda_transfer_two_streams_strided(
233 a_it a, unsigned a_count, b_it b, unsigned b_count,
234 cudaArray<unsigned, vt> indices, unsigned tid, c_it c
235 ) {
236
237 using T = typename std::iterator_traits<a_it>::value_type;
238 auto x = cuda_gather_two_streams_strided<nt, vt, T>(
239 a, a_count, b, b_count, indices, tid
240 );
241
242 cuda_reg_to_mem_strided<nt>(x, tid, a_count + b_count, c);
243 }
244
245
246 /**
247 @private
248
249 This function must be able to dereference keys[a_begin] and keys[b_begin],
250 no matter the indices for each. The caller should allocate at least
251 nt * vt + 1 elements for
252 */
253 template<cudaMergeBoundType bounds, unsigned vt, typename T, typename comp_t>
cuda_serial_merge(const T * keys_shared,cudaMergeRange range,comp_t comp,bool sync=true)254 __device__ auto cuda_serial_merge(
255 const T* keys_shared, cudaMergeRange range, comp_t comp, bool sync = true
256 ) {
257
258 auto a_key = keys_shared[range.a_begin];
259 auto b_key = keys_shared[range.b_begin];
260
261 cudaMergePair<T, vt> merge_pair;
262 cuda_iterate<vt>([&](auto i) {
263 bool p = cuda_merge_predicate<bounds, true>(a_key, b_key, range, comp);
264 auto index = p ? range.a_begin : range.b_begin;
265
266 merge_pair.keys[i] = p ? a_key : b_key;
267 merge_pair.indices[i] = index;
268
269 T c_key = keys_shared[++index];
270 if(p) a_key = c_key, range.a_begin = index;
271 else b_key = c_key, range.b_begin = index;
272 });
273
274 if(sync) __syncthreads();
275 return merge_pair;
276 }
277
278 /**
279 @private
280
281 Load arrays a and b from global memory and merge unsignedo register.
282 */
283 template<cudaMergeBoundType bounds,
284 unsigned nt, unsigned vt,
285 typename a_it, typename b_it, typename T, typename comp_t, unsigned S
286 >
block_merge_from_mem(a_it a,b_it b,cudaMergeRange range_mem,unsigned tid,comp_t comp,T (& keys_shared)[S])287 __device__ auto block_merge_from_mem(
288 a_it a, b_it b, cudaMergeRange range_mem, unsigned tid, comp_t comp, T (&keys_shared)[S]
289 ) {
290
291 static_assert(S >= nt * vt + 1,
292 "block_merge_from_mem requires temporary storage of at "
293 "least nt * vt + 1 items");
294
295 // Load the data into shared memory.
296 cuda_load_two_streams_shared<nt, vt>(
297 a + range_mem.a_begin, range_mem.a_count(),
298 b + range_mem.b_begin, range_mem.b_count(),
299 tid, keys_shared, true
300 );
301
302 // Run a merge path to find the start of the serial merge for each thread.
303 auto range_local = range_mem.to_local();
304 auto diag = vt * tid;
305 auto mp = cuda_merge_path<bounds>(keys_shared, range_local, diag, comp);
306
307 // Compute the ranges of the sources in shared memory. The end iterators
308 // of the range are inaccurate, but still facilitate exact merging, because
309 // only vt elements will be merged.
310 auto merged = cuda_serial_merge<bounds, vt>(
311 keys_shared, range_local.partition(mp, diag), comp
312 );
313
314 return merged;
315 };
316
317 /** @private */
318 template<cudaMergeBoundType bounds,
319 typename P, typename a_keys_it, typename b_keys_it, typename comp_t
320 >
cuda_merge_path_partitions(P && p,a_keys_it a,unsigned a_count,b_keys_it b,unsigned b_count,unsigned spacing,comp_t comp,unsigned * buf)321 void cuda_merge_path_partitions(
322 P&& p,
323 a_keys_it a, unsigned a_count,
324 b_keys_it b, unsigned b_count,
325 unsigned spacing,
326 comp_t comp,
327 unsigned* buf
328 ) {
329
330 //int num_partitions = (int)div_up(a_count + b_count, spacing) + 1;
331
332 unsigned num_partitions = (a_count + b_count + spacing - 1) / spacing + 1;
333
334 const unsigned nt = 128;
335 const unsigned vt = 1;
336 const unsigned nv = nt * vt;
337
338 unsigned B = (num_partitions + nv - 1) / nv; // nt = 128, vt = 1
339
340 cuda_kernel<<<B, nt, 0, p.stream()>>>([=]__device__(auto tid, auto bid) {
341 auto range = cuda_get_tile(bid, nt * vt, num_partitions);
342 cuda_strided_iterate<nt, vt>([=](auto, auto j) {
343 auto index = range.begin + j;
344 auto diag = min(spacing * index, a_count + b_count);
345 buf[index] = cuda_merge_path<bounds>(a, a_count, b, b_count, diag, comp);
346 }, tid, range.count());
347 });
348 }
349
350 //template<typename segments_it>
351 //auto load_balance_partitions(int64_t dest_count, segments_it segments,
352 // int num_segments, int spacing, context_t& context) ->
353 // mem_t<typename std::iterator_traits<segments_it>::value_type> {
354 //
355 // typedef typename std::iterator_traits<segments_it>::value_type int_t;
356 // return merge_path_partitions<bounds_upper>(counting_iterator_t<int_t>(0),
357 // dest_count, segments, num_segments, spacing, less_t<int_t>(), context);
358 //}
359
360 //template<bounds_t bounds, typename keys_it>
361 //mem_t<int> binary_search_partitions(keys_it keys, int count, int num_items,
362 // int spacing, context_t& context) {
363 //
364 // int num_partitions = div_up(count, spacing) + 1;
365 // mem_t<int> mem(num_partitions, context);
366 // int* p = mem.data();
367 // transform([=]MGPU_DEVICE(int index) {
368 // int key = min(spacing * index, count);
369 // p[index] = binary_search<bounds>(keys, num_items, key, less_t<int>());
370 // }, num_partitions, context);
371 // return mem;
372 //}
373
374 /** @private */
375 template<
376 typename P,
377 typename a_keys_it, typename a_vals_it,
378 typename b_keys_it, typename b_vals_it,
379 typename c_keys_it, typename c_vals_it,
380 typename comp_t
381 >
cuda_merge_loop(P && p,a_keys_it a_keys,a_vals_it a_vals,unsigned a_count,b_keys_it b_keys,b_vals_it b_vals,unsigned b_count,c_keys_it c_keys,c_vals_it c_vals,comp_t comp,void * ptr)382 void cuda_merge_loop(
383 P&& p,
384 a_keys_it a_keys, a_vals_it a_vals, unsigned a_count,
385 b_keys_it b_keys, b_vals_it b_vals, unsigned b_count,
386 c_keys_it c_keys, c_vals_it c_vals,
387 comp_t comp,
388 void* ptr
389 ) {
390
391 using E = std::decay_t<P>;
392 using T = typename std::iterator_traits<a_keys_it>::value_type;
393 using V = typename std::iterator_traits<a_vals_it>::value_type;
394
395 auto buf = static_cast<unsigned*>(ptr);
396
397 auto has_values = !std::is_same<V, cudaEmpty>::value;
398
399 cuda_merge_path_partitions<cudaMergeBoundType::LOWER>(
400 p, a_keys, a_count, b_keys, b_count, E::nv, comp, buf
401 );
402
403 unsigned B = (a_count + b_count + E::nv - 1)/ E::nv;
404
405 // we use small kernel
406 cuda_kernel<<<B, E::nt, 0, p.stream()>>>([=] __device__ (auto tid, auto bid) {
407
408 __shared__ union {
409 T keys[E::nv + 1];
410 unsigned indices[E::nv];
411 } shared;
412
413 // Load the range for this CTA and merge the values into register.
414 auto mp0 = buf[bid + 0];
415 auto mp1 = buf[bid + 1];
416 auto range = cuda_compute_merge_range(a_count, b_count, bid, E::nv, mp0, mp1);
417
418 auto merge = block_merge_from_mem<cudaMergeBoundType::LOWER, E::nt, E::vt>(
419 a_keys, b_keys, range, tid, comp, shared.keys
420 );
421
422 auto dest_offset = E::nv * bid;
423 cuda_reg_to_mem_thread<E::nt>(
424 merge.keys, tid, range.total(), c_keys + dest_offset, shared.keys
425 );
426
427 if(has_values) {
428 // Transpose the indices from thread order to strided order.
429 auto indices = cuda_reg_thread_to_strided<E::nt>(
430 merge.indices, tid, shared.indices
431 );
432
433 // Gather the input values and merge into the output values.
434 cuda_transfer_two_streams_strided<E::nt>(
435 a_vals + range.a_begin, range.a_count(),
436 b_vals + range.b_begin, range.b_count(), indices, tid,
437 c_vals + dest_offset
438 );
439 }
440 });
441 }
442
443 } // end of namespace tf::detail ---------------------------------------------
444
445 namespace tf {
446
447 // ----------------------------------------------------------------------------
448 // standalone merge algorithms
449 // ----------------------------------------------------------------------------
450
451 /**
452 @brief queries the buffer size in bytes needed to call merge kernels
453
454 @tparam P execution polity type
455 @param a_count number of elements in the first input array
456 @param b_count number of elements in the second input array
457
458 The function is used to allocate a buffer for calling
459 tf::cuda_merge.
460 */
461 template <typename P>
cuda_merge_buffer_size(unsigned a_count,unsigned b_count)462 unsigned cuda_merge_buffer_size(unsigned a_count, unsigned b_count) {
463 using E = std::decay_t<P>;
464 unsigned sz = (a_count + b_count + E::nv - 1) / E::nv + 1;
465 return sz*sizeof(unsigned);
466 }
467
468 // ----------------------------------------------------------------------------
469 // key-value merge
470 // ----------------------------------------------------------------------------
471
472 //template<
473 // typename P,
474 // typename a_keys_it, typename a_vals_it,
475 // typename b_keys_it, typename b_vals_it,
476 // typename c_keys_it, typename c_vals_it,
477 // typename C
478 //>
479 //void cuda_merge(
480 // P&& p,
481 // a_keys_it a_keys_first, a_vals_it a_vals_first, a_keys_it a_keys_last,
482 // b_keys_it b_keys_first, b_vals_it b_vals_first, b_keys_it b_keys_last,
483 // c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
484 //) {
485 //
486 // unsigned a_count = std::distance(a_keys_first, a_keys_last);
487 // unsigned b_count = std::distance(b_keys_first, b_keys_last);
488 //
489 // if(a_count + b_count == 0) {
490 // return;
491 // }
492 //
493 // // allocate temporary buffer
494 // cudaScopedDeviceMemory<std::byte> temp(cuda_merge_buffer_size<P>(a_count, b_count));
495 //
496 // detail::cuda_merge_loop(
497 // p,
498 // a_keys_first, a_vals_first, a_count,
499 // b_keys_first, b_vals_first, b_count,
500 // c_keys_first, c_vals_first, comp,
501 // temp.data()
502 // );
503 //
504 // // synchronize the execution
505 // p.synchronize();
506 //}
507
508 /**
509 @brief performs asynchronous key-value merge over a range of keys and values
510
511 @tparam P execution policy type
512 @tparam a_keys_it first key iterator type
513 @tparam a_vals_it first value iterator type
514 @tparam b_keys_it second key iterator type
515 @tparam b_vals_it second value iterator type
516 @tparam c_keys_it output key iterator type
517 @tparam c_vals_it output value iterator type
518 @tparam C comparator type
519
520 @param p execution policy
521 @param a_keys_first iterator to the beginning of the first key range
522 @param a_keys_last iterator to the end of the first key range
523 @param a_vals_first iterator to the beginning of the first value range
524 @param b_keys_first iterator to the beginning of the second key range
525 @param b_keys_last iterator to the end of the second key range
526 @param b_vals_first iterator to the beginning of the second value range
527 @param c_keys_first iterator to the beginning of the output key range
528 @param c_vals_first iterator to the beginning of the output value range
529 @param comp comparator
530 @param buf pointer to the temporary buffer
531
532 Performs a key-value merge that copies elements from
533 <tt>[a_keys_first, a_keys_last)</tt> and <tt>[b_keys_first, b_keys_last)</tt>
534 into a single range, <tt>[c_keys_first, c_keys_last + (a_keys_last - a_keys_first) + (b_keys_last - b_keys_first))</tt>
535 such that the resulting range is in ascending key order.
536
537 At the same time, the merge copies elements from the two associated ranges
538 <tt>[a_vals_first + (a_keys_last - a_keys_first))</tt> and
539 <tt>[b_vals_first + (b_keys_last - b_keys_first))</tt> into a single range,
540 <tt>[c_vals_first, c_vals_first + (a_keys_last - a_keys_first) + (b_keys_last - b_keys_first))</tt>
541 such that the resulting range is in ascending order
542 implied by each input element's associated key.
543
544 For example, assume:
545 + @c a_keys = {8, 1};
546 + @c a_vals = {1, 2};
547 + @c b_keys = {3, 7};
548 + @c b_vals = {3, 4};
549
550 After the merge, we have:
551 + @c c_keys = {1, 3, 7, 8}
552 + @c c_vals = {2, 3, 4, 1}
553
554 */
555 template<
556 typename P,
557 typename a_keys_it, typename a_vals_it,
558 typename b_keys_it, typename b_vals_it,
559 typename c_keys_it, typename c_vals_it,
560 typename C
561 >
cuda_merge_by_key(P && p,a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp,void * buf)562 void cuda_merge_by_key(
563 P&& p,
564 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
565 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
566 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp,
567 void* buf
568 ) {
569
570 unsigned a_count = std::distance(a_keys_first, a_keys_last);
571 unsigned b_count = std::distance(b_keys_first, b_keys_last);
572
573 if(a_count + b_count == 0) {
574 return;
575 }
576
577 detail::cuda_merge_loop(p,
578 a_keys_first, a_vals_first, a_count,
579 b_keys_first, b_vals_first, b_count,
580 c_keys_first, c_vals_first, comp,
581 buf
582 );
583 }
584
585 // ----------------------------------------------------------------------------
586 // key-only merge
587 // ----------------------------------------------------------------------------
588
589 //template<typename P,
590 // typename a_keys_it, typename b_keys_it, typename c_keys_it, typename C
591 //>
592 //void cuda_merge(
593 // P&& p,
594 // a_keys_it a_keys_first, a_keys_it a_keys_last,
595 // b_keys_it b_keys_first, b_keys_it b_keys_last,
596 // c_keys_it c_keys_first,
597 // C comp
598 //) {
599 // cuda_merge(
600 // p,
601 // a_keys_first, (const cudaEmpty*)nullptr, a_keys_last,
602 // b_keys_first, (const cudaEmpty*)nullptr, b_keys_last,
603 // c_keys_first, (cudaEmpty*)nullptr, comp
604 // );
605 //}
606
607 /**
608 @brief performs asynchronous key-only merge over a range of keys
609
610 @tparam P execution policy type
611 @tparam a_keys_it first key iterator type
612 @tparam b_keys_it second key iterator type
613 @tparam c_keys_it output key iterator type
614 @tparam C comparator type
615
616 @param p execution policy
617 @param a_keys_first iterator to the beginning of the first key range
618 @param a_keys_last iterator to the end of the first key range
619 @param b_keys_first iterator to the beginning of the second key range
620 @param b_keys_last iterator to the end of the second key range
621 @param c_keys_first iterator to the beginning of the output key range
622 @param comp comparator
623 @param buf pointer to the temporary buffer
624
625 This function is equivalent to tf::cuda_merge_by_key without values.
626
627 */
628 template<typename P,
629 typename a_keys_it, typename b_keys_it, typename c_keys_it, typename C
630 >
cuda_merge(P && p,a_keys_it a_keys_first,a_keys_it a_keys_last,b_keys_it b_keys_first,b_keys_it b_keys_last,c_keys_it c_keys_first,C comp,void * buf)631 void cuda_merge(
632 P&& p,
633 a_keys_it a_keys_first, a_keys_it a_keys_last,
634 b_keys_it b_keys_first, b_keys_it b_keys_last,
635 c_keys_it c_keys_first,
636 C comp,
637 void* buf
638 ) {
639 cuda_merge_by_key(
640 p,
641 a_keys_first, a_keys_last, (const cudaEmpty*)nullptr,
642 b_keys_first, b_keys_last, (const cudaEmpty*)nullptr,
643 c_keys_first, (cudaEmpty*)nullptr, comp,
644 buf
645 );
646 }
647
648 // ----------------------------------------------------------------------------
649 // cudaFlow merge algorithms
650 // ----------------------------------------------------------------------------
651
652 // Function: merge
653 template<typename A, typename B, typename C, typename Comp>
merge(A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)654 cudaTask cudaFlow::merge(
655 A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
656 ) {
657 return capture([=](cudaFlowCapturer& cap){
658 cap.make_optimizer<cudaLinearCapturing>();
659 cap.merge(a_first, a_last, b_first, b_last, c_first, comp);
660 });
661 }
662
663 // Function: merge
664 template<typename A, typename B, typename C, typename Comp>
merge(cudaTask task,A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)665 void cudaFlow::merge(
666 cudaTask task, A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
667 ) {
668 capture(task, [=](cudaFlowCapturer& cap){
669 cap.make_optimizer<cudaLinearCapturing>();
670 cap.merge(a_first, a_last, b_first, b_last, c_first, comp);
671 });
672 }
673
674 // Function: merge_by_key
675 template<
676 typename a_keys_it, typename a_vals_it,
677 typename b_keys_it, typename b_vals_it,
678 typename c_keys_it, typename c_vals_it,
679 typename C
680 >
merge_by_key(a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)681 cudaTask cudaFlow::merge_by_key(
682 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
683 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
684 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
685 ) {
686 return capture([=](cudaFlowCapturer& cap){
687 cap.make_optimizer<cudaLinearCapturing>();
688 cap.merge_by_key(
689 a_keys_first, a_keys_last, a_vals_first,
690 b_keys_first, b_keys_last, b_vals_first,
691 c_keys_first, c_vals_first,
692 comp
693 );
694 });
695 }
696
697 // Function: merge_by_key
698 template<
699 typename a_keys_it, typename a_vals_it,
700 typename b_keys_it, typename b_vals_it,
701 typename c_keys_it, typename c_vals_it,
702 typename C
703 >
merge_by_key(cudaTask task,a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)704 void cudaFlow::merge_by_key(
705 cudaTask task,
706 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
707 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
708 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
709 ) {
710 capture(task, [=](cudaFlowCapturer& cap){
711 cap.make_optimizer<cudaLinearCapturing>();
712 cap.merge_by_key(
713 a_keys_first, a_keys_last, a_vals_first,
714 b_keys_first, b_keys_last, b_vals_first,
715 c_keys_first, c_vals_first,
716 comp
717 );
718 });
719 }
720
721 // ----------------------------------------------------------------------------
722 // cudaFlowCapturer merge algorithms
723 // ----------------------------------------------------------------------------
724
725 // Function: merge
726 template<typename A, typename B, typename C, typename Comp>
merge(A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)727 cudaTask cudaFlowCapturer::merge(
728 A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
729 ) {
730 // TODO
731 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
732 std::distance(a_first, a_last), std::distance(b_first, b_last)
733 );
734
735 return on([=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
736 (cudaStream_t stream) mutable {
737 cuda_merge(cudaDefaultExecutionPolicy{stream},
738 a_first, a_last, b_first, b_last, c_first, comp, buf.get().data()
739 );
740 });
741 }
742
743 // Procedure: merge (update)
744 template<typename A, typename B, typename C, typename Comp>
merge(cudaTask task,A a_first,A a_last,B b_first,B b_last,C c_first,Comp comp)745 void cudaFlowCapturer::merge(
746 cudaTask task, A a_first, A a_last, B b_first, B b_last, C c_first, Comp comp
747 ) {
748 // TODO
749 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
750 std::distance(a_first, a_last), std::distance(b_first, b_last)
751 );
752
753 on(task, [=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
754 (cudaStream_t stream) mutable {
755 cuda_merge(cudaDefaultExecutionPolicy{stream},
756 a_first, a_last, b_first, b_last, c_first, comp, buf.get().data()
757 );
758 });
759 }
760
761 // Function: merge_by_key
762 template<
763 typename a_keys_it, typename a_vals_it,
764 typename b_keys_it, typename b_vals_it,
765 typename c_keys_it, typename c_vals_it,
766 typename C
767 >
merge_by_key(a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)768 cudaTask cudaFlowCapturer::merge_by_key(
769 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
770 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
771 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
772 ) {
773
774 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
775 std::distance(a_keys_first, a_keys_last),
776 std::distance(b_keys_first, b_keys_last)
777 );
778
779 return on([=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
780 (cudaStream_t stream) mutable {
781 cuda_merge_by_key(cudaDefaultExecutionPolicy{stream},
782 a_keys_first, a_keys_last, a_vals_first,
783 b_keys_first, b_keys_last, b_vals_first,
784 c_keys_first, c_vals_first,
785 comp,
786 buf.get().data()
787 );
788 });
789 }
790
791 // Function: merge_by_key
792 template<
793 typename a_keys_it, typename a_vals_it,
794 typename b_keys_it, typename b_vals_it,
795 typename c_keys_it, typename c_vals_it,
796 typename C
797 >
merge_by_key(cudaTask task,a_keys_it a_keys_first,a_keys_it a_keys_last,a_vals_it a_vals_first,b_keys_it b_keys_first,b_keys_it b_keys_last,b_vals_it b_vals_first,c_keys_it c_keys_first,c_vals_it c_vals_first,C comp)798 void cudaFlowCapturer::merge_by_key(
799 cudaTask task,
800 a_keys_it a_keys_first, a_keys_it a_keys_last, a_vals_it a_vals_first,
801 b_keys_it b_keys_first, b_keys_it b_keys_last, b_vals_it b_vals_first,
802 c_keys_it c_keys_first, c_vals_it c_vals_first, C comp
803 ) {
804
805 auto bufsz = cuda_merge_buffer_size<cudaDefaultExecutionPolicy>(
806 std::distance(a_keys_first, a_keys_last),
807 std::distance(b_keys_first, b_keys_last)
808 );
809
810 on(task, [=, buf=MoC{cudaScopedDeviceMemory<std::byte>(bufsz)}]
811 (cudaStream_t stream) mutable {
812 cuda_merge_by_key(cudaDefaultExecutionPolicy{stream},
813 a_keys_first, a_keys_last, a_vals_first,
814 b_keys_first, b_keys_last, b_vals_first,
815 c_keys_first, c_vals_first,
816 comp,
817 buf.get().data()
818 );
819 });
820 }
821
822
823
824 } // end of namespace tf -----------------------------------------------------
825