1#%%
2
3import numpy as np
4import tiledb
5
6from tiledb.tests.common import *
7
8
9class DomainIndexingSparseTest(DiskTestCase):
10    def test_int_domain_indexing(self):
11        path = self.path("int_domain_indexing")
12
13        dom = tiledb.Domain(
14            tiledb.Dim(name="x", domain=(-10, 10), tile=1, dtype=np.int64)
15        )
16        schema = tiledb.ArraySchema(
17            domain=dom, sparse=True, attrs=[tiledb.Attr(name="a", dtype=np.float64)]
18        )
19
20        tiledb.SparseArray.create(path, schema)
21
22        X = np.arange(-10, 11, step=1)
23        val = np.random.rand(len(X))
24
25        with tiledb.SparseArray(path, mode="w") as A:
26            A[X] = val
27
28        with tiledb.SparseArray(path) as A:
29            assert_array_equal(A.domain_index[X[0]]["a"], val[0])
30            assert_array_equal(A.domain_index[X[-1]]["a"], val[-1])
31            assert_array_equal(A.domain_index[X[0] : X[-1]]["a"], val[:])
32            # sanity check
33            assert_array_equal(A.domain_index[X[0] : X[-1]]["x"], X[:])
34
35    def test_fp_domain_indexing(self):
36        array_path = self.path("test_domain_idx")
37
38        # test case from https://github.com/TileDB-Inc/TileDB-Py/issues/201
39        tile = 1
40        dom = tiledb.Domain(
41            tiledb.Dim(name="x", domain=(-89.75, 89.75), tile=tile, dtype=np.float64),
42            tiledb.Dim(name="y", domain=(-179.75, 179.75), tile=tile, dtype=np.float64),
43            tiledb.Dim(name="z", domain=(157498, 157863), tile=tile, dtype=np.float64),
44        )
45        schema = tiledb.ArraySchema(
46            domain=dom, sparse=True, attrs=[tiledb.Attr(name="data", dtype=np.float64)]
47        )
48
49        tiledb.SparseArray.create(array_path, schema)
50
51        # fake data
52        X = np.linspace(-89.75, 89.75, 359)
53        Y = np.linspace(-179.75, 179.75, 359)
54        Z = np.linspace(157498, 157857, 359)
55
56        # data = np.random.rand(*map(lambda x: x[0], (X.shape, Y.shape, Z.shape)))
57        data = np.random.rand(X.shape[0])
58
59        with tiledb.SparseArray(array_path, mode="w") as A:
60            A[X, Y, Z] = data
61
62        with tiledb.SparseArray(array_path) as A:
63
64            # check direct slicing
65            assert_array_equal(A.domain_index[X[0], Y[0], Z[0]]["data"], data[0])
66
67            # check small slice ranges
68            tmp = A.domain_index[
69                X[0] : np.nextafter(X[0], 0),
70                Y[0] : np.nextafter(Y[0], 0),
71                Z[0] : np.nextafter(Z[0], Z[0] + 1),
72            ]
73            assert_array_equal(tmp["data"], data[0])
74
75            # check slicing last element
76            tmp = A.domain_index[X[-1], Y[-1], Z[-1]]
77            assert_array_equal(tmp["data"], data[-1])
78
79            # check slice range multiple components
80            tmp = A.domain_index[X[1] : X[2], Y[1] : Y[2], Z[1] : Z[2]]
81            assert_array_equal(tmp["data"], data[1:3])
82
83            # check an interior point
84            coords = X[145], Y[145], Z[145]
85            tmp = A.domain_index[coords]
86            assert_array_equal(tmp["x"], X[145])
87            assert_array_equal(tmp["data"], data[145])
88
89            # check entire domain
90            tmp = A.domain_index[X[0] : X[-1], Y[0] : Y[-1], Z[0] : Z[-1]]
91            assert_array_equal(tmp["data"], data[:])
92
93            # check entire domain
94            # TODO uncomment if vectorized indexing is available
95            # coords = np.array([X,Y,Z]).transpose().flatten()
96            # tmp = A.domain_index[X,Y,Z]
97            # assert_array_equal(
98            #    tmp['data'],
99            #    data[:]
100            # )
101
102    def test_fp_domain_count(self):
103        array_path = self.path("test_domain_count")
104        tile = 1
105
106        dom = tiledb.Domain(
107            tiledb.Dim(name="x", domain=(0.0, 2.0), tile=tile, dtype=np.float64),
108            tiledb.Dim(name="y", domain=(0.0, 2.0), tile=tile, dtype=np.float64),
109        )
110        schema = tiledb.ArraySchema(
111            domain=dom, sparse=True, attrs=[tiledb.Attr(name="data", dtype=np.float64)]
112        )
113
114        tiledb.SparseArray.create(array_path, schema)
115
116        # fake data
117        X = [1.0]
118        Y = [1.0]
119        data = [1.0]
120
121        with tiledb.SparseArray(array_path, mode="w") as A:
122            A[X, Y] = data
123
124        with tiledb.SparseArray(array_path) as A:
125            # check direct slicing
126            assert_array_equal(A.domain_index[X[0], Y[0]]["data"], data[0])
127
128            # check counting by slice
129            assert_equal(A.domain_index[0:2.0, 0:1.0]["x"].shape[0], 1)
130            assert_equal(A.domain_index[0:2.0, 0:1.0]["y"].shape[0], 1)
131            assert_equal(A.domain_index[0:2.0, np.nextafter(1.0, 2.0)]["x"].shape[0], 0)
132            assert_equal(A.domain_index[0:2.0, np.nextafter(1.0, 2.0)]["y"].shape[0], 0)
133
134
135class DomainIndexingDenseTest(DiskTestCase):
136    def test_int_domain_indexing(self):
137        path = self.path("dense_int_domain_indexing")
138
139        dom = tiledb.Domain(
140            tiledb.Dim(name="x", domain=(0, 10), tile=1, dtype=np.int64)
141        )
142        schema = tiledb.ArraySchema(
143            domain=dom, sparse=False, attrs=[tiledb.Attr(name="a", dtype=np.float64)]
144        )
145
146        tiledb.DenseArray.create(path, schema)
147
148        X = np.arange(0, 11, step=1)
149        val = np.random.rand(len(X))
150
151        with tiledb.DenseArray(path, mode="w") as A:
152            A[:] = val
153
154        with tiledb.DenseArray(path) as A:
155            assert_array_equal(A.domain_index[X[0]]["a"], val[0])
156            assert_array_equal(A.domain_index[X[-1]]["a"], val[-1])
157            assert_array_equal(A.domain_index[X[0] : X[-1]]["a"], val[:])
158            # sanity check
159            assert_array_equal(A.domain_index[X[0] : X[-1]]["x"], X[:])
160