1import contextlib 2import datetime 3import glob 4import os 5import random 6import shutil 7import subprocess 8import tempfile 9import traceback 10 11import numpy as np 12import pytest 13from numpy.testing import assert_almost_equal, assert_array_equal, assert_equal 14 15 16def assert_tail_equal(a, *rest, **kwargs): 17 """Assert that all arrays in target equal first array""" 18 for target in rest: 19 assert_array_equal(a, target, **kwargs) 20 21 22class DiskTestCase: 23 """Helper class to store paths and associated allocation frames. This is both 24 a cleanup step and a test of resource management. Some platforms will 25 refuse to delete an open file, indicating a potentially leaked resource. 26 """ 27 28 @classmethod 29 def setup_method(self): 30 prefix = "tiledb-" + self.__class__.__name__ 31 self.rootdir = tempfile.mkdtemp(prefix=prefix) 32 self.pathmap = dict() 33 34 @classmethod 35 def teardown_method(self): 36 # Remove every directory starting with rootdir 37 # This is both a clean-up step and an implicit test 38 # of proper resource deallocation (see notes below) 39 for dirpath in glob.glob(self.rootdir + "*"): 40 try: 41 shutil.rmtree(dirpath) 42 except OSError as exc: 43 print( 44 "test '{}' error deleting '{}'".format( 45 self.__class__.__name__, dirpath 46 ) 47 ) 48 print("registered paths and originating functions:") 49 for path, frame in self.pathmap.items(): 50 print(" '{}' <- '{}'".format(path, frame)) 51 raise exc 52 53 def path(self, basename=None, shared=False): 54 if basename is not None: 55 # Note: this must be `is not None` because we need to match empty string 56 out = os.path.abspath(os.path.join(self.rootdir, basename)) 57 else: 58 out = tempfile.mkdtemp(dir=self.rootdir) 59 60 if os.name == "nt" and shared: 61 subprocess.run( 62 f'cmd //c "net share tiledb-shared={out}"', shell=True, check=True 63 ) 64 65 # We have had issues in both py and libtiledb in the past 66 # where files were not released (eg: destructor not called) 67 # Often this is invisible on POSIX platforms, but will 68 # cause errors on Windows because two processes cannot access 69 # the same file at once. 70 # In order to debug this issue, we save the caller where 71 # this path was allocated so that we can determine what 72 # test created an unreleased file 73 frame = traceback.extract_stack(limit=2)[-2][2] 74 self.pathmap[out] = frame 75 76 return out 77 78 def assertRaises(self, *args): 79 return pytest.raises(*args) 80 81 def assertRaisesRegex(self, e, m): 82 return pytest.raises(e, match=m) 83 84 @contextlib.contextmanager 85 def assertEqual(self, *args): 86 if not len(args) == 2: 87 raise Exception("Unexpected input len > 2 to assertEquals") 88 assert args[0] == args[1] 89 90 @contextlib.contextmanager 91 def assertTrue(self, a, msg=None): 92 if msg: 93 assert a, msg 94 else: 95 assert a 96 97 @contextlib.contextmanager 98 def assertFalse(self, a): 99 assert a == False 100 101 @contextlib.contextmanager 102 def assertIsInstance(self, v, t): 103 assert isinstance(v, t) 104 105 @contextlib.contextmanager 106 def assertSetEqual(self, s1, s2): 107 assert all(isinstance(x, set) for x in (s1, s2)) 108 assert s1 == s2 109 110 @contextlib.contextmanager 111 def assertIsNone(self, a1): 112 assert a1 is None 113 114 @contextlib.contextmanager 115 def assertTupleEqual(self, a1, a2): 116 assert a1 == a2 117 118 @contextlib.contextmanager 119 def assertAlmostEqual(self, a1, a2): 120 assert_almost_equal(a1, a2) 121 122 123# fixture wrapper to use with pytest: mark.parametrize does not 124# work with DiskTestCase subclasses (unittest.TestCase methods 125# cannot take arguments) 126@pytest.fixture(scope="class") 127def checked_path(): 128 dtc = DiskTestCase() 129 130 dtc.setup_method() 131 132 yield dtc 133 134 dtc.teardown_method() 135 136 137# exclude whitespace: if we generate unquoted newline then pandas will be confused 138_ws_set = set("\n\t\r") 139 140 141def gen_chr(max, printable=False): 142 while True: 143 # TODO we exclude 0x0 here because the key API does not embedded NULL 144 s = chr(random.randrange(1, max)) 145 if printable and (not s.isprintable()) or (s in _ws_set): 146 continue 147 if len(s) > 0: 148 break 149 return s 150 151 152def rand_utf8(size=5): 153 return "".join([gen_chr(0xD7FF) for _ in range(0, size)]) 154 155 156def rand_ascii(size=5, printable=False): 157 return "".join([gen_chr(127, printable) for _ in range(0, size)]) 158 159 160def rand_ascii_bytes(size=5, printable=False): 161 return b"".join([gen_chr(127, printable).encode("utf-8") for _ in range(0, size)]) 162 163 164def dtype_max(dtype): 165 if not np.issubdtype(dtype, np.generic): 166 raise TypeError("expected numpy dtype!") 167 168 if np.issubdtype(dtype, np.floating): 169 finfo = np.finfo(dtype) 170 return finfo.max 171 172 elif np.issubdtype(dtype, np.integer): 173 iinfo = np.iinfo(dtype) 174 return int(iinfo.max) 175 176 elif np.issubdtype(dtype, np.datetime64): 177 return np.datetime64(datetime.datetime.max) 178 179 raise "Unknown dtype for dtype_max '{}'".format(str(dtype)) 180 181 182def dtype_min(dtype): 183 if not np.issubdtype(dtype, np.generic): 184 raise TypeError("expected numpy dtype!") 185 186 if np.issubdtype(dtype, np.floating): 187 finfo = np.finfo(dtype) 188 return finfo.min 189 190 elif np.issubdtype(dtype, np.integer): 191 iinfo = np.iinfo(dtype) 192 return int(iinfo.min) 193 194 elif np.issubdtype(dtype, np.datetime64): 195 return np.datetime64(datetime.datetime.min) 196 197 raise "Unknown dtype for dtype_min '{dtype}'".format(str(dtype)) 198 199 200def rand_int_sequential(size, dtype=np.uint64): 201 arr = np.random.randint( 202 dtype_min(dtype), high=dtype_max(dtype), size=size, dtype=dtype 203 ) 204 return np.sort(arr) 205 206 207def rand_datetime64_array(size, start=None, stop=None, dtype=None): 208 if not dtype: 209 dtype = np.dtype("M8[ns]") 210 211 # generate randint inbounds on the range of the dtype 212 units = np.datetime_data(dtype)[0] 213 intmin, intmax = np.iinfo(np.int64).min, np.iinfo(np.int64).max 214 215 if start is None: 216 start = np.datetime64(intmin + 1, units) 217 else: 218 start = np.datetime64(start) 219 if stop is None: 220 stop = np.datetime64(intmax, units) 221 else: 222 stop = np.datetime64(stop) 223 224 arr = np.random.randint( 225 start.astype(dtype).astype(np.int64), 226 stop.astype(dtype).astype(np.int64), 227 size=size, 228 dtype=np.int64, 229 ) 230 arr.sort() 231 232 return arr.astype(dtype) 233 234 235def intspace(start, stop, num=50, dtype=np.int64): 236 """ 237 Return evenly spaced values over range ensuring that stop is 238 always the maximum (will not overflow with int dtype as linspace) 239 :param start: 240 :param stop: 241 :param num: 242 :param dtype: 243 :return: 244 """ 245 rval = np.zeros(num, dtype=dtype) 246 step = (stop - start) // num 247 nextval = start 248 249 if np.issubdtype(dtype, np.integer) and step < 1: 250 raise ValueError( 251 "Cannot use non-integral step value '{}' for integer dtype!".format(step) 252 ) 253 254 for i in range(num): 255 rval[i] = nextval 256 nextval += step 257 258 rval[-1] = stop 259 return rval 260 261 262import pprint as _pprint 263 264pp = _pprint.PrettyPrinter(indent=4) 265 266 267def xprint(*x): 268 for xp in x: 269 pp.pprint(xp) 270 271 272def assert_unordered_equal(a1, a2, unordered=True): 273 """Assert that arrays are equal after sorting if 274 `unordered==True`""" 275 if unordered: 276 a1 = np.sort(a1) 277 a2 = np.sort(a2) 278 assert_array_equal(a1, a2) 279 280 281def assert_subarrays_equal(a, b, ordered=True): 282 assert_equal(a.shape, b.shape) 283 284 if not ordered: 285 a = np.sort(a) 286 b = np.sort(b) 287 288 for a_el, b_el in zip(a.flat, b.flat): 289 assert_array_equal(a_el, b_el) 290 291 292def assert_all_arrays_equal(*arrays): 293 # TODO this should display raise in the calling location if possible 294 assert len(arrays) % 2 == 0, "Expected even number of arrays" 295 296 for a1, a2 in zip(arrays[0::2], arrays[1::2]): 297 assert_array_equal(a1, a2) 298