1""" 2fs.watch 3======== 4 5Change notification support for FS. 6 7This module defines a standard interface for FS subclasses that support change 8notification callbacks. It also offers some WrapFS subclasses that can 9simulate such an ability on top of an ordinary FS object. 10 11An FS object that wants to be "watchable" must provide the following methods: 12 13 * ``add_watcher(callback,path="/",events=None,recursive=True)`` 14 15 Request that the given callback be executed in response to changes 16 to the given path. A specific set of change events can be specified. 17 This method returns a Watcher object. 18 19 * ``del_watcher(watcher_or_callback)`` 20 21 Remove the given watcher object, or any watchers associated with 22 the given callback. 23 24 25If you would prefer to read changes from a filesystem in a blocking fashion 26rather than using callbacks, you can use the function 'iter_changes' to obtain 27an iterator over the change events. 28 29 30""" 31 32import sys 33import weakref 34import threading 35import Queue 36import traceback 37 38from fs.path import * 39from fs.errors import * 40from fs.wrapfs import WrapFS 41from fs.base import FS 42from fs.filelike import FileWrapper 43 44from six import b 45 46 47class EVENT(object): 48 """Base class for change notification events.""" 49 def __init__(self,fs,path): 50 super(EVENT, self).__init__() 51 self.fs = fs 52 if path is not None: 53 path = abspath(normpath(path)) 54 self.path = path 55 56 def __str__(self): 57 return unicode(self).encode("utf8") 58 59 def __unicode__(self): 60 return u"<fs.watch.%s object (path='%s') at %s>" % (self.__class__.__name__,self.path,hex(id(self))) 61 62 def clone(self,fs=None,path=None): 63 if fs is None: 64 fs = self.fs 65 if path is None: 66 path = self.path 67 return self.__class__(fs,path) 68 69 70class ACCESSED(EVENT): 71 """Event fired when a file's contents are accessed.""" 72 pass 73 74class CREATED(EVENT): 75 """Event fired when a new file or directory is created.""" 76 pass 77 78class REMOVED(EVENT): 79 """Event fired when a file or directory is removed.""" 80 pass 81 82class MODIFIED(EVENT): 83 """Event fired when a file or directory is modified.""" 84 def __init__(self,fs,path,data_changed=False, closed=False): 85 super(MODIFIED,self).__init__(fs,path) 86 self.data_changed = data_changed 87 self.closed = closed 88 89 def clone(self,fs=None,path=None,data_changed=None): 90 evt = super(MODIFIED,self).clone(fs,path) 91 if data_changed is None: 92 data_changed = self.data_changed 93 evt.data_changed = data_changed 94 return evt 95 96class MOVED_DST(EVENT): 97 """Event fired when a file or directory is the target of a move.""" 98 def __init__(self,fs,path,source=None): 99 super(MOVED_DST,self).__init__(fs,path) 100 if source is not None: 101 source = abspath(normpath(source)) 102 self.source = source 103 104 def __unicode__(self): 105 return u"<fs.watch.%s object (path=%r,src=%r) at %s>" % (self.__class__.__name__,self.path,self.source,hex(id(self))) 106 107 def clone(self,fs=None,path=None,source=None): 108 evt = super(MOVED_DST,self).clone(fs,path) 109 if source is None: 110 source = self.source 111 evt.source = source 112 return evt 113 114class MOVED_SRC(EVENT): 115 """Event fired when a file or directory is the source of a move.""" 116 def __init__(self,fs,path,destination=None): 117 super(MOVED_SRC,self).__init__(fs,path) 118 if destination is not None: 119 destination = abspath(normpath(destination)) 120 self.destination = destination 121 122 def __unicode__(self): 123 return u"<fs.watch.%s object (path=%r,dst=%r) at %s>" % (self.__class__.__name__,self.path,self.destination,hex(id(self))) 124 125 def clone(self,fs=None,path=None,destination=None): 126 evt = super(MOVED_SRC,self).clone(fs,path) 127 if destination is None: 128 destination = self.destination 129 evt.destination = destination 130 return evt 131 132class CLOSED(EVENT): 133 """Event fired when the filesystem is closed.""" 134 pass 135 136class ERROR(EVENT): 137 """Event fired when some miscellaneous error occurs.""" 138 pass 139 140class OVERFLOW(ERROR): 141 """Event fired when some events could not be processed.""" 142 pass 143 144 145 146class Watcher(object): 147 """Object encapsulating filesystem watch info.""" 148 149 def __init__(self,fs,callback,path="/",events=None,recursive=True): 150 if events is None: 151 events = (EVENT,) 152 else: 153 events = tuple(events) 154 # Since the FS probably holds a reference to the Watcher, keeping 155 # a reference back to the FS would create a cycle containing a 156 # __del__ method. Use a weakref to avoid this. 157 self._w_fs = weakref.ref(fs) 158 self.callback = callback 159 self.path = abspath(normpath(path)) 160 self.events = events 161 self.recursive = recursive 162 163 @property 164 def fs(self): 165 return self._w_fs() 166 167 def delete(self): 168 fs = self.fs 169 if fs is not None: 170 fs.del_watcher(self) 171 172 def handle_event(self,event): 173 if not isinstance(event,self.events): 174 return 175 if event.path is not None: 176 if not isprefix(self.path,event.path): 177 return 178 if not self.recursive: 179 if event.path != self.path: 180 if dirname(event.path) != self.path: 181 return 182 try: 183 self.callback(event) 184 except Exception: 185 print >>sys.stderr, "error in FS watcher callback", self.callback 186 traceback.print_exc() 187 188 189class WatchableFSMixin(FS): 190 """Mixin class providing watcher management functions.""" 191 192 def __init__(self,*args,**kwds): 193 self._watchers = PathMap() 194 super(WatchableFSMixin,self).__init__(*args,**kwds) 195 196 def __getstate__(self): 197 state = super(WatchableFSMixin,self).__getstate__() 198 state.pop("_watchers",None) 199 return state 200 201 def __setstate__(self,state): 202 super(WatchableFSMixin,self).__setstate__(state) 203 self._watchers = PathMap() 204 205 def add_watcher(self,callback,path="/",events=None,recursive=True): 206 """Add a watcher callback to the FS.""" 207 w = Watcher(self,callback,path,events,recursive=recursive) 208 self._watchers.setdefault(path,[]).append(w) 209 return w 210 211 def del_watcher(self,watcher_or_callback): 212 """Delete a watcher callback from the FS.""" 213 if isinstance(watcher_or_callback,Watcher): 214 self._watchers[watcher_or_callback.path].remove(watcher_or_callback) 215 else: 216 for watchers in self._watchers.itervalues(): 217 for i,watcher in enumerate(watchers): 218 if watcher.callback is watcher_or_callback: 219 del watchers[i] 220 break 221 222 def _find_watchers(self,callback): 223 """Find watchers registered with the given callback.""" 224 for watchers in self._watchers.itervalues(): 225 for watcher in watchers: 226 if watcher.callback is callback: 227 yield watcher 228 229 def notify_watchers(self,event_or_class,path=None,*args,**kwds): 230 """Notify watchers of the given event data.""" 231 if isinstance(event_or_class,EVENT): 232 event = event_or_class 233 else: 234 event = event_or_class(self,path,*args,**kwds) 235 if path is None: 236 path = event.path 237 if path is None: 238 for watchers in self._watchers.itervalues(): 239 for watcher in watchers: 240 watcher.handle_event(event) 241 else: 242 for prefix in recursepath(path): 243 if prefix in self._watchers: 244 for watcher in self._watchers[prefix]: 245 watcher.handle_event(event) 246 247 248 249class WatchedFile(FileWrapper): 250 """File wrapper for use with WatchableFS. 251 252 This file wrapper provides access to a file opened from a WatchableFS 253 instance, and fires MODIFIED events when the file is modified. 254 """ 255 256 def __init__(self,file,fs,path,mode=None): 257 super(WatchedFile,self).__init__(file,mode) 258 self.fs = fs 259 self.path = path 260 self.was_modified = False 261 262 def _write(self,string,flushing=False): 263 self.was_modified = True 264 return super(WatchedFile,self)._write(string,flushing=flushing) 265 266 def _truncate(self,size): 267 self.was_modified = True 268 return super(WatchedFile,self)._truncate(size) 269 270 def flush(self): 271 super(WatchedFile,self).flush() 272 # Don't bother if python if being torn down 273 if Watcher is not None: 274 if self.was_modified: 275 self.fs.notify_watchers(MODIFIED,self.path,True) 276 277 def close(self): 278 super(WatchedFile,self).close() 279 # Don't bother if python if being torn down 280 if Watcher is not None: 281 if self.was_modified: 282 self.fs.notify_watchers(MODIFIED,self.path,True) 283 284 285class WatchableFS(WatchableFSMixin,WrapFS): 286 """FS wrapper simulating watcher callbacks. 287 288 This FS wrapper intercepts method calls that modify the underlying FS 289 and generates appropriate notification events. It thus allows watchers 290 to monitor changes made through the underlying FS object, but not changes 291 that might be made through other interfaces to the same filesystem. 292 """ 293 294 def __init__(self, *args, **kwds): 295 super(WatchableFS, self).__init__(*args, **kwds) 296 297 def close(self): 298 super(WatchableFS, self).close() 299 self.notify_watchers(CLOSED) 300 301 def open(self, path, mode='r', buffering=-1, encoding=None, errors=None, newline=None, line_buffering=False, **kwargs): 302 existed = self.wrapped_fs.isfile(path) 303 f = super(WatchableFS, self).open(path, 304 mode=mode, 305 buffering=buffering, 306 encoding=encoding, 307 errors=errors, 308 newline=newline, 309 line_buffering=line_buffering, 310 **kwargs) 311 if not existed: 312 self.notify_watchers(CREATED, path) 313 self.notify_watchers(ACCESSED, path) 314 return WatchedFile(f, self, path, mode) 315 316 def setcontents(self, path, data=b'', encoding=None, errors=None, chunk_size=64*1024): 317 existed = self.wrapped_fs.isfile(path) 318 ret = super(WatchableFS, self).setcontents(path, data=data, encoding=encoding, errors=errors, chunk_size=chunk_size) 319 if not existed: 320 self.notify_watchers(CREATED, path) 321 self.notify_watchers(ACCESSED, path) 322 if data: 323 self.notify_watchers(MODIFIED, path, True) 324 return ret 325 326 def createfile(self, path, wipe=False): 327 existed = self.wrapped_fs.isfile(path) 328 ret = super(WatchableFS, self).createfile(path, wipe=wipe) 329 if not existed: 330 self.notify_watchers(CREATED,path) 331 self.notify_watchers(ACCESSED,path) 332 return ret 333 334 def makedir(self,path,recursive=False,allow_recreate=False): 335 existed = self.wrapped_fs.isdir(path) 336 try: 337 super(WatchableFS,self).makedir(path,allow_recreate=allow_recreate) 338 except ParentDirectoryMissingError: 339 if not recursive: 340 raise 341 parent = dirname(path) 342 if parent != path: 343 self.makedir(dirname(path),recursive=True,allow_recreate=True) 344 super(WatchableFS,self).makedir(path,allow_recreate=allow_recreate) 345 if not existed: 346 self.notify_watchers(CREATED,path) 347 348 def remove(self,path): 349 super(WatchableFS,self).remove(path) 350 self.notify_watchers(REMOVED,path) 351 352 def removedir(self,path,recursive=False,force=False): 353 if not force: 354 for nm in self.listdir(path): 355 raise DirectoryNotEmptyError(path) 356 else: 357 for nm in self.listdir(path,dirs_only=True): 358 try: 359 self.removedir(pathjoin(path,nm),force=True) 360 except ResourceNotFoundError: 361 pass 362 for nm in self.listdir(path,files_only=True): 363 try: 364 self.remove(pathjoin(path,nm)) 365 except ResourceNotFoundError: 366 pass 367 super(WatchableFS,self).removedir(path) 368 self.notify_watchers(REMOVED,path) 369 if recursive: 370 parent = dirname(path) 371 while parent and not self.listdir(parent): 372 super(WatchableFS,self).removedir(parent) 373 self.notify_watchers(REMOVED,parent) 374 parent = dirname(parent) 375 376 def rename(self,src,dst): 377 d_existed = self.wrapped_fs.exists(dst) 378 super(WatchableFS,self).rename(src,dst) 379 if d_existed: 380 self.notify_watchers(REMOVED,dst) 381 self.notify_watchers(MOVED_DST,dst,src) 382 self.notify_watchers(MOVED_SRC,src,dst) 383 384 def copy(self,src,dst,**kwds): 385 d = self._pre_copy(src,dst) 386 super(WatchableFS,self).copy(src,dst,**kwds) 387 self._post_copy(src,dst,d) 388 389 def copydir(self,src,dst,**kwds): 390 d = self._pre_copy(src,dst) 391 super(WatchableFS,self).copydir(src,dst,**kwds) 392 self._post_copy(src,dst,d) 393 394 def move(self,src,dst,**kwds): 395 d = self._pre_copy(src,dst) 396 super(WatchableFS,self).move(src,dst,**kwds) 397 self._post_copy(src,dst,d) 398 self._post_move(src,dst,d) 399 400 def movedir(self,src,dst,**kwds): 401 d = self._pre_copy(src,dst) 402 super(WatchableFS,self).movedir(src,dst,**kwds) 403 self._post_copy(src,dst,d) 404 self._post_move(src,dst,d) 405 406 def _pre_copy(self,src,dst): 407 dst_paths = {} 408 try: 409 for (dirnm,filenms) in self.wrapped_fs.walk(dst): 410 dirnm = dirnm[len(dst)+1:] 411 dst_paths[dirnm] = True 412 for filenm in filenms: 413 dst_paths[filenm] = False 414 except ResourceNotFoundError: 415 pass 416 except ResourceInvalidError: 417 dst_paths[""] = False 418 src_paths = {} 419 try: 420 for (dirnm,filenms) in self.wrapped_fs.walk(src): 421 dirnm = dirnm[len(src)+1:] 422 src_paths[dirnm] = True 423 for filenm in filenms: 424 src_paths[pathjoin(dirnm,filenm)] = False 425 except ResourceNotFoundError: 426 pass 427 except ResourceInvalidError: 428 src_paths[""] = False 429 return (src_paths,dst_paths) 430 431 def _post_copy(self,src,dst,data): 432 (src_paths,dst_paths) = data 433 for src_path,isdir in sorted(src_paths.items()): 434 path = pathjoin(dst,src_path) 435 if src_path in dst_paths: 436 self.notify_watchers(MODIFIED,path,not isdir) 437 else: 438 self.notify_watchers(CREATED,path) 439 for dst_path,isdir in sorted(dst_paths.items()): 440 path = pathjoin(dst,dst_path) 441 if not self.wrapped_fs.exists(path): 442 self.notify_watchers(REMOVED,path) 443 444 def _post_move(self,src,dst,data): 445 (src_paths,dst_paths) = data 446 for src_path,isdir in sorted(src_paths.items(),reverse=True): 447 path = pathjoin(src,src_path) 448 self.notify_watchers(REMOVED,path) 449 450 def setxattr(self,path,name,value): 451 super(WatchableFS,self).setxattr(path,name,value) 452 self.notify_watchers(MODIFIED,path,False) 453 454 def delxattr(self,path,name): 455 super(WatchableFS,self).delxattr(path,name) 456 self.notify_watchers(MODIFIED,path,False) 457 458 459 460class PollingWatchableFS(WatchableFS): 461 """FS wrapper simulating watcher callbacks by periodic polling. 462 463 This FS wrapper augments the functionality of WatchableFS by periodically 464 polling the underlying FS for changes. It is thus capable of detecting 465 changes made to the underlying FS via other interfaces, albeit with a 466 (configurable) delay to account for the polling interval. 467 """ 468 469 def __init__(self,wrapped_fs,poll_interval=60*5): 470 super(PollingWatchableFS,self).__init__(wrapped_fs) 471 self.poll_interval = poll_interval 472 self.add_watcher(self._on_path_modify,"/",(CREATED,MOVED_DST,)) 473 self.add_watcher(self._on_path_modify,"/",(MODIFIED,ACCESSED,)) 474 self.add_watcher(self._on_path_delete,"/",(REMOVED,MOVED_SRC,)) 475 self._path_info = PathMap() 476 self._poll_thread = threading.Thread(target=self._poll_for_changes) 477 self._poll_cond = threading.Condition() 478 self._poll_close_event = threading.Event() 479 self._poll_thread.start() 480 481 def close(self): 482 self._poll_close_event.set() 483 self._poll_thread.join() 484 super(PollingWatchableFS,self).close() 485 486 def _on_path_modify(self,event): 487 path = event.path 488 try: 489 try: 490 self._path_info[path] = self.wrapped_fs.getinfo(path) 491 except ResourceNotFoundError: 492 self._path_info.clear(path) 493 except FSError: 494 pass 495 496 def _on_path_delete(self,event): 497 self._path_info.clear(event.path) 498 499 def _poll_for_changes(self): 500 try: 501 while not self._poll_close_event.isSet(): 502 # Walk all directories looking for changes. 503 # Come back to any that give us an error. 504 error_paths = set() 505 for dirnm in self.wrapped_fs.walkdirs(): 506 if self._poll_close_event.isSet(): 507 break 508 try: 509 self._check_for_changes(dirnm) 510 except FSError: 511 error_paths.add(dirnm) 512 # Retry the directories that gave us an error, until 513 # we have successfully updated them all 514 while error_paths and not self._poll_close_event.isSet(): 515 dirnm = error_paths.pop() 516 if self.wrapped_fs.isdir(dirnm): 517 try: 518 self._check_for_changes(dirnm) 519 except FSError: 520 error_paths.add(dirnm) 521 # Notify that we have completed a polling run 522 self._poll_cond.acquire() 523 self._poll_cond.notifyAll() 524 self._poll_cond.release() 525 # Sleep for the specified interval, or until closed. 526 self._poll_close_event.wait(timeout=self.poll_interval) 527 except FSError: 528 if not self.closed: 529 raise 530 531 def _check_for_changes(self,dirnm): 532 # Check the metadata for the directory itself. 533 new_info = self.wrapped_fs.getinfo(dirnm) 534 try: 535 old_info = self._path_info[dirnm] 536 except KeyError: 537 self.notify_watchers(CREATED,dirnm) 538 else: 539 if new_info != old_info: 540 self.notify_watchers(MODIFIED,dirnm,False) 541 # Check the metadata for each file in the directory. 542 # We assume that if the file's data changes, something in its 543 # metadata will also change; don't want to read through each file! 544 # Subdirectories will be handled by the outer polling loop. 545 for filenm in self.wrapped_fs.listdir(dirnm,files_only=True): 546 if self._poll_close_event.isSet(): 547 return 548 fpath = pathjoin(dirnm,filenm) 549 new_info = self.wrapped_fs.getinfo(fpath) 550 try: 551 old_info = self._path_info[fpath] 552 except KeyError: 553 self.notify_watchers(CREATED,fpath) 554 else: 555 was_accessed = False 556 was_modified = False 557 for (k,v) in new_info.iteritems(): 558 if k not in old_info: 559 was_modified = True 560 break 561 elif old_info[k] != v: 562 if k in ("accessed_time","st_atime",): 563 was_accessed = True 564 elif k: 565 was_modified = True 566 break 567 else: 568 for k in old_info: 569 if k not in new_info: 570 was_modified = True 571 break 572 if was_modified: 573 self.notify_watchers(MODIFIED,fpath,True) 574 elif was_accessed: 575 self.notify_watchers(ACCESSED,fpath) 576 # Check for deletion of cached child entries. 577 for childnm in self._path_info.iternames(dirnm): 578 if self._poll_close_event.isSet(): 579 return 580 cpath = pathjoin(dirnm,childnm) 581 if not self.wrapped_fs.exists(cpath): 582 self.notify_watchers(REMOVED,cpath) 583 584 585 586def ensure_watchable(fs,wrapper_class=PollingWatchableFS,*args,**kwds): 587 """Ensure that the given fs supports watching, simulating it if necessary. 588 589 Given an FS object, this function returns an equivalent FS that has support 590 for watcher callbacks. This may be the original object if it supports them 591 natively, or a wrapper class if they must be simulated. 592 """ 593 if isinstance(fs,wrapper_class): 594 return fs 595 try: 596 w = fs.add_watcher(lambda e: None,"/",recursive=False) 597 except (AttributeError,FSError): 598 return wrapper_class(fs,*args,**kwds) 599 else: 600 fs.del_watcher(w) 601 return fs 602 603 604class iter_changes(object): 605 """Blocking iterator over the change events produced by an FS. 606 607 This class can be used to transform the callback-based watcher mechanism 608 into a blocking stream of events. It operates by having the callbacks 609 push events onto a queue as they come in, then reading them off one at a 610 time. 611 """ 612 613 def __init__(self,fs=None,path="/",events=None,**kwds): 614 self.closed = False 615 self._queue = Queue.Queue() 616 self._watching = set() 617 if fs is not None: 618 self.add_watcher(fs,path,events,**kwds) 619 620 def __iter__(self): 621 return self 622 623 def __del__(self): 624 self.close() 625 626 def next(self,timeout=None): 627 if not self._watching: 628 raise StopIteration 629 try: 630 event = self._queue.get(timeout=timeout) 631 except Queue.Empty: 632 raise StopIteration 633 if event is None: 634 raise StopIteration 635 if isinstance(event,CLOSED): 636 event.fs.del_watcher(self._enqueue) 637 self._watching.remove(event.fs) 638 return event 639 640 def close(self): 641 if not self.closed: 642 self.closed = True 643 for fs in self._watching: 644 fs.del_watcher(self._enqueue) 645 self._queue.put(None) 646 647 def add_watcher(self,fs,path="/",events=None,**kwds): 648 w = fs.add_watcher(self._enqueue,path,events,**kwds) 649 self._watching.add(fs) 650 return w 651 652 def _enqueue(self,event): 653 self._queue.put(event) 654 655 def del_watcher(self,watcher): 656 for fs in self._watching: 657 try: 658 fs.del_watcher(watcher) 659 break 660 except ValueError: 661 pass 662 else: 663 raise ValueError("watcher not found: %s" % (watcher,)) 664 665 666