1"""Higher level child and data watching API's. 2 3:Maintainer: Ben Bangert <ben@groovie.org> 4:Status: Production 5 6.. note:: 7 8 :ref:`DataWatch` and :ref:`ChildrenWatch` may only handle a single 9 function, attempts to associate a single instance with multiple functions 10 will result in an exception being thrown. 11 12""" 13from functools import partial, wraps 14import logging 15import time 16import warnings 17 18from kazoo.exceptions import ( 19 ConnectionClosedError, 20 NoNodeError, 21 KazooException 22) 23from kazoo.protocol.states import KazooState 24from kazoo.retry import KazooRetry 25 26 27log = logging.getLogger(__name__) 28 29 30_STOP_WATCHING = object() 31 32 33def _ignore_closed(func): 34 @wraps(func) 35 def wrapper(*args, **kwargs): 36 try: 37 return func(*args, **kwargs) 38 except ConnectionClosedError: 39 pass 40 return wrapper 41 42 43class DataWatch(object): 44 """Watches a node for data updates and calls the specified 45 function each time it changes 46 47 The function will also be called the very first time its 48 registered to get the data. 49 50 Returning `False` from the registered function will disable future 51 data change calls. If the client connection is closed (using the 52 close command), the DataWatch will no longer get updates. 53 54 If the function supplied takes three arguments, then the third one 55 will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will 56 only be set if the change to the data occurs as a result of the 57 server notifying the watch that there has been a change. Events 58 like reconnection or the first call will not include an event. 59 60 If the node does not exist, then the function will be called with 61 ``None`` for all values. 62 63 .. tip:: 64 65 Because :class:`DataWatch` can watch nodes that don't exist, it 66 can be used alternatively as a higher-level Exists watcher that 67 survives reconnections and session loss. 68 69 Example with client: 70 71 .. code-block:: python 72 73 @client.DataWatch('/path/to/watch') 74 def my_func(data, stat): 75 print("Data is %s" % data) 76 print("Version is %s" % stat.version) 77 78 # Above function is called immediately and prints 79 80 # Or if you want the event object 81 @client.DataWatch('/path/to/watch') 82 def my_func(data, stat, event): 83 print("Data is %s" % data) 84 print("Version is %s" % stat.version) 85 print("Event is %s" % event) 86 87 .. versionchanged:: 1.2 88 89 DataWatch now ignores additional arguments that were previously 90 passed to it and warns that they are no longer respected. 91 92 """ 93 def __init__(self, client, path, func=None, *args, **kwargs): 94 """Create a data watcher for a path 95 96 :param client: A zookeeper client. 97 :type client: :class:`~kazoo.client.KazooClient` 98 :param path: The path to watch for data changes on. 99 :type path: str 100 :param func: Function to call initially and every time the 101 node changes. `func` will be called with a 102 tuple, the value of the node and a 103 :class:`~kazoo.client.ZnodeStat` instance. 104 :type func: callable 105 106 """ 107 self._client = client 108 self._path = path 109 self._func = func 110 self._stopped = False 111 self._run_lock = client.handler.lock_object() 112 self._version = None 113 self._retry = KazooRetry(max_tries=None, 114 sleep_func=client.handler.sleep_func) 115 self._include_event = None 116 self._ever_called = False 117 self._used = False 118 119 if args or kwargs: 120 warnings.warn('Passing additional arguments to DataWatch is' 121 ' deprecated. ignore_missing_node is now assumed ' 122 ' to be True by default, and the event will be ' 123 ' sent if the function can handle receiving it', 124 DeprecationWarning, stacklevel=2) 125 126 # Register our session listener if we're going to resume 127 # across session losses 128 if func is not None: 129 self._used = True 130 self._client.add_listener(self._session_watcher) 131 self._get_data() 132 133 def __call__(self, func): 134 """Callable version for use as a decorator 135 136 :param func: Function to call initially and every time the 137 data changes. `func` will be called with a 138 tuple, the value of the node and a 139 :class:`~kazoo.client.ZnodeStat` instance. 140 :type func: callable 141 142 """ 143 if self._used: 144 raise KazooException( 145 "A function has already been associated with this " 146 "DataWatch instance.") 147 148 self._func = func 149 150 self._used = True 151 self._client.add_listener(self._session_watcher) 152 self._get_data() 153 return func 154 155 def _log_func_exception(self, data, stat, event=None): 156 try: 157 # For backwards compatibility, don't send event to the 158 # callback unless the send_event is set in constructor 159 if not self._ever_called: 160 self._ever_called = True 161 try: 162 result = self._func(data, stat, event) 163 except TypeError: 164 result = self._func(data, stat) 165 if result is False: 166 self._stopped = True 167 self._func = None 168 self._client.remove_listener(self._session_watcher) 169 except Exception as exc: 170 log.exception(exc) 171 raise 172 173 @_ignore_closed 174 def _get_data(self, event=None): 175 # Ensure this runs one at a time, possible because the session 176 # watcher may trigger a run 177 with self._run_lock: 178 if self._stopped: 179 return 180 181 initial_version = self._version 182 183 try: 184 data, stat = self._retry(self._client.get, 185 self._path, self._watcher) 186 except NoNodeError: 187 data = None 188 189 # This will set 'stat' to None if the node does not yet 190 # exist. 191 stat = self._retry(self._client.exists, self._path, 192 self._watcher) 193 if stat: 194 self._client.handler.spawn(self._get_data) 195 return 196 197 # No node data, clear out version 198 if stat is None: 199 self._version = None 200 else: 201 self._version = stat.mzxid 202 203 # Call our function if its the first time ever, or if the 204 # version has changed 205 if initial_version != self._version or not self._ever_called: 206 self._log_func_exception(data, stat, event) 207 208 def _watcher(self, event): 209 self._get_data(event=event) 210 211 def _set_watch(self, state): 212 with self._run_lock: 213 self._watch_established = state 214 215 def _session_watcher(self, state): 216 if state == KazooState.CONNECTED: 217 self._client.handler.spawn(self._get_data) 218 219 220class ChildrenWatch(object): 221 """Watches a node for children updates and calls the specified 222 function each time it changes 223 224 The function will also be called the very first time its 225 registered to get children. 226 227 Returning `False` from the registered function will disable future 228 children change calls. If the client connection is closed (using 229 the close command), the ChildrenWatch will no longer get updates. 230 231 if send_event=True in __init__, then the function will always be 232 called with second parameter, ``event``. Upon initial call or when 233 recovering a lost session the ``event`` is always ``None``. 234 Otherwise it's a :class:`~kazoo.prototype.state.WatchedEvent` 235 instance. 236 237 Example with client: 238 239 .. code-block:: python 240 241 @client.ChildrenWatch('/path/to/watch') 242 def my_func(children): 243 print "Children are %s" % children 244 245 # Above function is called immediately and prints children 246 247 """ 248 def __init__(self, client, path, func=None, 249 allow_session_lost=True, send_event=False): 250 """Create a children watcher for a path 251 252 :param client: A zookeeper client. 253 :type client: :class:`~kazoo.client.KazooClient` 254 :param path: The path to watch for children on. 255 :type path: str 256 :param func: Function to call initially and every time the 257 children change. `func` will be called with a 258 single argument, the list of children. 259 :type func: callable 260 :param allow_session_lost: Whether the watch should be 261 re-registered if the zookeeper 262 session is lost. 263 :type allow_session_lost: bool 264 :type send_event: bool 265 :param send_event: Whether the function should be passed the 266 event sent by ZooKeeper or None upon 267 initialization (see class documentation) 268 269 The path must already exist for the children watcher to 270 run. 271 272 """ 273 self._client = client 274 self._path = path 275 self._func = func 276 self._send_event = send_event 277 self._stopped = False 278 self._watch_established = False 279 self._allow_session_lost = allow_session_lost 280 self._run_lock = client.handler.lock_object() 281 self._prior_children = None 282 self._used = False 283 284 # Register our session listener if we're going to resume 285 # across session losses 286 if func is not None: 287 self._used = True 288 if allow_session_lost: 289 self._client.add_listener(self._session_watcher) 290 self._get_children() 291 292 def __call__(self, func): 293 """Callable version for use as a decorator 294 295 :param func: Function to call initially and every time the 296 children change. `func` will be called with a 297 single argument, the list of children. 298 :type func: callable 299 300 """ 301 if self._used: 302 raise KazooException( 303 "A function has already been associated with this " 304 "ChildrenWatch instance.") 305 306 self._func = func 307 308 self._used = True 309 if self._allow_session_lost: 310 self._client.add_listener(self._session_watcher) 311 self._get_children() 312 return func 313 314 @_ignore_closed 315 def _get_children(self, event=None): 316 with self._run_lock: # Ensure this runs one at a time 317 if self._stopped: 318 return 319 320 try: 321 children = self._client.retry(self._client.get_children, 322 self._path, self._watcher) 323 except NoNodeError: 324 self._stopped = True 325 return 326 327 if not self._watch_established: 328 self._watch_established = True 329 330 if self._prior_children is not None and \ 331 self._prior_children == children: 332 return 333 334 self._prior_children = children 335 336 try: 337 if self._send_event: 338 result = self._func(children, event) 339 else: 340 result = self._func(children) 341 if result is False: 342 self._stopped = True 343 self._func = None 344 except Exception as exc: 345 log.exception(exc) 346 raise 347 348 def _watcher(self, event): 349 if event.type != "NONE": 350 self._get_children(event) 351 352 def _session_watcher(self, state): 353 if state in (KazooState.LOST, KazooState.SUSPENDED): 354 self._watch_established = False 355 elif (state == KazooState.CONNECTED and 356 not self._watch_established and not self._stopped): 357 self._client.handler.spawn(self._get_children) 358 359 360class PatientChildrenWatch(object): 361 """Patient Children Watch that returns values after the children 362 of a node don't change for a period of time 363 364 A separate watcher for the children of a node, that ignores 365 changes within a boundary time and sets the result only when the 366 boundary time has elapsed with no children changes. 367 368 Example:: 369 370 watcher = PatientChildrenWatch(client, '/some/path', 371 time_boundary=5) 372 async_object = watcher.start() 373 374 # Blocks until the children have not changed for time boundary 375 # (5 in this case) seconds, returns children list and an 376 # async_result that will be set if the children change in the 377 # future 378 children, child_async = async_object.get() 379 380 .. note:: 381 382 This Watch is different from :class:`DataWatch` and 383 :class:`ChildrenWatch` as it only returns once, does not take 384 a function that is called, and provides an 385 :class:`~kazoo.interfaces.IAsyncResult` object that can be 386 checked to see if the children have changed later. 387 388 """ 389 def __init__(self, client, path, time_boundary=30): 390 self.client = client 391 self.path = path 392 self.children = [] 393 self.time_boundary = time_boundary 394 self.children_changed = client.handler.event_object() 395 396 def start(self): 397 """Begin the watching process asynchronously 398 399 :returns: An :class:`~kazoo.interfaces.IAsyncResult` instance 400 that will be set when no change has occurred to the 401 children for time boundary seconds. 402 403 """ 404 self.asy = asy = self.client.handler.async_result() 405 self.client.handler.spawn(self._inner_start) 406 return asy 407 408 def _inner_start(self): 409 try: 410 while True: 411 async_result = self.client.handler.async_result() 412 self.children = self.client.retry( 413 self.client.get_children, self.path, 414 partial(self._children_watcher, async_result)) 415 self.client.handler.sleep_func(self.time_boundary) 416 417 if self.children_changed.is_set(): 418 self.children_changed.clear() 419 else: 420 break 421 422 self.asy.set((self.children, async_result)) 423 except Exception as exc: 424 self.asy.set_exception(exc) 425 426 def _children_watcher(self, async_result, event): 427 self.children_changed.set() 428 async_result.set(time.time()) 429