1# coding: utf-8
2#
3# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
4# Copyright 2012 Google, Inc & contributors.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18"""
19:module: watchdog.observers.fsevents
20:synopsis: FSEvents based emitter implementation.
21:author: yesudeep@google.com (Yesudeep Mangalapilly)
22:author: contact@tiger-222.fr (Mickaël Schoentgen)
23:platforms: macOS
24"""
25
26import time
27import logging
28import os
29import threading
30import unicodedata
31import _watchdog_fsevents as _fsevents
32
33from watchdog.events import (
34    FileDeletedEvent,
35    FileModifiedEvent,
36    FileCreatedEvent,
37    FileMovedEvent,
38    DirDeletedEvent,
39    DirModifiedEvent,
40    DirCreatedEvent,
41    DirMovedEvent,
42    generate_sub_created_events,
43    generate_sub_moved_events
44)
45
46from watchdog.observers.api import (
47    BaseObserver,
48    EventEmitter,
49    DEFAULT_EMITTER_TIMEOUT,
50    DEFAULT_OBSERVER_TIMEOUT
51)
52from watchdog.utils.dirsnapshot import DirectorySnapshot
53
54logger = logging.getLogger('fsevents')
55
56
57class FSEventsEmitter(EventEmitter):
58
59    """
60    macOS FSEvents Emitter class.
61
62    :param event_queue:
63        The event queue to fill with events.
64    :param watch:
65        A watch object representing the directory to monitor.
66    :type watch:
67        :class:`watchdog.observers.api.ObservedWatch`
68    :param timeout:
69        Read events blocking timeout (in seconds).
70    :param suppress_history:
71        The FSEvents API may emit historic events up to 30 sec before the watch was
72        started. When ``suppress_history`` is ``True``, those events will be suppressed
73        by creating a directory snapshot of the watched path before starting the stream
74        as a reference to suppress old events. Warning: This may result in significant
75        memory usage in case of a large number of items in the watched path.
76    :type timeout:
77        ``float``
78    """
79
80    def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress_history=False):
81        EventEmitter.__init__(self, event_queue, watch, timeout)
82        self._fs_view = set()
83        self.suppress_history = suppress_history
84        self._start_time = 0.0
85        self._starting_state = None
86        self._lock = threading.Lock()
87        self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path)))
88
89    def on_thread_stop(self):
90        _fsevents.remove_watch(self.watch)
91        _fsevents.stop(self)
92
93    def queue_event(self, event):
94        # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop
95        # all the events here which do not have a src_path / dest_path that matches the watched path
96        if self._watch.is_recursive:
97            logger.debug("queue_event %s", event)
98            EventEmitter.queue_event(self, event)
99        else:
100            if not self._is_recursive_event(event):
101                logger.debug("queue_event %s", event)
102                EventEmitter.queue_event(self, event)
103            else:
104                logger.debug("drop event %s", event)
105
106    def _is_recursive_event(self, event):
107        src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path)
108        if src_path == self._absolute_watch_path:
109            return False
110
111        if isinstance(event, (FileMovedEvent, DirMovedEvent)):
112            # when moving something into the watch path we must always take the dirname,
113            # otherwise we miss out on `DirMovedEvent`s
114            dest_path = os.path.dirname(event.dest_path)
115            if dest_path == self._absolute_watch_path:
116                return False
117
118        return True
119
120    def _queue_created_event(self, event, src_path, dirname):
121        cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
122        self.queue_event(cls(src_path))
123        self.queue_event(DirModifiedEvent(dirname))
124
125    def _queue_deleted_event(self, event, src_path, dirname):
126        cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
127        self.queue_event(cls(src_path))
128        self.queue_event(DirModifiedEvent(dirname))
129
130    def _queue_modified_event(self, event, src_path, dirname):
131        cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
132        self.queue_event(cls(src_path))
133
134    def _queue_renamed_event(self, src_event, src_path, dst_path, src_dirname, dst_dirname):
135        cls = DirMovedEvent if src_event.is_directory else FileMovedEvent
136        dst_path = self._encode_path(dst_path)
137        self.queue_event(cls(src_path, dst_path))
138        self.queue_event(DirModifiedEvent(src_dirname))
139        self.queue_event(DirModifiedEvent(dst_dirname))
140
141    def _is_historic_created_event(self, event):
142
143        # We only queue a created event if the item was created after we
144        # started the FSEventsStream.
145
146        in_history = event.inode in self._fs_view
147
148        if self._starting_state:
149            try:
150                old_inode = self._starting_state.inode(event.path)[0]
151                before_start = old_inode == event.inode
152            except KeyError:
153                before_start = False
154        else:
155            before_start = False
156
157        return in_history or before_start
158
159    @staticmethod
160    def _is_meta_mod(event):
161        """Returns True if the event indicates a change in metadata."""
162        return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change
163
164    def queue_events(self, timeout, events):
165
166        if logger.getEffectiveLevel() <= logging.DEBUG:
167            for event in events:
168                flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True)
169                logger.debug(f"{event}: {flags}")
170
171        if time.monotonic() - self._start_time > 60:
172            # Event history is no longer needed, let's free some memory.
173            self._starting_state = None
174
175        while events:
176            event = events.pop(0)
177
178            src_path = self._encode_path(event.path)
179            src_dirname = os.path.dirname(src_path)
180
181            try:
182                stat = os.stat(src_path)
183            except OSError:
184                stat = None
185
186            exists = stat and stat.st_ino == event.inode
187
188            # FSevents may coalesce multiple events for the same item + path into a
189            # single event. However, events are never coalesced for different items at
190            # the same path or for the same item at different paths. Therefore, the
191            # event chains "removed -> created" and "created -> renamed -> removed" will
192            # never emit a single native event and a deleted event *always* means that
193            # the item no longer existed at the end of the event chain.
194
195            # Some events will have a spurious `is_created` flag set, coalesced from an
196            # already emitted and processed CreatedEvent. To filter those, we keep track
197            # of all inodes which we know to be already created. This is safer than
198            # keeping track of paths since paths are more likely to be reused than
199            # inodes.
200
201            # Likewise, some events will have a spurious `is_modified`,
202            # `is_inode_meta_mod` or `is_xattr_mod` flag set. We currently do not
203            # suppress those but could do so if the item still exists by caching the
204            # stat result and verifying that it did change.
205
206            if event.is_created and event.is_removed:
207
208                # Events will only be coalesced for the same item / inode.
209                # The sequence deleted -> created therefore cannot occur.
210                # Any combination with renamed cannot occur either.
211
212                if not self._is_historic_created_event(event):
213                    self._queue_created_event(event, src_path, src_dirname)
214
215                self._fs_view.add(event.inode)
216
217                if event.is_modified or self._is_meta_mod(event):
218                    self._queue_modified_event(event, src_path, src_dirname)
219
220                self._queue_deleted_event(event, src_path, src_dirname)
221                self._fs_view.discard(event.inode)
222
223            else:
224
225                if event.is_created and not self._is_historic_created_event(event):
226                    self._queue_created_event(event, src_path, src_dirname)
227
228                self._fs_view.add(event.inode)
229
230                if event.is_modified or self._is_meta_mod(event):
231                    self._queue_modified_event(event, src_path, src_dirname)
232
233                if event.is_renamed:
234
235                    # Check if we have a corresponding destination event in the watched path.
236                    dst_event = next(iter(e for e in events if e.is_renamed and e.inode == event.inode), None)
237
238                    if dst_event:
239                        # Item was moved within the watched folder.
240                        logger.debug("Destination event for rename is %s", dst_event)
241
242                        dst_path = self._encode_path(dst_event.path)
243                        dst_dirname = os.path.dirname(dst_path)
244
245                        self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname)
246                        self._fs_view.add(event.inode)
247
248                        for sub_event in generate_sub_moved_events(src_path, dst_path):
249                            self.queue_event(sub_event)
250
251                        # Process any coalesced flags for the dst_event.
252
253                        events.remove(dst_event)
254
255                        if dst_event.is_modified or self._is_meta_mod(dst_event):
256                            self._queue_modified_event(dst_event, dst_path, dst_dirname)
257
258                        if dst_event.is_removed:
259                            self._queue_deleted_event(dst_event, dst_path, dst_dirname)
260                            self._fs_view.discard(dst_event.inode)
261
262                    elif exists:
263                        # This is the destination event, item was moved into the watched
264                        # folder.
265                        self._queue_created_event(event, src_path, src_dirname)
266                        self._fs_view.add(event.inode)
267
268                        for sub_event in generate_sub_created_events(src_path):
269                            self.queue_event(sub_event)
270
271                    else:
272                        # This is the source event, item was moved out of the watched
273                        # folder.
274                        self._queue_deleted_event(event, src_path, src_dirname)
275                        self._fs_view.discard(event.inode)
276
277                        # Skip further coalesced processing.
278                        continue
279
280                if event.is_removed:
281                    # Won't occur together with renamed.
282                    self._queue_deleted_event(event, src_path, src_dirname)
283                    self._fs_view.discard(event.inode)
284
285            if event.is_root_changed:
286                # This will be set if root or any of its parents is renamed or deleted.
287                # TODO: find out new path and generate DirMovedEvent?
288                self.queue_event(DirDeletedEvent(self.watch.path))
289                logger.debug("Stopping because root path was changed")
290                self.stop()
291
292                self._fs_view.clear()
293
294    def events_callback(self, paths, inodes, flags, ids):
295        """Callback passed to FSEventStreamCreate(), it will receive all
296        FS events and queue them.
297        """
298        cls = _fsevents.NativeEvent
299        try:
300            events = [
301                cls(path, inode, event_flags, event_id)
302                for path, inode, event_flags, event_id in zip(
303                    paths, inodes, flags, ids
304                )
305            ]
306            with self._lock:
307                self.queue_events(self.timeout, events)
308        except Exception:
309            logger.exception("Unhandled exception in fsevents callback")
310
311    def run(self):
312        self.pathnames = [self.watch.path]
313        self._start_time = time.monotonic()
314        try:
315            _fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames)
316            _fsevents.read_events(self)
317        except Exception:
318            logger.exception("Unhandled exception in FSEventsEmitter")
319
320    def on_thread_start(self):
321        if self.suppress_history:
322
323            if isinstance(self.watch.path, bytes):
324                watch_path = os.fsdecode(self.watch.path)
325            else:
326                watch_path = self.watch.path
327
328            self._starting_state = DirectorySnapshot(watch_path)
329
330    def _encode_path(self, path):
331        """Encode path only if bytes were passed to this emitter. """
332        if isinstance(self.watch.path, bytes):
333            return os.fsencode(path)
334        return path
335
336
337class FSEventsObserver(BaseObserver):
338
339    def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
340        BaseObserver.__init__(self, emitter_class=FSEventsEmitter,
341                              timeout=timeout)
342
343    def schedule(self, event_handler, path, recursive=False):
344        # Fix for issue #26: Trace/BPT error when given a unicode path
345        # string. https://github.com/gorakhargosh/watchdog/issues#issue/26
346        if isinstance(path, str):
347            path = unicodedata.normalize('NFC', path)
348        return BaseObserver.schedule(self, event_handler, path, recursive)
349