1import pytest
2from networkx.utils.mapped_queue import _HeapElement, MappedQueue
3
4
5def test_HeapElement_gtlt():
6    bar = _HeapElement(1.1, "a")
7    foo = _HeapElement(1, "b")
8    assert foo < bar
9    assert bar > foo
10    assert foo < 1.1
11    assert 1 < bar
12
13
14def test_HeapElement_eq():
15    bar = _HeapElement(1.1, "a")
16    foo = _HeapElement(1, "a")
17    assert foo == bar
18    assert bar == foo
19    assert foo == "a"
20
21
22def test_HeapElement_iter():
23    foo = _HeapElement(1, "a")
24    bar = _HeapElement(1.1, (3, 2, 1))
25    assert list(foo) == [1, "a"]
26    assert list(bar) == [1.1, 3, 2, 1]
27
28
29def test_HeapElement_getitem():
30    foo = _HeapElement(1, "a")
31    bar = _HeapElement(1.1, (3, 2, 1))
32    assert foo[1] == "a"
33    assert foo[0] == 1
34    assert bar[0] == 1.1
35    assert bar[2] == 2
36    assert bar[3] == 1
37    pytest.raises(IndexError, bar.__getitem__, 4)
38    pytest.raises(IndexError, foo.__getitem__, 2)
39
40
41class TestMappedQueue:
42    def setup(self):
43        pass
44
45    def _check_map(self, q):
46        assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
47
48    def _make_mapped_queue(self, h):
49        q = MappedQueue()
50        q.heap = h
51        q.position = {elt: pos for pos, elt in enumerate(h)}
52        return q
53
54    def test_heapify(self):
55        h = [5, 4, 3, 2, 1, 0]
56        q = self._make_mapped_queue(h)
57        q._heapify()
58        self._check_map(q)
59
60    def test_init(self):
61        h = [5, 4, 3, 2, 1, 0]
62        q = MappedQueue(h)
63        self._check_map(q)
64
65    def test_len(self):
66        h = [5, 4, 3, 2, 1, 0]
67        q = MappedQueue(h)
68        self._check_map(q)
69        assert len(q) == 6
70
71    def test_siftup_leaf(self):
72        h = [2]
73        h_sifted = [2]
74        q = self._make_mapped_queue(h)
75        q._siftup(0)
76        assert q.heap == h_sifted
77        self._check_map(q)
78
79    def test_siftup_one_child(self):
80        h = [2, 0]
81        h_sifted = [0, 2]
82        q = self._make_mapped_queue(h)
83        q._siftup(0)
84        assert q.heap == h_sifted
85        self._check_map(q)
86
87    def test_siftup_left_child(self):
88        h = [2, 0, 1]
89        h_sifted = [0, 2, 1]
90        q = self._make_mapped_queue(h)
91        q._siftup(0)
92        assert q.heap == h_sifted
93        self._check_map(q)
94
95    def test_siftup_right_child(self):
96        h = [2, 1, 0]
97        h_sifted = [0, 1, 2]
98        q = self._make_mapped_queue(h)
99        q._siftup(0)
100        assert q.heap == h_sifted
101        self._check_map(q)
102
103    def test_siftup_multiple(self):
104        h = [0, 1, 2, 4, 3, 5, 6]
105        h_sifted = [0, 1, 2, 4, 3, 5, 6]
106        q = self._make_mapped_queue(h)
107        q._siftup(0)
108        assert q.heap == h_sifted
109        self._check_map(q)
110
111    def test_siftdown_leaf(self):
112        h = [2]
113        h_sifted = [2]
114        q = self._make_mapped_queue(h)
115        q._siftdown(0, 0)
116        assert q.heap == h_sifted
117        self._check_map(q)
118
119    def test_siftdown_single(self):
120        h = [1, 0]
121        h_sifted = [0, 1]
122        q = self._make_mapped_queue(h)
123        q._siftdown(0, len(h) - 1)
124        assert q.heap == h_sifted
125        self._check_map(q)
126
127    def test_siftdown_multiple(self):
128        h = [1, 2, 3, 4, 5, 6, 7, 0]
129        h_sifted = [0, 1, 3, 2, 5, 6, 7, 4]
130        q = self._make_mapped_queue(h)
131        q._siftdown(0, len(h) - 1)
132        assert q.heap == h_sifted
133        self._check_map(q)
134
135    def test_push(self):
136        to_push = [6, 1, 4, 3, 2, 5, 0]
137        h_sifted = [0, 2, 1, 6, 3, 5, 4]
138        q = MappedQueue()
139        for elt in to_push:
140            q.push(elt)
141        assert q.heap == h_sifted
142        self._check_map(q)
143
144    def test_push_duplicate(self):
145        to_push = [2, 1, 0]
146        h_sifted = [0, 2, 1]
147        q = MappedQueue()
148        for elt in to_push:
149            inserted = q.push(elt)
150            assert inserted
151        assert q.heap == h_sifted
152        self._check_map(q)
153        inserted = q.push(1)
154        assert not inserted
155
156    def test_pop(self):
157        h = [3, 4, 6, 0, 1, 2, 5]
158        h_sorted = sorted(h)
159        q = self._make_mapped_queue(h)
160        q._heapify()
161        popped = [q.pop() for _ in range(len(h))]
162        assert popped == h_sorted
163        self._check_map(q)
164
165    def test_remove_leaf(self):
166        h = [0, 2, 1, 6, 3, 5, 4]
167        h_removed = [0, 2, 1, 6, 4, 5]
168        q = self._make_mapped_queue(h)
169        removed = q.remove(3)
170        assert q.heap == h_removed
171
172    def test_remove_root(self):
173        h = [0, 2, 1, 6, 3, 5, 4]
174        h_removed = [1, 2, 4, 6, 3, 5]
175        q = self._make_mapped_queue(h)
176        removed = q.remove(0)
177        assert q.heap == h_removed
178
179    def test_update_leaf(self):
180        h = [0, 20, 10, 60, 30, 50, 40]
181        h_updated = [0, 15, 10, 60, 20, 50, 40]
182        q = self._make_mapped_queue(h)
183        removed = q.update(30, 15)
184        assert q.heap == h_updated
185
186    def test_update_root(self):
187        h = [0, 20, 10, 60, 30, 50, 40]
188        h_updated = [10, 20, 35, 60, 30, 50, 40]
189        q = self._make_mapped_queue(h)
190        removed = q.update(0, 35)
191        assert q.heap == h_updated
192
193
194class TestMappedDict(TestMappedQueue):
195    def _make_mapped_queue(self, h):
196        priority_dict = {elt: elt for elt in h}
197        return MappedQueue(priority_dict)
198
199    def test_push(self):
200        to_push = [6, 1, 4, 3, 2, 5, 0]
201        h_sifted = [0, 2, 1, 6, 3, 5, 4]
202        q = MappedQueue()
203        for elt in to_push:
204            q.push(elt, priority=elt)
205        assert q.heap == h_sifted
206        self._check_map(q)
207
208    def test_push_duplicate(self):
209        to_push = [2, 1, 0]
210        h_sifted = [0, 2, 1]
211        q = MappedQueue()
212        for elt in to_push:
213            inserted = q.push(elt, priority=elt)
214            assert inserted
215        assert q.heap == h_sifted
216        self._check_map(q)
217        inserted = q.push(1, priority=1)
218        assert not inserted
219
220    def test_update_leaf(self):
221        h = [0, 20, 10, 60, 30, 50, 40]
222        h_updated = [0, 15, 10, 60, 20, 50, 40]
223        q = self._make_mapped_queue(h)
224        removed = q.update(30, 15, priority=15)
225        assert q.heap == h_updated
226
227    def test_update_root(self):
228        h = [0, 20, 10, 60, 30, 50, 40]
229        h_updated = [10, 20, 35, 60, 30, 50, 40]
230        q = self._make_mapped_queue(h)
231        removed = q.update(0, 35, priority=35)
232        assert q.heap == h_updated
233