1 #![allow(missing_docs)]
2 //! Watcher implementation for Windows' directory management APIs
3 //!
4 //! For more information see the [ReadDirectoryChangesW reference][ref].
5 //!
6 //! [ref]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa363950(v=vs.85).aspx
7 
8 use winapi::shared::minwindef::TRUE;
9 use winapi::shared::winerror::ERROR_OPERATION_ABORTED;
10 use winapi::um::fileapi;
11 use winapi::um::handleapi::{self, INVALID_HANDLE_VALUE};
12 use winapi::um::ioapiset;
13 use winapi::um::minwinbase::{LPOVERLAPPED, OVERLAPPED};
14 use winapi::um::synchapi;
15 use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0};
16 use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE};
17 
18 use crate::event::*;
19 use crate::{Config, Error, EventHandler, RecursiveMode, Result, Watcher};
20 use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
21 use std::collections::HashMap;
22 use std::env;
23 use std::ffi::OsString;
24 use std::mem;
25 use std::os::raw::c_void;
26 use std::os::windows::ffi::{OsStrExt, OsStringExt};
27 use std::path::{Path, PathBuf};
28 use std::ptr;
29 use std::slice;
30 use std::sync::{Arc, Mutex};
31 use std::thread;
32 
33 const BUF_SIZE: u32 = 16384;
34 
35 #[derive(Clone)]
36 struct ReadData {
37     dir: PathBuf,          // directory that is being watched
38     file: Option<PathBuf>, // if a file is being watched, this is its full path
39     complete_sem: HANDLE,
40     is_recursive: bool,
41 }
42 
43 struct ReadDirectoryRequest {
44     event_handler: Arc<Mutex<dyn EventHandler>>,
45     buffer: [u8; BUF_SIZE as usize],
46     handle: HANDLE,
47     data: ReadData,
48 }
49 
50 enum Action {
51     Watch(PathBuf, RecursiveMode),
52     Unwatch(PathBuf),
53     Stop,
54     Configure(Config, Sender<Result<bool>>),
55 }
56 
57 pub enum MetaEvent {
58     SingleWatchComplete,
59     WatcherAwakened,
60 }
61 
62 struct WatchState {
63     dir_handle: HANDLE,
64     complete_sem: HANDLE,
65 }
66 
67 struct ReadDirectoryChangesServer {
68     rx: Receiver<Action>,
69     event_handler: Arc<Mutex<dyn EventHandler>>,
70     meta_tx: Sender<MetaEvent>,
71     cmd_tx: Sender<Result<PathBuf>>,
72     watches: HashMap<PathBuf, WatchState>,
73     wakeup_sem: HANDLE,
74 }
75 
76 impl ReadDirectoryChangesServer {
start( event_handler: Arc<Mutex<dyn EventHandler>>, meta_tx: Sender<MetaEvent>, cmd_tx: Sender<Result<PathBuf>>, wakeup_sem: HANDLE, ) -> Sender<Action>77     fn start(
78         event_handler: Arc<Mutex<dyn EventHandler>>,
79         meta_tx: Sender<MetaEvent>,
80         cmd_tx: Sender<Result<PathBuf>>,
81         wakeup_sem: HANDLE,
82     ) -> Sender<Action> {
83         let (action_tx, action_rx) = unbounded();
84         // it is, in fact, ok to send the semaphore across threads
85         let sem_temp = wakeup_sem as u64;
86         thread::spawn(move || {
87             let wakeup_sem = sem_temp as HANDLE;
88             let server = ReadDirectoryChangesServer {
89                 rx: action_rx,
90                 event_handler,
91                 meta_tx,
92                 cmd_tx,
93                 watches: HashMap::new(),
94                 wakeup_sem,
95             };
96             server.run();
97         });
98         action_tx
99     }
100 
run(mut self)101     fn run(mut self) {
102         loop {
103             // process all available actions first
104             let mut stopped = false;
105 
106             while let Ok(action) = self.rx.try_recv() {
107                 match action {
108                     Action::Watch(path, recursive_mode) => {
109                         let res = self.add_watch(path, recursive_mode.is_recursive());
110                         let _ = self.cmd_tx.send(res);
111                     }
112                     Action::Unwatch(path) => self.remove_watch(path),
113                     Action::Stop => {
114                         stopped = true;
115                         for ws in self.watches.values() {
116                             stop_watch(ws, &self.meta_tx);
117                         }
118                         break;
119                     }
120                     Action::Configure(config, tx) => {
121                         self.configure_raw_mode(config, tx);
122                     }
123                 }
124             }
125 
126             if stopped {
127                 break;
128             }
129 
130             unsafe {
131                 // wait with alertable flag so that the completion routine fires
132                 let waitres = synchapi::WaitForSingleObjectEx(self.wakeup_sem, 100, TRUE);
133                 if waitres == WAIT_OBJECT_0 {
134                     let _ = self.meta_tx.send(MetaEvent::WatcherAwakened);
135                 }
136             }
137         }
138 
139         // we have to clean this up, since the watcher may be long gone
140         unsafe {
141             handleapi::CloseHandle(self.wakeup_sem);
142         }
143     }
144 
add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf>145     fn add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf> {
146         // path must exist and be either a file or directory
147         if !path.is_dir() && !path.is_file() {
148             return Err(Error::generic(
149                 "Input watch path is neither a file nor a directory.",
150             ).add_path(path));
151         }
152 
153         let (watching_file, dir_target) = {
154             if path.is_dir() {
155                 (false, path.clone())
156             } else {
157                 // emulate file watching by watching the parent directory
158                 (true, path.parent().unwrap().to_path_buf())
159             }
160         };
161 
162         let encoded_path: Vec<u16> = dir_target
163             .as_os_str()
164             .encode_wide()
165             .chain(Some(0))
166             .collect();
167         let handle;
168         unsafe {
169             handle = fileapi::CreateFileW(
170                 encoded_path.as_ptr(),
171                 winnt::FILE_LIST_DIRECTORY,
172                 winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE,
173                 ptr::null_mut(),
174                 fileapi::OPEN_EXISTING,
175                 winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED,
176                 ptr::null_mut(),
177             );
178 
179             if handle == INVALID_HANDLE_VALUE {
180                 return Err(if watching_file {
181                     Error::generic(
182                         "You attempted to watch a single file, but parent \
183                          directory could not be opened.",
184                     ).add_path(path)
185                 } else {
186                     // TODO: Call GetLastError for better error info?
187                     Error::path_not_found().add_path(path)
188                 });
189             }
190         }
191         let wf = if watching_file {
192             Some(path.clone())
193         } else {
194             None
195         };
196         // every watcher gets its own semaphore to signal completion
197         let semaphore =
198             unsafe { synchapi::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
199         if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
200             unsafe {
201                 handleapi::CloseHandle(handle);
202             }
203             return Err(Error::generic("Failed to create semaphore for watch.").add_path(path));
204         }
205         let rd = ReadData {
206             dir: dir_target,
207             file: wf,
208             complete_sem: semaphore,
209             is_recursive,
210         };
211         let ws = WatchState {
212             dir_handle: handle,
213             complete_sem: semaphore,
214         };
215         self.watches.insert(path.clone(), ws);
216         start_read(&rd, self.event_handler.clone(), handle);
217         Ok(path)
218     }
219 
remove_watch(&mut self, path: PathBuf)220     fn remove_watch(&mut self, path: PathBuf) {
221         if let Some(ws) = self.watches.remove(&path) {
222             stop_watch(&ws, &self.meta_tx);
223         }
224     }
225 
configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>)226     fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
227         tx.send(Ok(false))
228             .expect("configuration channel disconnect");
229     }
230 }
231 
stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>)232 fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
233     unsafe {
234         let cio = ioapiset::CancelIo(ws.dir_handle);
235         let ch = handleapi::CloseHandle(ws.dir_handle);
236         // have to wait for it, otherwise we leak the memory allocated for there read request
237         if cio != 0 && ch != 0 {
238             while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0
239             {
240                 // drain the apc queue, fix for https://github.com/notify-rs/notify/issues/287#issuecomment-801465550
241             }
242         }
243         handleapi::CloseHandle(ws.complete_sem);
244     }
245     let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
246 }
247 
start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE)248 fn start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE) {
249     let mut request = Box::new(ReadDirectoryRequest {
250         event_handler,
251         handle,
252         buffer: [0u8; BUF_SIZE as usize],
253         data: rd.clone(),
254     });
255 
256     let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME
257         | winnt::FILE_NOTIFY_CHANGE_DIR_NAME
258         | winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES
259         | winnt::FILE_NOTIFY_CHANGE_SIZE
260         | winnt::FILE_NOTIFY_CHANGE_LAST_WRITE
261         | winnt::FILE_NOTIFY_CHANGE_CREATION
262         | winnt::FILE_NOTIFY_CHANGE_SECURITY;
263 
264     let monitor_subdir = if (&request.data.file).is_none() && request.data.is_recursive {
265         1
266     } else {
267         0
268     };
269 
270     unsafe {
271         let mut overlapped: Box<OVERLAPPED> = Box::new(mem::zeroed());
272         // When using callback based async requests, we are allowed to use the hEvent member
273         // for our own purposes
274 
275         let req_buf = request.buffer.as_mut_ptr() as *mut c_void;
276         let request_p = Box::into_raw(request) as *mut c_void;
277         overlapped.hEvent = request_p;
278 
279         // This is using an asynchronous call with a completion routine for receiving notifications
280         // An I/O completion port would probably be more performant
281         let ret = winbase::ReadDirectoryChangesW(
282             handle,
283             req_buf,
284             BUF_SIZE,
285             monitor_subdir,
286             flags,
287             &mut 0u32 as *mut u32, // not used for async reqs
288             &mut *overlapped as *mut OVERLAPPED,
289             Some(handle_event),
290         );
291 
292         if ret == 0 {
293             // error reading. retransmute request memory to allow drop.
294             // allow overlapped to drop by omitting forget()
295             let request: Box<ReadDirectoryRequest> = mem::transmute(request_p);
296 
297             synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
298         } else {
299             // read ok. forget overlapped to let the completion routine handle memory
300             mem::forget(overlapped);
301         }
302     }
303 }
304 
handle_event( error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED, )305 unsafe extern "system" fn handle_event(
306     error_code: u32,
307     _bytes_written: u32,
308     overlapped: LPOVERLAPPED,
309 ) {
310     let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
311     let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
312 
313     if error_code == ERROR_OPERATION_ABORTED {
314         // received when dir is unwatched or watcher is shutdown; return and let overlapped/request
315         // get drop-cleaned
316         synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
317         return;
318     }
319 
320     // Get the next request queued up as soon as possible
321     start_read(&request.data, request.event_handler.clone(), request.handle);
322 
323     // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length
324     // string as its last member. Each struct contains an offset for getting the next entry in
325     // the buffer.
326     let mut cur_offset: *const u8 = request.buffer.as_ptr();
327     let mut cur_entry = cur_offset as *const FILE_NOTIFY_INFORMATION;
328     loop {
329         // filename length is size in bytes, so / 2
330         let len = (*cur_entry).FileNameLength as usize / 2;
331         let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len);
332         // prepend root to get a full path
333         let path = request
334             .data
335             .dir
336             .join(PathBuf::from(OsString::from_wide(encoded_path)));
337 
338         // if we are watching a single file, ignore the event unless the path is exactly
339         // the watched file
340         let skip = match request.data.file {
341             None => false,
342             Some(ref watch_path) => *watch_path != path,
343         };
344 
345         if !skip {
346             let newe = Event::new(EventKind::Any).add_path(path);
347 
348             fn emit_event(event_handler: &Mutex<dyn EventHandler>, res: Result<Event>) {
349                 if let Ok(mut guard) = event_handler.lock() {
350                     let f: &mut dyn EventHandler = &mut *guard;
351                     f.handle_event(res);
352                 }
353             }
354 
355             let event_handler = |res| emit_event(&request.event_handler, res);
356 
357             if (*cur_entry).Action == winnt::FILE_ACTION_RENAMED_OLD_NAME {
358                 let mode = RenameMode::From;
359                 let kind = ModifyKind::Name(mode);
360                 let kind = EventKind::Modify(kind);
361                 let ev = newe.set_kind(kind);
362                 event_handler(Ok(ev))
363             } else {
364                 match (*cur_entry).Action {
365                     winnt::FILE_ACTION_RENAMED_NEW_NAME => {
366                         let kind = EventKind::Modify(ModifyKind::Name(RenameMode::To));
367                         let ev = newe.set_kind(kind);
368                         event_handler(Ok(ev));
369                     }
370                     winnt::FILE_ACTION_ADDED => {
371                         let kind = EventKind::Create(CreateKind::Any);
372                         let ev = newe.set_kind(kind);
373                         event_handler(Ok(ev));
374                     }
375                     winnt::FILE_ACTION_REMOVED => {
376                         let kind = EventKind::Remove(RemoveKind::Any);
377                         let ev = newe.set_kind(kind);
378                         event_handler(Ok(ev));
379                     }
380                     winnt::FILE_ACTION_MODIFIED => {
381                         let kind = EventKind::Modify(ModifyKind::Any);
382                         let ev = newe.set_kind(kind);
383                         event_handler(Ok(ev));
384                     }
385                     _ => (),
386                 };
387             }
388         }
389 
390         if (*cur_entry).NextEntryOffset == 0 {
391             break;
392         }
393         cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize);
394         cur_entry = cur_offset as *const FILE_NOTIFY_INFORMATION;
395     }
396 }
397 
398 /// Watcher implementation based on ReadDirectoryChanges
399 pub struct ReadDirectoryChangesWatcher {
400     tx: Sender<Action>,
401     cmd_rx: Receiver<Result<PathBuf>>,
402     wakeup_sem: HANDLE,
403 }
404 
405 impl ReadDirectoryChangesWatcher {
create( event_handler: Arc<Mutex<dyn EventHandler>>, meta_tx: Sender<MetaEvent>, ) -> Result<ReadDirectoryChangesWatcher>406     pub fn create(
407         event_handler: Arc<Mutex<dyn EventHandler>>,
408         meta_tx: Sender<MetaEvent>,
409     ) -> Result<ReadDirectoryChangesWatcher> {
410         let (cmd_tx, cmd_rx) = unbounded();
411 
412         let wakeup_sem =
413             unsafe { synchapi::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
414         if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
415             return Err(Error::generic("Failed to create wakeup semaphore."));
416         }
417 
418         let action_tx = ReadDirectoryChangesServer::start(event_handler, meta_tx, cmd_tx, wakeup_sem);
419 
420         Ok(ReadDirectoryChangesWatcher {
421             tx: action_tx,
422             cmd_rx,
423             wakeup_sem,
424         })
425     }
426 
wakeup_server(&mut self)427     fn wakeup_server(&mut self) {
428         // breaks the server out of its wait state.  right now this is really just an optimization,
429         // so that if you add a watch you don't block for 100ms in watch() while the
430         // server sleeps.
431         unsafe {
432             synchapi::ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut());
433         }
434     }
435 
send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()>436     fn send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()> {
437         self.tx
438             .send(action)
439             .map_err(|_| Error::generic("Error sending to internal channel"))?;
440 
441         // wake 'em up, we don't want to wait around for the ack
442         self.wakeup_server();
443 
444         let ack_pb = self
445             .cmd_rx
446             .recv()
447             .map_err(|_| Error::generic("Error receiving from command channel"))?
448             .map_err(|e| Error::generic(&format!("Error in watcher: {:?}", e)))?;
449 
450         if pb.as_path() != ack_pb.as_path() {
451             Err(Error::generic(&format!(
452                 "Expected ack for {:?} but got \
453                  ack for {:?}",
454                 pb, ack_pb
455             )))
456         } else {
457             Ok(())
458         }
459     }
460 
watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>461     fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
462         let pb = if path.is_absolute() {
463             path.to_owned()
464         } else {
465             let p = env::current_dir().map_err(Error::io)?;
466             p.join(path)
467         };
468         // path must exist and be either a file or directory
469         if !pb.is_dir() && !pb.is_file() {
470             return Err(Error::generic(
471                 "Input watch path is neither a file nor a directory.",
472             ));
473         }
474         self.send_action_require_ack(Action::Watch(pb.clone(), recursive_mode), &pb)
475     }
476 
unwatch_inner(&mut self, path: &Path) -> Result<()>477     fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
478         let pb = if path.is_absolute() {
479             path.to_owned()
480         } else {
481             let p = env::current_dir().map_err(Error::io)?;
482             p.join(path)
483         };
484         let res = self
485             .tx
486             .send(Action::Unwatch(pb))
487             .map_err(|_| Error::generic("Error sending to internal channel"));
488         self.wakeup_server();
489         res
490     }
491 }
492 
493 impl Watcher for ReadDirectoryChangesWatcher {
new<F: EventHandler>(event_handler: F) -> Result<Self>494     fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
495         // create dummy channel for meta event
496         // TODO: determine the original purpose of this - can we remove it?
497         let (meta_tx, _) = unbounded();
498         let event_handler = Arc::new(Mutex::new(event_handler));
499         Self::create(event_handler, meta_tx)
500     }
501 
watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>502     fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
503         self.watch_inner(path, recursive_mode)
504     }
505 
unwatch(&mut self, path: &Path) -> Result<()>506     fn unwatch(&mut self, path: &Path) -> Result<()> {
507         self.unwatch_inner(path)
508     }
509 
configure(&mut self, config: Config) -> Result<bool>510     fn configure(&mut self, config: Config) -> Result<bool> {
511         let (tx, rx) = bounded(1);
512         self.tx.send(Action::Configure(config, tx))?;
513         rx.recv()?
514     }
515 }
516 
517 impl Drop for ReadDirectoryChangesWatcher {
drop(&mut self)518     fn drop(&mut self) {
519         let _ = self.tx.send(Action::Stop);
520         // better wake it up
521         self.wakeup_server();
522     }
523 }
524 
525 // `ReadDirectoryChangesWatcher` is not Send/Sync because of the semaphore Handle.
526 // As said elsewhere it's perfectly safe to send it across threads.
527 unsafe impl Send for ReadDirectoryChangesWatcher {}
528 // Because all public methods are `&mut self` it's also perfectly safe to share references.
529 unsafe impl Sync for ReadDirectoryChangesWatcher {}
530