1import contextlib
2import datetime
3import glob
4import os
5import random
6import shutil
7import subprocess
8import tempfile
9import traceback
10
11import numpy as np
12import pytest
13from numpy.testing import assert_almost_equal, assert_array_equal, assert_equal
14
15
16def assert_tail_equal(a, *rest, **kwargs):
17    """Assert that all arrays in target equal first array"""
18    for target in rest:
19        assert_array_equal(a, target, **kwargs)
20
21
22class DiskTestCase:
23    """Helper class to store paths and associated allocation frames. This is both
24    a cleanup step and a test of resource management. Some platforms will
25    refuse to delete an open file, indicating a potentially leaked resource.
26    """
27
28    @classmethod
29    def setup_method(self):
30        prefix = "tiledb-" + self.__class__.__name__
31        self.rootdir = tempfile.mkdtemp(prefix=prefix)
32        self.pathmap = dict()
33
34    @classmethod
35    def teardown_method(self):
36        # Remove every directory starting with rootdir
37        # This is both a clean-up step and an implicit test
38        # of proper resource deallocation (see notes below)
39        for dirpath in glob.glob(self.rootdir + "*"):
40            try:
41                shutil.rmtree(dirpath)
42            except OSError as exc:
43                print(
44                    "test '{}' error deleting '{}'".format(
45                        self.__class__.__name__, dirpath
46                    )
47                )
48                print("registered paths and originating functions:")
49                for path, frame in self.pathmap.items():
50                    print("  '{}' <- '{}'".format(path, frame))
51                raise exc
52
53    def path(self, basename=None, shared=False):
54        if basename is not None:
55            # Note: this must be `is not None` because we need to match empty string
56            out = os.path.abspath(os.path.join(self.rootdir, basename))
57        else:
58            out = tempfile.mkdtemp(dir=self.rootdir)
59
60        if os.name == "nt" and shared:
61            subprocess.run(
62                f'cmd //c "net share tiledb-shared={out}"', shell=True, check=True
63            )
64
65        # We have had issues in both py and libtiledb in the past
66        # where files were not released (eg: destructor not called)
67        # Often this is invisible on POSIX platforms, but will
68        # cause errors on Windows because two processes cannot access
69        # the same file at once.
70        # In order to debug this issue, we save the caller where
71        # this path was allocated so that we can determine what
72        # test created an unreleased file
73        frame = traceback.extract_stack(limit=2)[-2][2]
74        self.pathmap[out] = frame
75
76        return out
77
78    def assertRaises(self, *args):
79        return pytest.raises(*args)
80
81    def assertRaisesRegex(self, e, m):
82        return pytest.raises(e, match=m)
83
84    @contextlib.contextmanager
85    def assertEqual(self, *args):
86        if not len(args) == 2:
87            raise Exception("Unexpected input len > 2 to assertEquals")
88        assert args[0] == args[1]
89
90    @contextlib.contextmanager
91    def assertTrue(self, a, msg=None):
92        if msg:
93            assert a, msg
94        else:
95            assert a
96
97    @contextlib.contextmanager
98    def assertFalse(self, a):
99        assert a == False
100
101    @contextlib.contextmanager
102    def assertIsInstance(self, v, t):
103        assert isinstance(v, t)
104
105    @contextlib.contextmanager
106    def assertSetEqual(self, s1, s2):
107        assert all(isinstance(x, set) for x in (s1, s2))
108        assert s1 == s2
109
110    @contextlib.contextmanager
111    def assertIsNone(self, a1):
112        assert a1 is None
113
114    @contextlib.contextmanager
115    def assertTupleEqual(self, a1, a2):
116        assert a1 == a2
117
118    @contextlib.contextmanager
119    def assertAlmostEqual(self, a1, a2):
120        assert_almost_equal(a1, a2)
121
122
123# fixture wrapper to use with pytest: mark.parametrize does not
124#   work with DiskTestCase subclasses (unittest.TestCase methods
125#   cannot take arguments)
126@pytest.fixture(scope="class")
127def checked_path():
128    dtc = DiskTestCase()
129
130    dtc.setup_method()
131
132    yield dtc
133
134    dtc.teardown_method()
135
136
137# exclude whitespace: if we generate unquoted newline then pandas will be confused
138_ws_set = set("\n\t\r")
139
140
141def gen_chr(max, printable=False):
142    while True:
143        # TODO we exclude 0x0 here because the key API does not embedded NULL
144        s = chr(random.randrange(1, max))
145        if printable and (not s.isprintable()) or (s in _ws_set):
146            continue
147        if len(s) > 0:
148            break
149    return s
150
151
152def rand_utf8(size=5):
153    return "".join([gen_chr(0xD7FF) for _ in range(0, size)])
154
155
156def rand_ascii(size=5, printable=False):
157    return "".join([gen_chr(127, printable) for _ in range(0, size)])
158
159
160def rand_ascii_bytes(size=5, printable=False):
161    return b"".join([gen_chr(127, printable).encode("utf-8") for _ in range(0, size)])
162
163
164def dtype_max(dtype):
165    if not np.issubdtype(dtype, np.generic):
166        raise TypeError("expected numpy dtype!")
167
168    if np.issubdtype(dtype, np.floating):
169        finfo = np.finfo(dtype)
170        return finfo.max
171
172    elif np.issubdtype(dtype, np.integer):
173        iinfo = np.iinfo(dtype)
174        return int(iinfo.max)
175
176    elif np.issubdtype(dtype, np.datetime64):
177        return np.datetime64(datetime.datetime.max)
178
179    raise "Unknown dtype for dtype_max '{}'".format(str(dtype))
180
181
182def dtype_min(dtype):
183    if not np.issubdtype(dtype, np.generic):
184        raise TypeError("expected numpy dtype!")
185
186    if np.issubdtype(dtype, np.floating):
187        finfo = np.finfo(dtype)
188        return finfo.min
189
190    elif np.issubdtype(dtype, np.integer):
191        iinfo = np.iinfo(dtype)
192        return int(iinfo.min)
193
194    elif np.issubdtype(dtype, np.datetime64):
195        return np.datetime64(datetime.datetime.min)
196
197    raise "Unknown dtype for dtype_min '{dtype}'".format(str(dtype))
198
199
200def rand_int_sequential(size, dtype=np.uint64):
201    arr = np.random.randint(
202        dtype_min(dtype), high=dtype_max(dtype), size=size, dtype=dtype
203    )
204    return np.sort(arr)
205
206
207def rand_datetime64_array(size, start=None, stop=None, dtype=None):
208    if not dtype:
209        dtype = np.dtype("M8[ns]")
210
211    # generate randint inbounds on the range of the dtype
212    units = np.datetime_data(dtype)[0]
213    intmin, intmax = np.iinfo(np.int64).min, np.iinfo(np.int64).max
214
215    if start is None:
216        start = np.datetime64(intmin + 1, units)
217    else:
218        start = np.datetime64(start)
219    if stop is None:
220        stop = np.datetime64(intmax, units)
221    else:
222        stop = np.datetime64(stop)
223
224    arr = np.random.randint(
225        start.astype(dtype).astype(np.int64),
226        stop.astype(dtype).astype(np.int64),
227        size=size,
228        dtype=np.int64,
229    )
230    arr.sort()
231
232    return arr.astype(dtype)
233
234
235def intspace(start, stop, num=50, dtype=np.int64):
236    """
237    Return evenly spaced values over range ensuring that stop is
238    always the maximum (will not overflow with int dtype as linspace)
239    :param start:
240    :param stop:
241    :param num:
242    :param dtype:
243    :return:
244    """
245    rval = np.zeros(num, dtype=dtype)
246    step = (stop - start) // num
247    nextval = start
248
249    if np.issubdtype(dtype, np.integer) and step < 1:
250        raise ValueError(
251            "Cannot use non-integral step value '{}' for integer dtype!".format(step)
252        )
253
254    for i in range(num):
255        rval[i] = nextval
256        nextval += step
257
258    rval[-1] = stop
259    return rval
260
261
262import pprint as _pprint
263
264pp = _pprint.PrettyPrinter(indent=4)
265
266
267def xprint(*x):
268    for xp in x:
269        pp.pprint(xp)
270
271
272def assert_unordered_equal(a1, a2, unordered=True):
273    """Assert that arrays are equal after sorting if
274    `unordered==True`"""
275    if unordered:
276        a1 = np.sort(a1)
277        a2 = np.sort(a2)
278    assert_array_equal(a1, a2)
279
280
281def assert_subarrays_equal(a, b, ordered=True):
282    assert_equal(a.shape, b.shape)
283
284    if not ordered:
285        a = np.sort(a)
286        b = np.sort(b)
287
288    for a_el, b_el in zip(a.flat, b.flat):
289        assert_array_equal(a_el, b_el)
290
291
292def assert_all_arrays_equal(*arrays):
293    # TODO this should display raise in the calling location if possible
294    assert len(arrays) % 2 == 0, "Expected even number of arrays"
295
296    for a1, a2 in zip(arrays[0::2], arrays[1::2]):
297        assert_array_equal(a1, a2)
298