1import time
2import threading
3import uuid
4
5from nose.tools import eq_
6from nose.tools import raises
7
8from kazoo.exceptions import KazooException
9from kazoo.protocol.states import EventType
10from kazoo.testing import KazooTestCase
11
12
13class KazooDataWatcherTests(KazooTestCase):
14    def setUp(self):
15        super(KazooDataWatcherTests, self).setUp()
16        self.path = "/" + uuid.uuid4().hex
17        self.client.ensure_path(self.path)
18
19    def test_data_watcher(self):
20        update = threading.Event()
21        data = [True]
22
23        # Make it a non-existent path
24        self.path += 'f'
25
26        @self.client.DataWatch(self.path)
27        def changed(d, stat):
28            data.pop()
29            data.append(d)
30            update.set()
31
32        update.wait(10)
33        eq_(data, [None])
34        update.clear()
35
36        self.client.create(self.path, b'fred')
37        update.wait(10)
38        eq_(data[0], b'fred')
39        update.clear()
40
41    def test_data_watcher_once(self):
42        update = threading.Event()
43        data = [True]
44
45        # Make it a non-existent path
46        self.path += 'f'
47
48        dwatcher = self.client.DataWatch(self.path)
49
50        @dwatcher
51        def changed(d, stat):
52            data.pop()
53            data.append(d)
54            update.set()
55
56        update.wait(10)
57        eq_(data, [None])
58        update.clear()
59
60        @raises(KazooException)
61        def test_it():
62            @dwatcher
63            def func(d, stat):
64                data.pop()
65        test_it()
66
67    def test_data_watcher_with_event(self):
68        # Test that the data watcher gets passed the event, if it
69        # accepts three arguments
70        update = threading.Event()
71        data = [True]
72
73        # Make it a non-existent path
74        self.path += 'f'
75
76        @self.client.DataWatch(self.path)
77        def changed(d, stat, event):
78            data.pop()
79            data.append(event)
80            update.set()
81
82        update.wait(10)
83        eq_(data, [None])
84        update.clear()
85
86        self.client.create(self.path, b'fred')
87        update.wait(10)
88        eq_(data[0].type, EventType.CREATED)
89        update.clear()
90
91    def test_func_style_data_watch(self):
92        update = threading.Event()
93        data = [True]
94
95        # Make it a non-existent path
96        path = self.path + 'f'
97
98        def changed(d, stat):
99            data.pop()
100            data.append(d)
101            update.set()
102        self.client.DataWatch(path, changed)
103
104        update.wait(10)
105        eq_(data, [None])
106        update.clear()
107
108        self.client.create(path, b'fred')
109        update.wait(10)
110        eq_(data[0], b'fred')
111        update.clear()
112
113    def test_datawatch_across_session_expire(self):
114        update = threading.Event()
115        data = [True]
116
117        @self.client.DataWatch(self.path)
118        def changed(d, stat):
119            data.pop()
120            data.append(d)
121            update.set()
122
123        update.wait(10)
124        eq_(data, [b""])
125        update.clear()
126
127        self.expire_session(threading.Event)
128        self.client.retry(self.client.set, self.path, b'fred')
129        update.wait(25)
130        eq_(data[0], b'fred')
131
132    def test_func_stops(self):
133        update = threading.Event()
134        data = [True]
135
136        self.path += "f"
137
138        fail_through = []
139
140        @self.client.DataWatch(self.path)
141        def changed(d, stat):
142            data.pop()
143            data.append(d)
144            update.set()
145            if fail_through:
146                return False
147
148        update.wait(10)
149        eq_(data, [None])
150        update.clear()
151
152        fail_through.append(True)
153        self.client.create(self.path, b'fred')
154        update.wait(10)
155        eq_(data[0], b'fred')
156        update.clear()
157
158        self.client.set(self.path, b'asdfasdf')
159        update.wait(0.2)
160        eq_(data[0], b'fred')
161
162        d, stat = self.client.get(self.path)
163        eq_(d, b'asdfasdf')
164
165    def test_no_such_node(self):
166        args = []
167
168        @self.client.DataWatch("/some/path")
169        def changed(d, stat):
170            args.extend([d, stat])
171
172        eq_(args, [None, None])
173
174    def test_no_such_node_for_children_watch(self):
175        args = []
176        path = self.path + '/test_no_such_node_for_children_watch'
177        update = threading.Event()
178
179        def changed(children):
180            args.append(children)
181            update.set()
182
183        # watch a node which does not exist
184        children_watch = self.client.ChildrenWatch(path, changed)
185        eq_(update.is_set(), False)
186        eq_(children_watch._stopped, True)
187        eq_(args, [])
188
189        # watch a node which exists
190        self.client.create(path, b'')
191        children_watch = self.client.ChildrenWatch(path, changed)
192        update.wait(3)
193        eq_(args, [[]])
194        update.clear()
195
196        # watch changes
197        self.client.create(path + '/fred', b'')
198        update.wait(3)
199        eq_(args, [[], ['fred']])
200        update.clear()
201
202        # delete children
203        self.client.delete(path + '/fred')
204        update.wait(3)
205        eq_(args, [[], ['fred'], []])
206        update.clear()
207
208        # delete watching
209        self.client.delete(path)
210
211        # a hack for waiting the watcher stop
212        for retry in range(5):
213            if children_watch._stopped:
214                break
215            children_watch._run_lock.acquire()
216            children_watch._run_lock.release()
217            time.sleep(retry / 10.0)
218
219        eq_(update.is_set(), False)
220        eq_(children_watch._stopped, True)
221
222    def test_bad_watch_func2(self):
223        counter = 0
224
225        @self.client.DataWatch(self.path)
226        def changed(d, stat):
227            if counter > 0:
228                raise Exception("oops")
229
230        raises(Exception)(changed)
231
232        counter += 1
233        self.client.set(self.path, b'asdfasdf')
234
235    def test_watcher_evaluating_to_false(self):
236        class WeirdWatcher(list):
237            def __call__(self, *args):
238                self.called = True
239        watcher = WeirdWatcher()
240        self.client.DataWatch(self.path, watcher)
241        self.client.set(self.path, b'mwahaha')
242        self.assertTrue(watcher.called)
243
244    def test_watcher_repeat_delete(self):
245        a = []
246        ev = threading.Event()
247
248        self.client.delete(self.path)
249
250        @self.client.DataWatch(self.path)
251        def changed(val, stat):
252            a.append(val)
253            ev.set()
254
255        eq_(a, [None])
256        ev.wait(10)
257        ev.clear()
258        self.client.create(self.path, b'blah')
259        ev.wait(10)
260        eq_(ev.is_set(), True)
261        ev.clear()
262        eq_(a, [None, b'blah'])
263        self.client.delete(self.path)
264        ev.wait(10)
265        eq_(ev.is_set(), True)
266        ev.clear()
267        eq_(a, [None, b'blah', None])
268        self.client.create(self.path, b'blah')
269        ev.wait(10)
270        eq_(ev.is_set(), True)
271        ev.clear()
272        eq_(a, [None, b'blah', None, b'blah'])
273
274    def test_watcher_with_closing(self):
275        a = []
276        ev = threading.Event()
277
278        self.client.delete(self.path)
279
280        @self.client.DataWatch(self.path)
281        def changed(val, stat):
282            a.append(val)
283            ev.set()
284        eq_(a, [None])
285
286        b = False
287        try:
288            self.client.stop()
289        except:
290            b = True
291        eq_(b, False)
292
293
294class KazooChildrenWatcherTests(KazooTestCase):
295    def setUp(self):
296        super(KazooChildrenWatcherTests, self).setUp()
297        self.path = "/" + uuid.uuid4().hex
298        self.client.ensure_path(self.path)
299
300    def test_child_watcher(self):
301        update = threading.Event()
302        all_children = ['fred']
303
304        @self.client.ChildrenWatch(self.path)
305        def changed(children):
306            while all_children:
307                all_children.pop()
308            all_children.extend(children)
309            update.set()
310
311        update.wait(10)
312        eq_(all_children, [])
313        update.clear()
314
315        self.client.create(self.path + '/' + 'smith')
316        update.wait(10)
317        eq_(all_children, ['smith'])
318        update.clear()
319
320        self.client.create(self.path + '/' + 'george')
321        update.wait(10)
322        eq_(sorted(all_children), ['george', 'smith'])
323
324    def test_child_watcher_once(self):
325        update = threading.Event()
326        all_children = ['fred']
327
328        cwatch = self.client.ChildrenWatch(self.path)
329
330        @cwatch
331        def changed(children):
332            while all_children:
333                all_children.pop()
334            all_children.extend(children)
335            update.set()
336
337        update.wait(10)
338        eq_(all_children, [])
339        update.clear()
340
341        @raises(KazooException)
342        def test_it():
343            @cwatch
344            def changed_again(children):
345                update.set()
346        test_it()
347
348    def test_child_watcher_with_event(self):
349        update = threading.Event()
350        events = [True]
351
352        @self.client.ChildrenWatch(self.path, send_event=True)
353        def changed(children, event):
354            events.pop()
355            events.append(event)
356            update.set()
357
358        update.wait(10)
359        eq_(events, [None])
360        update.clear()
361
362        self.client.create(self.path + '/' + 'smith')
363        update.wait(10)
364        eq_(events[0].type, EventType.CHILD)
365        update.clear()
366
367    def test_func_style_child_watcher(self):
368        update = threading.Event()
369        all_children = ['fred']
370
371        def changed(children):
372            while all_children:
373                all_children.pop()
374            all_children.extend(children)
375            update.set()
376
377        self.client.ChildrenWatch(self.path, changed)
378
379        update.wait(10)
380        eq_(all_children, [])
381        update.clear()
382
383        self.client.create(self.path + '/' + 'smith')
384        update.wait(10)
385        eq_(all_children, ['smith'])
386        update.clear()
387
388        self.client.create(self.path + '/' + 'george')
389        update.wait(10)
390        eq_(sorted(all_children), ['george', 'smith'])
391
392    def test_func_stops(self):
393        update = threading.Event()
394        all_children = ['fred']
395
396        fail_through = []
397
398        @self.client.ChildrenWatch(self.path)
399        def changed(children):
400            while all_children:
401                all_children.pop()
402            all_children.extend(children)
403            update.set()
404            if fail_through:
405                return False
406
407        update.wait(10)
408        eq_(all_children, [])
409        update.clear()
410
411        fail_through.append(True)
412        self.client.create(self.path + '/' + 'smith')
413        update.wait(10)
414        eq_(all_children, ['smith'])
415        update.clear()
416
417        self.client.create(self.path + '/' + 'george')
418        update.wait(0.5)
419        eq_(all_children, ['smith'])
420
421    def test_child_watch_session_loss(self):
422        update = threading.Event()
423        all_children = ['fred']
424
425        @self.client.ChildrenWatch(self.path)
426        def changed(children):
427            while all_children:
428                all_children.pop()
429            all_children.extend(children)
430            update.set()
431
432        update.wait(10)
433        eq_(all_children, [])
434        update.clear()
435
436        self.client.create(self.path + '/' + 'smith')
437        update.wait(10)
438        eq_(all_children, ['smith'])
439        update.clear()
440        self.expire_session(threading.Event)
441
442        self.client.retry(self.client.create,
443                          self.path + '/' + 'george')
444        update.wait(20)
445        eq_(sorted(all_children), ['george', 'smith'])
446
447    def test_child_stop_on_session_loss(self):
448        update = threading.Event()
449        all_children = ['fred']
450
451        @self.client.ChildrenWatch(self.path, allow_session_lost=False)
452        def changed(children):
453            while all_children:
454                all_children.pop()
455            all_children.extend(children)
456            update.set()
457
458        update.wait(10)
459        eq_(all_children, [])
460        update.clear()
461
462        self.client.create(self.path + '/' + 'smith')
463        update.wait(10)
464        eq_(all_children, ['smith'])
465        update.clear()
466        self.expire_session(threading.Event)
467
468        self.client.retry(self.client.create,
469                          self.path + '/' + 'george')
470        update.wait(4)
471        eq_(update.is_set(), False)
472        eq_(all_children, ['smith'])
473
474        children = self.client.get_children(self.path)
475        eq_(sorted(children), ['george', 'smith'])
476
477    def test_bad_children_watch_func(self):
478        counter = 0
479
480        @self.client.ChildrenWatch(self.path)
481        def changed(children):
482            if counter > 0:
483                raise Exception("oops")
484
485        raises(Exception)(changed)
486        counter += 1
487        self.client.create(self.path + '/' + 'smith')
488
489
490class KazooPatientChildrenWatcherTests(KazooTestCase):
491    def setUp(self):
492        super(KazooPatientChildrenWatcherTests, self).setUp()
493        self.path = "/" + uuid.uuid4().hex
494
495    def _makeOne(self, *args, **kwargs):
496        from kazoo.recipe.watchers import PatientChildrenWatch
497        return PatientChildrenWatch(*args, **kwargs)
498
499    def test_watch(self):
500        self.client.ensure_path(self.path)
501        watcher = self._makeOne(self.client, self.path, 0.1)
502        result = watcher.start()
503        children, asy = result.get()
504        eq_(len(children), 0)
505        eq_(asy.ready(), False)
506
507        self.client.create(self.path + '/' + 'fred')
508        asy.get(timeout=1)
509        eq_(asy.ready(), True)
510
511    def test_exception(self):
512        from kazoo.exceptions import NoNodeError
513        watcher = self._makeOne(self.client, self.path, 0.1)
514        result = watcher.start()
515
516        @raises(NoNodeError)
517        def testit():
518            result.get()
519        testit()
520
521    def test_watch_iterations(self):
522        self.client.ensure_path(self.path)
523        watcher = self._makeOne(self.client, self.path, 0.5)
524        result = watcher.start()
525        eq_(result.ready(), False)
526
527        time.sleep(0.08)
528        self.client.create(self.path + '/' + uuid.uuid4().hex)
529        eq_(result.ready(), False)
530        time.sleep(0.08)
531        eq_(result.ready(), False)
532        self.client.create(self.path + '/' + uuid.uuid4().hex)
533        time.sleep(0.08)
534        eq_(result.ready(), False)
535
536        children, asy = result.get()
537        eq_(len(children), 2)
538