1# coding: utf-8 2# 3# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com> 4# Copyright 2012 Google, Inc & contributors. 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18""" 19:module: watchdog.observers.fsevents 20:synopsis: FSEvents based emitter implementation. 21:author: yesudeep@google.com (Yesudeep Mangalapilly) 22:author: contact@tiger-222.fr (Mickaël Schoentgen) 23:platforms: macOS 24""" 25 26import time 27import logging 28import os 29import threading 30import unicodedata 31import _watchdog_fsevents as _fsevents 32 33from watchdog.events import ( 34 FileDeletedEvent, 35 FileModifiedEvent, 36 FileCreatedEvent, 37 FileMovedEvent, 38 DirDeletedEvent, 39 DirModifiedEvent, 40 DirCreatedEvent, 41 DirMovedEvent, 42 generate_sub_created_events, 43 generate_sub_moved_events 44) 45 46from watchdog.observers.api import ( 47 BaseObserver, 48 EventEmitter, 49 DEFAULT_EMITTER_TIMEOUT, 50 DEFAULT_OBSERVER_TIMEOUT 51) 52from watchdog.utils.dirsnapshot import DirectorySnapshot 53 54logger = logging.getLogger('fsevents') 55 56 57class FSEventsEmitter(EventEmitter): 58 59 """ 60 macOS FSEvents Emitter class. 61 62 :param event_queue: 63 The event queue to fill with events. 64 :param watch: 65 A watch object representing the directory to monitor. 66 :type watch: 67 :class:`watchdog.observers.api.ObservedWatch` 68 :param timeout: 69 Read events blocking timeout (in seconds). 70 :param suppress_history: 71 The FSEvents API may emit historic events up to 30 sec before the watch was 72 started. When ``suppress_history`` is ``True``, those events will be suppressed 73 by creating a directory snapshot of the watched path before starting the stream 74 as a reference to suppress old events. Warning: This may result in significant 75 memory usage in case of a large number of items in the watched path. 76 :type timeout: 77 ``float`` 78 """ 79 80 def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, suppress_history=False): 81 EventEmitter.__init__(self, event_queue, watch, timeout) 82 self._fs_view = set() 83 self.suppress_history = suppress_history 84 self._start_time = 0.0 85 self._starting_state = None 86 self._lock = threading.Lock() 87 self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path))) 88 89 def on_thread_stop(self): 90 _fsevents.remove_watch(self.watch) 91 _fsevents.stop(self) 92 93 def queue_event(self, event): 94 # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop 95 # all the events here which do not have a src_path / dest_path that matches the watched path 96 if self._watch.is_recursive: 97 logger.debug("queue_event %s", event) 98 EventEmitter.queue_event(self, event) 99 else: 100 if not self._is_recursive_event(event): 101 logger.debug("queue_event %s", event) 102 EventEmitter.queue_event(self, event) 103 else: 104 logger.debug("drop event %s", event) 105 106 def _is_recursive_event(self, event): 107 src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path) 108 if src_path == self._absolute_watch_path: 109 return False 110 111 if isinstance(event, (FileMovedEvent, DirMovedEvent)): 112 # when moving something into the watch path we must always take the dirname, 113 # otherwise we miss out on `DirMovedEvent`s 114 dest_path = os.path.dirname(event.dest_path) 115 if dest_path == self._absolute_watch_path: 116 return False 117 118 return True 119 120 def _queue_created_event(self, event, src_path, dirname): 121 cls = DirCreatedEvent if event.is_directory else FileCreatedEvent 122 self.queue_event(cls(src_path)) 123 self.queue_event(DirModifiedEvent(dirname)) 124 125 def _queue_deleted_event(self, event, src_path, dirname): 126 cls = DirDeletedEvent if event.is_directory else FileDeletedEvent 127 self.queue_event(cls(src_path)) 128 self.queue_event(DirModifiedEvent(dirname)) 129 130 def _queue_modified_event(self, event, src_path, dirname): 131 cls = DirModifiedEvent if event.is_directory else FileModifiedEvent 132 self.queue_event(cls(src_path)) 133 134 def _queue_renamed_event(self, src_event, src_path, dst_path, src_dirname, dst_dirname): 135 cls = DirMovedEvent if src_event.is_directory else FileMovedEvent 136 dst_path = self._encode_path(dst_path) 137 self.queue_event(cls(src_path, dst_path)) 138 self.queue_event(DirModifiedEvent(src_dirname)) 139 self.queue_event(DirModifiedEvent(dst_dirname)) 140 141 def _is_historic_created_event(self, event): 142 143 # We only queue a created event if the item was created after we 144 # started the FSEventsStream. 145 146 in_history = event.inode in self._fs_view 147 148 if self._starting_state: 149 try: 150 old_inode = self._starting_state.inode(event.path)[0] 151 before_start = old_inode == event.inode 152 except KeyError: 153 before_start = False 154 else: 155 before_start = False 156 157 return in_history or before_start 158 159 @staticmethod 160 def _is_meta_mod(event): 161 """Returns True if the event indicates a change in metadata.""" 162 return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change 163 164 def queue_events(self, timeout, events): 165 166 if logger.getEffectiveLevel() <= logging.DEBUG: 167 for event in events: 168 flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True) 169 logger.debug(f"{event}: {flags}") 170 171 if time.monotonic() - self._start_time > 60: 172 # Event history is no longer needed, let's free some memory. 173 self._starting_state = None 174 175 while events: 176 event = events.pop(0) 177 178 src_path = self._encode_path(event.path) 179 src_dirname = os.path.dirname(src_path) 180 181 try: 182 stat = os.stat(src_path) 183 except OSError: 184 stat = None 185 186 exists = stat and stat.st_ino == event.inode 187 188 # FSevents may coalesce multiple events for the same item + path into a 189 # single event. However, events are never coalesced for different items at 190 # the same path or for the same item at different paths. Therefore, the 191 # event chains "removed -> created" and "created -> renamed -> removed" will 192 # never emit a single native event and a deleted event *always* means that 193 # the item no longer existed at the end of the event chain. 194 195 # Some events will have a spurious `is_created` flag set, coalesced from an 196 # already emitted and processed CreatedEvent. To filter those, we keep track 197 # of all inodes which we know to be already created. This is safer than 198 # keeping track of paths since paths are more likely to be reused than 199 # inodes. 200 201 # Likewise, some events will have a spurious `is_modified`, 202 # `is_inode_meta_mod` or `is_xattr_mod` flag set. We currently do not 203 # suppress those but could do so if the item still exists by caching the 204 # stat result and verifying that it did change. 205 206 if event.is_created and event.is_removed: 207 208 # Events will only be coalesced for the same item / inode. 209 # The sequence deleted -> created therefore cannot occur. 210 # Any combination with renamed cannot occur either. 211 212 if not self._is_historic_created_event(event): 213 self._queue_created_event(event, src_path, src_dirname) 214 215 self._fs_view.add(event.inode) 216 217 if event.is_modified or self._is_meta_mod(event): 218 self._queue_modified_event(event, src_path, src_dirname) 219 220 self._queue_deleted_event(event, src_path, src_dirname) 221 self._fs_view.discard(event.inode) 222 223 else: 224 225 if event.is_created and not self._is_historic_created_event(event): 226 self._queue_created_event(event, src_path, src_dirname) 227 228 self._fs_view.add(event.inode) 229 230 if event.is_modified or self._is_meta_mod(event): 231 self._queue_modified_event(event, src_path, src_dirname) 232 233 if event.is_renamed: 234 235 # Check if we have a corresponding destination event in the watched path. 236 dst_event = next(iter(e for e in events if e.is_renamed and e.inode == event.inode), None) 237 238 if dst_event: 239 # Item was moved within the watched folder. 240 logger.debug("Destination event for rename is %s", dst_event) 241 242 dst_path = self._encode_path(dst_event.path) 243 dst_dirname = os.path.dirname(dst_path) 244 245 self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname) 246 self._fs_view.add(event.inode) 247 248 for sub_event in generate_sub_moved_events(src_path, dst_path): 249 self.queue_event(sub_event) 250 251 # Process any coalesced flags for the dst_event. 252 253 events.remove(dst_event) 254 255 if dst_event.is_modified or self._is_meta_mod(dst_event): 256 self._queue_modified_event(dst_event, dst_path, dst_dirname) 257 258 if dst_event.is_removed: 259 self._queue_deleted_event(dst_event, dst_path, dst_dirname) 260 self._fs_view.discard(dst_event.inode) 261 262 elif exists: 263 # This is the destination event, item was moved into the watched 264 # folder. 265 self._queue_created_event(event, src_path, src_dirname) 266 self._fs_view.add(event.inode) 267 268 for sub_event in generate_sub_created_events(src_path): 269 self.queue_event(sub_event) 270 271 else: 272 # This is the source event, item was moved out of the watched 273 # folder. 274 self._queue_deleted_event(event, src_path, src_dirname) 275 self._fs_view.discard(event.inode) 276 277 # Skip further coalesced processing. 278 continue 279 280 if event.is_removed: 281 # Won't occur together with renamed. 282 self._queue_deleted_event(event, src_path, src_dirname) 283 self._fs_view.discard(event.inode) 284 285 if event.is_root_changed: 286 # This will be set if root or any of its parents is renamed or deleted. 287 # TODO: find out new path and generate DirMovedEvent? 288 self.queue_event(DirDeletedEvent(self.watch.path)) 289 logger.debug("Stopping because root path was changed") 290 self.stop() 291 292 self._fs_view.clear() 293 294 def events_callback(self, paths, inodes, flags, ids): 295 """Callback passed to FSEventStreamCreate(), it will receive all 296 FS events and queue them. 297 """ 298 cls = _fsevents.NativeEvent 299 try: 300 events = [ 301 cls(path, inode, event_flags, event_id) 302 for path, inode, event_flags, event_id in zip( 303 paths, inodes, flags, ids 304 ) 305 ] 306 with self._lock: 307 self.queue_events(self.timeout, events) 308 except Exception: 309 logger.exception("Unhandled exception in fsevents callback") 310 311 def run(self): 312 self.pathnames = [self.watch.path] 313 self._start_time = time.monotonic() 314 try: 315 _fsevents.add_watch(self, self.watch, self.events_callback, self.pathnames) 316 _fsevents.read_events(self) 317 except Exception: 318 logger.exception("Unhandled exception in FSEventsEmitter") 319 320 def on_thread_start(self): 321 if self.suppress_history: 322 323 if isinstance(self.watch.path, bytes): 324 watch_path = os.fsdecode(self.watch.path) 325 else: 326 watch_path = self.watch.path 327 328 self._starting_state = DirectorySnapshot(watch_path) 329 330 def _encode_path(self, path): 331 """Encode path only if bytes were passed to this emitter. """ 332 if isinstance(self.watch.path, bytes): 333 return os.fsencode(path) 334 return path 335 336 337class FSEventsObserver(BaseObserver): 338 339 def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): 340 BaseObserver.__init__(self, emitter_class=FSEventsEmitter, 341 timeout=timeout) 342 343 def schedule(self, event_handler, path, recursive=False): 344 # Fix for issue #26: Trace/BPT error when given a unicode path 345 # string. https://github.com/gorakhargosh/watchdog/issues#issue/26 346 if isinstance(path, str): 347 path = unicodedata.normalize('NFC', path) 348 return BaseObserver.schedule(self, event_handler, path, recursive) 349