1 //! Watcher implementation for the inotify Linux API
2 //!
3 //! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to
4 //! monitor individual files, or to monitor directories. When a directory is monitored, inotify
5 //! will return events for the directory itself, and for files inside the directory.
6
7 use super::event::*;
8 use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9 use crossbeam_channel::{bounded, unbounded, Sender};
10 use inotify as inotify_sys;
11 use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12 use std::collections::HashMap;
13 use std::env;
14 use std::ffi::OsStr;
15 use std::fs::metadata;
16 use std::os::unix::io::AsRawFd;
17 use std::path::{Path, PathBuf};
18 use std::sync::Arc;
19 use std::thread;
20 use std::time::Duration;
21 use walkdir::WalkDir;
22
23 const INOTIFY: mio::Token = mio::Token(0);
24 const MESSAGE: mio::Token = mio::Token(1);
25
26 // The EventLoop will set up a mio::Poll and use it to wait for the following:
27 //
28 // - messages telling it what to do
29 //
30 // - events telling it that something has happened on one of the watched files.
31 struct EventLoop {
32 running: bool,
33 poll: mio::Poll,
34 event_loop_waker: Arc<mio::Waker>,
35 event_loop_tx: crossbeam_channel::Sender<EventLoopMsg>,
36 event_loop_rx: crossbeam_channel::Receiver<EventLoopMsg>,
37 inotify: Option<Inotify>,
38 event_handler: Box<dyn EventHandler>,
39 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
40 paths: HashMap<WatchDescriptor, PathBuf>,
41 rename_event: Option<Event>,
42 }
43
44 /// Watcher implementation based on inotify
45 pub struct INotifyWatcher {
46 channel: crossbeam_channel::Sender<EventLoopMsg>,
47 waker: Arc<mio::Waker>,
48 }
49
50 enum EventLoopMsg {
51 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
52 RemoveWatch(PathBuf, Sender<Result<()>>),
53 Shutdown,
54 RenameTimeout(usize),
55 Configure(Config, Sender<Result<bool>>),
56 }
57
58 #[inline]
send_pending_rename_event(rename_event: &mut Option<Event>, event_handler: &mut dyn EventHandler)59 fn send_pending_rename_event(rename_event: &mut Option<Event>, event_handler: &mut dyn EventHandler) {
60 if let Some(e) = rename_event.take() {
61 event_handler.handle_event(Ok(e));
62 }
63 }
64
65 #[inline]
add_watch_by_event( path: &Option<PathBuf>, event: &inotify_sys::Event<&OsStr>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, add_watches: &mut Vec<PathBuf>, )66 fn add_watch_by_event(
67 path: &Option<PathBuf>,
68 event: &inotify_sys::Event<&OsStr>,
69 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
70 add_watches: &mut Vec<PathBuf>,
71 ) {
72 if let Some(ref path) = *path {
73 if event.mask.contains(EventMask::ISDIR) {
74 if let Some(parent_path) = path.parent() {
75 if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
76 if is_recursive {
77 add_watches.push(path.to_owned());
78 }
79 }
80 }
81 }
82 }
83 }
84
85 #[inline]
remove_watch_by_event( path: &Option<PathBuf>, watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>, remove_watches: &mut Vec<PathBuf>, )86 fn remove_watch_by_event(
87 path: &Option<PathBuf>,
88 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
89 remove_watches: &mut Vec<PathBuf>,
90 ) {
91 if let Some(ref path) = *path {
92 if watches.contains_key(path) {
93 remove_watches.push(path.to_owned());
94 }
95 }
96 }
97
98 impl EventLoop {
new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self>99 pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
100 let (event_loop_tx, event_loop_rx) = crossbeam_channel::unbounded::<EventLoopMsg>();
101 let poll = mio::Poll::new()?;
102
103 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
104
105 let inotify_fd = inotify.as_raw_fd();
106 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
107 poll.registry()
108 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
109
110 let event_loop = EventLoop {
111 running: true,
112 poll,
113 event_loop_waker,
114 event_loop_tx,
115 event_loop_rx,
116 inotify: Some(inotify),
117 event_handler,
118 watches: HashMap::new(),
119 paths: HashMap::new(),
120 rename_event: None,
121 };
122 Ok(event_loop)
123 }
124
125 // Run the event loop.
run(self)126 pub fn run(self) {
127 thread::spawn(|| self.event_loop_thread());
128 }
129
event_loop_thread(mut self)130 fn event_loop_thread(mut self) {
131 let mut events = mio::Events::with_capacity(16);
132 loop {
133 // Wait for something to happen.
134 match self.poll.poll(&mut events, None) {
135 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
136 // System call was interrupted, we will retry
137 // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
138 }
139 Err(e) => panic!("poll failed: {}", e),
140 Ok(()) => {}
141 }
142
143 // Process whatever happened.
144 for event in &events {
145 self.handle_event(event);
146 }
147
148 // Stop, if we're done.
149 if !self.running {
150 break;
151 }
152 }
153 }
154
155 // Handle a single event.
handle_event(&mut self, event: &mio::event::Event)156 fn handle_event(&mut self, event: &mio::event::Event) {
157 match event.token() {
158 MESSAGE => {
159 // The channel is readable - handle messages.
160 self.handle_messages()
161 }
162 INOTIFY => {
163 // inotify has something to tell us.
164 self.handle_inotify()
165 }
166 _ => unreachable!(),
167 }
168 }
169
handle_messages(&mut self)170 fn handle_messages(&mut self) {
171 while let Ok(msg) = self.event_loop_rx.try_recv() {
172 match msg {
173 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
174 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
175 }
176 EventLoopMsg::RemoveWatch(path, tx) => {
177 let _ = tx.send(self.remove_watch(path, false));
178 }
179 EventLoopMsg::Shutdown => {
180 let _ = self.remove_all_watches();
181 if let Some(inotify) = self.inotify.take() {
182 let _ = inotify.close();
183 }
184 self.running = false;
185 break;
186 }
187 EventLoopMsg::RenameTimeout(cookie) => {
188 let current_cookie = self.rename_event.as_ref().and_then(|e| e.tracker());
189 // send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
190 if current_cookie == Some(cookie) {
191 send_pending_rename_event(&mut self.rename_event, &mut *self.event_handler);
192 }
193 }
194 EventLoopMsg::Configure(config, tx) => {
195 self.configure_raw_mode(config, tx);
196 }
197 }
198 }
199 }
200
configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>)201 fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
202 tx.send(Ok(false))
203 .expect("configuration channel disconnected");
204 }
205
handle_inotify(&mut self)206 fn handle_inotify(&mut self) {
207 let mut add_watches = Vec::new();
208 let mut remove_watches = Vec::new();
209
210 if let Some(ref mut inotify) = self.inotify {
211 let mut buffer = [0; 1024];
212 // Read all buffers available.
213 loop {
214 match inotify.read_events(&mut buffer) {
215 Ok(events) => {
216 let mut num_events = 0;
217 for event in events {
218 num_events += 1;
219 if event.mask.contains(EventMask::Q_OVERFLOW) {
220 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
221 self.event_handler.handle_event(ev);
222 }
223
224 let path = match event.name {
225 Some(name) => {
226 self.paths.get(&event.wd).map(|root| root.join(&name))
227 }
228 None => self.paths.get(&event.wd).cloned(),
229 };
230
231 if event.mask.contains(EventMask::MOVED_FROM) {
232 send_pending_rename_event(
233 &mut self.rename_event,
234 &mut *self.event_handler,
235 );
236 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
237 self.rename_event = Some(
238 Event::new(EventKind::Modify(ModifyKind::Name(
239 RenameMode::From,
240 )))
241 .add_some_path(path.clone())
242 .set_tracker(event.cookie as usize),
243 );
244 } else {
245 let mut evs = Vec::new();
246 if event.mask.contains(EventMask::MOVED_TO) {
247 if let Some(e) = self.rename_event.take() {
248 if e.tracker() == Some(event.cookie as usize) {
249 self.event_handler.handle_event(Ok(e.clone()));
250 evs.push(
251 Event::new(EventKind::Modify(ModifyKind::Name(
252 RenameMode::To,
253 )))
254 .set_tracker(event.cookie as usize)
255 .add_some_path(path.clone()),
256 );
257 evs.push(
258 Event::new(EventKind::Modify(ModifyKind::Name(
259 RenameMode::Both,
260 )))
261 .set_tracker(event.cookie as usize)
262 .add_some_path(e.paths.first().cloned())
263 .add_some_path(path.clone()),
264 );
265 } else {
266 // TODO should it be rename?
267 evs.push(
268 Event::new(EventKind::Create(
269 if event.mask.contains(EventMask::ISDIR) {
270 CreateKind::Folder
271 } else {
272 CreateKind::File
273 },
274 ))
275 .add_some_path(path.clone()),
276 );
277 }
278 } else {
279 // TODO should it be rename?
280 evs.push(
281 Event::new(EventKind::Create(
282 if event.mask.contains(EventMask::ISDIR) {
283 CreateKind::Folder
284 } else {
285 CreateKind::File
286 },
287 ))
288 .add_some_path(path.clone()),
289 );
290 }
291 add_watch_by_event(
292 &path,
293 &event,
294 &self.watches,
295 &mut add_watches,
296 );
297 }
298 if event.mask.contains(EventMask::MOVE_SELF) {
299 evs.push(
300 Event::new(EventKind::Modify(ModifyKind::Name(
301 RenameMode::From,
302 )))
303 .add_some_path(path.clone()),
304 );
305 // TODO stat the path and get to new path
306 // - emit To and Both events
307 // - change prefix for further events
308 }
309 if event.mask.contains(EventMask::CREATE) {
310 evs.push(
311 Event::new(EventKind::Create(
312 if event.mask.contains(EventMask::ISDIR) {
313 CreateKind::Folder
314 } else {
315 CreateKind::File
316 },
317 ))
318 .add_some_path(path.clone()),
319 );
320 add_watch_by_event(
321 &path,
322 &event,
323 &self.watches,
324 &mut add_watches,
325 );
326 }
327 if event.mask.contains(EventMask::DELETE_SELF)
328 || event.mask.contains(EventMask::DELETE)
329 {
330 evs.push(
331 Event::new(EventKind::Remove(
332 if event.mask.contains(EventMask::ISDIR) {
333 RemoveKind::Folder
334 } else {
335 RemoveKind::File
336 },
337 ))
338 .add_some_path(path.clone()),
339 );
340 remove_watch_by_event(
341 &path,
342 &self.watches,
343 &mut remove_watches,
344 );
345 }
346 if event.mask.contains(EventMask::MODIFY) {
347 evs.push(
348 Event::new(EventKind::Modify(ModifyKind::Data(
349 DataChange::Any,
350 )))
351 .add_some_path(path.clone()),
352 );
353 }
354 if event.mask.contains(EventMask::CLOSE_WRITE) {
355 evs.push(
356 Event::new(EventKind::Access(AccessKind::Close(
357 AccessMode::Write,
358 )))
359 .add_some_path(path.clone()),
360 );
361 }
362 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
363 evs.push(
364 Event::new(EventKind::Access(AccessKind::Close(
365 AccessMode::Read,
366 )))
367 .add_some_path(path.clone()),
368 );
369 }
370 if event.mask.contains(EventMask::ATTRIB) {
371 evs.push(
372 Event::new(EventKind::Modify(ModifyKind::Metadata(
373 MetadataKind::Any,
374 )))
375 .add_some_path(path.clone()),
376 );
377 }
378 if event.mask.contains(EventMask::OPEN) {
379 evs.push(
380 Event::new(EventKind::Access(AccessKind::Open(
381 AccessMode::Any,
382 )))
383 .add_some_path(path.clone()),
384 );
385 }
386
387 if !evs.is_empty() {
388 send_pending_rename_event(
389 &mut self.rename_event,
390 &mut *self.event_handler,
391 );
392 }
393
394 for ev in evs {
395 self.event_handler.handle_event(Ok(ev));
396 }
397 }
398 }
399
400 // All events read. Break out.
401 if num_events == 0 {
402 break;
403 }
404
405 // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear
406 // whether the second part (IN_MOVED_TO) will arrive because the file or directory
407 // could just have been moved out of the watched directory. So it's necessary to wait
408 // for possible subsequent events in case it's a complete move event but also to make sure
409 // that the first part of the event is handled in a timely manner in case no subsequent events arrive.
410 // TODO: don't do this here, instead leave it entirely to the debounce
411 // -> related to some rename events being reported as creates.
412
413 if let Some(ref rename_event) = self.rename_event {
414 let event_loop_tx = self.event_loop_tx.clone();
415 let waker = self.event_loop_waker.clone();
416 let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie
417 thread::spawn(move || {
418 thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
419 event_loop_tx
420 .send(EventLoopMsg::RenameTimeout(cookie))
421 .unwrap();
422 waker.wake().unwrap();
423 });
424 }
425 }
426 Err(e) => {
427 self.event_handler.handle_event(Err(Error::io(e)));
428 }
429 }
430 }
431 }
432
433 for path in remove_watches {
434 self.remove_watch(path, true).ok();
435 }
436
437 for path in add_watches {
438 self.add_watch(path, true, false).ok();
439 }
440 }
441
add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()>442 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
443 // If the watch is not recursive, or if we determine (by stat'ing the path to get its
444 // metadata) that the watched path is not a directory, add a single path watch.
445 if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
446 return self.add_single_watch(path, false, true);
447 }
448
449 for entry in WalkDir::new(path)
450 .follow_links(true)
451 .into_iter()
452 .filter_map(filter_dir)
453 {
454 self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
455 watch_self = false;
456 }
457
458 Ok(())
459 }
460
add_single_watch( &mut self, path: PathBuf, is_recursive: bool, watch_self: bool, ) -> Result<()>461 fn add_single_watch(
462 &mut self,
463 path: PathBuf,
464 is_recursive: bool,
465 watch_self: bool,
466 ) -> Result<()> {
467 let mut watchmask = WatchMask::ATTRIB
468 | WatchMask::CREATE
469 | WatchMask::DELETE
470 | WatchMask::CLOSE_WRITE
471 | WatchMask::MODIFY
472 | WatchMask::MOVED_FROM
473 | WatchMask::MOVED_TO;
474
475 if watch_self {
476 watchmask.insert(WatchMask::DELETE_SELF);
477 watchmask.insert(WatchMask::MOVE_SELF);
478 }
479
480 if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
481 watchmask.insert(old_watchmask);
482 watchmask.insert(WatchMask::MASK_ADD);
483 }
484
485 if let Some(ref mut inotify) = self.inotify {
486 match inotify.add_watch(&path, watchmask) {
487 Err(e) => {
488 Err(if e.raw_os_error() == Some(libc::ENOSPC) {
489 // do not report inotify limits as "no more space" on linux #266
490 Error::new(ErrorKind::MaxFilesWatch)
491 } else {
492 Error::io(e)
493 }.add_path(path))
494 }
495 Ok(w) => {
496 watchmask.remove(WatchMask::MASK_ADD);
497 self.watches
498 .insert(path.clone(), (w.clone(), watchmask, is_recursive));
499 self.paths.insert(w, path);
500 Ok(())
501 }
502 }
503 } else {
504 Ok(())
505 }
506 }
507
remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()>508 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
509 match self.watches.remove(&path) {
510 None => return Err(Error::watch_not_found().add_path(path)),
511 Some((w, _, is_recursive)) => {
512 if let Some(ref mut inotify) = self.inotify {
513 inotify.rm_watch(w.clone()).map_err(|e| Error::io(e).add_path(path.clone()))?;
514 self.paths.remove(&w);
515
516 if is_recursive || remove_recursive {
517 let mut remove_list = Vec::new();
518 for (w, p) in &self.paths {
519 if p.starts_with(&path) {
520 inotify.rm_watch(w.clone()).map_err(|e| Error::io(e).add_path(p.into()))?;
521 self.watches.remove(p);
522 remove_list.push(w.clone());
523 }
524 }
525 for w in remove_list {
526 self.paths.remove(&w);
527 }
528 }
529 }
530 }
531 }
532 Ok(())
533 }
534
remove_all_watches(&mut self) -> Result<()>535 fn remove_all_watches(&mut self) -> Result<()> {
536 if let Some(ref mut inotify) = self.inotify {
537 for (w, p) in &self.paths {
538 inotify.rm_watch(w.clone()).map_err(|e| Error::io(e).add_path(p.into()))?;
539 }
540 self.watches.clear();
541 self.paths.clear();
542 }
543 Ok(())
544 }
545 }
546
547 /// return `DirEntry` when it is a directory
filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry>548 fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
549 if let Ok(e) = e {
550 if let Ok(metadata) = e.metadata() {
551 if metadata.is_dir() {
552 return Some(e);
553 }
554 }
555 }
556 None
557 }
558
559 impl INotifyWatcher {
from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self>560 fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
561 let inotify = Inotify::init()?;
562 let event_loop = EventLoop::new(inotify, event_handler)?;
563 let channel = event_loop.event_loop_tx.clone();
564 let waker = event_loop.event_loop_waker.clone();
565 event_loop.run();
566 Ok(INotifyWatcher { channel, waker })
567 }
568
watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>569 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
570 let pb = if path.is_absolute() {
571 path.to_owned()
572 } else {
573 let p = env::current_dir().map_err(Error::io)?;
574 p.join(path)
575 };
576 let (tx, rx) = unbounded();
577 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
578
579 // we expect the event loop to live and reply => unwraps must not panic
580 self.channel.send(msg).unwrap();
581 self.waker.wake().unwrap();
582 rx.recv().unwrap()
583 }
584
unwatch_inner(&mut self, path: &Path) -> Result<()>585 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
586 let pb = if path.is_absolute() {
587 path.to_owned()
588 } else {
589 let p = env::current_dir().map_err(Error::io)?;
590 p.join(path)
591 };
592 let (tx, rx) = unbounded();
593 let msg = EventLoopMsg::RemoveWatch(pb, tx);
594
595 // we expect the event loop to live and reply => unwraps must not panic
596 self.channel.send(msg).unwrap();
597 self.waker.wake().unwrap();
598 rx.recv().unwrap()
599 }
600 }
601
602 impl Watcher for INotifyWatcher {
603 /// Create a new watcher.
new<F: EventHandler>(event_handler: F) -> Result<Self>604 fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
605 Self::from_event_handler(Box::new(event_handler))
606 }
607
watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>608 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
609 self.watch_inner(path, recursive_mode)
610 }
611
unwatch(&mut self, path: &Path) -> Result<()>612 fn unwatch(&mut self, path: &Path) -> Result<()> {
613 self.unwatch_inner(path)
614 }
615
configure(&mut self, config: Config) -> Result<bool>616 fn configure(&mut self, config: Config) -> Result<bool> {
617 let (tx, rx) = bounded(1);
618 self.channel.send(EventLoopMsg::Configure(config, tx))?;
619 self.waker.wake()?;
620 rx.recv()?
621 }
622 }
623
624 impl Drop for INotifyWatcher {
drop(&mut self)625 fn drop(&mut self) {
626 // we expect the event loop to live => unwrap must not panic
627 self.channel.send(EventLoopMsg::Shutdown).unwrap();
628 self.waker.wake().unwrap();
629 }
630 }
631
632 #[test]
inotify_watcher_is_send_and_sync()633 fn inotify_watcher_is_send_and_sync() {
634 fn check<T: Send + Sync>() {}
635 check::<INotifyWatcher>();
636 }
637