1 //------------------------------------------------------------------------------
2 // templates/GB_AxB_cuda_dot3_phase2: fill the global buckets
3 //------------------------------------------------------------------------------
4
5 // TODO describe me
6
7 #define GB_KERNEL
8 #include <cstdint>
9 #include "GB_cuda_buckets.h"
10 #include "matrix.h"
11 #include <cooperative_groups.h>
12 #include "local_cub/block/block_scan.cuh"
13
14 using namespace cooperative_groups;
15
16 // A stateful callback functor that maintains a running prefix to be applied
17 // during consecutive scan operations.
18 struct BlockPrefixCallbackOp
19 {
20 // Running prefix
21 int64_t running_total;
22 // Constructor
BlockPrefixCallbackOpBlockPrefixCallbackOp23 __device__ BlockPrefixCallbackOp(int64_t running_total) : running_total(running_total) {}
24
25 // Callback operator to be entered by the first warp of threads in the block.
26 // Thread-0 is responsible for returning a value for seeding the block-wide scan.
operator ()BlockPrefixCallbackOp27 __device__ int64_t operator()(int64_t block_aggregate)
28 {
29 int64_t old_prefix = running_total;
30 running_total += block_aggregate;
31 return old_prefix;
32 }
33 };
34
35 __inline__
blockBucketExclusiveSum(int bucketId,int64_t * d_data,int nblocks)36 __device__ void blockBucketExclusiveSum(int bucketId, int64_t *d_data, int nblocks)
37 {
38 #define blocksize 32
39
40 // Specialize BlockScan for a 1D block of 32 threads
41 typedef cub::BlockScan<int64_t, 32, cub::BLOCK_SCAN_WARP_SCANS> BlockScan;
42
43 // Allocate shared memory for BlockScan
44 __shared__ typename BlockScan::TempStorage temp_storage;
45
46 // Initialize running total
47 BlockPrefixCallbackOp prefix_op(0);
48
49 // Have the block iterate over segments of items
50 int64_t data=0;
51
52 int64_t *blockbucket= d_data;
53
54 for (int block_id = 0; block_id < nblocks; block_id += blocksize)
55 {
56 // Load a segment of consecutive items that are blocked across threads
57
58 //printf("block %d entering sum\n",blockIdx.x);
59 int loc = block_id + threadIdx.x;
60 if ( loc < nblocks)
61 {
62 //printf("block %di loading tid=%d\n",block_id,tid);
63 data = blockbucket[bucketId*nblocks +loc ] ;
64 }
65 __syncthreads();
66
67 //printf("bb%d_%d s0 before prefix= %ld \n", block_id,bucketId,
68 // blockbucket[bucketId*nblocks + block_id+threadIdx.x] ) ;
69 // Collectively compute the block-wide exclusive prefix sum
70 BlockScan(temp_storage).ExclusiveSum( data, data, prefix_op);
71 __syncthreads();
72
73 if ( loc < nblocks)
74 {
75 blockbucket[bucketId*nblocks +loc ] = data ;
76 }
77 __syncthreads();
78
79 //printf("bb%d_%d = %ld \n", block_id, bucketId, blockbucket[bucketId*nblocks+block_id+threadIdx.x] ) ;
80
81 data = 0;
82 }
83 }
84
85
86 template< typename T, int tile_sz>
87 __inline__ __device__
warp_ReduceSumPlus(thread_block_tile<tile_sz> tile,T val)88 T warp_ReduceSumPlus( thread_block_tile<tile_sz> tile, T val)
89 {
90 // Each iteration halves the number of active threads
91 // Each thread adds its partial sum[i] to sum[lane+i]
92 for (int i = tile.size() / 2; i > 0; i /= 2) {
93 val += tile.shfl_down( val, i);
94 }
95 return val; // note: only thread 0 will return full sum
96 }
97
98 template<typename T, int warpSize>
99 __inline__ __device__
block_ReduceSum(thread_block g,T val)100 T block_ReduceSum(thread_block g, T val)
101 {
102 static __shared__ T shared[warpSize]; // Shared mem for 32 partial sums
103 int lane = threadIdx.x % warpSize;
104 int wid = threadIdx.x / warpSize;
105 thread_block_tile<warpSize> tile = tiled_partition<warpSize>( g );
106
107 // Each warp performs partial reduction
108 val = warp_ReduceSumPlus<T, warpSize>( tile, val);
109
110 // Wait for all partial reductions
111 if (lane==0) {
112 //printf("thd%d warp%d sum is %d\n", threadIdx.x, wid, val);
113 shared[wid]=val; // Write reduced value to shared memory
114 //printf("thd%d stored warp %d sum %d\n", threadIdx.x, wid, val);
115 }
116 __syncthreads(); // Wait for all partial reductions
117
118 if (wid > 0 ) return val ;
119 //Final reduce within first warp
120 if (wid==0) val = warp_ReduceSumPlus<T, warpSize>( tile, val) ;
121
122 return val;
123 }
124
125 // GB_AxB_cuda_dot3_phase2 is a CUDA kernel that takes as input the
126 // nanobuckets and blockbucket arrays computed by the first phase kernel,
127 // GB_AxB_cuda_dot3_phase1. The launch geometry of this kernel must match the
128 // GB_AxB_cuda_dot3_phase1 kernel, with the same # of threads and threadblocks.
129
130 __global__
GB_AxB_dot3_phase2(int64_t * __restrict__ nanobuckets,int64_t * __restrict__ blockbucket,int64_t * __restrict__ bucketp,int64_t * __restrict__ bucket,int64_t * __restrict__ offset,GrB_Matrix C,const int64_t cnz,const int nblocks)131 void GB_AxB_dot3_phase2
132 (
133 // input, not modified:
134 int64_t *__restrict__ nanobuckets, // array of size 12-blockDim.x-by-nblocks
135 int64_t *__restrict__ blockbucket, // global bucket count, of size 12*nblocks
136 // output:
137 int64_t *__restrict__ bucketp, // global bucket cumsum, of size 13
138 int64_t *__restrict__ bucket, // global buckets, of size cnz (== mnz)
139 int64_t *__restrict__ offset, // global offsets, for each bucket
140 // inputs, not modified:
141 GrB_Matrix C, // output matrix
142 const int64_t cnz, // number of entries in C and M
143 const int nblocks // input number of blocks to reduce
144 )
145 {
146
147 //--------------------------------------------------------------------------
148 // get C and M
149 //--------------------------------------------------------------------------
150
151 //int64_t *Ci = C->i ; // for zombies, or bucket assignment
152
153 // Ci [p] for an entry C(i,j) contains either GB_FLIP(i) if C(i,j) is a
154 // zombie, or (k << 4) + bucket otherwise, where C(:,j) is the kth vector
155 // of C (j = Ch [k] if hypersparse or j = k if standard sparse), and
156 // where bucket is the bucket assignment for C(i,j). This phase does not
157 // need k, just the bucket for each entry C(i,j).
158
159 //--------------------------------------------------------------------------
160 // sum up the bucket counts of prior threadblocks
161 //--------------------------------------------------------------------------
162
163 // blockbucket is an array of size 12-by-nblocks, held by row. The
164 // entry blockbucket [bucket * nblocks + t] holds the # of entries
165 // in the bucket (in range 0 to 11) found by threadblock t.
166
167
168 //__shared__ uint64_t offset [12] ;
169 uint64_t s_0=0;
170 uint64_t s_1=0;
171 uint64_t s_2=0;
172 uint64_t s_3=0;
173 uint64_t s_4=0;
174 uint64_t s_5=0;
175 uint64_t s_6=0;
176 uint64_t s_7=0;
177 uint64_t s_8=0;
178 uint64_t s_9=0;
179 uint64_t s_10=0;
180 uint64_t s_11=0;
181
182 thread_block_tile<32> tile = tiled_partition<32>(this_thread_block() );
183
184 //printf("block %d entering sum\n",blockIdx.x);
185 int tid = threadIdx.x + blockIdx.x*blockDim.x;
186 #define reduceBucket( B ) \
187 for( tid = threadIdx.x + blockIdx.x*blockDim.x; \
188 tid < nblocks; \
189 tid += blockDim.x*gridDim.x) \
190 { \
191 s_ ## B += blockbucket[ B *nblocks +tid] ; \
192 } \
193 __syncthreads(); \
194 s_ ## B = warp_ReduceSumPlus<uint64_t , 32>( tile, s_ ## B);
195
196 reduceBucket( 0 )
197 reduceBucket( 1 )
198 reduceBucket( 2 )
199 reduceBucket( 3 )
200 reduceBucket( 4 )
201 reduceBucket( 5 )
202 reduceBucket( 6 )
203 reduceBucket( 7 )
204 reduceBucket( 8 )
205 reduceBucket( 9 )
206 reduceBucket( 10 )
207 reduceBucket( 11 )
208
209
210 //printf("summing blk,tid=%d,%d\n",blockIdx.x,threadIdx.x);
211 if (threadIdx.x ==0 )
212 {
213 atomicAdd( (unsigned long long int*)&(offset[0]), s_0);
214 atomicAdd( (unsigned long long int*)&(offset[1]), s_1);
215 atomicAdd( (unsigned long long int*)&(offset[2]), s_2);
216 atomicAdd( (unsigned long long int*)&(offset[3]), s_3);
217 atomicAdd( (unsigned long long int*)&(offset[4]), s_4);
218 atomicAdd( (unsigned long long int*)&(offset[5]), s_5);
219 atomicAdd( (unsigned long long int*)&(offset[6]), s_6);
220 atomicAdd( (unsigned long long int*)&(offset[7]), s_7);
221 atomicAdd( (unsigned long long int*)&(offset[8]), s_8);
222 atomicAdd( (unsigned long long int*)&(offset[9]), s_9);
223 atomicAdd( (unsigned long long int*)&(offset[10]),s_10);
224 atomicAdd( (unsigned long long int*)&(offset[11]),s_11);
225 }
226 __syncthreads();
227
228
229
230 if( gridDim.x >= 12)
231 {
232 // Cumulative sum across blocks for each bucket
233 if (blockIdx.x <12)
234 blockBucketExclusiveSum( blockIdx.x, blockbucket, nblocks ) ;
235 }
236 else
237 {
238 if (blockIdx.x == 0)
239 {
240 blockBucketExclusiveSum( 0, blockbucket, nblocks ) ;
241 blockBucketExclusiveSum( 1, blockbucket, nblocks ) ;
242 blockBucketExclusiveSum( 2, blockbucket, nblocks ) ;
243 blockBucketExclusiveSum( 3, blockbucket, nblocks ) ;
244 blockBucketExclusiveSum( 4, blockbucket, nblocks ) ;
245 blockBucketExclusiveSum( 5, blockbucket, nblocks ) ;
246 blockBucketExclusiveSum( 6, blockbucket, nblocks ) ;
247 blockBucketExclusiveSum( 7, blockbucket, nblocks ) ;
248 blockBucketExclusiveSum( 8, blockbucket, nblocks ) ;
249 blockBucketExclusiveSum( 9, blockbucket, nblocks ) ;
250 blockBucketExclusiveSum( 10, blockbucket, nblocks) ;
251 blockBucketExclusiveSum( 11, blockbucket, nblocks) ;
252 }
253 }
254
255
256
257
258 //--------------------------------------------------------------------------
259 // last threadblock saves the cumsum of the 12 global buckets
260 //--------------------------------------------------------------------------
261 /* do on cpu
262 if (blockIdx.x == 0) // gridDim.x - 1)
263 {
264
265 // the last threadblock: compute all 12 global bucket sizes, and its
266 // cumulative sum
267 if (threadIdx.x == 0)
268 {
269 // the work in this last threadblock is single-threaded
270 uint64_t s = 0;
271 for (int bucket = 0 ; bucket < 12 ; bucket++)
272 {
273 // write the global cumsum of all buckets to the final global
274 // bucketp. bucketp [bucket] is the starting position in
275 // the bucket.
276 bucketp [bucket] = s ;
277
278 // bucket_size is the total # of entries in this bucket, for
279 // all threadblocks. It has nearly been computed already,
280 // since offset [bucket] = sum (blockbucket (bucket,0:blockDim.x-1)).
281 // All that is left is to add the counts for the last threadblock.`
282 //int64_t global_bucket_size = offset [bucket];
283 // + blockbucket [bucket * gridDim.x + blockIdx.x] ;
284
285 //printf("bucketp[%d]= %ld\n",bucket, s);
286 // s is a cumulative sum of the global bucket sizes
287 s += offset[bucket]; // global_bucket_size ;
288 }
289 // The kth global bucket (for k = 0 to 11) appears in:
290 // bucket [bucketp [k]... bucketp [k+1]-1],
291 // so the end of the last bucket needs bucketp [12].
292 bucketp [12] = (int64_t)s;
293 //printf("bucketp[12]= %ld\n", s);
294 // all entries in C now appear in the buckets.
295 // ASSERT (s == cnz) ;
296 }
297 __syncthreads ( ) ;
298 }
299 */
300
301 } // phase2
302
303
304 __global__
GB_AxB_dot3_phase2end(int64_t * __restrict__ nanobuckets,const int64_t * __restrict__ blockbucket,const int64_t * __restrict__ bucketp,int64_t * __restrict__ bucket,const int64_t * __restrict__ offset,const GrB_Matrix C,const int64_t cnz)305 void GB_AxB_dot3_phase2end
306 (
307 // input, not modified:
308 int64_t *__restrict__ nanobuckets, // array of size 12-blockDim.x-by-nblocks
309 const int64_t *__restrict__ blockbucket, // global bucket count, of size 12*nblocks
310 // output:
311 const int64_t *__restrict__ bucketp, // global bucket cumsum, of size 13
312 int64_t *__restrict__ bucket, // global buckets, of size cnz (== mnz)
313 const int64_t *__restrict__ offset, // global offsets, for each bucket
314 // inputs, not modified:
315 const GrB_Matrix C, // output matrix
316 const int64_t cnz // number of entries in C and M
317 )
318 {
319
320
321 int64_t *__restrict__ Ci = C->i ; // for zombies, or bucket assignment
322 int64_t *__restrict__ Mp = C->p ; // for offset calculations
323 int64_t mnvec = C->nvec;
324
325 //--------------------------------------------------------------------------
326 // load and shift the nanobuckets for this thread block
327 //--------------------------------------------------------------------------
328
329 // The taskbucket for this threadblock is an array of size
330 // 12-by-blockDim.x, held by row. It forms a 2D array within the 3D
331 // nanobuckets array.
332 int64_t *__restrict__ taskbucket = nanobuckets + blockIdx.x * (12 * blockDim.x) ;
333
334 //printf("block%d thd%d blockbucket= %ld\n", blockIdx.x, threadIdx.x,
335 // blockbucket[blockIdx.x*gridDim.x+blockIdx.x]);
336
337 // Each thread in this threadblock owns one column of this taskbucket, for
338 // its set of 12 nanobuckets. The nanobuckets are a column of length 12,
339 // with stride equal to blockDim.x.
340 int64_t *__restrict__ nanobucket = taskbucket + threadIdx.x;
341
342 // Each thread loads its 12 nanobucket values into registers.
343 #define LOAD_NANOBUCKET(bucket) \
344 int64_t my_bucket_ ## bucket = \
345 nanobucket [bucket * blockDim.x] \
346 + blockbucket [bucket * gridDim.x + blockIdx.x]\
347 + bucketp [bucket] ;
348
349 LOAD_NANOBUCKET (0) ;
350 LOAD_NANOBUCKET (1) ;
351 LOAD_NANOBUCKET (2) ;
352 LOAD_NANOBUCKET (3) ;
353 LOAD_NANOBUCKET (4) ;
354 LOAD_NANOBUCKET (5) ;
355 LOAD_NANOBUCKET (6) ;
356 LOAD_NANOBUCKET (7) ;
357 LOAD_NANOBUCKET (8) ;
358 LOAD_NANOBUCKET (9) ;
359 LOAD_NANOBUCKET (10) ;
360 LOAD_NANOBUCKET (11) ;
361
362 // Now each thread has an index into the global set of 12 buckets,
363 // held in bucket, of where to place its own entries.
364
365 //--------------------------------------------------------------------------
366 // construct the global buckets
367 //--------------------------------------------------------------------------
368
369 // The slice for task blockIdx.x contains entries pfirst:plast-1 of M and
370 // C, which is the part of C operated on by this threadblock.
371 int64_t pfirst, plast ;
372
373 /*
374 for ( int tid_global = threadIdx.x + blockIdx.x * blockDim.x ;
375 tid_global < (mnvec+7)/8 ;
376 tid_global += blockDim.x * gridDim.x)
377 */
378 int chunk_max= (cnz + chunksize -1)/chunksize;
379 for ( int chunk = blockIdx.x;
380 chunk < chunk_max;
381 chunk += gridDim.x )
382 {
383
384 //GB_PARTITION (pfirst, plast, cnz, tid_global, (mnvec+7)/8 ) ;
385 pfirst = chunksize * chunk ;
386 plast = GB_IMIN( chunksize * (chunk+1), cnz ) ;
387
388 int chunk_end;
389 if ( cnz > chunksize) chunk_end = GB_IMIN( chunksize,
390 cnz - chunksize*(chunk) );
391 else chunk_end = cnz;
392
393 // find the first vector of the slice for task blockIdx.x: the
394 // vector that owns the entry Ai [pfirst] and Ax [pfirst].
395 //kfirst = GB_search_for_vector_device (pfirst, Mp, 0, mnvec) ;
396
397 // find the last vector of the slice for task blockIdx.x: the
398 // vector that owns the entry Ai [plast-1] and Ax [plast-1].
399 //klast = GB_search_for_vector_device (plast-1, Mp, kfirst, mnvec) ;
400
401
402 for ( int p = pfirst + threadIdx.x;
403 p < pfirst + chunk_end;
404 p += blockDim.x )
405 {
406 // get the entry C(i,j), and extract its bucket. Then
407 // place the entry C(i,j) in the global bucket it belongs to.
408
409 // TODO: these writes to global are not coalesced. Instead: each
410 // threadblock could buffer its writes to 12 buffers and when the
411 // buffers are full they can be written to global.
412 int ibucket = Ci[p] & 0xF;
413 //printf(" thd: %d p,Ci[p] = %ld,%ld,%d\n", threadIdx.x, p, Ci[p], irow );
414 switch (ibucket)
415 {
416 case 0: bucket [my_bucket_0++ ] = p ; Ci[p] = Ci[p] >>4; break ; //unshift zombies
417 case 1: bucket [my_bucket_1++ ] = p ; break ;
418 case 2: bucket [my_bucket_2++ ] = p ; break ;
419 case 3: bucket [my_bucket_3++ ] = p ; break ;
420 case 4: bucket [my_bucket_4++ ] = p ; break ;
421 case 5: bucket [my_bucket_5++ ] = p ; break ;
422 case 6: bucket [my_bucket_6++ ] = p ; break ;
423 case 7: bucket [my_bucket_7++ ] = p ; break ;
424 case 8: bucket [my_bucket_8++ ] = p ; break ;
425 case 9: bucket [my_bucket_9++ ] = p ; break ;
426 case 10: bucket [my_bucket_10++] = p ; break ;
427 case 11: bucket [my_bucket_11++] = p ; break ;
428 default: break;
429 }
430
431 }
432 //__syncthreads();
433 }
434
435 }
436
437