1import uuid 2 3from mock import patch, call, Mock 4from nose.tools import eq_, ok_, assert_not_equal, raises 5 6from kazoo.testing import KazooTestCase 7from kazoo.exceptions import KazooException 8from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent 9 10 11class KazooTreeCacheTests(KazooTestCase): 12 13 def setUp(self): 14 super(KazooTreeCacheTests, self).setUp() 15 self._event_queue = self.client.handler.queue_impl() 16 self._error_queue = self.client.handler.queue_impl() 17 self.path = None 18 self.cache = None 19 20 def tearDown(self): 21 super(KazooTreeCacheTests, self).tearDown() 22 if not self._error_queue.empty(): 23 try: 24 raise self._error_queue.get() 25 except FakeException: 26 pass 27 28 def make_cache(self): 29 if self.cache is None: 30 self.path = '/' + uuid.uuid4().hex 31 self.cache = TreeCache(self.client, self.path) 32 self.cache.listen(lambda event: self._event_queue.put(event)) 33 self.cache.listen_fault(lambda error: self._error_queue.put(error)) 34 self.cache.start() 35 return self.cache 36 37 def wait_cache(self, expect=None, since=None, timeout=10): 38 started = since is None 39 while True: 40 event = self._event_queue.get(timeout=timeout) 41 if started: 42 if expect is not None: 43 eq_(event.event_type, expect) 44 return event 45 if event.event_type == since: 46 started = True 47 if expect is None: 48 return 49 50 def spy_client(self, method_name): 51 method = getattr(self.client, method_name) 52 return patch.object(self.client, method_name, wraps=method) 53 54 def test_start(self): 55 self.make_cache() 56 self.wait_cache(since=TreeEvent.INITIALIZED) 57 58 stat = self.client.exists(self.path) 59 eq_(stat.version, 0) 60 61 eq_(self.cache._state, TreeCache.STATE_STARTED) 62 eq_(self.cache._root._state, TreeNode.STATE_LIVE) 63 64 @raises(KazooException) 65 def test_start_started(self): 66 self.make_cache() 67 self.cache.start() 68 69 @raises(KazooException) 70 def test_start_closed(self): 71 self.make_cache() 72 self.cache.start() 73 self.cache.close() 74 self.cache.start() 75 76 def test_close(self): 77 self.make_cache() 78 self.wait_cache(since=TreeEvent.INITIALIZED) 79 self.client.create(self.path + '/foo/bar/baz', makepath=True) 80 for _ in range(3): 81 self.wait_cache(TreeEvent.NODE_ADDED) 82 83 self.cache.close() 84 85 # nothing should be published since tree closed 86 ok_(self._event_queue.empty()) 87 88 # tree should be empty 89 eq_(self.cache._root._children, {}) 90 eq_(self.cache._root._data, None) 91 eq_(self.cache._state, TreeCache.STATE_CLOSED) 92 93 # node state should not be changed 94 assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD) 95 96 def test_children_operation(self): 97 self.make_cache() 98 self.wait_cache(since=TreeEvent.INITIALIZED) 99 100 self.client.create(self.path + '/test_children', b'test_children_1') 101 event = self.wait_cache(TreeEvent.NODE_ADDED) 102 eq_(event.event_type, TreeEvent.NODE_ADDED) 103 eq_(event.event_data.path, self.path + '/test_children') 104 eq_(event.event_data.data, b'test_children_1') 105 eq_(event.event_data.stat.version, 0) 106 107 self.client.set(self.path + '/test_children', b'test_children_2') 108 event = self.wait_cache(TreeEvent.NODE_UPDATED) 109 eq_(event.event_type, TreeEvent.NODE_UPDATED) 110 eq_(event.event_data.path, self.path + '/test_children') 111 eq_(event.event_data.data, b'test_children_2') 112 eq_(event.event_data.stat.version, 1) 113 114 self.client.delete(self.path + '/test_children') 115 event = self.wait_cache(TreeEvent.NODE_REMOVED) 116 eq_(event.event_type, TreeEvent.NODE_REMOVED) 117 eq_(event.event_data.path, self.path + '/test_children') 118 eq_(event.event_data.data, b'test_children_2') 119 eq_(event.event_data.stat.version, 1) 120 121 def test_subtree_operation(self): 122 self.make_cache() 123 self.wait_cache(since=TreeEvent.INITIALIZED) 124 125 self.client.create(self.path + '/foo/bar/baz', makepath=True) 126 for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'): 127 event = self.wait_cache(TreeEvent.NODE_ADDED) 128 eq_(event.event_type, TreeEvent.NODE_ADDED) 129 eq_(event.event_data.path, self.path + relative_path) 130 eq_(event.event_data.data, b'') 131 eq_(event.event_data.stat.version, 0) 132 133 self.client.delete(self.path + '/foo', recursive=True) 134 for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'): 135 event = self.wait_cache(TreeEvent.NODE_REMOVED) 136 eq_(event.event_type, TreeEvent.NODE_REMOVED) 137 eq_(event.event_data.path, self.path + relative_path) 138 139 def test_get_data(self): 140 cache = self.make_cache() 141 self.wait_cache(since=TreeEvent.INITIALIZED) 142 self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True) 143 self.wait_cache(TreeEvent.NODE_ADDED) 144 self.wait_cache(TreeEvent.NODE_ADDED) 145 self.wait_cache(TreeEvent.NODE_ADDED) 146 147 with patch.object(cache, '_client'): # disable any remote operation 148 eq_(cache.get_data(self.path).data, b'') 149 eq_(cache.get_data(self.path).stat.version, 0) 150 151 eq_(cache.get_data(self.path + '/foo').data, b'') 152 eq_(cache.get_data(self.path + '/foo').stat.version, 0) 153 154 eq_(cache.get_data(self.path + '/foo/bar').data, b'') 155 eq_(cache.get_data(self.path + '/foo/bar').stat.version, 0) 156 157 eq_(cache.get_data(self.path + '/foo/bar/baz').data, b'@') 158 eq_(cache.get_data(self.path + '/foo/bar/baz').stat.version, 0) 159 160 def test_get_children(self): 161 cache = self.make_cache() 162 self.wait_cache(since=TreeEvent.INITIALIZED) 163 self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True) 164 self.wait_cache(TreeEvent.NODE_ADDED) 165 self.wait_cache(TreeEvent.NODE_ADDED) 166 self.wait_cache(TreeEvent.NODE_ADDED) 167 168 with patch.object(cache, '_client'): # disable any remote operation 169 eq_(cache.get_children(self.path + '/foo/bar/baz'), frozenset()) 170 eq_(cache.get_children(self.path + '/foo/bar'), frozenset(['baz'])) 171 eq_(cache.get_children(self.path + '/foo'), frozenset(['bar'])) 172 eq_(cache.get_children(self.path), frozenset(['foo'])) 173 174 @raises(ValueError) 175 def test_get_data_out_of_tree(self): 176 self.make_cache() 177 self.wait_cache(since=TreeEvent.INITIALIZED) 178 self.cache.get_data('/out_of_tree') 179 180 @raises(ValueError) 181 def test_get_children_out_of_tree(self): 182 self.make_cache() 183 self.wait_cache(since=TreeEvent.INITIALIZED) 184 self.cache.get_children('/out_of_tree') 185 186 def test_get_data_no_node(self): 187 cache = self.make_cache() 188 self.wait_cache(since=TreeEvent.INITIALIZED) 189 190 with patch.object(cache, '_client'): # disable any remote operation 191 eq_(cache.get_data(self.path + '/non_exists'), None) 192 193 def test_get_children_no_node(self): 194 cache = self.make_cache() 195 self.wait_cache(since=TreeEvent.INITIALIZED) 196 197 with patch.object(cache, '_client'): # disable any remote operation 198 eq_(cache.get_children(self.path + '/non_exists'), None) 199 200 def test_session_reconnected(self): 201 self.make_cache() 202 self.wait_cache(since=TreeEvent.INITIALIZED) 203 204 self.client.create(self.path + '/foo') 205 event = self.wait_cache(TreeEvent.NODE_ADDED) 206 eq_(event.event_data.path, self.path + '/foo') 207 208 with self.spy_client('get_async') as get_data: 209 with self.spy_client('get_children_async') as get_children: 210 # session suspended 211 self.lose_connection(self.client.handler.event_object) 212 self.wait_cache(TreeEvent.CONNECTION_SUSPENDED) 213 214 # There are a serial refreshing operation here. But NODE_ADDED 215 # events will not be raised because the zxid of nodes are the 216 # same during reconnecting. 217 218 # connection restore 219 self.wait_cache(TreeEvent.CONNECTION_RECONNECTED) 220 221 # wait for outstanding operations 222 while self.cache._outstanding_ops > 0: 223 self.client.handler.sleep_func(0.1) 224 225 # inspect in-memory nodes 226 _node_root = self.cache._root 227 _node_foo = self.cache._root._children['foo'] 228 229 # make sure that all nodes are refreshed 230 get_data.assert_has_calls([ 231 call(self.path, watch=_node_root._process_watch), 232 call(self.path + '/foo', watch=_node_foo._process_watch), 233 ], any_order=True) 234 get_children.assert_has_calls([ 235 call(self.path, watch=_node_root._process_watch), 236 call(self.path + '/foo', watch=_node_foo._process_watch), 237 ], any_order=True) 238 239 def test_root_recreated(self): 240 self.make_cache() 241 self.wait_cache(since=TreeEvent.INITIALIZED) 242 243 # remove root node 244 self.client.delete(self.path) 245 event = self.wait_cache(TreeEvent.NODE_REMOVED) 246 eq_(event.event_type, TreeEvent.NODE_REMOVED) 247 eq_(event.event_data.data, b'') 248 eq_(event.event_data.path, self.path) 249 eq_(event.event_data.stat.version, 0) 250 251 # re-create root node 252 self.client.ensure_path(self.path) 253 event = self.wait_cache(TreeEvent.NODE_ADDED) 254 eq_(event.event_type, TreeEvent.NODE_ADDED) 255 eq_(event.event_data.data, b'') 256 eq_(event.event_data.path, self.path) 257 eq_(event.event_data.stat.version, 0) 258 259 self.assertTrue( 260 self.cache._outstanding_ops >= 0, 261 'unexpected outstanding ops %r' % self.cache._outstanding_ops) 262 263 def test_exception_handler(self): 264 error_value = FakeException() 265 error_handler = Mock() 266 267 with patch.object(TreeNode, 'on_deleted') as on_deleted: 268 on_deleted.side_effect = [error_value] 269 270 self.make_cache() 271 self.cache.listen_fault(error_handler) 272 273 self.cache.close() 274 error_handler.assert_called_once_with(error_value) 275 276 def test_exception_suppressed(self): 277 self.make_cache() 278 self.wait_cache(since=TreeEvent.INITIALIZED) 279 280 # stoke up ConnectionClosedError 281 self.client.stop() 282 self.client.close() 283 self.client.handler.start() # keep the async completion 284 self.wait_cache(since=TreeEvent.CONNECTION_LOST) 285 286 with patch.object(TreeNode, 'on_created') as on_created: 287 self.cache._root._call_client('exists', '/') 288 self.cache._root._call_client('get', '/') 289 self.cache._root._call_client('get_children', '/') 290 291 self.wait_cache(since=TreeEvent.INITIALIZED) 292 on_created.assert_not_called() 293 eq_(self.cache._outstanding_ops, 0) 294 295 296class FakeException(Exception): 297 pass 298