1 //------------------------------------------------------------------------------
2 // templates/GB_AxB_cuda_dot3_phase2: fill the global buckets
3 //------------------------------------------------------------------------------
4 
5 // TODO describe me
6 
7 #define GB_KERNEL
8 #include <cstdint>
9 #include "GB_cuda_buckets.h"
10 #include "matrix.h"
11 #include <cooperative_groups.h>
12 #include "local_cub/block/block_scan.cuh"
13 
14 using namespace cooperative_groups;
15 
16 // A stateful callback functor that maintains a running prefix to be applied
17 // during consecutive scan operations.
18 struct BlockPrefixCallbackOp
19 {
20    // Running prefix
21    int64_t running_total;
22    // Constructor
BlockPrefixCallbackOpBlockPrefixCallbackOp23    __device__ BlockPrefixCallbackOp(int64_t running_total) : running_total(running_total) {}
24 
25    // Callback operator to be entered by the first warp of threads in the block.
26    // Thread-0 is responsible for returning a value for seeding the block-wide scan.
operator ()BlockPrefixCallbackOp27    __device__ int64_t operator()(int64_t block_aggregate)
28    {
29      int64_t old_prefix = running_total;
30      running_total += block_aggregate;
31      return old_prefix;
32    }
33 };
34 
35 __inline__
blockBucketExclusiveSum(int bucketId,int64_t * d_data,int nblocks)36 __device__ void blockBucketExclusiveSum(int bucketId, int64_t *d_data, int nblocks)
37 {
38    #define blocksize  32
39 
40    // Specialize BlockScan for a 1D block of 32 threads
41    typedef cub::BlockScan<int64_t, 32, cub::BLOCK_SCAN_WARP_SCANS> BlockScan;
42 
43    // Allocate shared memory for BlockScan
44    __shared__ typename BlockScan::TempStorage temp_storage;
45 
46    // Initialize running total
47    BlockPrefixCallbackOp prefix_op(0);
48 
49    // Have the block iterate over segments of items
50    int64_t data=0;
51 
52    int64_t *blockbucket= d_data;
53 
54    for (int block_id = 0; block_id < nblocks; block_id += blocksize)
55    {
56     // Load a segment of consecutive items that are blocked across threads
57 
58     //printf("block %d entering sum\n",blockIdx.x);
59       int loc = block_id + threadIdx.x;
60       if ( loc < nblocks)
61       {
62         //printf("block %di loading tid=%d\n",block_id,tid);
63         data  = blockbucket[bucketId*nblocks    +loc ] ;
64       }
65       __syncthreads();
66 
67       //printf("bb%d_%d s0 before prefix= %ld \n", block_id,bucketId,
68       //                     blockbucket[bucketId*nblocks + block_id+threadIdx.x] )  ;
69       // Collectively compute the block-wide exclusive prefix sum
70       BlockScan(temp_storage).ExclusiveSum( data, data, prefix_op);
71       __syncthreads();
72 
73       if ( loc < nblocks)
74       {
75         blockbucket[bucketId*nblocks   +loc ]  = data  ;
76       }
77       __syncthreads();
78 
79         //printf("bb%d_%d = %ld \n", block_id, bucketId, blockbucket[bucketId*nblocks+block_id+threadIdx.x] )  ;
80 
81       data = 0;
82    }
83 }
84 
85 
86 template< typename T, int tile_sz>
87 __inline__ __device__
warp_ReduceSumPlus(thread_block_tile<tile_sz> tile,T val)88 T warp_ReduceSumPlus( thread_block_tile<tile_sz> tile, T val)
89 {
90     // Each iteration halves the number of active threads
91     // Each thread adds its partial sum[i] to sum[lane+i]
92     for (int i = tile.size() / 2; i > 0; i /= 2) {
93         val +=  tile.shfl_down( val, i);
94     }
95     return val; // note: only thread 0 will return full sum
96 }
97 
98 template<typename T, int warpSize>
99 __inline__ __device__
block_ReduceSum(thread_block g,T val)100 T block_ReduceSum(thread_block g, T val)
101 {
102   static __shared__ T shared[warpSize]; // Shared mem for 32 partial sums
103   int lane = threadIdx.x % warpSize;
104   int wid = threadIdx.x / warpSize;
105   thread_block_tile<warpSize> tile = tiled_partition<warpSize>( g );
106 
107   // Each warp performs partial reduction
108   val = warp_ReduceSumPlus<T, warpSize>( tile, val);
109 
110   // Wait for all partial reductions
111   if (lane==0) {
112      //printf("thd%d warp%d sum is %d\n", threadIdx.x, wid, val);
113      shared[wid]=val; // Write reduced value to shared memory
114      //printf("thd%d stored warp %d sum %d\n", threadIdx.x, wid, val);
115   }
116   __syncthreads();              // Wait for all partial reductions
117 
118   if (wid > 0 ) return val ;
119   //Final reduce within first warp
120   if (wid==0) val = warp_ReduceSumPlus<T, warpSize>( tile, val) ;
121 
122   return val;
123 }
124 
125 // GB_AxB_cuda_dot3_phase2 is a CUDA kernel that takes as input the
126 // nanobuckets and blockbucket arrays computed by the first phase kernel,
127 // GB_AxB_cuda_dot3_phase1.  The launch geometry of this kernel must match the
128 // GB_AxB_cuda_dot3_phase1 kernel, with the same # of threads and threadblocks.
129 
130 __global__
GB_AxB_dot3_phase2(int64_t * __restrict__ nanobuckets,int64_t * __restrict__ blockbucket,int64_t * __restrict__ bucketp,int64_t * __restrict__ bucket,int64_t * __restrict__ offset,GrB_Matrix C,const int64_t cnz,const int nblocks)131 void GB_AxB_dot3_phase2
132 (
133     // input, not modified:
134     int64_t *__restrict__ nanobuckets,    // array of size 12-blockDim.x-by-nblocks
135     int64_t *__restrict__ blockbucket,    // global bucket count, of size 12*nblocks
136     // output:
137     int64_t *__restrict__ bucketp,        // global bucket cumsum, of size 13
138     int64_t *__restrict__ bucket,         // global buckets, of size cnz (== mnz)
139     int64_t *__restrict__ offset,         // global offsets, for each bucket
140     // inputs, not modified:
141     GrB_Matrix C,             // output matrix
142     const int64_t cnz,        // number of entries in C and M
143     const int nblocks         // input number of blocks to reduce
144 )
145 {
146 
147     //--------------------------------------------------------------------------
148     // get C and M
149     //--------------------------------------------------------------------------
150 
151     //int64_t *Ci = C->i ;       // for zombies, or bucket assignment
152 
153     // Ci [p] for an entry C(i,j) contains either GB_FLIP(i) if C(i,j) is a
154     // zombie, or (k << 4) + bucket otherwise, where C(:,j) is the kth vector
155     // of C (j = Ch [k] if hypersparse or j = k if standard sparse), and
156     // where bucket is the bucket assignment for C(i,j).  This phase does not
157     // need k, just the bucket for each entry C(i,j).
158 
159     //--------------------------------------------------------------------------
160     // sum up the bucket counts of prior threadblocks
161     //--------------------------------------------------------------------------
162 
163     // blockbucket is an array of size 12-by-nblocks, held by row.  The
164     // entry blockbucket [bucket * nblocks + t] holds the # of entries
165     // in the bucket (in range 0 to 11) found by threadblock t.
166 
167 
168     //__shared__ uint64_t offset [12] ;
169     uint64_t s_0=0;
170     uint64_t s_1=0;
171     uint64_t s_2=0;
172     uint64_t s_3=0;
173     uint64_t s_4=0;
174     uint64_t s_5=0;
175     uint64_t s_6=0;
176     uint64_t s_7=0;
177     uint64_t s_8=0;
178     uint64_t s_9=0;
179     uint64_t s_10=0;
180     uint64_t s_11=0;
181 
182     thread_block_tile<32> tile = tiled_partition<32>(this_thread_block() );
183 
184     //printf("block %d entering sum\n",blockIdx.x);
185     int tid = threadIdx.x  + blockIdx.x*blockDim.x;
186     #define reduceBucket( B )    \
187      for( tid = threadIdx.x + blockIdx.x*blockDim.x; \
188           tid < nblocks;  \
189           tid += blockDim.x*gridDim.x) \
190      {                           \
191         s_ ## B  += blockbucket[  B *nblocks +tid] ;  \
192      } \
193      __syncthreads(); \
194      s_ ## B  = warp_ReduceSumPlus<uint64_t , 32>( tile, s_ ## B);
195 
196      reduceBucket( 0 )
197      reduceBucket( 1 )
198      reduceBucket( 2 )
199      reduceBucket( 3 )
200      reduceBucket( 4 )
201      reduceBucket( 5 )
202      reduceBucket( 6 )
203      reduceBucket( 7 )
204      reduceBucket( 8 )
205      reduceBucket( 9 )
206      reduceBucket( 10 )
207      reduceBucket( 11 )
208 
209 
210         //printf("summing blk,tid=%d,%d\n",blockIdx.x,threadIdx.x);
211        if (threadIdx.x ==0 )
212        {
213           atomicAdd( (unsigned long long int*)&(offset[0]), s_0);
214           atomicAdd( (unsigned long long int*)&(offset[1]), s_1);
215           atomicAdd( (unsigned long long int*)&(offset[2]), s_2);
216           atomicAdd( (unsigned long long int*)&(offset[3]), s_3);
217           atomicAdd( (unsigned long long int*)&(offset[4]), s_4);
218           atomicAdd( (unsigned long long int*)&(offset[5]), s_5);
219           atomicAdd( (unsigned long long int*)&(offset[6]), s_6);
220           atomicAdd( (unsigned long long int*)&(offset[7]), s_7);
221           atomicAdd( (unsigned long long int*)&(offset[8]), s_8);
222           atomicAdd( (unsigned long long int*)&(offset[9]), s_9);
223           atomicAdd( (unsigned long long int*)&(offset[10]),s_10);
224           atomicAdd( (unsigned long long int*)&(offset[11]),s_11);
225        }
226        __syncthreads();
227 
228 
229 
230     if( gridDim.x >= 12)
231     {
232         // Cumulative sum across blocks for each bucket
233         if (blockIdx.x <12)
234            blockBucketExclusiveSum( blockIdx.x, blockbucket, nblocks ) ;
235     }
236     else
237     {
238         if (blockIdx.x == 0)
239         {
240            blockBucketExclusiveSum( 0, blockbucket, nblocks ) ;
241            blockBucketExclusiveSum( 1, blockbucket, nblocks ) ;
242            blockBucketExclusiveSum( 2, blockbucket, nblocks ) ;
243            blockBucketExclusiveSum( 3, blockbucket, nblocks ) ;
244            blockBucketExclusiveSum( 4, blockbucket, nblocks ) ;
245            blockBucketExclusiveSum( 5, blockbucket, nblocks ) ;
246            blockBucketExclusiveSum( 6, blockbucket, nblocks ) ;
247            blockBucketExclusiveSum( 7, blockbucket, nblocks ) ;
248            blockBucketExclusiveSum( 8, blockbucket, nblocks ) ;
249            blockBucketExclusiveSum( 9, blockbucket, nblocks ) ;
250            blockBucketExclusiveSum( 10, blockbucket, nblocks) ;
251            blockBucketExclusiveSum( 11, blockbucket, nblocks) ;
252         }
253     }
254 
255 
256 
257 
258     //--------------------------------------------------------------------------
259     // last threadblock saves the cumsum of the 12 global buckets
260     //--------------------------------------------------------------------------
261     /* do on cpu
262     if (blockIdx.x == 0) // gridDim.x - 1)
263     {
264 
265         // the last threadblock: compute all 12 global bucket sizes, and its
266         // cumulative sum
267         if (threadIdx.x == 0)
268         {
269             // the work in this last threadblock is single-threaded
270             uint64_t s = 0;
271             for (int bucket = 0 ; bucket < 12 ; bucket++)
272             {
273                 // write the global cumsum of all buckets to the final global
274                 // bucketp.  bucketp [bucket] is the starting position in
275                 // the bucket.
276                 bucketp [bucket] = s ;
277 
278                 // bucket_size is the total # of entries in this bucket, for
279                 // all threadblocks.  It has nearly been computed already,
280                 // since offset [bucket] = sum (blockbucket (bucket,0:blockDim.x-1)).
281                 // All that is left is to add the counts for the last threadblock.`
282                 //int64_t global_bucket_size = offset [bucket];
283                      // + blockbucket [bucket * gridDim.x + blockIdx.x] ;
284 
285                 //printf("bucketp[%d]= %ld\n",bucket, s);
286                 // s is a cumulative sum of the global bucket sizes
287                 s += offset[bucket]; // global_bucket_size ;
288             }
289             // The kth global bucket (for k = 0 to 11) appears in:
290             // bucket [bucketp [k]... bucketp [k+1]-1],
291             // so the end of the last bucket needs bucketp [12].
292             bucketp [12] = (int64_t)s;
293                 //printf("bucketp[12]= %ld\n", s);
294             // all entries in C now appear in the buckets.
295             // ASSERT (s == cnz) ;
296         }
297         __syncthreads ( ) ;
298     }
299     */
300 
301 } // phase2
302 
303 
304 __global__
GB_AxB_dot3_phase2end(int64_t * __restrict__ nanobuckets,const int64_t * __restrict__ blockbucket,const int64_t * __restrict__ bucketp,int64_t * __restrict__ bucket,const int64_t * __restrict__ offset,const GrB_Matrix C,const int64_t cnz)305 void GB_AxB_dot3_phase2end
306 (
307     // input, not modified:
308     int64_t *__restrict__ nanobuckets,    // array of size 12-blockDim.x-by-nblocks
309     const int64_t *__restrict__ blockbucket,    // global bucket count, of size 12*nblocks
310     // output:
311     const int64_t *__restrict__ bucketp,        // global bucket cumsum, of size 13
312     int64_t *__restrict__ bucket,         // global buckets, of size cnz (== mnz)
313     const int64_t *__restrict__ offset,        // global offsets, for each bucket
314     // inputs, not modified:
315     const GrB_Matrix C,            // output matrix
316     const int64_t cnz        // number of entries in C and M
317 )
318 {
319 
320 
321     int64_t *__restrict__ Ci = C->i ;       // for zombies, or bucket assignment
322     int64_t *__restrict__ Mp = C->p ;       // for offset calculations
323     int64_t mnvec = C->nvec;
324 
325     //--------------------------------------------------------------------------
326     // load and shift the nanobuckets for this thread block
327     //--------------------------------------------------------------------------
328 
329     // The taskbucket for this threadblock is an array of size
330     // 12-by-blockDim.x, held by row.  It forms a 2D array within the 3D
331     // nanobuckets array.
332     int64_t *__restrict__ taskbucket = nanobuckets + blockIdx.x * (12 * blockDim.x) ;
333 
334     //printf("block%d thd%d blockbucket= %ld\n", blockIdx.x, threadIdx.x,
335     //                                           blockbucket[blockIdx.x*gridDim.x+blockIdx.x]);
336 
337     // Each thread in this threadblock owns one column of this taskbucket, for
338     // its set of 12 nanobuckets.  The nanobuckets are a column of length 12,
339     // with stride equal to blockDim.x.
340     int64_t *__restrict__ nanobucket = taskbucket + threadIdx.x;
341 
342     // Each thread loads its 12 nanobucket values into registers.
343     #define LOAD_NANOBUCKET(bucket)                     \
344         int64_t my_bucket_ ## bucket =                  \
345             nanobucket [bucket * blockDim.x]            \
346          + blockbucket [bucket * gridDim.x + blockIdx.x]\
347          + bucketp [bucket] ;
348 
349     LOAD_NANOBUCKET (0) ;
350     LOAD_NANOBUCKET (1) ;
351     LOAD_NANOBUCKET (2) ;
352     LOAD_NANOBUCKET (3) ;
353     LOAD_NANOBUCKET (4) ;
354     LOAD_NANOBUCKET (5) ;
355     LOAD_NANOBUCKET (6) ;
356     LOAD_NANOBUCKET (7) ;
357     LOAD_NANOBUCKET (8) ;
358     LOAD_NANOBUCKET (9) ;
359     LOAD_NANOBUCKET (10) ;
360     LOAD_NANOBUCKET (11) ;
361 
362     // Now each thread has an index into the global set of 12 buckets,
363     // held in bucket, of where to place its own entries.
364 
365     //--------------------------------------------------------------------------
366     // construct the global buckets
367     //--------------------------------------------------------------------------
368 
369     // The slice for task blockIdx.x contains entries pfirst:plast-1 of M and
370     // C, which is the part of C operated on by this threadblock.
371     int64_t pfirst, plast ;
372 
373     /*
374     for ( int tid_global = threadIdx.x + blockIdx.x * blockDim.x ;
375               tid_global < (mnvec+7)/8 ;
376               tid_global += blockDim.x * gridDim.x)
377     */
378     int chunk_max= (cnz + chunksize -1)/chunksize;
379     for ( int chunk = blockIdx.x;
380               chunk < chunk_max;
381               chunk += gridDim.x )
382     {
383 
384     //GB_PARTITION (pfirst, plast, cnz, tid_global, (mnvec+7)/8 ) ;
385       pfirst = chunksize * chunk ;
386       plast  = GB_IMIN( chunksize * (chunk+1), cnz ) ;
387 
388       int chunk_end;
389       if ( cnz > chunksize) chunk_end = GB_IMIN(  chunksize,
390                                                   cnz - chunksize*(chunk) );
391       else chunk_end = cnz;
392 
393     // find the first vector of the slice for task blockIdx.x: the
394     // vector that owns the entry Ai [pfirst] and Ax [pfirst].
395     //kfirst = GB_search_for_vector_device (pfirst, Mp, 0, mnvec) ;
396 
397     // find the last vector of the slice for task blockIdx.x: the
398     // vector that owns the entry Ai [plast-1] and Ax [plast-1].
399     //klast = GB_search_for_vector_device (plast-1, Mp, kfirst, mnvec) ;
400 
401 
402     for ( int p = pfirst + threadIdx.x;
403               p < pfirst + chunk_end;
404               p += blockDim.x )
405     {
406         // get the entry C(i,j), and extract its bucket.  Then
407         // place the entry C(i,j) in the global bucket it belongs to.
408 
409         // TODO: these writes to global are not coalesced.  Instead: each
410         // threadblock could buffer its writes to 12 buffers and when the
411         // buffers are full they can be written to global.
412         int ibucket = Ci[p] & 0xF;
413         //printf(" thd: %d p,Ci[p] = %ld,%ld,%d\n", threadIdx.x, p, Ci[p], irow );
414         switch (ibucket)
415         {
416             case  0: bucket [my_bucket_0++ ] = p ; Ci[p] = Ci[p] >>4; break ; //unshift zombies
417             case  1: bucket [my_bucket_1++ ] = p ; break ;
418             case  2: bucket [my_bucket_2++ ] = p ; break ;
419             case  3: bucket [my_bucket_3++ ] = p ; break ;
420             case  4: bucket [my_bucket_4++ ] = p ; break ;
421             case  5: bucket [my_bucket_5++ ] = p ; break ;
422             case  6: bucket [my_bucket_6++ ] = p ; break ;
423             case  7: bucket [my_bucket_7++ ] = p ; break ;
424             case  8: bucket [my_bucket_8++ ] = p ; break ;
425             case  9: bucket [my_bucket_9++ ] = p ; break ;
426             case 10: bucket [my_bucket_10++] = p ; break ;
427             case 11: bucket [my_bucket_11++] = p ; break ;
428             default: break;
429         }
430 
431     }
432     //__syncthreads();
433   }
434 
435 }
436 
437