1"""
2fs.watch
3========
4
5Change notification support for FS.
6
7This module defines a standard interface for FS subclasses that support change
8notification callbacks.  It also offers some WrapFS subclasses that can
9simulate such an ability on top of an ordinary FS object.
10
11An FS object that wants to be "watchable" must provide the following methods:
12
13  * ``add_watcher(callback,path="/",events=None,recursive=True)``
14
15      Request that the given callback be executed in response to changes
16      to the given path.  A specific set of change events can be specified.
17      This method returns a Watcher object.
18
19  * ``del_watcher(watcher_or_callback)``
20
21      Remove the given watcher object, or any watchers associated with
22      the given callback.
23
24
25If you would prefer to read changes from a filesystem in a blocking fashion
26rather than using callbacks, you can use the function 'iter_changes' to obtain
27an iterator over the change events.
28
29
30"""
31
32import sys
33import weakref
34import threading
35import Queue
36import traceback
37
38from fs.path import *
39from fs.errors import *
40from fs.wrapfs import WrapFS
41from fs.base import FS
42from fs.filelike import FileWrapper
43
44from six import b
45
46
47class EVENT(object):
48    """Base class for change notification events."""
49    def __init__(self,fs,path):
50        super(EVENT, self).__init__()
51        self.fs = fs
52        if path is not None:
53            path = abspath(normpath(path))
54        self.path = path
55
56    def __str__(self):
57        return unicode(self).encode("utf8")
58
59    def __unicode__(self):
60        return u"<fs.watch.%s object (path='%s') at %s>" % (self.__class__.__name__,self.path,hex(id(self)))
61
62    def clone(self,fs=None,path=None):
63        if fs is None:
64            fs = self.fs
65        if path is None:
66            path = self.path
67        return self.__class__(fs,path)
68
69
70class ACCESSED(EVENT):
71    """Event fired when a file's contents are accessed."""
72    pass
73
74class CREATED(EVENT):
75    """Event fired when a new file or directory is created."""
76    pass
77
78class REMOVED(EVENT):
79    """Event fired when a file or directory is removed."""
80    pass
81
82class MODIFIED(EVENT):
83    """Event fired when a file or directory is modified."""
84    def __init__(self,fs,path,data_changed=False, closed=False):
85        super(MODIFIED,self).__init__(fs,path)
86        self.data_changed = data_changed
87        self.closed = closed
88
89    def clone(self,fs=None,path=None,data_changed=None):
90        evt = super(MODIFIED,self).clone(fs,path)
91        if data_changed is None:
92            data_changed = self.data_changed
93        evt.data_changed = data_changed
94        return evt
95
96class MOVED_DST(EVENT):
97    """Event fired when a file or directory is the target of a move."""
98    def __init__(self,fs,path,source=None):
99        super(MOVED_DST,self).__init__(fs,path)
100        if source is not None:
101            source = abspath(normpath(source))
102        self.source = source
103
104    def __unicode__(self):
105        return u"<fs.watch.%s object (path=%r,src=%r) at %s>" % (self.__class__.__name__,self.path,self.source,hex(id(self)))
106
107    def clone(self,fs=None,path=None,source=None):
108        evt = super(MOVED_DST,self).clone(fs,path)
109        if source is None:
110            source = self.source
111        evt.source = source
112        return evt
113
114class MOVED_SRC(EVENT):
115    """Event fired when a file or directory is the source of a move."""
116    def __init__(self,fs,path,destination=None):
117        super(MOVED_SRC,self).__init__(fs,path)
118        if destination is not None:
119            destination = abspath(normpath(destination))
120        self.destination = destination
121
122    def __unicode__(self):
123        return u"<fs.watch.%s object (path=%r,dst=%r) at %s>" % (self.__class__.__name__,self.path,self.destination,hex(id(self)))
124
125    def clone(self,fs=None,path=None,destination=None):
126        evt = super(MOVED_SRC,self).clone(fs,path)
127        if destination is None:
128            destination = self.destination
129        evt.destination = destination
130        return evt
131
132class CLOSED(EVENT):
133    """Event fired when the filesystem is closed."""
134    pass
135
136class ERROR(EVENT):
137    """Event fired when some miscellaneous error occurs."""
138    pass
139
140class OVERFLOW(ERROR):
141    """Event fired when some events could not be processed."""
142    pass
143
144
145
146class Watcher(object):
147    """Object encapsulating filesystem watch info."""
148
149    def __init__(self,fs,callback,path="/",events=None,recursive=True):
150        if events is None:
151            events = (EVENT,)
152        else:
153            events = tuple(events)
154        # Since the FS probably holds a reference to the Watcher, keeping
155        # a reference back to the FS would create a cycle containing a
156        # __del__ method.  Use a weakref to avoid this.
157        self._w_fs = weakref.ref(fs)
158        self.callback = callback
159        self.path = abspath(normpath(path))
160        self.events = events
161        self.recursive = recursive
162
163    @property
164    def fs(self):
165        return self._w_fs()
166
167    def delete(self):
168        fs = self.fs
169        if fs is not None:
170            fs.del_watcher(self)
171
172    def handle_event(self,event):
173        if not isinstance(event,self.events):
174            return
175        if event.path is not None:
176            if not isprefix(self.path,event.path):
177                return
178            if not self.recursive:
179                if event.path != self.path:
180                    if dirname(event.path) != self.path:
181                        return
182        try:
183            self.callback(event)
184        except Exception:
185            print >>sys.stderr, "error in FS watcher callback", self.callback
186            traceback.print_exc()
187
188
189class WatchableFSMixin(FS):
190    """Mixin class providing watcher management functions."""
191
192    def __init__(self,*args,**kwds):
193        self._watchers = PathMap()
194        super(WatchableFSMixin,self).__init__(*args,**kwds)
195
196    def __getstate__(self):
197        state = super(WatchableFSMixin,self).__getstate__()
198        state.pop("_watchers",None)
199        return state
200
201    def __setstate__(self,state):
202        super(WatchableFSMixin,self).__setstate__(state)
203        self._watchers = PathMap()
204
205    def add_watcher(self,callback,path="/",events=None,recursive=True):
206        """Add a watcher callback to the FS."""
207        w = Watcher(self,callback,path,events,recursive=recursive)
208        self._watchers.setdefault(path,[]).append(w)
209        return w
210
211    def del_watcher(self,watcher_or_callback):
212        """Delete a watcher callback from the FS."""
213        if isinstance(watcher_or_callback,Watcher):
214            self._watchers[watcher_or_callback.path].remove(watcher_or_callback)
215        else:
216            for watchers in self._watchers.itervalues():
217                for i,watcher in enumerate(watchers):
218                    if watcher.callback is watcher_or_callback:
219                        del watchers[i]
220                        break
221
222    def _find_watchers(self,callback):
223        """Find watchers registered with the given callback."""
224        for watchers in self._watchers.itervalues():
225            for watcher in watchers:
226                if watcher.callback is callback:
227                    yield watcher
228
229    def notify_watchers(self,event_or_class,path=None,*args,**kwds):
230        """Notify watchers of the given event data."""
231        if isinstance(event_or_class,EVENT):
232            event = event_or_class
233        else:
234            event = event_or_class(self,path,*args,**kwds)
235        if path is None:
236            path = event.path
237        if path is None:
238            for watchers in self._watchers.itervalues():
239                for watcher in watchers:
240                    watcher.handle_event(event)
241        else:
242            for prefix in recursepath(path):
243                if prefix in self._watchers:
244                    for watcher in self._watchers[prefix]:
245                        watcher.handle_event(event)
246
247
248
249class WatchedFile(FileWrapper):
250    """File wrapper for use with WatchableFS.
251
252    This file wrapper provides access to a file opened from a WatchableFS
253    instance, and fires MODIFIED events when the file is modified.
254    """
255
256    def __init__(self,file,fs,path,mode=None):
257        super(WatchedFile,self).__init__(file,mode)
258        self.fs = fs
259        self.path = path
260        self.was_modified = False
261
262    def _write(self,string,flushing=False):
263        self.was_modified = True
264        return super(WatchedFile,self)._write(string,flushing=flushing)
265
266    def _truncate(self,size):
267        self.was_modified = True
268        return super(WatchedFile,self)._truncate(size)
269
270    def flush(self):
271        super(WatchedFile,self).flush()
272        #  Don't bother if python if being torn down
273        if Watcher is not None:
274            if self.was_modified:
275                self.fs.notify_watchers(MODIFIED,self.path,True)
276
277    def close(self):
278        super(WatchedFile,self).close()
279        #  Don't bother if python if being torn down
280        if Watcher is not None:
281            if self.was_modified:
282                self.fs.notify_watchers(MODIFIED,self.path,True)
283
284
285class WatchableFS(WatchableFSMixin,WrapFS):
286    """FS wrapper simulating watcher callbacks.
287
288    This FS wrapper intercepts method calls that modify the underlying FS
289    and generates appropriate notification events.  It thus allows watchers
290    to monitor changes made through the underlying FS object, but not changes
291    that might be made through other interfaces to the same filesystem.
292    """
293
294    def __init__(self, *args, **kwds):
295        super(WatchableFS, self).__init__(*args, **kwds)
296
297    def close(self):
298        super(WatchableFS, self).close()
299        self.notify_watchers(CLOSED)
300
301    def open(self, path, mode='r', buffering=-1, encoding=None, errors=None, newline=None, line_buffering=False, **kwargs):
302        existed = self.wrapped_fs.isfile(path)
303        f = super(WatchableFS, self).open(path,
304                                          mode=mode,
305                                          buffering=buffering,
306                                          encoding=encoding,
307                                          errors=errors,
308                                          newline=newline,
309                                          line_buffering=line_buffering,
310                                          **kwargs)
311        if not existed:
312            self.notify_watchers(CREATED, path)
313        self.notify_watchers(ACCESSED, path)
314        return WatchedFile(f, self, path, mode)
315
316    def setcontents(self, path, data=b'', encoding=None, errors=None, chunk_size=64*1024):
317        existed = self.wrapped_fs.isfile(path)
318        ret = super(WatchableFS, self).setcontents(path, data=data, encoding=encoding, errors=errors, chunk_size=chunk_size)
319        if not existed:
320            self.notify_watchers(CREATED, path)
321        self.notify_watchers(ACCESSED, path)
322        if data:
323            self.notify_watchers(MODIFIED, path, True)
324        return ret
325
326    def createfile(self, path, wipe=False):
327        existed = self.wrapped_fs.isfile(path)
328        ret = super(WatchableFS, self).createfile(path, wipe=wipe)
329        if not existed:
330            self.notify_watchers(CREATED,path)
331        self.notify_watchers(ACCESSED,path)
332        return ret
333
334    def makedir(self,path,recursive=False,allow_recreate=False):
335        existed = self.wrapped_fs.isdir(path)
336        try:
337            super(WatchableFS,self).makedir(path,allow_recreate=allow_recreate)
338        except ParentDirectoryMissingError:
339            if not recursive:
340                raise
341            parent = dirname(path)
342            if parent != path:
343                self.makedir(dirname(path),recursive=True,allow_recreate=True)
344            super(WatchableFS,self).makedir(path,allow_recreate=allow_recreate)
345        if not existed:
346            self.notify_watchers(CREATED,path)
347
348    def remove(self,path):
349        super(WatchableFS,self).remove(path)
350        self.notify_watchers(REMOVED,path)
351
352    def removedir(self,path,recursive=False,force=False):
353        if not force:
354            for nm in self.listdir(path):
355                raise DirectoryNotEmptyError(path)
356        else:
357            for nm in self.listdir(path,dirs_only=True):
358                try:
359                    self.removedir(pathjoin(path,nm),force=True)
360                except ResourceNotFoundError:
361                    pass
362            for nm in self.listdir(path,files_only=True):
363                try:
364                    self.remove(pathjoin(path,nm))
365                except ResourceNotFoundError:
366                    pass
367        super(WatchableFS,self).removedir(path)
368        self.notify_watchers(REMOVED,path)
369        if recursive:
370            parent = dirname(path)
371            while parent and not self.listdir(parent):
372                super(WatchableFS,self).removedir(parent)
373                self.notify_watchers(REMOVED,parent)
374                parent = dirname(parent)
375
376    def rename(self,src,dst):
377        d_existed = self.wrapped_fs.exists(dst)
378        super(WatchableFS,self).rename(src,dst)
379        if d_existed:
380            self.notify_watchers(REMOVED,dst)
381        self.notify_watchers(MOVED_DST,dst,src)
382        self.notify_watchers(MOVED_SRC,src,dst)
383
384    def copy(self,src,dst,**kwds):
385        d = self._pre_copy(src,dst)
386        super(WatchableFS,self).copy(src,dst,**kwds)
387        self._post_copy(src,dst,d)
388
389    def copydir(self,src,dst,**kwds):
390        d = self._pre_copy(src,dst)
391        super(WatchableFS,self).copydir(src,dst,**kwds)
392        self._post_copy(src,dst,d)
393
394    def move(self,src,dst,**kwds):
395        d = self._pre_copy(src,dst)
396        super(WatchableFS,self).move(src,dst,**kwds)
397        self._post_copy(src,dst,d)
398        self._post_move(src,dst,d)
399
400    def movedir(self,src,dst,**kwds):
401        d = self._pre_copy(src,dst)
402        super(WatchableFS,self).movedir(src,dst,**kwds)
403        self._post_copy(src,dst,d)
404        self._post_move(src,dst,d)
405
406    def _pre_copy(self,src,dst):
407        dst_paths = {}
408        try:
409            for (dirnm,filenms) in self.wrapped_fs.walk(dst):
410                dirnm = dirnm[len(dst)+1:]
411                dst_paths[dirnm] = True
412                for filenm in filenms:
413                    dst_paths[filenm] = False
414        except ResourceNotFoundError:
415            pass
416        except ResourceInvalidError:
417            dst_paths[""] = False
418        src_paths = {}
419        try:
420            for (dirnm,filenms) in self.wrapped_fs.walk(src):
421                dirnm = dirnm[len(src)+1:]
422                src_paths[dirnm] = True
423                for filenm in filenms:
424                    src_paths[pathjoin(dirnm,filenm)] = False
425        except ResourceNotFoundError:
426            pass
427        except ResourceInvalidError:
428            src_paths[""] = False
429        return (src_paths,dst_paths)
430
431    def _post_copy(self,src,dst,data):
432        (src_paths,dst_paths) = data
433        for src_path,isdir in sorted(src_paths.items()):
434            path = pathjoin(dst,src_path)
435            if src_path in dst_paths:
436                self.notify_watchers(MODIFIED,path,not isdir)
437            else:
438                self.notify_watchers(CREATED,path)
439        for dst_path,isdir in sorted(dst_paths.items()):
440            path = pathjoin(dst,dst_path)
441            if not self.wrapped_fs.exists(path):
442                self.notify_watchers(REMOVED,path)
443
444    def _post_move(self,src,dst,data):
445        (src_paths,dst_paths) = data
446        for src_path,isdir in sorted(src_paths.items(),reverse=True):
447            path = pathjoin(src,src_path)
448            self.notify_watchers(REMOVED,path)
449
450    def setxattr(self,path,name,value):
451        super(WatchableFS,self).setxattr(path,name,value)
452        self.notify_watchers(MODIFIED,path,False)
453
454    def delxattr(self,path,name):
455        super(WatchableFS,self).delxattr(path,name)
456        self.notify_watchers(MODIFIED,path,False)
457
458
459
460class PollingWatchableFS(WatchableFS):
461    """FS wrapper simulating watcher callbacks by periodic polling.
462
463    This FS wrapper augments the functionality of WatchableFS by periodically
464    polling the underlying FS for changes.  It is thus capable of detecting
465    changes made to the underlying FS via other interfaces, albeit with a
466    (configurable) delay to account for the polling interval.
467    """
468
469    def __init__(self,wrapped_fs,poll_interval=60*5):
470        super(PollingWatchableFS,self).__init__(wrapped_fs)
471        self.poll_interval = poll_interval
472        self.add_watcher(self._on_path_modify,"/",(CREATED,MOVED_DST,))
473        self.add_watcher(self._on_path_modify,"/",(MODIFIED,ACCESSED,))
474        self.add_watcher(self._on_path_delete,"/",(REMOVED,MOVED_SRC,))
475        self._path_info = PathMap()
476        self._poll_thread = threading.Thread(target=self._poll_for_changes)
477        self._poll_cond = threading.Condition()
478        self._poll_close_event = threading.Event()
479        self._poll_thread.start()
480
481    def close(self):
482        self._poll_close_event.set()
483        self._poll_thread.join()
484        super(PollingWatchableFS,self).close()
485
486    def _on_path_modify(self,event):
487        path = event.path
488        try:
489            try:
490                self._path_info[path] = self.wrapped_fs.getinfo(path)
491            except ResourceNotFoundError:
492                self._path_info.clear(path)
493        except FSError:
494            pass
495
496    def _on_path_delete(self,event):
497        self._path_info.clear(event.path)
498
499    def _poll_for_changes(self):
500        try:
501            while not self._poll_close_event.isSet():
502                #  Walk all directories looking for changes.
503                #  Come back to any that give us an error.
504                error_paths = set()
505                for dirnm in self.wrapped_fs.walkdirs():
506                    if self._poll_close_event.isSet():
507                        break
508                    try:
509                        self._check_for_changes(dirnm)
510                    except FSError:
511                        error_paths.add(dirnm)
512                #  Retry the directories that gave us an error, until
513                #  we have successfully updated them all
514                while error_paths and not self._poll_close_event.isSet():
515                    dirnm = error_paths.pop()
516                    if self.wrapped_fs.isdir(dirnm):
517                        try:
518                            self._check_for_changes(dirnm)
519                        except FSError:
520                            error_paths.add(dirnm)
521                #  Notify that we have completed a polling run
522                self._poll_cond.acquire()
523                self._poll_cond.notifyAll()
524                self._poll_cond.release()
525                #  Sleep for the specified interval, or until closed.
526                self._poll_close_event.wait(timeout=self.poll_interval)
527        except FSError:
528            if not self.closed:
529                raise
530
531    def _check_for_changes(self,dirnm):
532        #  Check the metadata for the directory itself.
533        new_info = self.wrapped_fs.getinfo(dirnm)
534        try:
535            old_info = self._path_info[dirnm]
536        except KeyError:
537            self.notify_watchers(CREATED,dirnm)
538        else:
539            if new_info != old_info:
540                self.notify_watchers(MODIFIED,dirnm,False)
541        #  Check the metadata for each file in the directory.
542        #  We assume that if the file's data changes, something in its
543        #  metadata will also change; don't want to read through each file!
544        #  Subdirectories will be handled by the outer polling loop.
545        for filenm in self.wrapped_fs.listdir(dirnm,files_only=True):
546            if self._poll_close_event.isSet():
547                return
548            fpath = pathjoin(dirnm,filenm)
549            new_info = self.wrapped_fs.getinfo(fpath)
550            try:
551                old_info = self._path_info[fpath]
552            except KeyError:
553                self.notify_watchers(CREATED,fpath)
554            else:
555                was_accessed = False
556                was_modified = False
557                for (k,v) in new_info.iteritems():
558                    if k not in old_info:
559                        was_modified = True
560                        break
561                    elif old_info[k] != v:
562                        if k in ("accessed_time","st_atime",):
563                            was_accessed = True
564                        elif k:
565                            was_modified = True
566                            break
567                else:
568                    for k in old_info:
569                        if k not in new_info:
570                            was_modified = True
571                            break
572                if was_modified:
573                    self.notify_watchers(MODIFIED,fpath,True)
574                elif was_accessed:
575                    self.notify_watchers(ACCESSED,fpath)
576        #  Check for deletion of cached child entries.
577        for childnm in self._path_info.iternames(dirnm):
578            if self._poll_close_event.isSet():
579                return
580            cpath = pathjoin(dirnm,childnm)
581            if not self.wrapped_fs.exists(cpath):
582                self.notify_watchers(REMOVED,cpath)
583
584
585
586def ensure_watchable(fs,wrapper_class=PollingWatchableFS,*args,**kwds):
587    """Ensure that the given fs supports watching, simulating it if necessary.
588
589    Given an FS object, this function returns an equivalent FS that has support
590    for watcher callbacks.  This may be the original object if it supports them
591    natively, or a wrapper class if they must be simulated.
592    """
593    if isinstance(fs,wrapper_class):
594        return fs
595    try:
596        w = fs.add_watcher(lambda e: None,"/",recursive=False)
597    except (AttributeError,FSError):
598        return wrapper_class(fs,*args,**kwds)
599    else:
600        fs.del_watcher(w)
601    return fs
602
603
604class iter_changes(object):
605    """Blocking iterator over the change events produced by an FS.
606
607    This class can be used to transform the callback-based watcher mechanism
608    into a blocking stream of events.  It operates by having the callbacks
609    push events onto a queue as they come in, then reading them off one at a
610    time.
611    """
612
613    def __init__(self,fs=None,path="/",events=None,**kwds):
614        self.closed = False
615        self._queue = Queue.Queue()
616        self._watching = set()
617        if fs is not None:
618            self.add_watcher(fs,path,events,**kwds)
619
620    def __iter__(self):
621        return self
622
623    def __del__(self):
624        self.close()
625
626    def next(self,timeout=None):
627        if not self._watching:
628            raise StopIteration
629        try:
630            event = self._queue.get(timeout=timeout)
631        except Queue.Empty:
632            raise StopIteration
633        if event is None:
634            raise StopIteration
635        if isinstance(event,CLOSED):
636            event.fs.del_watcher(self._enqueue)
637            self._watching.remove(event.fs)
638        return event
639
640    def close(self):
641        if not self.closed:
642            self.closed = True
643            for fs in self._watching:
644                fs.del_watcher(self._enqueue)
645            self._queue.put(None)
646
647    def add_watcher(self,fs,path="/",events=None,**kwds):
648        w = fs.add_watcher(self._enqueue,path,events,**kwds)
649        self._watching.add(fs)
650        return w
651
652    def _enqueue(self,event):
653        self._queue.put(event)
654
655    def del_watcher(self,watcher):
656        for fs in self._watching:
657            try:
658                fs.del_watcher(watcher)
659                break
660            except ValueError:
661                pass
662        else:
663            raise ValueError("watcher not found: %s" % (watcher,))
664
665
666