1import uuid
2
3from mock import patch, call, Mock
4from nose.tools import eq_, ok_, assert_not_equal, raises
5
6from kazoo.testing import KazooTestCase
7from kazoo.exceptions import KazooException
8from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
9
10
11class KazooTreeCacheTests(KazooTestCase):
12
13    def setUp(self):
14        super(KazooTreeCacheTests, self).setUp()
15        self._event_queue = self.client.handler.queue_impl()
16        self._error_queue = self.client.handler.queue_impl()
17        self.path = None
18        self.cache = None
19
20    def tearDown(self):
21        super(KazooTreeCacheTests, self).tearDown()
22        if not self._error_queue.empty():
23            try:
24                raise self._error_queue.get()
25            except FakeException:
26                pass
27
28    def make_cache(self):
29        if self.cache is None:
30            self.path = '/' + uuid.uuid4().hex
31            self.cache = TreeCache(self.client, self.path)
32            self.cache.listen(lambda event: self._event_queue.put(event))
33            self.cache.listen_fault(lambda error: self._error_queue.put(error))
34            self.cache.start()
35        return self.cache
36
37    def wait_cache(self, expect=None, since=None, timeout=10):
38        started = since is None
39        while True:
40            event = self._event_queue.get(timeout=timeout)
41            if started:
42                if expect is not None:
43                    eq_(event.event_type, expect)
44                return event
45            if event.event_type == since:
46                started = True
47                if expect is None:
48                    return
49
50    def spy_client(self, method_name):
51        method = getattr(self.client, method_name)
52        return patch.object(self.client, method_name, wraps=method)
53
54    def test_start(self):
55        self.make_cache()
56        self.wait_cache(since=TreeEvent.INITIALIZED)
57
58        stat = self.client.exists(self.path)
59        eq_(stat.version, 0)
60
61        eq_(self.cache._state, TreeCache.STATE_STARTED)
62        eq_(self.cache._root._state, TreeNode.STATE_LIVE)
63
64    @raises(KazooException)
65    def test_start_started(self):
66        self.make_cache()
67        self.cache.start()
68
69    @raises(KazooException)
70    def test_start_closed(self):
71        self.make_cache()
72        self.cache.start()
73        self.cache.close()
74        self.cache.start()
75
76    def test_close(self):
77        self.make_cache()
78        self.wait_cache(since=TreeEvent.INITIALIZED)
79        self.client.create(self.path + '/foo/bar/baz', makepath=True)
80        for _ in range(3):
81            self.wait_cache(TreeEvent.NODE_ADDED)
82
83        self.cache.close()
84
85        # nothing should be published since tree closed
86        ok_(self._event_queue.empty())
87
88        # tree should be empty
89        eq_(self.cache._root._children, {})
90        eq_(self.cache._root._data, None)
91        eq_(self.cache._state, TreeCache.STATE_CLOSED)
92
93        # node state should not be changed
94        assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)
95
96    def test_children_operation(self):
97        self.make_cache()
98        self.wait_cache(since=TreeEvent.INITIALIZED)
99
100        self.client.create(self.path + '/test_children', b'test_children_1')
101        event = self.wait_cache(TreeEvent.NODE_ADDED)
102        eq_(event.event_type, TreeEvent.NODE_ADDED)
103        eq_(event.event_data.path, self.path + '/test_children')
104        eq_(event.event_data.data, b'test_children_1')
105        eq_(event.event_data.stat.version, 0)
106
107        self.client.set(self.path + '/test_children', b'test_children_2')
108        event = self.wait_cache(TreeEvent.NODE_UPDATED)
109        eq_(event.event_type, TreeEvent.NODE_UPDATED)
110        eq_(event.event_data.path, self.path + '/test_children')
111        eq_(event.event_data.data, b'test_children_2')
112        eq_(event.event_data.stat.version, 1)
113
114        self.client.delete(self.path + '/test_children')
115        event = self.wait_cache(TreeEvent.NODE_REMOVED)
116        eq_(event.event_type, TreeEvent.NODE_REMOVED)
117        eq_(event.event_data.path, self.path + '/test_children')
118        eq_(event.event_data.data, b'test_children_2')
119        eq_(event.event_data.stat.version, 1)
120
121    def test_subtree_operation(self):
122        self.make_cache()
123        self.wait_cache(since=TreeEvent.INITIALIZED)
124
125        self.client.create(self.path + '/foo/bar/baz', makepath=True)
126        for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'):
127            event = self.wait_cache(TreeEvent.NODE_ADDED)
128            eq_(event.event_type, TreeEvent.NODE_ADDED)
129            eq_(event.event_data.path, self.path + relative_path)
130            eq_(event.event_data.data, b'')
131            eq_(event.event_data.stat.version, 0)
132
133        self.client.delete(self.path + '/foo', recursive=True)
134        for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'):
135            event = self.wait_cache(TreeEvent.NODE_REMOVED)
136            eq_(event.event_type, TreeEvent.NODE_REMOVED)
137            eq_(event.event_data.path, self.path + relative_path)
138
139    def test_get_data(self):
140        cache = self.make_cache()
141        self.wait_cache(since=TreeEvent.INITIALIZED)
142        self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
143        self.wait_cache(TreeEvent.NODE_ADDED)
144        self.wait_cache(TreeEvent.NODE_ADDED)
145        self.wait_cache(TreeEvent.NODE_ADDED)
146
147        with patch.object(cache, '_client'):  # disable any remote operation
148            eq_(cache.get_data(self.path).data, b'')
149            eq_(cache.get_data(self.path).stat.version, 0)
150
151            eq_(cache.get_data(self.path + '/foo').data, b'')
152            eq_(cache.get_data(self.path + '/foo').stat.version, 0)
153
154            eq_(cache.get_data(self.path + '/foo/bar').data, b'')
155            eq_(cache.get_data(self.path + '/foo/bar').stat.version, 0)
156
157            eq_(cache.get_data(self.path + '/foo/bar/baz').data, b'@')
158            eq_(cache.get_data(self.path + '/foo/bar/baz').stat.version, 0)
159
160    def test_get_children(self):
161        cache = self.make_cache()
162        self.wait_cache(since=TreeEvent.INITIALIZED)
163        self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
164        self.wait_cache(TreeEvent.NODE_ADDED)
165        self.wait_cache(TreeEvent.NODE_ADDED)
166        self.wait_cache(TreeEvent.NODE_ADDED)
167
168        with patch.object(cache, '_client'):  # disable any remote operation
169            eq_(cache.get_children(self.path + '/foo/bar/baz'), frozenset())
170            eq_(cache.get_children(self.path + '/foo/bar'), frozenset(['baz']))
171            eq_(cache.get_children(self.path + '/foo'), frozenset(['bar']))
172            eq_(cache.get_children(self.path), frozenset(['foo']))
173
174    @raises(ValueError)
175    def test_get_data_out_of_tree(self):
176        self.make_cache()
177        self.wait_cache(since=TreeEvent.INITIALIZED)
178        self.cache.get_data('/out_of_tree')
179
180    @raises(ValueError)
181    def test_get_children_out_of_tree(self):
182        self.make_cache()
183        self.wait_cache(since=TreeEvent.INITIALIZED)
184        self.cache.get_children('/out_of_tree')
185
186    def test_get_data_no_node(self):
187        cache = self.make_cache()
188        self.wait_cache(since=TreeEvent.INITIALIZED)
189
190        with patch.object(cache, '_client'):  # disable any remote operation
191            eq_(cache.get_data(self.path + '/non_exists'), None)
192
193    def test_get_children_no_node(self):
194        cache = self.make_cache()
195        self.wait_cache(since=TreeEvent.INITIALIZED)
196
197        with patch.object(cache, '_client'):  # disable any remote operation
198            eq_(cache.get_children(self.path + '/non_exists'), None)
199
200    def test_session_reconnected(self):
201        self.make_cache()
202        self.wait_cache(since=TreeEvent.INITIALIZED)
203
204        self.client.create(self.path + '/foo')
205        event = self.wait_cache(TreeEvent.NODE_ADDED)
206        eq_(event.event_data.path, self.path + '/foo')
207
208        with self.spy_client('get_async') as get_data:
209            with self.spy_client('get_children_async') as get_children:
210                # session suspended
211                self.lose_connection(self.client.handler.event_object)
212                self.wait_cache(TreeEvent.CONNECTION_SUSPENDED)
213
214                # There are a serial refreshing operation here. But NODE_ADDED
215                # events will not be raised because the zxid of nodes are the
216                # same during reconnecting.
217
218                # connection restore
219                self.wait_cache(TreeEvent.CONNECTION_RECONNECTED)
220
221                # wait for outstanding operations
222                while self.cache._outstanding_ops > 0:
223                    self.client.handler.sleep_func(0.1)
224
225                # inspect in-memory nodes
226                _node_root = self.cache._root
227                _node_foo = self.cache._root._children['foo']
228
229                # make sure that all nodes are refreshed
230                get_data.assert_has_calls([
231                    call(self.path, watch=_node_root._process_watch),
232                    call(self.path + '/foo', watch=_node_foo._process_watch),
233                ], any_order=True)
234                get_children.assert_has_calls([
235                    call(self.path, watch=_node_root._process_watch),
236                    call(self.path + '/foo', watch=_node_foo._process_watch),
237                ], any_order=True)
238
239    def test_root_recreated(self):
240        self.make_cache()
241        self.wait_cache(since=TreeEvent.INITIALIZED)
242
243        # remove root node
244        self.client.delete(self.path)
245        event = self.wait_cache(TreeEvent.NODE_REMOVED)
246        eq_(event.event_type, TreeEvent.NODE_REMOVED)
247        eq_(event.event_data.data, b'')
248        eq_(event.event_data.path, self.path)
249        eq_(event.event_data.stat.version, 0)
250
251        # re-create root node
252        self.client.ensure_path(self.path)
253        event = self.wait_cache(TreeEvent.NODE_ADDED)
254        eq_(event.event_type, TreeEvent.NODE_ADDED)
255        eq_(event.event_data.data, b'')
256        eq_(event.event_data.path, self.path)
257        eq_(event.event_data.stat.version, 0)
258
259        self.assertTrue(
260            self.cache._outstanding_ops >= 0,
261            'unexpected outstanding ops %r' % self.cache._outstanding_ops)
262
263    def test_exception_handler(self):
264        error_value = FakeException()
265        error_handler = Mock()
266
267        with patch.object(TreeNode, 'on_deleted') as on_deleted:
268            on_deleted.side_effect = [error_value]
269
270            self.make_cache()
271            self.cache.listen_fault(error_handler)
272
273            self.cache.close()
274            error_handler.assert_called_once_with(error_value)
275
276    def test_exception_suppressed(self):
277        self.make_cache()
278        self.wait_cache(since=TreeEvent.INITIALIZED)
279
280        # stoke up ConnectionClosedError
281        self.client.stop()
282        self.client.close()
283        self.client.handler.start()  # keep the async completion
284        self.wait_cache(since=TreeEvent.CONNECTION_LOST)
285
286        with patch.object(TreeNode, 'on_created') as on_created:
287            self.cache._root._call_client('exists', '/')
288            self.cache._root._call_client('get', '/')
289            self.cache._root._call_client('get_children', '/')
290
291            self.wait_cache(since=TreeEvent.INITIALIZED)
292            on_created.assert_not_called()
293            eq_(self.cache._outstanding_ops, 0)
294
295
296class FakeException(Exception):
297    pass
298