1"""Higher level child and data watching API's.
2
3:Maintainer: Ben Bangert <ben@groovie.org>
4:Status: Production
5
6.. note::
7
8    :ref:`DataWatch` and :ref:`ChildrenWatch` may only handle a single
9    function, attempts to associate a single instance with multiple functions
10    will result in an exception being thrown.
11
12"""
13from functools import partial, wraps
14import logging
15import time
16import warnings
17
18from kazoo.exceptions import (
19    ConnectionClosedError,
20    NoNodeError,
21    KazooException
22)
23from kazoo.protocol.states import KazooState
24from kazoo.retry import KazooRetry
25
26
27log = logging.getLogger(__name__)
28
29
30_STOP_WATCHING = object()
31
32
33def _ignore_closed(func):
34    @wraps(func)
35    def wrapper(*args, **kwargs):
36        try:
37            return func(*args, **kwargs)
38        except ConnectionClosedError:
39            pass
40    return wrapper
41
42
43class DataWatch(object):
44    """Watches a node for data updates and calls the specified
45    function each time it changes
46
47    The function will also be called the very first time its
48    registered to get the data.
49
50    Returning `False` from the registered function will disable future
51    data change calls. If the client connection is closed (using the
52    close command), the DataWatch will no longer get updates.
53
54    If the function supplied takes three arguments, then the third one
55    will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will
56    only be set if the change to the data occurs as a result of the
57    server notifying the watch that there has been a change. Events
58    like reconnection or the first call will not include an event.
59
60    If the node does not exist, then the function will be called with
61    ``None`` for all values.
62
63    .. tip::
64
65        Because :class:`DataWatch` can watch nodes that don't exist, it
66        can be used alternatively as a higher-level Exists watcher that
67        survives reconnections and session loss.
68
69    Example with client:
70
71    .. code-block:: python
72
73        @client.DataWatch('/path/to/watch')
74        def my_func(data, stat):
75            print("Data is %s" % data)
76            print("Version is %s" % stat.version)
77
78        # Above function is called immediately and prints
79
80        # Or if you want the event object
81        @client.DataWatch('/path/to/watch')
82        def my_func(data, stat, event):
83            print("Data is %s" % data)
84            print("Version is %s" % stat.version)
85            print("Event is %s" % event)
86
87    .. versionchanged:: 1.2
88
89        DataWatch now ignores additional arguments that were previously
90        passed to it and warns that they are no longer respected.
91
92    """
93    def __init__(self, client, path, func=None, *args, **kwargs):
94        """Create a data watcher for a path
95
96        :param client: A zookeeper client.
97        :type client: :class:`~kazoo.client.KazooClient`
98        :param path: The path to watch for data changes on.
99        :type path: str
100        :param func: Function to call initially and every time the
101                     node changes. `func` will be called with a
102                     tuple, the value of the node and a
103                     :class:`~kazoo.client.ZnodeStat` instance.
104        :type func: callable
105
106        """
107        self._client = client
108        self._path = path
109        self._func = func
110        self._stopped = False
111        self._run_lock = client.handler.lock_object()
112        self._version = None
113        self._retry = KazooRetry(max_tries=None,
114                                 sleep_func=client.handler.sleep_func)
115        self._include_event = None
116        self._ever_called = False
117        self._used = False
118
119        if args or kwargs:
120            warnings.warn('Passing additional arguments to DataWatch is'
121                          ' deprecated. ignore_missing_node is now assumed '
122                          ' to be True by default, and the event will be '
123                          ' sent if the function can handle receiving it',
124                          DeprecationWarning, stacklevel=2)
125
126        # Register our session listener if we're going to resume
127        # across session losses
128        if func is not None:
129            self._used = True
130            self._client.add_listener(self._session_watcher)
131            self._get_data()
132
133    def __call__(self, func):
134        """Callable version for use as a decorator
135
136        :param func: Function to call initially and every time the
137                     data changes. `func` will be called with a
138                     tuple, the value of the node and a
139                     :class:`~kazoo.client.ZnodeStat` instance.
140        :type func: callable
141
142        """
143        if self._used:
144            raise KazooException(
145                "A function has already been associated with this "
146                "DataWatch instance.")
147
148        self._func = func
149
150        self._used = True
151        self._client.add_listener(self._session_watcher)
152        self._get_data()
153        return func
154
155    def _log_func_exception(self, data, stat, event=None):
156        try:
157            # For backwards compatibility, don't send event to the
158            # callback unless the send_event is set in constructor
159            if not self._ever_called:
160                self._ever_called = True
161            try:
162                result = self._func(data, stat, event)
163            except TypeError:
164                result = self._func(data, stat)
165            if result is False:
166                self._stopped = True
167                self._func = None
168                self._client.remove_listener(self._session_watcher)
169        except Exception as exc:
170            log.exception(exc)
171            raise
172
173    @_ignore_closed
174    def _get_data(self, event=None):
175        # Ensure this runs one at a time, possible because the session
176        # watcher may trigger a run
177        with self._run_lock:
178            if self._stopped:
179                return
180
181            initial_version = self._version
182
183            try:
184                data, stat = self._retry(self._client.get,
185                                         self._path, self._watcher)
186            except NoNodeError:
187                data = None
188
189                # This will set 'stat' to None if the node does not yet
190                # exist.
191                stat = self._retry(self._client.exists, self._path,
192                                   self._watcher)
193                if stat:
194                    self._client.handler.spawn(self._get_data)
195                    return
196
197            # No node data, clear out version
198            if stat is None:
199                self._version = None
200            else:
201                self._version = stat.mzxid
202
203            # Call our function if its the first time ever, or if the
204            # version has changed
205            if initial_version != self._version or not self._ever_called:
206                self._log_func_exception(data, stat, event)
207
208    def _watcher(self, event):
209        self._get_data(event=event)
210
211    def _set_watch(self, state):
212        with self._run_lock:
213            self._watch_established = state
214
215    def _session_watcher(self, state):
216        if state == KazooState.CONNECTED:
217            self._client.handler.spawn(self._get_data)
218
219
220class ChildrenWatch(object):
221    """Watches a node for children updates and calls the specified
222    function each time it changes
223
224    The function will also be called the very first time its
225    registered to get children.
226
227    Returning `False` from the registered function will disable future
228    children change calls. If the client connection is closed (using
229    the close command), the ChildrenWatch will no longer get updates.
230
231    if send_event=True in __init__, then the function will always be
232    called with second parameter, ``event``. Upon initial call or when
233    recovering a lost session the ``event`` is always ``None``.
234    Otherwise it's a :class:`~kazoo.prototype.state.WatchedEvent`
235    instance.
236
237    Example with client:
238
239    .. code-block:: python
240
241        @client.ChildrenWatch('/path/to/watch')
242        def my_func(children):
243            print "Children are %s" % children
244
245        # Above function is called immediately and prints children
246
247    """
248    def __init__(self, client, path, func=None,
249                 allow_session_lost=True, send_event=False):
250        """Create a children watcher for a path
251
252        :param client: A zookeeper client.
253        :type client: :class:`~kazoo.client.KazooClient`
254        :param path: The path to watch for children on.
255        :type path: str
256        :param func: Function to call initially and every time the
257                     children change. `func` will be called with a
258                     single argument, the list of children.
259        :type func: callable
260        :param allow_session_lost: Whether the watch should be
261                                   re-registered if the zookeeper
262                                   session is lost.
263        :type allow_session_lost: bool
264        :type send_event: bool
265        :param send_event: Whether the function should be passed the
266                           event sent by ZooKeeper or None upon
267                           initialization (see class documentation)
268
269        The path must already exist for the children watcher to
270        run.
271
272        """
273        self._client = client
274        self._path = path
275        self._func = func
276        self._send_event = send_event
277        self._stopped = False
278        self._watch_established = False
279        self._allow_session_lost = allow_session_lost
280        self._run_lock = client.handler.lock_object()
281        self._prior_children = None
282        self._used = False
283
284        # Register our session listener if we're going to resume
285        # across session losses
286        if func is not None:
287            self._used = True
288            if allow_session_lost:
289                self._client.add_listener(self._session_watcher)
290            self._get_children()
291
292    def __call__(self, func):
293        """Callable version for use as a decorator
294
295        :param func: Function to call initially and every time the
296                     children change. `func` will be called with a
297                     single argument, the list of children.
298        :type func: callable
299
300        """
301        if self._used:
302            raise KazooException(
303                "A function has already been associated with this "
304                "ChildrenWatch instance.")
305
306        self._func = func
307
308        self._used = True
309        if self._allow_session_lost:
310            self._client.add_listener(self._session_watcher)
311        self._get_children()
312        return func
313
314    @_ignore_closed
315    def _get_children(self, event=None):
316        with self._run_lock:  # Ensure this runs one at a time
317            if self._stopped:
318                return
319
320            try:
321                children = self._client.retry(self._client.get_children,
322                                              self._path, self._watcher)
323            except NoNodeError:
324                self._stopped = True
325                return
326
327            if not self._watch_established:
328                self._watch_established = True
329
330                if self._prior_children is not None and \
331                   self._prior_children == children:
332                    return
333
334            self._prior_children = children
335
336            try:
337                if self._send_event:
338                    result = self._func(children, event)
339                else:
340                    result = self._func(children)
341                if result is False:
342                    self._stopped = True
343                    self._func = None
344            except Exception as exc:
345                log.exception(exc)
346                raise
347
348    def _watcher(self, event):
349        if event.type != "NONE":
350            self._get_children(event)
351
352    def _session_watcher(self, state):
353        if state in (KazooState.LOST, KazooState.SUSPENDED):
354            self._watch_established = False
355        elif (state == KazooState.CONNECTED and
356              not self._watch_established and not self._stopped):
357            self._client.handler.spawn(self._get_children)
358
359
360class PatientChildrenWatch(object):
361    """Patient Children Watch that returns values after the children
362    of a node don't change for a period of time
363
364    A separate watcher for the children of a node, that ignores
365    changes within a boundary time and sets the result only when the
366    boundary time has elapsed with no children changes.
367
368    Example::
369
370        watcher = PatientChildrenWatch(client, '/some/path',
371                                       time_boundary=5)
372        async_object = watcher.start()
373
374        # Blocks until the children have not changed for time boundary
375        # (5 in this case) seconds, returns children list and an
376        # async_result that will be set if the children change in the
377        # future
378        children, child_async = async_object.get()
379
380    .. note::
381
382        This Watch is different from :class:`DataWatch` and
383        :class:`ChildrenWatch` as it only returns once, does not take
384        a function that is called, and provides an
385        :class:`~kazoo.interfaces.IAsyncResult` object that can be
386        checked to see if the children have changed later.
387
388    """
389    def __init__(self, client, path, time_boundary=30):
390        self.client = client
391        self.path = path
392        self.children = []
393        self.time_boundary = time_boundary
394        self.children_changed = client.handler.event_object()
395
396    def start(self):
397        """Begin the watching process asynchronously
398
399        :returns: An :class:`~kazoo.interfaces.IAsyncResult` instance
400                  that will be set when no change has occurred to the
401                  children for time boundary seconds.
402
403        """
404        self.asy = asy = self.client.handler.async_result()
405        self.client.handler.spawn(self._inner_start)
406        return asy
407
408    def _inner_start(self):
409        try:
410            while True:
411                async_result = self.client.handler.async_result()
412                self.children = self.client.retry(
413                    self.client.get_children, self.path,
414                    partial(self._children_watcher, async_result))
415                self.client.handler.sleep_func(self.time_boundary)
416
417                if self.children_changed.is_set():
418                    self.children_changed.clear()
419                else:
420                    break
421
422            self.asy.set((self.children, async_result))
423        except Exception as exc:
424            self.asy.set_exception(exc)
425
426    def _children_watcher(self, async_result, event):
427        self.children_changed.set()
428        async_result.set(time.time())
429