1"""
2A library written in CUDA Python for generating reduction kernels
3"""
4
5from numba.np.numpy_support import from_dtype
6
7
8_WARPSIZE = 32
9_NUMWARPS = 4
10
11
12def _gpu_reduce_factory(fn, nbtype):
13    from numba import cuda
14
15    reduce_op = cuda.jit(device=True)(fn)
16    inner_sm_size = _WARPSIZE + 1   # plus one to avoid SM collision
17    max_blocksize = _NUMWARPS * _WARPSIZE
18
19    @cuda.jit(device=True)
20    def inner_warp_reduction(sm_partials, init):
21        """
22        Compute reduction within a single warp
23        """
24        tid = cuda.threadIdx.x
25        warpid = tid // _WARPSIZE
26        laneid = tid % _WARPSIZE
27
28        sm_this = sm_partials[warpid, :]
29        sm_this[laneid] = init
30        # XXX expect warp synchronization
31        width = _WARPSIZE // 2
32        while width:
33            if laneid < width:
34                old = sm_this[laneid]
35                sm_this[laneid] = reduce_op(old, sm_this[laneid + width])
36            width //= 2
37            # XXX expect warp synchronization
38
39    @cuda.jit(device=True)
40    def device_reduce_full_block(arr, partials, sm_partials):
41        """
42        Partially reduce `arr` into `partials` using `sm_partials` as working
43        space.  The algorithm goes like:
44
45            array chunks of 128:  |   0 | 128 | 256 | 384 | 512 |
46                        block-0:  |   x |     |     |   x |     |
47                        block-1:  |     |   x |     |     |   x |
48                        block-2:  |     |     |   x |     |     |
49
50        The array is divided into chunks of 128 (size of a threadblock).
51        The threadblocks consumes the chunks in roundrobin scheduling.
52        First, a threadblock loads a chunk into temp memory.  Then, all
53        subsequent chunks are combined into the temp memory.
54
55        Once all chunks are processed.  Inner-block reduction is performed
56        on the temp memory.  So that, there will just be one scalar result
57        per block.  The result from each block is stored to `partials` at
58        the dedicated slot.
59        """
60        tid = cuda.threadIdx.x
61        blkid = cuda.blockIdx.x
62        blksz = cuda.blockDim.x
63        gridsz = cuda.gridDim.x
64
65        # block strided loop to compute the reduction
66        start = tid + blksz * blkid
67        stop = arr.size
68        step = blksz * gridsz
69
70        # load first value
71        tmp = arr[start]
72        # loop over all values in block-stride
73        for i in range(start + step, stop, step):
74            tmp = reduce_op(tmp, arr[i])
75
76        cuda.syncthreads()
77        # inner-warp reduction
78        inner_warp_reduction(sm_partials, tmp)
79
80        cuda.syncthreads()
81        # at this point, only the first slot for each warp in tsm_partials
82        # is valid.
83
84        # finish up block reduction
85        # warning: this is assuming 4 warps.
86        # assert numwarps == 4
87        if tid < 2:
88            sm_partials[tid, 0] = reduce_op(sm_partials[tid, 0],
89                                            sm_partials[tid + 2, 0])
90        if tid == 0:
91            partials[blkid] = reduce_op(sm_partials[0, 0], sm_partials[1, 0])
92
93    @cuda.jit(device=True)
94    def device_reduce_partial_block(arr, partials, sm_partials):
95        """
96        This computes reduction on `arr`.
97        This device function must be used by 1 threadblock only.
98        The blocksize must match `arr.size` and must not be greater than 128.
99        """
100        tid = cuda.threadIdx.x
101        blkid = cuda.blockIdx.x
102        blksz = cuda.blockDim.x
103        warpid = tid // _WARPSIZE
104        laneid = tid % _WARPSIZE
105
106        size = arr.size
107        # load first value
108        tid = cuda.threadIdx.x
109        value = arr[tid]
110        sm_partials[warpid, laneid] = value
111
112        cuda.syncthreads()
113
114        if (warpid + 1) * _WARPSIZE < size:
115            # fully populated warps
116            inner_warp_reduction(sm_partials, value)
117        else:
118            # partially populated warps
119            # NOTE: this uses a very inefficient sequential algorithm
120            if laneid == 0:
121                sm_this = sm_partials[warpid, :]
122                base = warpid * _WARPSIZE
123                for i in range(1, size - base):
124                    sm_this[0] = reduce_op(sm_this[0], sm_this[i])
125
126        cuda.syncthreads()
127        # finish up
128        if tid == 0:
129            num_active_warps = (blksz + _WARPSIZE - 1) // _WARPSIZE
130
131            result = sm_partials[0, 0]
132            for i in range(1, num_active_warps):
133                result = reduce_op(result, sm_partials[i, 0])
134
135            partials[blkid] = result
136
137    def gpu_reduce_block_strided(arr, partials, init, use_init):
138        """
139        Perform reductions on *arr* and writing out partial reduction result
140        into *partials*.  The length of *partials* is determined by the
141        number of threadblocks. The initial value is set with *init*.
142
143        Launch config:
144
145        Blocksize must be multiple of warpsize and it is limited to 4 warps.
146        """
147        tid = cuda.threadIdx.x
148
149        sm_partials = cuda.shared.array((_NUMWARPS, inner_sm_size),
150                                        dtype=nbtype)
151        if cuda.blockDim.x == max_blocksize:
152            device_reduce_full_block(arr, partials, sm_partials)
153        else:
154            device_reduce_partial_block(arr, partials, sm_partials)
155        # deal with the initializer
156        if use_init and tid == 0 and cuda.blockIdx.x == 0:
157            partials[0] = reduce_op(partials[0], init)
158
159    return cuda.jit(gpu_reduce_block_strided)
160
161
162class Reduce(object):
163    """Create a reduction object that reduces values using a given binary
164    function. The binary function is compiled once and cached inside this
165    object. Keeping this object alive will prevent re-compilation.
166    """
167
168    _cache = {}
169
170    def __init__(self, functor):
171        """
172        :param functor: A function implementing a binary operation for
173                        reduction. It will be compiled as a CUDA device
174                        function using ``cuda.jit(device=True)``.
175        """
176        self._functor = functor
177
178    def _compile(self, dtype):
179        key = self._functor, dtype
180        if key in self._cache:
181            kernel = self._cache[key]
182        else:
183            kernel = _gpu_reduce_factory(self._functor, from_dtype(dtype))
184            self._cache[key] = kernel
185        return kernel
186
187    def __call__(self, arr, size=None, res=None, init=0, stream=0):
188        """Performs a full reduction.
189
190        :param arr: A host or device array.
191        :param size: Optional integer specifying the number of elements in
192                    ``arr`` to reduce. If this parameter is not specified, the
193                    entire array is reduced.
194        :param res: Optional device array into which to write the reduction
195                    result to. The result is written into the first element of
196                    this array. If this parameter is specified, then no
197                    communication of the reduction output takes place from the
198                    device to the host.
199        :param init: Optional initial value for the reduction, the type of which
200                    must match ``arr.dtype``.
201        :param stream: Optional CUDA stream in which to perform the reduction.
202                    If no stream is specified, the default stream of 0 is
203                    used.
204        :return: If ``res`` is specified, ``None`` is returned. Otherwise, the
205                result of the reduction is returned.
206        """
207        from numba import cuda
208
209        # ensure 1d array
210        if arr.ndim != 1:
211            raise TypeError("only support 1D array")
212
213        # adjust array size
214        if size is not None:
215            arr = arr[:size]
216
217        init = arr.dtype.type(init)  # ensure the right type
218
219        # return `init` if `arr` is empty
220        if arr.size < 1:
221            return init
222
223        kernel = self._compile(arr.dtype)
224
225        # Perform the reduction on the GPU
226        blocksize = _NUMWARPS * _WARPSIZE
227        size_full = (arr.size // blocksize) * blocksize
228        size_partial = arr.size - size_full
229        full_blockct = min(size_full // blocksize, _WARPSIZE * 2)
230
231        # allocate size of partials array
232        partials_size = full_blockct
233        if size_partial:
234            partials_size += 1
235        partials = cuda.device_array(shape=partials_size, dtype=arr.dtype)
236
237        if size_full:
238            # kernel for the fully populated threadblocks
239            kernel[full_blockct, blocksize, stream](arr[:size_full],
240                                                    partials[:full_blockct],
241                                                    init,
242                                                    True)
243
244        if size_partial:
245            # kernel for partially populated threadblocks
246            kernel[1, size_partial, stream](arr[size_full:],
247                                            partials[full_blockct:],
248                                            init,
249                                            not full_blockct)
250
251        if partials.size > 1:
252            # finish up
253            kernel[1, partials_size, stream](partials, partials, init, False)
254
255        # handle return value
256        if res is not None:
257            res[:1].copy_to_device(partials[:1], stream=stream)
258            return
259        else:
260            return partials[0]
261