1 //! Watcher implementation for the inotify Linux API
2 //!
3 //! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to
4 //! monitor individual files, or to monitor directories. When a directory is monitored, inotify
5 //! will return events for the directory itself, and for files inside the directory.
6
7 use super::event::*;
8 use super::{Config, Error, EventFn, RecursiveMode, Result, Watcher};
9 use crossbeam_channel::{bounded, unbounded, Sender};
10 use inotify as inotify_sys;
11 use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12 use std::collections::HashMap;
13 use std::env;
14 use std::ffi::OsStr;
15 use std::fs::metadata;
16 use std::os::unix::io::AsRawFd;
17 use std::path::{Path, PathBuf};
18 use std::sync::Arc;
19 use std::thread;
20 use std::time::Duration;
21 use walkdir::WalkDir;
22
23 const INOTIFY: mio::Token = mio::Token(0);
24 const MESSAGE: mio::Token = mio::Token(1);
25
26 // The EventLoop will set up a mio::Poll and use it to wait for the following:
27 //
28 // - messages telling it what to do
29 //
30 // - events telling it that something has happened on one of the watched files.
31 struct EventLoop {
32 running: bool,
33 poll: mio::Poll,
34 event_loop_waker: Arc<mio::Waker>,
35 event_loop_tx: crossbeam_channel::Sender<EventLoopMsg>,
36 event_loop_rx: crossbeam_channel::Receiver<EventLoopMsg>,
37 inotify: Option<Inotify>,
38 event_fn: Box<dyn EventFn>,
39 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
40 paths: HashMap<WatchDescriptor, PathBuf>,
41 rename_event: Option<Event>,
42 }
43
44 /// Watcher implementation based on inotify
45 pub struct INotifyWatcher {
46 channel: crossbeam_channel::Sender<EventLoopMsg>,
47 waker: Arc<mio::Waker>,
48 }
49
50 enum EventLoopMsg {
51 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
52 RemoveWatch(PathBuf, Sender<Result<()>>),
53 Shutdown,
54 RenameTimeout(usize),
55 Configure(Config, Sender<Result<bool>>),
56 }
57
58 #[inline]
send_pending_rename_event(rename_event: &mut Option<Event>, event_fn: &dyn EventFn)59 fn send_pending_rename_event(rename_event: &mut Option<Event>, event_fn: &dyn EventFn) {
60 if let Some(e) = rename_event.take() {
61 event_fn(Ok(e));
62 }
63 }
64
65 #[inline]
add_watch_by_event( path: &Option<PathBuf>, event: &inotify_sys::Event<&OsStr>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, add_watches: &mut Vec<PathBuf>, )66 fn add_watch_by_event(
67 path: &Option<PathBuf>,
68 event: &inotify_sys::Event<&OsStr>,
69 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
70 add_watches: &mut Vec<PathBuf>,
71 ) {
72 if let Some(ref path) = *path {
73 if event.mask.contains(EventMask::ISDIR) {
74 if let Some(parent_path) = path.parent() {
75 if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
76 if is_recursive {
77 add_watches.push(path.to_owned());
78 }
79 }
80 }
81 }
82 }
83 }
84
85 #[inline]
remove_watch_by_event( path: &Option<PathBuf>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, remove_watches: &mut Vec<PathBuf>, )86 fn remove_watch_by_event(
87 path: &Option<PathBuf>,
88 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
89 remove_watches: &mut Vec<PathBuf>,
90 ) {
91 if let Some(ref path) = *path {
92 if watches.contains_key(path) {
93 remove_watches.push(path.to_owned());
94 }
95 }
96 }
97
98 impl EventLoop {
new(inotify: Inotify, event_fn: Box<dyn EventFn>) -> Result<Self>99 pub fn new(inotify: Inotify, event_fn: Box<dyn EventFn>) -> Result<Self> {
100 let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::<EventLoopMsg>();
101 let poll = mio::Poll::new()?;
102
103 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
104
105 let inotify_fd = inotify.as_raw_fd();
106 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
107 poll.registry()
108 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
109
110 let event_loop = EventLoop {
111 running: true,
112 poll,
113 event_loop_waker,
114 event_loop_tx,
115 event_loop_rx,
116 inotify: Some(inotify),
117 event_fn,
118 watches: HashMap::new(),
119 paths: HashMap::new(),
120 rename_event: None,
121 };
122 Ok(event_loop)
123 }
124
125 // Run the event loop.
run(self)126 pub fn run(self) {
127 thread::spawn(|| self.event_loop_thread());
128 }
129
event_loop_thread(mut self)130 fn event_loop_thread(mut self) {
131 let mut events = mio::Events::with_capacity(16);
132 loop {
133 // Wait for something to happen.
134 match self.poll.poll(&mut events, None) {
135 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
136 // System call was interrupted, we will retry
137 // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
138 }
139 Err(e) => panic!("poll failed: {}", e),
140 Ok(()) => {}
141 }
142
143 // Process whatever happened.
144 for event in &events {
145 self.handle_event(&event);
146 }
147
148 // Stop, if we're done.
149 if !self.running {
150 break;
151 }
152 }
153 }
154
155 // Handle a single event.
handle_event(&mut self, event: &mio::event::Event)156 fn handle_event(&mut self, event: &mio::event::Event) {
157 match event.token() {
158 MESSAGE => {
159 // The channel is readable - handle messages.
160 self.handle_messages()
161 }
162 INOTIFY => {
163 // inotify has something to tell us.
164 self.handle_inotify()
165 }
166 _ => unreachable!(),
167 }
168 }
169
handle_messages(&mut self)170 fn handle_messages(&mut self) {
171 while let Ok(msg) = self.event_loop_rx.try_recv() {
172 match msg {
173 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
174 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
175 }
176 EventLoopMsg::RemoveWatch(path, tx) => {
177 let _ = tx.send(self.remove_watch(path, false));
178 }
179 EventLoopMsg::Shutdown => {
180 let _ = self.remove_all_watches();
181 if let Some(inotify) = self.inotify.take() {
182 let _ = inotify.close();
183 }
184 self.running = false;
185 break;
186 }
187 EventLoopMsg::RenameTimeout(cookie) => {
188 let current_cookie = self.rename_event.as_ref().and_then(|e| e.tracker());
189 // send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
190 if current_cookie == Some(cookie) {
191 send_pending_rename_event(&mut self.rename_event, &*self.event_fn);
192 }
193 }
194 EventLoopMsg::Configure(config, tx) => {
195 self.configure_raw_mode(config, tx);
196 }
197 }
198 }
199 }
200
configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>)201 fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
202 tx.send(Ok(false))
203 .expect("configuration channel disconnected");
204 }
205
handle_inotify(&mut self)206 fn handle_inotify(&mut self) {
207 let mut add_watches = Vec::new();
208 let mut remove_watches = Vec::new();
209
210 if let Some(ref mut inotify) = self.inotify {
211 let mut buffer = [0; 1024];
212 // Read all buffers available.
213 loop {
214 match inotify.read_events(&mut buffer) {
215 Ok(events) => {
216 let mut num_events = 0;
217 for event in events {
218 num_events += 1;
219 if event.mask.contains(EventMask::Q_OVERFLOW) {
220 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
221 (self.event_fn)(ev);
222 }
223
224 let path = match event.name {
225 Some(name) => {
226 self.paths.get(&event.wd).map(|root| root.join(&name))
227 }
228 None => self.paths.get(&event.wd).cloned(),
229 };
230
231 if event.mask.contains(EventMask::MOVED_FROM) {
232 send_pending_rename_event(&mut self.rename_event, &*self.event_fn);
233 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
234 self.rename_event = Some(
235 Event::new(EventKind::Modify(ModifyKind::Name(
236 RenameMode::From,
237 )))
238 .add_some_path(path.clone())
239 .set_tracker(event.cookie as usize),
240 );
241 } else {
242 let mut evs = Vec::new();
243 if event.mask.contains(EventMask::MOVED_TO) {
244 if let Some(e) = self.rename_event.take() {
245 if e.tracker() == Some(event.cookie as usize) {
246 (self.event_fn)(Ok(e.clone()));
247 evs.push(
248 Event::new(EventKind::Modify(ModifyKind::Name(
249 RenameMode::To,
250 )))
251 .set_tracker(event.cookie as usize)
252 .add_some_path(path.clone()),
253 );
254 evs.push(
255 Event::new(EventKind::Modify(ModifyKind::Name(
256 RenameMode::Both,
257 )))
258 .set_tracker(event.cookie as usize)
259 .add_some_path(e.paths.first().cloned())
260 .add_some_path(path.clone()),
261 );
262 } else {
263 // TODO should it be rename?
264 evs.push(
265 Event::new(EventKind::Create(
266 if event.mask.contains(EventMask::ISDIR) {
267 CreateKind::Folder
268 } else {
269 CreateKind::File
270 },
271 ))
272 .add_some_path(path.clone()),
273 );
274 }
275 } else {
276 // TODO should it be rename?
277 evs.push(
278 Event::new(EventKind::Create(
279 if event.mask.contains(EventMask::ISDIR) {
280 CreateKind::Folder
281 } else {
282 CreateKind::File
283 },
284 ))
285 .add_some_path(path.clone()),
286 );
287 }
288 add_watch_by_event(
289 &path,
290 &event,
291 &self.watches,
292 &mut add_watches,
293 );
294 }
295 if event.mask.contains(EventMask::MOVE_SELF) {
296 evs.push(
297 Event::new(EventKind::Modify(ModifyKind::Name(
298 RenameMode::From,
299 )))
300 .add_some_path(path.clone()),
301 );
302 // TODO stat the path and get to new path
303 // - emit To and Both events
304 // - change prefix for further events
305 }
306 if event.mask.contains(EventMask::CREATE) {
307 evs.push(
308 Event::new(EventKind::Create(
309 if event.mask.contains(EventMask::ISDIR) {
310 CreateKind::Folder
311 } else {
312 CreateKind::File
313 },
314 ))
315 .add_some_path(path.clone()),
316 );
317 add_watch_by_event(
318 &path,
319 &event,
320 &self.watches,
321 &mut add_watches,
322 );
323 }
324 if event.mask.contains(EventMask::DELETE_SELF)
325 || event.mask.contains(EventMask::DELETE)
326 {
327 evs.push(
328 Event::new(EventKind::Remove(
329 if event.mask.contains(EventMask::ISDIR) {
330 RemoveKind::Folder
331 } else {
332 RemoveKind::File
333 },
334 ))
335 .add_some_path(path.clone()),
336 );
337 remove_watch_by_event(
338 &path,
339 &self.watches,
340 &mut remove_watches,
341 );
342 }
343 if event.mask.contains(EventMask::MODIFY) {
344 evs.push(
345 Event::new(EventKind::Modify(ModifyKind::Data(
346 DataChange::Any,
347 )))
348 .add_some_path(path.clone()),
349 );
350 }
351 if event.mask.contains(EventMask::CLOSE_WRITE) {
352 evs.push(
353 Event::new(EventKind::Access(AccessKind::Close(
354 AccessMode::Write,
355 )))
356 .add_some_path(path.clone()),
357 );
358 }
359 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
360 evs.push(
361 Event::new(EventKind::Access(AccessKind::Close(
362 AccessMode::Read,
363 )))
364 .add_some_path(path.clone()),
365 );
366 }
367 if event.mask.contains(EventMask::ATTRIB) {
368 evs.push(
369 Event::new(EventKind::Modify(ModifyKind::Metadata(
370 MetadataKind::Any,
371 )))
372 .add_some_path(path.clone()),
373 );
374 }
375 if event.mask.contains(EventMask::OPEN) {
376 evs.push(
377 Event::new(EventKind::Access(AccessKind::Open(
378 AccessMode::Any,
379 )))
380 .add_some_path(path.clone()),
381 );
382 }
383
384 if !evs.is_empty() {
385 send_pending_rename_event(
386 &mut self.rename_event,
387 &*self.event_fn,
388 );
389 }
390
391 for ev in evs {
392 (self.event_fn)(Ok(ev));
393 }
394 }
395 }
396
397 // All events read. Break out.
398 if num_events == 0 {
399 break;
400 }
401
402 // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear
403 // whether the second part (IN_MOVED_TO) will arrive because the file or directory
404 // could just have been moved out of the watched directory. So it's necessary to wait
405 // for possible subsequent events in case it's a complete move event but also to make sure
406 // that the first part of the event is handled in a timely manner in case no subsequent events arrive.
407 // TODO: don't do this here, instead leave it entirely to the debounce
408 // -> related to some rename events being reported as creates.
409
410 if let Some(ref rename_event) = self.rename_event {
411 let event_loop_tx = self.event_loop_tx.clone();
412 let waker = self.event_loop_waker.clone();
413 let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie
414 thread::spawn(move || {
415 thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
416 event_loop_tx
417 .send(EventLoopMsg::RenameTimeout(cookie))
418 .unwrap();
419 waker.wake().unwrap();
420 });
421 }
422 }
423 Err(e) => {
424 (self.event_fn)(Err(Error::io(e)));
425 }
426 }
427 }
428 }
429
430 for path in remove_watches {
431 self.remove_watch(path, true).ok();
432 }
433
434 for path in add_watches {
435 self.add_watch(path, true, false).ok();
436 }
437 }
438
add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()>439 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
440 // If the watch is not recursive, or if we determine (by stat'ing the path to get its
441 // metadata) that the watched path is not a directory, add a single path watch.
442 if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
443 return self.add_single_watch(path, false, true);
444 }
445
446 for entry in WalkDir::new(path)
447 .follow_links(true)
448 .into_iter()
449 .filter_map(filter_dir)
450 {
451 self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
452 watch_self = false;
453 }
454
455 Ok(())
456 }
457
add_single_watch( &mut self, path: PathBuf, is_recursive: bool, watch_self: bool, ) -> Result<()>458 fn add_single_watch(
459 &mut self,
460 path: PathBuf,
461 is_recursive: bool,
462 watch_self: bool,
463 ) -> Result<()> {
464 let mut watchmask = WatchMask::ATTRIB
465 | WatchMask::CREATE
466 | WatchMask::DELETE
467 | WatchMask::CLOSE_WRITE
468 | WatchMask::MODIFY
469 | WatchMask::MOVED_FROM
470 | WatchMask::MOVED_TO;
471
472 if watch_self {
473 watchmask.insert(WatchMask::DELETE_SELF);
474 watchmask.insert(WatchMask::MOVE_SELF);
475 }
476
477 if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
478 watchmask.insert(old_watchmask);
479 watchmask.insert(WatchMask::MASK_ADD);
480 }
481
482 if let Some(ref mut inotify) = self.inotify {
483 match inotify.add_watch(&path, watchmask) {
484 Err(e) => Err(Error::io(e)),
485 Ok(w) => {
486 watchmask.remove(WatchMask::MASK_ADD);
487 self.watches
488 .insert(path.clone(), (w.clone(), watchmask, is_recursive));
489 self.paths.insert(w, path);
490 Ok(())
491 }
492 }
493 } else {
494 Ok(())
495 }
496 }
497
remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()>498 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
499 match self.watches.remove(&path) {
500 None => return Err(Error::watch_not_found()),
501 Some((w, _, is_recursive)) => {
502 if let Some(ref mut inotify) = self.inotify {
503 inotify.rm_watch(w.clone()).map_err(Error::io)?;
504 self.paths.remove(&w);
505
506 if is_recursive || remove_recursive {
507 let mut remove_list = Vec::new();
508 for (w, p) in &self.paths {
509 if p.starts_with(&path) {
510 inotify.rm_watch(w.clone()).map_err(Error::io)?;
511 self.watches.remove(p);
512 remove_list.push(w.clone());
513 }
514 }
515 for w in remove_list {
516 self.paths.remove(&w);
517 }
518 }
519 }
520 }
521 }
522 Ok(())
523 }
524
remove_all_watches(&mut self) -> Result<()>525 fn remove_all_watches(&mut self) -> Result<()> {
526 if let Some(ref mut inotify) = self.inotify {
527 for w in self.paths.keys() {
528 inotify.rm_watch(w.clone()).map_err(Error::io)?;
529 }
530 self.watches.clear();
531 self.paths.clear();
532 }
533 Ok(())
534 }
535 }
536
537 /// return `DirEntry` when it is a directory
filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry>538 fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
539 if let Ok(e) = e {
540 if let Ok(metadata) = e.metadata() {
541 if metadata.is_dir() {
542 return Some(e);
543 }
544 }
545 }
546 None
547 }
548
549 impl INotifyWatcher {
from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self>550 fn from_event_fn(event_fn: Box<dyn EventFn>) -> Result<Self> {
551 let inotify = Inotify::init()?;
552 let event_loop = EventLoop::new(inotify, event_fn)?;
553 let channel = event_loop.event_loop_tx.clone();
554 let waker = event_loop.event_loop_waker.clone();
555 event_loop.run();
556 Ok(INotifyWatcher { channel, waker })
557 }
558
watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>559 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
560 let pb = if path.is_absolute() {
561 path.to_owned()
562 } else {
563 let p = env::current_dir().map_err(Error::io)?;
564 p.join(path)
565 };
566 let (tx, rx) = unbounded();
567 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
568
569 // we expect the event loop to live and reply => unwraps must not panic
570 self.channel.send(msg).unwrap();
571 self.waker.wake().unwrap();
572 rx.recv().unwrap()
573 }
574
unwatch_inner(&mut self, path: &Path) -> Result<()>575 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
576 let pb = if path.is_absolute() {
577 path.to_owned()
578 } else {
579 let p = env::current_dir().map_err(Error::io)?;
580 p.join(path)
581 };
582 let (tx, rx) = unbounded();
583 let msg = EventLoopMsg::RemoveWatch(pb, tx);
584
585 // we expect the event loop to live and reply => unwraps must not panic
586 self.channel.send(msg).unwrap();
587 self.waker.wake().unwrap();
588 rx.recv().unwrap()
589 }
590 }
591
592 impl Watcher for INotifyWatcher {
new_immediate<F: EventFn>(event_fn: F) -> Result<INotifyWatcher>593 fn new_immediate<F: EventFn>(event_fn: F) -> Result<INotifyWatcher> {
594 INotifyWatcher::from_event_fn(Box::new(event_fn))
595 }
596
watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()>597 fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
598 self.watch_inner(path.as_ref(), recursive_mode)
599 }
600
unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>601 fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
602 self.unwatch_inner(path.as_ref())
603 }
604
configure(&mut self, config: Config) -> Result<bool>605 fn configure(&mut self, config: Config) -> Result<bool> {
606 let (tx, rx) = bounded(1);
607 self.channel.send(EventLoopMsg::Configure(config, tx))?;
608 self.waker.wake()?;
609 rx.recv()?
610 }
611 }
612
613 impl Drop for INotifyWatcher {
drop(&mut self)614 fn drop(&mut self) {
615 // we expect the event loop to live => unwrap must not panic
616 self.channel.send(EventLoopMsg::Shutdown).unwrap();
617 self.waker.wake().unwrap();
618 }
619 }
620