1 //! Watcher implementation for the inotify Linux API
2 //!
3 //! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4 //! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5 //! will return events for the directory itself, and for files inside the directory.
6 
7 use super::event::*;
8 use super::{Config, Error, EventFn, RecursiveMode, Result, Watcher};
9 use crossbeam_channel::{bounded, unbounded, Sender};
10 use inotify as inotify_sys;
11 use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12 use std::collections::HashMap;
13 use std::env;
14 use std::ffi::OsStr;
15 use std::fs::metadata;
16 use std::os::unix::io::AsRawFd;
17 use std::path::{Path, PathBuf};
18 use std::sync::Arc;
19 use std::thread;
20 use std::time::Duration;
21 use walkdir::WalkDir;
22 
23 const INOTIFY: mio::Token = mio::Token(0);
24 const MESSAGE: mio::Token = mio::Token(1);
25 
26 // The EventLoop will set up a mio::Poll and use it to wait for the following:
27 //
28 // -  messages telling it what to do
29 //
30 // -  events telling it that something has happened on one of the watched files.
31 struct EventLoop {
32     running: bool,
33     poll: mio::Poll,
34     event_loop_waker: Arc<mio::Waker>,
35     event_loop_tx: crossbeam_channel::Sender<EventLoopMsg>,
36     event_loop_rx: crossbeam_channel::Receiver<EventLoopMsg>,
37     inotify: Option<Inotify>,
38     event_fn: Box<dyn EventFn>,
39     watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
40     paths: HashMap<WatchDescriptor, PathBuf>,
41     rename_event: Option<Event>,
42 }
43 
44 /// Watcher implementation based on inotify
45 pub struct INotifyWatcher {
46     channel: crossbeam_channel::Sender<EventLoopMsg>,
47     waker: Arc<mio::Waker>,
48 }
49 
50 enum EventLoopMsg {
51     AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
52     RemoveWatch(PathBuf, Sender<Result<()>>),
53     Shutdown,
54     RenameTimeout(usize),
55     Configure(Config, Sender<Result<bool>>),
56 }
57 
58 #[inline]
send_pending_rename_event(rename_event: &mut Option<Event>, event_fn: &dyn EventFn)59 fn send_pending_rename_event(rename_event: &mut Option<Event>, event_fn: &dyn EventFn) {
60     if let Some(e) = rename_event.take() {
61         event_fn(Ok(e));
62     }
63 }
64 
65 #[inline]
add_watch_by_event( path: &Option<PathBuf>, event: &inotify_sys::Event<&OsStr>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, add_watches: &mut Vec<PathBuf>, )66 fn add_watch_by_event(
67     path: &Option<PathBuf>,
68     event: &inotify_sys::Event<&OsStr>,
69     watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
70     add_watches: &mut Vec<PathBuf>,
71 ) {
72     if let Some(ref path) = *path {
73         if event.mask.contains(EventMask::ISDIR) {
74             if let Some(parent_path) = path.parent() {
75                 if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
76                     if is_recursive {
77                         add_watches.push(path.to_owned());
78                     }
79                 }
80             }
81         }
82     }
83 }
84 
85 #[inline]
remove_watch_by_event( path: &Option<PathBuf>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, remove_watches: &mut Vec<PathBuf>, )86 fn remove_watch_by_event(
87     path: &Option<PathBuf>,
88     watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
89     remove_watches: &mut Vec<PathBuf>,
90 ) {
91     if let Some(ref path) = *path {
92         if watches.contains_key(path) {
93             remove_watches.push(path.to_owned());
94         }
95     }
96 }
97 
98 impl EventLoop {
new(inotify: Inotify, event_fn: Box<dyn EventFn>) -> Result<Self>99     pub fn new(inotify: Inotify, event_fn: Box<dyn EventFn>) -> Result<Self> {
100         let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::<EventLoopMsg>();
101         let poll = mio::Poll::new()?;
102 
103         let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
104 
105         let inotify_fd = inotify.as_raw_fd();
106         let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
107         poll.registry()
108             .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
109 
110         let event_loop = EventLoop {
111             running: true,
112             poll,
113             event_loop_waker,
114             event_loop_tx,
115             event_loop_rx,
116             inotify: Some(inotify),
117             event_fn,
118             watches: HashMap::new(),
119             paths: HashMap::new(),
120             rename_event: None,
121         };
122         Ok(event_loop)
123     }
124 
125     // Run the event loop.
run(self)126     pub fn run(self) {
127         thread::spawn(|| self.event_loop_thread());
128     }
129 
event_loop_thread(mut self)130     fn event_loop_thread(mut self) {
131         let mut events = mio::Events::with_capacity(16);
132         loop {
133             // Wait for something to happen.
134             match self.poll.poll(&mut events, None) {
135                 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
136                     // System call was interrupted, we will retry
137                     // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
138                 }
139                 Err(e) => panic!("poll failed: {}", e),
140                 Ok(()) => {}
141             }
142 
143             // Process whatever happened.
144             for event in &events {
145                 self.handle_event(&event);
146             }
147 
148             // Stop, if we're done.
149             if !self.running {
150                 break;
151             }
152         }
153     }
154 
155     // Handle a single event.
handle_event(&mut self, event: &mio::event::Event)156     fn handle_event(&mut self, event: &mio::event::Event) {
157         match event.token() {
158             MESSAGE => {
159                 // The channel is readable - handle messages.
160                 self.handle_messages()
161             }
162             INOTIFY => {
163                 // inotify has something to tell us.
164                 self.handle_inotify()
165             }
166             _ => unreachable!(),
167         }
168     }
169 
handle_messages(&mut self)170     fn handle_messages(&mut self) {
171         while let Ok(msg) = self.event_loop_rx.try_recv() {
172             match msg {
173                 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
174                     let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
175                 }
176                 EventLoopMsg::RemoveWatch(path, tx) => {
177                     let _ = tx.send(self.remove_watch(path, false));
178                 }
179                 EventLoopMsg::Shutdown => {
180                     let _ = self.remove_all_watches();
181                     if let Some(inotify) = self.inotify.take() {
182                         let _ = inotify.close();
183                     }
184                     self.running = false;
185                     break;
186                 }
187                 EventLoopMsg::RenameTimeout(cookie) => {
188                     let current_cookie = self.rename_event.as_ref().and_then(|e| e.tracker());
189                     // send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
190                     if current_cookie == Some(cookie) {
191                         send_pending_rename_event(&mut self.rename_event, &*self.event_fn);
192                     }
193                 }
194                 EventLoopMsg::Configure(config, tx) => {
195                     self.configure_raw_mode(config, tx);
196                 }
197             }
198         }
199     }
200 
configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>)201     fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
202         tx.send(Ok(false))
203             .expect("configuration channel disconnected");
204     }
205 
handle_inotify(&mut self)206     fn handle_inotify(&mut self) {
207         let mut add_watches = Vec::new();
208         let mut remove_watches = Vec::new();
209 
210         if let Some(ref mut inotify) = self.inotify {
211             let mut buffer = [0; 1024];
212             // Read all buffers available.
213             loop {
214                 match inotify.read_events(&mut buffer) {
215                     Ok(events) => {
216                         let mut num_events = 0;
217                         for event in events {
218                             num_events += 1;
219                             if event.mask.contains(EventMask::Q_OVERFLOW) {
220                                 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
221                                 (self.event_fn)(ev);
222                             }
223 
224                             let path = match event.name {
225                                 Some(name) => {
226                                     self.paths.get(&event.wd).map(|root| root.join(&name))
227                                 }
228                                 None => self.paths.get(&event.wd).cloned(),
229                             };
230 
231                             if event.mask.contains(EventMask::MOVED_FROM) {
232                                 send_pending_rename_event(&mut self.rename_event, &*self.event_fn);
233                                 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
234                                 self.rename_event = Some(
235                                     Event::new(EventKind::Modify(ModifyKind::Name(
236                                         RenameMode::From,
237                                     )))
238                                     .add_some_path(path.clone())
239                                     .set_tracker(event.cookie as usize),
240                                 );
241                             } else {
242                                 let mut evs = Vec::new();
243                                 if event.mask.contains(EventMask::MOVED_TO) {
244                                     if let Some(e) = self.rename_event.take() {
245                                         if e.tracker() == Some(event.cookie as usize) {
246                                             (self.event_fn)(Ok(e.clone()));
247                                             evs.push(
248                                                 Event::new(EventKind::Modify(ModifyKind::Name(
249                                                     RenameMode::To,
250                                                 )))
251                                                 .set_tracker(event.cookie as usize)
252                                                 .add_some_path(path.clone()),
253                                             );
254                                             evs.push(
255                                                 Event::new(EventKind::Modify(ModifyKind::Name(
256                                                     RenameMode::Both,
257                                                 )))
258                                                 .set_tracker(event.cookie as usize)
259                                                 .add_some_path(e.paths.first().cloned())
260                                                 .add_some_path(path.clone()),
261                                             );
262                                         } else {
263                                             // TODO should it be rename?
264                                             evs.push(
265                                                 Event::new(EventKind::Create(
266                                                     if event.mask.contains(EventMask::ISDIR) {
267                                                         CreateKind::Folder
268                                                     } else {
269                                                         CreateKind::File
270                                                     },
271                                                 ))
272                                                 .add_some_path(path.clone()),
273                                             );
274                                         }
275                                     } else {
276                                         // TODO should it be rename?
277                                         evs.push(
278                                             Event::new(EventKind::Create(
279                                                 if event.mask.contains(EventMask::ISDIR) {
280                                                     CreateKind::Folder
281                                                 } else {
282                                                     CreateKind::File
283                                                 },
284                                             ))
285                                             .add_some_path(path.clone()),
286                                         );
287                                     }
288                                     add_watch_by_event(
289                                         &path,
290                                         &event,
291                                         &self.watches,
292                                         &mut add_watches,
293                                     );
294                                 }
295                                 if event.mask.contains(EventMask::MOVE_SELF) {
296                                     evs.push(
297                                         Event::new(EventKind::Modify(ModifyKind::Name(
298                                             RenameMode::From,
299                                         )))
300                                         .add_some_path(path.clone()),
301                                     );
302                                     // TODO stat the path and get to new path
303                                     // - emit To and Both events
304                                     // - change prefix for further events
305                                 }
306                                 if event.mask.contains(EventMask::CREATE) {
307                                     evs.push(
308                                         Event::new(EventKind::Create(
309                                             if event.mask.contains(EventMask::ISDIR) {
310                                                 CreateKind::Folder
311                                             } else {
312                                                 CreateKind::File
313                                             },
314                                         ))
315                                         .add_some_path(path.clone()),
316                                     );
317                                     add_watch_by_event(
318                                         &path,
319                                         &event,
320                                         &self.watches,
321                                         &mut add_watches,
322                                     );
323                                 }
324                                 if event.mask.contains(EventMask::DELETE_SELF)
325                                     || event.mask.contains(EventMask::DELETE)
326                                 {
327                                     evs.push(
328                                         Event::new(EventKind::Remove(
329                                             if event.mask.contains(EventMask::ISDIR) {
330                                                 RemoveKind::Folder
331                                             } else {
332                                                 RemoveKind::File
333                                             },
334                                         ))
335                                         .add_some_path(path.clone()),
336                                     );
337                                     remove_watch_by_event(
338                                         &path,
339                                         &self.watches,
340                                         &mut remove_watches,
341                                     );
342                                 }
343                                 if event.mask.contains(EventMask::MODIFY) {
344                                     evs.push(
345                                         Event::new(EventKind::Modify(ModifyKind::Data(
346                                             DataChange::Any,
347                                         )))
348                                         .add_some_path(path.clone()),
349                                     );
350                                 }
351                                 if event.mask.contains(EventMask::CLOSE_WRITE) {
352                                     evs.push(
353                                         Event::new(EventKind::Access(AccessKind::Close(
354                                             AccessMode::Write,
355                                         )))
356                                         .add_some_path(path.clone()),
357                                     );
358                                 }
359                                 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
360                                     evs.push(
361                                         Event::new(EventKind::Access(AccessKind::Close(
362                                             AccessMode::Read,
363                                         )))
364                                         .add_some_path(path.clone()),
365                                     );
366                                 }
367                                 if event.mask.contains(EventMask::ATTRIB) {
368                                     evs.push(
369                                         Event::new(EventKind::Modify(ModifyKind::Metadata(
370                                             MetadataKind::Any,
371                                         )))
372                                         .add_some_path(path.clone()),
373                                     );
374                                 }
375                                 if event.mask.contains(EventMask::OPEN) {
376                                     evs.push(
377                                         Event::new(EventKind::Access(AccessKind::Open(
378                                             AccessMode::Any,
379                                         )))
380                                         .add_some_path(path.clone()),
381                                     );
382                                 }
383 
384                                 if !evs.is_empty() {
385                                     send_pending_rename_event(
386                                         &mut self.rename_event,
387                                         &*self.event_fn,
388                                     );
389                                 }
390 
391                                 for ev in evs {
392                                     (self.event_fn)(Ok(ev));
393                                 }
394                             }
395                         }
396 
397                         // All events read. Break out.
398                         if num_events == 0 {
399                             break;
400                         }
401 
402                         // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear
403                         // whether the second part (IN_MOVED_TO) will arrive because the file or directory
404                         // could just have been moved out of the watched directory. So it's necessary to wait
405                         // for possible subsequent events in case it's a complete move event but also to make sure
406                         // that the first part of the event is handled in a timely manner in case no subsequent events arrive.
407                         // TODO: don't do this here, instead leave it entirely to the debounce
408                         // -> related to some rename events being reported as creates.
409 
410                         if let Some(ref rename_event) = self.rename_event {
411                             let event_loop_tx = self.event_loop_tx.clone();
412                             let waker = self.event_loop_waker.clone();
413                             let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie
414                             thread::spawn(move || {
415                                 thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
416                                 event_loop_tx
417                                     .send(EventLoopMsg::RenameTimeout(cookie))
418                                     .unwrap();
419                                 waker.wake().unwrap();
420                             });
421                         }
422                     }
423                     Err(e) => {
424                         (self.event_fn)(Err(Error::io(e)));
425                     }
426                 }
427             }
428         }
429 
430         for path in remove_watches {
431             self.remove_watch(path, true).ok();
432         }
433 
434         for path in add_watches {
435             self.add_watch(path, true, false).ok();
436         }
437     }
438 
add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()>439     fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
440         // If the watch is not recursive, or if we determine (by stat'ing the path to get its
441         // metadata) that the watched path is not a directory, add a single path watch.
442         if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
443             return self.add_single_watch(path, false, true);
444         }
445 
446         for entry in WalkDir::new(path)
447             .follow_links(true)
448             .into_iter()
449             .filter_map(filter_dir)
450         {
451             self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
452             watch_self = false;
453         }
454 
455         Ok(())
456     }
457 
add_single_watch( &mut self, path: PathBuf, is_recursive: bool, watch_self: bool, ) -> Result<()>458     fn add_single_watch(
459         &mut self,
460         path: PathBuf,
461         is_recursive: bool,
462         watch_self: bool,
463     ) -> Result<()> {
464         let mut watchmask = WatchMask::ATTRIB
465             | WatchMask::CREATE
466             | WatchMask::DELETE
467             | WatchMask::CLOSE_WRITE
468             | WatchMask::MODIFY
469             | WatchMask::MOVED_FROM
470             | WatchMask::MOVED_TO;
471 
472         if watch_self {
473             watchmask.insert(WatchMask::DELETE_SELF);
474             watchmask.insert(WatchMask::MOVE_SELF);
475         }
476 
477         if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
478             watchmask.insert(old_watchmask);
479             watchmask.insert(WatchMask::MASK_ADD);
480         }
481 
482         if let Some(ref mut inotify) = self.inotify {
483             match inotify.add_watch(&path, watchmask) {
484                 Err(e) => Err(Error::io(e)),
485                 Ok(w) => {
486                     watchmask.remove(WatchMask::MASK_ADD);
487                     self.watches
488                         .insert(path.clone(), (w.clone(), watchmask, is_recursive));
489                     self.paths.insert(w, path);
490                     Ok(())
491                 }
492             }
493         } else {
494             Ok(())
495         }
496     }
497 
remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()>498     fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
499         match self.watches.remove(&path) {
500             None => return Err(Error::watch_not_found()),
501             Some((w, _, is_recursive)) => {
502                 if let Some(ref mut inotify) = self.inotify {
503                     inotify.rm_watch(w.clone()).map_err(Error::io)?;
504                     self.paths.remove(&w);
505 
506                     if is_recursive || remove_recursive {
507                         let mut remove_list = Vec::new();
508                         for (w, p) in &self.paths {
509                             if p.starts_with(&path) {
510                                 inotify.rm_watch(w.clone()).map_err(Error::io)?;
511                                 self.watches.remove(p);
512                                 remove_list.push(w.clone());
513                             }
514                         }
515                         for w in remove_list {
516                             self.paths.remove(&w);
517                         }
518                     }
519                 }
520             }
521         }
522         Ok(())
523     }
524 
remove_all_watches(&mut self) -> Result<()>525     fn remove_all_watches(&mut self) -> Result<()> {
526         if let Some(ref mut inotify) = self.inotify {
527             for w in self.paths.keys() {
528                 inotify.rm_watch(w.clone()).map_err(Error::io)?;
529             }
530             self.watches.clear();
531             self.paths.clear();
532         }
533         Ok(())
534     }
535 }
536 
537 /// return `DirEntry` when it is a directory
filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry>538 fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
539     if let Ok(e) = e {
540         if let Ok(metadata) = e.metadata() {
541             if metadata.is_dir() {
542                 return Some(e);
543             }
544         }
545     }
546     None
547 }
548 
549 impl INotifyWatcher {
from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self>550     fn from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self> {
551         let inotify = Inotify::init()?;
552         let event_loop = EventLoop::new(inotify, event_fn)?;
553         let channel = event_loop.event_loop_tx.clone();
554         let waker = event_loop.event_loop_waker.clone();
555         event_loop.run();
556         Ok(INotifyWatcher { channel, waker })
557     }
558 
watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>559     fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
560         let pb = if path.is_absolute() {
561             path.to_owned()
562         } else {
563             let p = env::current_dir().map_err(Error::io)?;
564             p.join(path)
565         };
566         let (tx, rx) = unbounded();
567         let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
568 
569         // we expect the event loop to live and reply => unwraps must not panic
570         self.channel.send(msg).unwrap();
571         self.waker.wake().unwrap();
572         rx.recv().unwrap()
573     }
574 
unwatch_inner(&mut self, path: &Path) -> Result<()>575     fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
576         let pb = if path.is_absolute() {
577             path.to_owned()
578         } else {
579             let p = env::current_dir().map_err(Error::io)?;
580             p.join(path)
581         };
582         let (tx, rx) = unbounded();
583         let msg = EventLoopMsg::RemoveWatch(pb, tx);
584 
585         // we expect the event loop to live and reply => unwraps must not panic
586         self.channel.send(msg).unwrap();
587         self.waker.wake().unwrap();
588         rx.recv().unwrap()
589     }
590 }
591 
592 impl Watcher for INotifyWatcher {
new_immediate<F: EventFn>(event_fn: F) -> Result<INotifyWatcher>593     fn new_immediate<F: EventFn>(event_fn: F) -> Result<INotifyWatcher> {
594         INotifyWatcher::from_event_fn(Box::new(event_fn))
595     }
596 
watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()>597     fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
598         self.watch_inner(path.as_ref(), recursive_mode)
599     }
600 
unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>601     fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
602         self.unwatch_inner(path.as_ref())
603     }
604 
configure(&mut self, config: Config) -> Result<bool>605     fn configure(&mut self, config: Config) -> Result<bool> {
606         let (tx, rx) = bounded(1);
607         self.channel.send(EventLoopMsg::Configure(config, tx))?;
608         self.waker.wake()?;
609         rx.recv()?
610     }
611 }
612 
613 impl Drop for INotifyWatcher {
drop(&mut self)614     fn drop(&mut self) {
615         // we expect the event loop to live => unwrap must not panic
616         self.channel.send(EventLoopMsg::Shutdown).unwrap();
617         self.waker.wake().unwrap();
618     }
619 }
620