1 //! Watcher implementation for the inotify Linux API
2 //!
3 //! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4 //! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5 //! will return events for the directory itself, and for files inside the directory.
6 
7 use super::event::*;
8 use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9 use crossbeam_channel::{bounded, unbounded, Sender};
10 use inotify as inotify_sys;
11 use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12 use std::collections::HashMap;
13 use std::env;
14 use std::ffi::OsStr;
15 use std::fs::metadata;
16 use std::os::unix::io::AsRawFd;
17 use std::path::{Path, PathBuf};
18 use std::sync::Arc;
19 use std::thread;
20 use std::time::Duration;
21 use walkdir::WalkDir;
22 
23 const INOTIFY: mio::Token = mio::Token(0);
24 const MESSAGE: mio::Token = mio::Token(1);
25 
26 // The EventLoop will set up a mio::Poll and use it to wait for the following:
27 //
28 // -  messages telling it what to do
29 //
30 // -  events telling it that something has happened on one of the watched files.
31 struct EventLoop {
32     running: bool,
33     poll: mio::Poll,
34     event_loop_waker: Arc<mio::Waker>,
35     event_loop_tx: crossbeam_channel::Sender<EventLoopMsg>,
36     event_loop_rx: crossbeam_channel::Receiver<EventLoopMsg>,
37     inotify: Option<Inotify>,
38     event_handler: Box<dyn EventHandler>,
39     watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
40     paths: HashMap<WatchDescriptor, PathBuf>,
41     rename_event: Option<Event>,
42 }
43 
44 /// Watcher implementation based on inotify
45 pub struct INotifyWatcher {
46     channel: crossbeam_channel::Sender<EventLoopMsg>,
47     waker: Arc<mio::Waker>,
48 }
49 
50 enum EventLoopMsg {
51     AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
52     RemoveWatch(PathBuf, Sender<Result<()>>),
53     Shutdown,
54     RenameTimeout(usize),
55     Configure(Config, Sender<Result<bool>>),
56 }
57 
58 #[inline]
send_pending_rename_event(rename_event: &mut Option<Event>, event_handler: &mut dyn EventHandler)59 fn send_pending_rename_event(rename_event: &mut Option<Event>, event_handler: &mut dyn EventHandler) {
60     if let Some(e) = rename_event.take() {
61         event_handler.handle_event(Ok(e));
62     }
63 }
64 
65 #[inline]
add_watch_by_event( path: &Option<PathBuf>, event: &inotify_sys::Event<&OsStr>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, add_watches: &mut Vec<PathBuf>, )66 fn add_watch_by_event(
67     path: &Option<PathBuf>,
68     event: &inotify_sys::Event<&OsStr>,
69     watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
70     add_watches: &mut Vec<PathBuf>,
71 ) {
72     if let Some(ref path) = *path {
73         if event.mask.contains(EventMask::ISDIR) {
74             if let Some(parent_path) = path.parent() {
75                 if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
76                     if is_recursive {
77                         add_watches.push(path.to_owned());
78                     }
79                 }
80             }
81         }
82     }
83 }
84 
85 #[inline]
remove_watch_by_event( path: &Option<PathBuf>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, remove_watches: &mut Vec<PathBuf>, )86 fn remove_watch_by_event(
87     path: &Option<PathBuf>,
88     watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
89     remove_watches: &mut Vec<PathBuf>,
90 ) {
91     if let Some(ref path) = *path {
92         if watches.contains_key(path) {
93             remove_watches.push(path.to_owned());
94         }
95     }
96 }
97 
98 impl EventLoop {
new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self>99     pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
100         let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::<EventLoopMsg>();
101         let poll = mio::Poll::new()?;
102 
103         let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
104 
105         let inotify_fd = inotify.as_raw_fd();
106         let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
107         poll.registry()
108             .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
109 
110         let event_loop = EventLoop {
111             running: true,
112             poll,
113             event_loop_waker,
114             event_loop_tx,
115             event_loop_rx,
116             inotify: Some(inotify),
117             event_handler,
118             watches: HashMap::new(),
119             paths: HashMap::new(),
120             rename_event: None,
121         };
122         Ok(event_loop)
123     }
124 
125     // Run the event loop.
run(self)126     pub fn run(self) {
127         thread::spawn(|| self.event_loop_thread());
128     }
129 
event_loop_thread(mut self)130     fn event_loop_thread(mut self) {
131         let mut events = mio::Events::with_capacity(16);
132         loop {
133             // Wait for something to happen.
134             match self.poll.poll(&mut events, None) {
135                 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
136                     // System call was interrupted, we will retry
137                     // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
138                 }
139                 Err(e) => panic!("poll failed: {}", e),
140                 Ok(()) => {}
141             }
142 
143             // Process whatever happened.
144             for event in &events {
145                 self.handle_event(event);
146             }
147 
148             // Stop, if we're done.
149             if !self.running {
150                 break;
151             }
152         }
153     }
154 
155     // Handle a single event.
handle_event(&mut self, event: &mio::event::Event)156     fn handle_event(&mut self, event: &mio::event::Event) {
157         match event.token() {
158             MESSAGE => {
159                 // The channel is readable - handle messages.
160                 self.handle_messages()
161             }
162             INOTIFY => {
163                 // inotify has something to tell us.
164                 self.handle_inotify()
165             }
166             _ => unreachable!(),
167         }
168     }
169 
handle_messages(&mut self)170     fn handle_messages(&mut self) {
171         while let Ok(msg) = self.event_loop_rx.try_recv() {
172             match msg {
173                 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
174                     let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
175                 }
176                 EventLoopMsg::RemoveWatch(path, tx) => {
177                     let _ = tx.send(self.remove_watch(path, false));
178                 }
179                 EventLoopMsg::Shutdown => {
180                     let _ = self.remove_all_watches();
181                     if let Some(inotify) = self.inotify.take() {
182                         let _ = inotify.close();
183                     }
184                     self.running = false;
185                     break;
186                 }
187                 EventLoopMsg::RenameTimeout(cookie) => {
188                     let current_cookie = self.rename_event.as_ref().and_then(|e| e.tracker());
189                     // send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
190                     if current_cookie == Some(cookie) {
191                         send_pending_rename_event(&mut self.rename_event, &mut *self.event_handler);
192                     }
193                 }
194                 EventLoopMsg::Configure(config, tx) => {
195                     self.configure_raw_mode(config, tx);
196                 }
197             }
198         }
199     }
200 
configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>)201     fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
202         tx.send(Ok(false))
203             .expect("configuration channel disconnected");
204     }
205 
handle_inotify(&mut self)206     fn handle_inotify(&mut self) {
207         let mut add_watches = Vec::new();
208         let mut remove_watches = Vec::new();
209 
210         if let Some(ref mut inotify) = self.inotify {
211             let mut buffer = [0; 1024];
212             // Read all buffers available.
213             loop {
214                 match inotify.read_events(&mut buffer) {
215                     Ok(events) => {
216                         let mut num_events = 0;
217                         for event in events {
218                             num_events += 1;
219                             if event.mask.contains(EventMask::Q_OVERFLOW) {
220                                 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
221                                 self.event_handler.handle_event(ev);
222                             }
223 
224                             let path = match event.name {
225                                 Some(name) => {
226                                     self.paths.get(&event.wd).map(|root| root.join(&name))
227                                 }
228                                 None => self.paths.get(&event.wd).cloned(),
229                             };
230 
231                             if event.mask.contains(EventMask::MOVED_FROM) {
232                                 send_pending_rename_event(
233                                     &mut self.rename_event,
234                                     &mut *self.event_handler,
235                                 );
236                                 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
237                                 self.rename_event = Some(
238                                     Event::new(EventKind::Modify(ModifyKind::Name(
239                                         RenameMode::From,
240                                     )))
241                                     .add_some_path(path.clone())
242                                     .set_tracker(event.cookie as usize),
243                                 );
244                             } else {
245                                 let mut evs = Vec::new();
246                                 if event.mask.contains(EventMask::MOVED_TO) {
247                                     if let Some(e) = self.rename_event.take() {
248                                         if e.tracker() == Some(event.cookie as usize) {
249                                             self.event_handler.handle_event(Ok(e.clone()));
250                                             evs.push(
251                                                 Event::new(EventKind::Modify(ModifyKind::Name(
252                                                     RenameMode::To,
253                                                 )))
254                                                 .set_tracker(event.cookie as usize)
255                                                 .add_some_path(path.clone()),
256                                             );
257                                             evs.push(
258                                                 Event::new(EventKind::Modify(ModifyKind::Name(
259                                                     RenameMode::Both,
260                                                 )))
261                                                 .set_tracker(event.cookie as usize)
262                                                 .add_some_path(e.paths.first().cloned())
263                                                 .add_some_path(path.clone()),
264                                             );
265                                         } else {
266                                             // TODO should it be rename?
267                                             evs.push(
268                                                 Event::new(EventKind::Create(
269                                                     if event.mask.contains(EventMask::ISDIR) {
270                                                         CreateKind::Folder
271                                                     } else {
272                                                         CreateKind::File
273                                                     },
274                                                 ))
275                                                 .add_some_path(path.clone()),
276                                             );
277                                         }
278                                     } else {
279                                         // TODO should it be rename?
280                                         evs.push(
281                                             Event::new(EventKind::Create(
282                                                 if event.mask.contains(EventMask::ISDIR) {
283                                                     CreateKind::Folder
284                                                 } else {
285                                                     CreateKind::File
286                                                 },
287                                             ))
288                                             .add_some_path(path.clone()),
289                                         );
290                                     }
291                                     add_watch_by_event(
292                                         &path,
293                                         &event,
294                                         &self.watches,
295                                         &mut add_watches,
296                                     );
297                                 }
298                                 if event.mask.contains(EventMask::MOVE_SELF) {
299                                     evs.push(
300                                         Event::new(EventKind::Modify(ModifyKind::Name(
301                                             RenameMode::From,
302                                         )))
303                                         .add_some_path(path.clone()),
304                                     );
305                                     // TODO stat the path and get to new path
306                                     // - emit To and Both events
307                                     // - change prefix for further events
308                                 }
309                                 if event.mask.contains(EventMask::CREATE) {
310                                     evs.push(
311                                         Event::new(EventKind::Create(
312                                             if event.mask.contains(EventMask::ISDIR) {
313                                                 CreateKind::Folder
314                                             } else {
315                                                 CreateKind::File
316                                             },
317                                         ))
318                                         .add_some_path(path.clone()),
319                                     );
320                                     add_watch_by_event(
321                                         &path,
322                                         &event,
323                                         &self.watches,
324                                         &mut add_watches,
325                                     );
326                                 }
327                                 if event.mask.contains(EventMask::DELETE_SELF)
328                                     || event.mask.contains(EventMask::DELETE)
329                                 {
330                                     evs.push(
331                                         Event::new(EventKind::Remove(
332                                             if event.mask.contains(EventMask::ISDIR) {
333                                                 RemoveKind::Folder
334                                             } else {
335                                                 RemoveKind::File
336                                             },
337                                         ))
338                                         .add_some_path(path.clone()),
339                                     );
340                                     remove_watch_by_event(
341                                         &path,
342                                         &self.watches,
343                                         &mut remove_watches,
344                                     );
345                                 }
346                                 if event.mask.contains(EventMask::MODIFY) {
347                                     evs.push(
348                                         Event::new(EventKind::Modify(ModifyKind::Data(
349                                             DataChange::Any,
350                                         )))
351                                         .add_some_path(path.clone()),
352                                     );
353                                 }
354                                 if event.mask.contains(EventMask::CLOSE_WRITE) {
355                                     evs.push(
356                                         Event::new(EventKind::Access(AccessKind::Close(
357                                             AccessMode::Write,
358                                         )))
359                                         .add_some_path(path.clone()),
360                                     );
361                                 }
362                                 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
363                                     evs.push(
364                                         Event::new(EventKind::Access(AccessKind::Close(
365                                             AccessMode::Read,
366                                         )))
367                                         .add_some_path(path.clone()),
368                                     );
369                                 }
370                                 if event.mask.contains(EventMask::ATTRIB) {
371                                     evs.push(
372                                         Event::new(EventKind::Modify(ModifyKind::Metadata(
373                                             MetadataKind::Any,
374                                         )))
375                                         .add_some_path(path.clone()),
376                                     );
377                                 }
378                                 if event.mask.contains(EventMask::OPEN) {
379                                     evs.push(
380                                         Event::new(EventKind::Access(AccessKind::Open(
381                                             AccessMode::Any,
382                                         )))
383                                         .add_some_path(path.clone()),
384                                     );
385                                 }
386 
387                                 if !evs.is_empty() {
388                                     send_pending_rename_event(
389                                         &mut self.rename_event,
390                                         &mut *self.event_handler,
391                                     );
392                                 }
393 
394                                 for ev in evs {
395                                     self.event_handler.handle_event(Ok(ev));
396                                 }
397                             }
398                         }
399 
400                         // All events read. Break out.
401                         if num_events == 0 {
402                             break;
403                         }
404 
405                         // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear
406                         // whether the second part (IN_MOVED_TO) will arrive because the file or directory
407                         // could just have been moved out of the watched directory. So it's necessary to wait
408                         // for possible subsequent events in case it's a complete move event but also to make sure
409                         // that the first part of the event is handled in a timely manner in case no subsequent events arrive.
410                         // TODO: don't do this here, instead leave it entirely to the debounce
411                         // -> related to some rename events being reported as creates.
412 
413                         if let Some(ref rename_event) = self.rename_event {
414                             let event_loop_tx = self.event_loop_tx.clone();
415                             let waker = self.event_loop_waker.clone();
416                             let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie
417                             thread::spawn(move || {
418                                 thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
419                                 event_loop_tx
420                                     .send(EventLoopMsg::RenameTimeout(cookie))
421                                     .unwrap();
422                                 waker.wake().unwrap();
423                             });
424                         }
425                     }
426                     Err(e) => {
427                         self.event_handler.handle_event(Err(Error::io(e)));
428                     }
429                 }
430             }
431         }
432 
433         for path in remove_watches {
434             self.remove_watch(path, true).ok();
435         }
436 
437         for path in add_watches {
438             self.add_watch(path, true, false).ok();
439         }
440     }
441 
add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()>442     fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
443         // If the watch is not recursive, or if we determine (by stat'ing the path to get its
444         // metadata) that the watched path is not a directory, add a single path watch.
445         if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
446             return self.add_single_watch(path, false, true);
447         }
448 
449         for entry in WalkDir::new(path)
450             .follow_links(true)
451             .into_iter()
452             .filter_map(filter_dir)
453         {
454             self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
455             watch_self = false;
456         }
457 
458         Ok(())
459     }
460 
add_single_watch( &mut self, path: PathBuf, is_recursive: bool, watch_self: bool, ) -> Result<()>461     fn add_single_watch(
462         &mut self,
463         path: PathBuf,
464         is_recursive: bool,
465         watch_self: bool,
466     ) -> Result<()> {
467         let mut watchmask = WatchMask::ATTRIB
468             | WatchMask::CREATE
469             | WatchMask::DELETE
470             | WatchMask::CLOSE_WRITE
471             | WatchMask::MODIFY
472             | WatchMask::MOVED_FROM
473             | WatchMask::MOVED_TO;
474 
475         if watch_self {
476             watchmask.insert(WatchMask::DELETE_SELF);
477             watchmask.insert(WatchMask::MOVE_SELF);
478         }
479 
480         if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
481             watchmask.insert(old_watchmask);
482             watchmask.insert(WatchMask::MASK_ADD);
483         }
484 
485         if let Some(ref mut inotify) = self.inotify {
486             match inotify.add_watch(&path, watchmask) {
487                 Err(e) => {
488                     Err(if e.raw_os_error() == Some(libc::ENOSPC) {
489                         // do not report inotify limits as "no more space" on linux #266
490                         Error::new(ErrorKind::MaxFilesWatch)
491                     } else {
492                         Error::io(e)
493                     }.add_path(path))
494                 }
495                 Ok(w) => {
496                     watchmask.remove(WatchMask::MASK_ADD);
497                     self.watches
498                         .insert(path.clone(), (w.clone(), watchmask, is_recursive));
499                     self.paths.insert(w, path);
500                     Ok(())
501                 }
502             }
503         } else {
504             Ok(())
505         }
506     }
507 
remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()>508     fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
509         match self.watches.remove(&path) {
510             None => return Err(Error::watch_not_found().add_path(path)),
511             Some((w, _, is_recursive)) => {
512                 if let Some(ref mut inotify) = self.inotify {
513                     inotify.rm_watch(w.clone()).map_err(|e| Error::io(e).add_path(path.clone()))?;
514                     self.paths.remove(&w);
515 
516                     if is_recursive || remove_recursive {
517                         let mut remove_list = Vec::new();
518                         for (w, p) in &self.paths {
519                             if p.starts_with(&path) {
520                                 inotify.rm_watch(w.clone()).map_err(|e| Error::io(e).add_path(p.into()))?;
521                                 self.watches.remove(p);
522                                 remove_list.push(w.clone());
523                             }
524                         }
525                         for w in remove_list {
526                             self.paths.remove(&w);
527                         }
528                     }
529                 }
530             }
531         }
532         Ok(())
533     }
534 
remove_all_watches(&mut self) -> Result<()>535     fn remove_all_watches(&mut self) -> Result<()> {
536         if let Some(ref mut inotify) = self.inotify {
537             for (w, p) in &self.paths {
538                 inotify.rm_watch(w.clone()).map_err(|e| Error::io(e).add_path(p.into()))?;
539             }
540             self.watches.clear();
541             self.paths.clear();
542         }
543         Ok(())
544     }
545 }
546 
547 /// return `DirEntry` when it is a directory
filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry>548 fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
549     if let Ok(e) = e {
550         if let Ok(metadata) = e.metadata() {
551             if metadata.is_dir() {
552                 return Some(e);
553             }
554         }
555     }
556     None
557 }
558 
559 impl INotifyWatcher {
from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self>560     fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
561         let inotify = Inotify::init()?;
562         let event_loop = EventLoop::new(inotify, event_handler)?;
563         let channel = event_loop.event_loop_tx.clone();
564         let waker = event_loop.event_loop_waker.clone();
565         event_loop.run();
566         Ok(INotifyWatcher { channel, waker })
567     }
568 
watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>569     fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
570         let pb = if path.is_absolute() {
571             path.to_owned()
572         } else {
573             let p = env::current_dir().map_err(Error::io)?;
574             p.join(path)
575         };
576         let (tx, rx) = unbounded();
577         let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
578 
579         // we expect the event loop to live and reply => unwraps must not panic
580         self.channel.send(msg).unwrap();
581         self.waker.wake().unwrap();
582         rx.recv().unwrap()
583     }
584 
unwatch_inner(&mut self, path: &Path) -> Result<()>585     fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
586         let pb = if path.is_absolute() {
587             path.to_owned()
588         } else {
589             let p = env::current_dir().map_err(Error::io)?;
590             p.join(path)
591         };
592         let (tx, rx) = unbounded();
593         let msg = EventLoopMsg::RemoveWatch(pb, tx);
594 
595         // we expect the event loop to live and reply => unwraps must not panic
596         self.channel.send(msg).unwrap();
597         self.waker.wake().unwrap();
598         rx.recv().unwrap()
599     }
600 }
601 
602 impl Watcher for INotifyWatcher {
603     /// Create a new watcher.
new<F: EventHandler>(event_handler: F) -> Result<Self>604     fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
605         Self::from_event_handler(Box::new(event_handler))
606     }
607 
watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>608     fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
609         self.watch_inner(path, recursive_mode)
610     }
611 
unwatch(&mut self, path: &Path) -> Result<()>612     fn unwatch(&mut self, path: &Path) -> Result<()> {
613         self.unwatch_inner(path)
614     }
615 
configure(&mut self, config: Config) -> Result<bool>616     fn configure(&mut self, config: Config) -> Result<bool> {
617         let (tx, rx) = bounded(1);
618         self.channel.send(EventLoopMsg::Configure(config, tx))?;
619         self.waker.wake()?;
620         rx.recv()?
621     }
622 }
623 
624 impl Drop for INotifyWatcher {
drop(&mut self)625     fn drop(&mut self) {
626         // we expect the event loop to live => unwrap must not panic
627         self.channel.send(EventLoopMsg::Shutdown).unwrap();
628         self.waker.wake().unwrap();
629     }
630 }
631 
632 #[test]
inotify_watcher_is_send_and_sync()633 fn inotify_watcher_is_send_and_sync() {
634     fn check<T: Send + Sync>() {}
635     check::<INotifyWatcher>();
636 }
637