1import time 2import threading 3import uuid 4 5from nose.tools import eq_ 6from nose.tools import raises 7 8from kazoo.exceptions import KazooException 9from kazoo.protocol.states import EventType 10from kazoo.testing import KazooTestCase 11 12 13class KazooDataWatcherTests(KazooTestCase): 14 def setUp(self): 15 super(KazooDataWatcherTests, self).setUp() 16 self.path = "/" + uuid.uuid4().hex 17 self.client.ensure_path(self.path) 18 19 def test_data_watcher(self): 20 update = threading.Event() 21 data = [True] 22 23 # Make it a non-existent path 24 self.path += 'f' 25 26 @self.client.DataWatch(self.path) 27 def changed(d, stat): 28 data.pop() 29 data.append(d) 30 update.set() 31 32 update.wait(10) 33 eq_(data, [None]) 34 update.clear() 35 36 self.client.create(self.path, b'fred') 37 update.wait(10) 38 eq_(data[0], b'fred') 39 update.clear() 40 41 def test_data_watcher_once(self): 42 update = threading.Event() 43 data = [True] 44 45 # Make it a non-existent path 46 self.path += 'f' 47 48 dwatcher = self.client.DataWatch(self.path) 49 50 @dwatcher 51 def changed(d, stat): 52 data.pop() 53 data.append(d) 54 update.set() 55 56 update.wait(10) 57 eq_(data, [None]) 58 update.clear() 59 60 @raises(KazooException) 61 def test_it(): 62 @dwatcher 63 def func(d, stat): 64 data.pop() 65 test_it() 66 67 def test_data_watcher_with_event(self): 68 # Test that the data watcher gets passed the event, if it 69 # accepts three arguments 70 update = threading.Event() 71 data = [True] 72 73 # Make it a non-existent path 74 self.path += 'f' 75 76 @self.client.DataWatch(self.path) 77 def changed(d, stat, event): 78 data.pop() 79 data.append(event) 80 update.set() 81 82 update.wait(10) 83 eq_(data, [None]) 84 update.clear() 85 86 self.client.create(self.path, b'fred') 87 update.wait(10) 88 eq_(data[0].type, EventType.CREATED) 89 update.clear() 90 91 def test_func_style_data_watch(self): 92 update = threading.Event() 93 data = [True] 94 95 # Make it a non-existent path 96 path = self.path + 'f' 97 98 def changed(d, stat): 99 data.pop() 100 data.append(d) 101 update.set() 102 self.client.DataWatch(path, changed) 103 104 update.wait(10) 105 eq_(data, [None]) 106 update.clear() 107 108 self.client.create(path, b'fred') 109 update.wait(10) 110 eq_(data[0], b'fred') 111 update.clear() 112 113 def test_datawatch_across_session_expire(self): 114 update = threading.Event() 115 data = [True] 116 117 @self.client.DataWatch(self.path) 118 def changed(d, stat): 119 data.pop() 120 data.append(d) 121 update.set() 122 123 update.wait(10) 124 eq_(data, [b""]) 125 update.clear() 126 127 self.expire_session(threading.Event) 128 self.client.retry(self.client.set, self.path, b'fred') 129 update.wait(25) 130 eq_(data[0], b'fred') 131 132 def test_func_stops(self): 133 update = threading.Event() 134 data = [True] 135 136 self.path += "f" 137 138 fail_through = [] 139 140 @self.client.DataWatch(self.path) 141 def changed(d, stat): 142 data.pop() 143 data.append(d) 144 update.set() 145 if fail_through: 146 return False 147 148 update.wait(10) 149 eq_(data, [None]) 150 update.clear() 151 152 fail_through.append(True) 153 self.client.create(self.path, b'fred') 154 update.wait(10) 155 eq_(data[0], b'fred') 156 update.clear() 157 158 self.client.set(self.path, b'asdfasdf') 159 update.wait(0.2) 160 eq_(data[0], b'fred') 161 162 d, stat = self.client.get(self.path) 163 eq_(d, b'asdfasdf') 164 165 def test_no_such_node(self): 166 args = [] 167 168 @self.client.DataWatch("/some/path") 169 def changed(d, stat): 170 args.extend([d, stat]) 171 172 eq_(args, [None, None]) 173 174 def test_no_such_node_for_children_watch(self): 175 args = [] 176 path = self.path + '/test_no_such_node_for_children_watch' 177 update = threading.Event() 178 179 def changed(children): 180 args.append(children) 181 update.set() 182 183 # watch a node which does not exist 184 children_watch = self.client.ChildrenWatch(path, changed) 185 eq_(update.is_set(), False) 186 eq_(children_watch._stopped, True) 187 eq_(args, []) 188 189 # watch a node which exists 190 self.client.create(path, b'') 191 children_watch = self.client.ChildrenWatch(path, changed) 192 update.wait(3) 193 eq_(args, [[]]) 194 update.clear() 195 196 # watch changes 197 self.client.create(path + '/fred', b'') 198 update.wait(3) 199 eq_(args, [[], ['fred']]) 200 update.clear() 201 202 # delete children 203 self.client.delete(path + '/fred') 204 update.wait(3) 205 eq_(args, [[], ['fred'], []]) 206 update.clear() 207 208 # delete watching 209 self.client.delete(path) 210 211 # a hack for waiting the watcher stop 212 for retry in range(5): 213 if children_watch._stopped: 214 break 215 children_watch._run_lock.acquire() 216 children_watch._run_lock.release() 217 time.sleep(retry / 10.0) 218 219 eq_(update.is_set(), False) 220 eq_(children_watch._stopped, True) 221 222 def test_bad_watch_func2(self): 223 counter = 0 224 225 @self.client.DataWatch(self.path) 226 def changed(d, stat): 227 if counter > 0: 228 raise Exception("oops") 229 230 raises(Exception)(changed) 231 232 counter += 1 233 self.client.set(self.path, b'asdfasdf') 234 235 def test_watcher_evaluating_to_false(self): 236 class WeirdWatcher(list): 237 def __call__(self, *args): 238 self.called = True 239 watcher = WeirdWatcher() 240 self.client.DataWatch(self.path, watcher) 241 self.client.set(self.path, b'mwahaha') 242 self.assertTrue(watcher.called) 243 244 def test_watcher_repeat_delete(self): 245 a = [] 246 ev = threading.Event() 247 248 self.client.delete(self.path) 249 250 @self.client.DataWatch(self.path) 251 def changed(val, stat): 252 a.append(val) 253 ev.set() 254 255 eq_(a, [None]) 256 ev.wait(10) 257 ev.clear() 258 self.client.create(self.path, b'blah') 259 ev.wait(10) 260 eq_(ev.is_set(), True) 261 ev.clear() 262 eq_(a, [None, b'blah']) 263 self.client.delete(self.path) 264 ev.wait(10) 265 eq_(ev.is_set(), True) 266 ev.clear() 267 eq_(a, [None, b'blah', None]) 268 self.client.create(self.path, b'blah') 269 ev.wait(10) 270 eq_(ev.is_set(), True) 271 ev.clear() 272 eq_(a, [None, b'blah', None, b'blah']) 273 274 def test_watcher_with_closing(self): 275 a = [] 276 ev = threading.Event() 277 278 self.client.delete(self.path) 279 280 @self.client.DataWatch(self.path) 281 def changed(val, stat): 282 a.append(val) 283 ev.set() 284 eq_(a, [None]) 285 286 b = False 287 try: 288 self.client.stop() 289 except: 290 b = True 291 eq_(b, False) 292 293 294class KazooChildrenWatcherTests(KazooTestCase): 295 def setUp(self): 296 super(KazooChildrenWatcherTests, self).setUp() 297 self.path = "/" + uuid.uuid4().hex 298 self.client.ensure_path(self.path) 299 300 def test_child_watcher(self): 301 update = threading.Event() 302 all_children = ['fred'] 303 304 @self.client.ChildrenWatch(self.path) 305 def changed(children): 306 while all_children: 307 all_children.pop() 308 all_children.extend(children) 309 update.set() 310 311 update.wait(10) 312 eq_(all_children, []) 313 update.clear() 314 315 self.client.create(self.path + '/' + 'smith') 316 update.wait(10) 317 eq_(all_children, ['smith']) 318 update.clear() 319 320 self.client.create(self.path + '/' + 'george') 321 update.wait(10) 322 eq_(sorted(all_children), ['george', 'smith']) 323 324 def test_child_watcher_once(self): 325 update = threading.Event() 326 all_children = ['fred'] 327 328 cwatch = self.client.ChildrenWatch(self.path) 329 330 @cwatch 331 def changed(children): 332 while all_children: 333 all_children.pop() 334 all_children.extend(children) 335 update.set() 336 337 update.wait(10) 338 eq_(all_children, []) 339 update.clear() 340 341 @raises(KazooException) 342 def test_it(): 343 @cwatch 344 def changed_again(children): 345 update.set() 346 test_it() 347 348 def test_child_watcher_with_event(self): 349 update = threading.Event() 350 events = [True] 351 352 @self.client.ChildrenWatch(self.path, send_event=True) 353 def changed(children, event): 354 events.pop() 355 events.append(event) 356 update.set() 357 358 update.wait(10) 359 eq_(events, [None]) 360 update.clear() 361 362 self.client.create(self.path + '/' + 'smith') 363 update.wait(10) 364 eq_(events[0].type, EventType.CHILD) 365 update.clear() 366 367 def test_func_style_child_watcher(self): 368 update = threading.Event() 369 all_children = ['fred'] 370 371 def changed(children): 372 while all_children: 373 all_children.pop() 374 all_children.extend(children) 375 update.set() 376 377 self.client.ChildrenWatch(self.path, changed) 378 379 update.wait(10) 380 eq_(all_children, []) 381 update.clear() 382 383 self.client.create(self.path + '/' + 'smith') 384 update.wait(10) 385 eq_(all_children, ['smith']) 386 update.clear() 387 388 self.client.create(self.path + '/' + 'george') 389 update.wait(10) 390 eq_(sorted(all_children), ['george', 'smith']) 391 392 def test_func_stops(self): 393 update = threading.Event() 394 all_children = ['fred'] 395 396 fail_through = [] 397 398 @self.client.ChildrenWatch(self.path) 399 def changed(children): 400 while all_children: 401 all_children.pop() 402 all_children.extend(children) 403 update.set() 404 if fail_through: 405 return False 406 407 update.wait(10) 408 eq_(all_children, []) 409 update.clear() 410 411 fail_through.append(True) 412 self.client.create(self.path + '/' + 'smith') 413 update.wait(10) 414 eq_(all_children, ['smith']) 415 update.clear() 416 417 self.client.create(self.path + '/' + 'george') 418 update.wait(0.5) 419 eq_(all_children, ['smith']) 420 421 def test_child_watch_session_loss(self): 422 update = threading.Event() 423 all_children = ['fred'] 424 425 @self.client.ChildrenWatch(self.path) 426 def changed(children): 427 while all_children: 428 all_children.pop() 429 all_children.extend(children) 430 update.set() 431 432 update.wait(10) 433 eq_(all_children, []) 434 update.clear() 435 436 self.client.create(self.path + '/' + 'smith') 437 update.wait(10) 438 eq_(all_children, ['smith']) 439 update.clear() 440 self.expire_session(threading.Event) 441 442 self.client.retry(self.client.create, 443 self.path + '/' + 'george') 444 update.wait(20) 445 eq_(sorted(all_children), ['george', 'smith']) 446 447 def test_child_stop_on_session_loss(self): 448 update = threading.Event() 449 all_children = ['fred'] 450 451 @self.client.ChildrenWatch(self.path, allow_session_lost=False) 452 def changed(children): 453 while all_children: 454 all_children.pop() 455 all_children.extend(children) 456 update.set() 457 458 update.wait(10) 459 eq_(all_children, []) 460 update.clear() 461 462 self.client.create(self.path + '/' + 'smith') 463 update.wait(10) 464 eq_(all_children, ['smith']) 465 update.clear() 466 self.expire_session(threading.Event) 467 468 self.client.retry(self.client.create, 469 self.path + '/' + 'george') 470 update.wait(4) 471 eq_(update.is_set(), False) 472 eq_(all_children, ['smith']) 473 474 children = self.client.get_children(self.path) 475 eq_(sorted(children), ['george', 'smith']) 476 477 def test_bad_children_watch_func(self): 478 counter = 0 479 480 @self.client.ChildrenWatch(self.path) 481 def changed(children): 482 if counter > 0: 483 raise Exception("oops") 484 485 raises(Exception)(changed) 486 counter += 1 487 self.client.create(self.path + '/' + 'smith') 488 489 490class KazooPatientChildrenWatcherTests(KazooTestCase): 491 def setUp(self): 492 super(KazooPatientChildrenWatcherTests, self).setUp() 493 self.path = "/" + uuid.uuid4().hex 494 495 def _makeOne(self, *args, **kwargs): 496 from kazoo.recipe.watchers import PatientChildrenWatch 497 return PatientChildrenWatch(*args, **kwargs) 498 499 def test_watch(self): 500 self.client.ensure_path(self.path) 501 watcher = self._makeOne(self.client, self.path, 0.1) 502 result = watcher.start() 503 children, asy = result.get() 504 eq_(len(children), 0) 505 eq_(asy.ready(), False) 506 507 self.client.create(self.path + '/' + 'fred') 508 asy.get(timeout=1) 509 eq_(asy.ready(), True) 510 511 def test_exception(self): 512 from kazoo.exceptions import NoNodeError 513 watcher = self._makeOne(self.client, self.path, 0.1) 514 result = watcher.start() 515 516 @raises(NoNodeError) 517 def testit(): 518 result.get() 519 testit() 520 521 def test_watch_iterations(self): 522 self.client.ensure_path(self.path) 523 watcher = self._makeOne(self.client, self.path, 0.5) 524 result = watcher.start() 525 eq_(result.ready(), False) 526 527 time.sleep(0.08) 528 self.client.create(self.path + '/' + uuid.uuid4().hex) 529 eq_(result.ready(), False) 530 time.sleep(0.08) 531 eq_(result.ready(), False) 532 self.client.create(self.path + '/' + uuid.uuid4().hex) 533 time.sleep(0.08) 534 eq_(result.ready(), False) 535 536 children, asy = result.get() 537 eq_(len(children), 2) 538