1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import pytest
19import pyarrow as pa
20import numpy as np
21
22dtypes = ['uint8', 'int16', 'float32']
23cuda = pytest.importorskip("pyarrow.cuda")
24nb_cuda = pytest.importorskip("numba.cuda")
25
26from numba.cuda.cudadrv.devicearray import DeviceNDArray  # noqa: E402
27
28
29context_choices = None
30context_choice_ids = ['pyarrow.cuda', 'numba.cuda']
31
32
33def setup_module(module):
34    np.random.seed(1234)
35    ctx1 = cuda.Context()
36    nb_ctx1 = ctx1.to_numba()
37    nb_ctx2 = nb_cuda.current_context()
38    ctx2 = cuda.Context.from_numba(nb_ctx2)
39    module.context_choices = [(ctx1, nb_ctx1), (ctx2, nb_ctx2)]
40
41
42def teardown_module(module):
43    del module.context_choices
44
45
46@pytest.mark.parametrize("c", range(len(context_choice_ids)),
47                         ids=context_choice_ids)
48def test_context(c):
49    ctx, nb_ctx = context_choices[c]
50    assert ctx.handle == nb_ctx.handle.value
51    assert ctx.handle == ctx.to_numba().handle.value
52    ctx2 = cuda.Context.from_numba(nb_ctx)
53    assert ctx.handle == ctx2.handle
54    size = 10
55    buf = ctx.new_buffer(size)
56    assert ctx.handle == buf.context.handle
57
58
59def make_random_buffer(size, target='host', dtype='uint8', ctx=None):
60    """Return a host or device buffer with random data.
61    """
62    dtype = np.dtype(dtype)
63    if target == 'host':
64        assert size >= 0
65        buf = pa.allocate_buffer(size*dtype.itemsize)
66        arr = np.frombuffer(buf, dtype=dtype)
67        arr[:] = np.random.randint(low=0, high=255, size=size,
68                                   dtype=np.uint8)
69        return arr, buf
70    elif target == 'device':
71        arr, buf = make_random_buffer(size, target='host', dtype=dtype)
72        dbuf = ctx.new_buffer(size * dtype.itemsize)
73        dbuf.copy_from_host(buf, position=0, nbytes=buf.size)
74        return arr, dbuf
75    raise ValueError('invalid target value')
76
77
78@pytest.mark.parametrize("c", range(len(context_choice_ids)),
79                         ids=context_choice_ids)
80@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
81@pytest.mark.parametrize("size", [0, 1, 8, 1000])
82def test_from_object(c, dtype, size):
83    ctx, nb_ctx = context_choices[c]
84    arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
85
86    # Creating device buffer from numba DeviceNDArray:
87    darr = nb_cuda.to_device(arr)
88    cbuf2 = ctx.buffer_from_object(darr)
89    assert cbuf2.size == cbuf.size
90    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
91    np.testing.assert_equal(arr, arr2)
92
93    # Creating device buffer from a slice of numba DeviceNDArray:
94    if size >= 8:
95        # 1-D arrays
96        for s in [slice(size//4, None, None),
97                  slice(size//4, -(size//4), None)]:
98            cbuf2 = ctx.buffer_from_object(darr[s])
99            arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
100            np.testing.assert_equal(arr[s], arr2)
101
102        # cannot test negative strides due to numba bug, see its issue 3705
103        if 0:
104            rdarr = darr[::-1]
105            cbuf2 = ctx.buffer_from_object(rdarr)
106            assert cbuf2.size == cbuf.size
107            arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
108            np.testing.assert_equal(arr, arr2)
109
110        with pytest.raises(ValueError,
111                           match=('array data is non-contiguous')):
112            ctx.buffer_from_object(darr[::2])
113
114        # a rectangular 2-D array
115        s1 = size//4
116        s2 = size//s1
117        assert s1 * s2 == size
118        cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2))
119        assert cbuf2.size == cbuf.size
120        arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
121        np.testing.assert_equal(arr, arr2)
122
123        with pytest.raises(ValueError,
124                           match=('array data is non-contiguous')):
125            ctx.buffer_from_object(darr.reshape(s1, s2)[:, ::2])
126
127        # a 3-D array
128        s1 = 4
129        s2 = size//8
130        s3 = size//(s1*s2)
131        assert s1 * s2 * s3 == size
132        cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2, s3))
133        assert cbuf2.size == cbuf.size
134        arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
135        np.testing.assert_equal(arr, arr2)
136
137        with pytest.raises(ValueError,
138                           match=('array data is non-contiguous')):
139            ctx.buffer_from_object(darr.reshape(s1, s2, s3)[::2])
140
141    # Creating device buffer from am object implementing cuda array
142    # interface:
143    class MyObj:
144        def __init__(self, darr):
145            self.darr = darr
146
147        @property
148        def __cuda_array_interface__(self):
149            return self.darr.__cuda_array_interface__
150
151    cbuf2 = ctx.buffer_from_object(MyObj(darr))
152    assert cbuf2.size == cbuf.size
153    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
154    np.testing.assert_equal(arr, arr2)
155
156
157@pytest.mark.parametrize("c", range(len(context_choice_ids)),
158                         ids=context_choice_ids)
159@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
160def test_numba_memalloc(c, dtype):
161    ctx, nb_ctx = context_choices[c]
162    dtype = np.dtype(dtype)
163    # Allocate memory using numba context
164    # Warning: this will not be reflected in pyarrow context manager
165    # (e.g bytes_allocated does not change)
166    size = 10
167    mem = nb_ctx.memalloc(size * dtype.itemsize)
168    darr = DeviceNDArray((size,), (dtype.itemsize,), dtype, gpu_data=mem)
169    darr[:5] = 99
170    darr[5:] = 88
171    np.testing.assert_equal(darr.copy_to_host()[:5], 99)
172    np.testing.assert_equal(darr.copy_to_host()[5:], 88)
173
174    # wrap numba allocated memory with CudaBuffer
175    cbuf = cuda.CudaBuffer.from_numba(mem)
176    arr2 = np.frombuffer(cbuf.copy_to_host(), dtype=dtype)
177    np.testing.assert_equal(arr2, darr.copy_to_host())
178
179
180@pytest.mark.parametrize("c", range(len(context_choice_ids)),
181                         ids=context_choice_ids)
182@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
183def test_pyarrow_memalloc(c, dtype):
184    ctx, nb_ctx = context_choices[c]
185    size = 10
186    arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
187
188    # wrap CudaBuffer with numba device array
189    mem = cbuf.to_numba()
190    darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
191    np.testing.assert_equal(darr.copy_to_host(), arr)
192
193
194@pytest.mark.parametrize("c", range(len(context_choice_ids)),
195                         ids=context_choice_ids)
196@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
197def test_numba_context(c, dtype):
198    ctx, nb_ctx = context_choices[c]
199    size = 10
200    with nb_cuda.gpus[0]:
201        arr, cbuf = make_random_buffer(size, target='device',
202                                       dtype=dtype, ctx=ctx)
203        assert cbuf.context.handle == nb_ctx.handle.value
204        mem = cbuf.to_numba()
205        darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
206        np.testing.assert_equal(darr.copy_to_host(), arr)
207        darr[0] = 99
208        cbuf.context.synchronize()
209        arr2 = np.frombuffer(cbuf.copy_to_host(), dtype=dtype)
210        assert arr2[0] == 99
211
212
213@pytest.mark.parametrize("c", range(len(context_choice_ids)),
214                         ids=context_choice_ids)
215@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
216def test_pyarrow_jit(c, dtype):
217    ctx, nb_ctx = context_choices[c]
218
219    @nb_cuda.jit
220    def increment_by_one(an_array):
221        pos = nb_cuda.grid(1)
222        if pos < an_array.size:
223            an_array[pos] += 1
224
225    # applying numba.cuda kernel to memory hold by CudaBuffer
226    size = 10
227    arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
228    threadsperblock = 32
229    blockspergrid = (arr.size + (threadsperblock - 1)) // threadsperblock
230    mem = cbuf.to_numba()
231    darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem)
232    increment_by_one[blockspergrid, threadsperblock](darr)
233    cbuf.context.synchronize()
234    arr1 = np.frombuffer(cbuf.copy_to_host(), dtype=arr.dtype)
235    np.testing.assert_equal(arr1, arr + 1)
236