1 #![allow(missing_docs)]
2 //! Watcher implementation for Windows' directory management APIs
3 //!
4 //! For more information see the [ReadDirectoryChangesW reference][ref].
5 //!
6 //! [ref]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa363950(v=vs.85).aspx
7
8 use winapi::shared::minwindef::TRUE;
9 use winapi::shared::winerror::ERROR_OPERATION_ABORTED;
10 use winapi::um::fileapi;
11 use winapi::um::handleapi::{self, INVALID_HANDLE_VALUE};
12 use winapi::um::ioapiset;
13 use winapi::um::minwinbase::{LPOVERLAPPED, OVERLAPPED};
14 use winapi::um::synchapi;
15 use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0};
16 use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE};
17
18 use crate::event::*;
19 use crate::{Config, Error, EventHandler, RecursiveMode, Result, Watcher};
20 use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
21 use std::collections::HashMap;
22 use std::env;
23 use std::ffi::OsString;
24 use std::mem;
25 use std::os::raw::c_void;
26 use std::os::windows::ffi::{OsStrExt, OsStringExt};
27 use std::path::{Path, PathBuf};
28 use std::ptr;
29 use std::slice;
30 use std::sync::{Arc, Mutex};
31 use std::thread;
32
33 const BUF_SIZE: u32 = 16384;
34
35 #[derive(Clone)]
36 struct ReadData {
37 dir: PathBuf, // directory that is being watched
38 file: Option<PathBuf>, // if a file is being watched, this is its full path
39 complete_sem: HANDLE,
40 is_recursive: bool,
41 }
42
43 struct ReadDirectoryRequest {
44 event_handler: Arc<Mutex<dyn EventHandler>>,
45 buffer: [u8; BUF_SIZE as usize],
46 handle: HANDLE,
47 data: ReadData,
48 }
49
50 enum Action {
51 Watch(PathBuf, RecursiveMode),
52 Unwatch(PathBuf),
53 Stop,
54 Configure(Config, Sender<Result<bool>>),
55 }
56
57 pub enum MetaEvent {
58 SingleWatchComplete,
59 WatcherAwakened,
60 }
61
62 struct WatchState {
63 dir_handle: HANDLE,
64 complete_sem: HANDLE,
65 }
66
67 struct ReadDirectoryChangesServer {
68 rx: Receiver<Action>,
69 event_handler: Arc<Mutex<dyn EventHandler>>,
70 meta_tx: Sender<MetaEvent>,
71 cmd_tx: Sender<Result<PathBuf>>,
72 watches: HashMap<PathBuf, WatchState>,
73 wakeup_sem: HANDLE,
74 }
75
76 impl ReadDirectoryChangesServer {
start( event_handler: Arc<Mutex<dyn EventHandler>>, meta_tx: Sender<MetaEvent>, cmd_tx: Sender<Result<PathBuf>>, wakeup_sem: HANDLE, ) -> Sender<Action>77 fn start(
78 event_handler: Arc<Mutex<dyn EventHandler>>,
79 meta_tx: Sender<MetaEvent>,
80 cmd_tx: Sender<Result<PathBuf>>,
81 wakeup_sem: HANDLE,
82 ) -> Sender<Action> {
83 let (action_tx, action_rx) = unbounded();
84 // it is, in fact, ok to send the semaphore across threads
85 let sem_temp = wakeup_sem as u64;
86 thread::spawn(move || {
87 let wakeup_sem = sem_temp as HANDLE;
88 let server = ReadDirectoryChangesServer {
89 rx: action_rx,
90 event_handler,
91 meta_tx,
92 cmd_tx,
93 watches: HashMap::new(),
94 wakeup_sem,
95 };
96 server.run();
97 });
98 action_tx
99 }
100
run(mut self)101 fn run(mut self) {
102 loop {
103 // process all available actions first
104 let mut stopped = false;
105
106 while let Ok(action) = self.rx.try_recv() {
107 match action {
108 Action::Watch(path, recursive_mode) => {
109 let res = self.add_watch(path, recursive_mode.is_recursive());
110 let _ = self.cmd_tx.send(res);
111 }
112 Action::Unwatch(path) => self.remove_watch(path),
113 Action::Stop => {
114 stopped = true;
115 for ws in self.watches.values() {
116 stop_watch(ws, &self.meta_tx);
117 }
118 break;
119 }
120 Action::Configure(config, tx) => {
121 self.configure_raw_mode(config, tx);
122 }
123 }
124 }
125
126 if stopped {
127 break;
128 }
129
130 unsafe {
131 // wait with alertable flag so that the completion routine fires
132 let waitres = synchapi::WaitForSingleObjectEx(self.wakeup_sem, 100, TRUE);
133 if waitres == WAIT_OBJECT_0 {
134 let _ = self.meta_tx.send(MetaEvent::WatcherAwakened);
135 }
136 }
137 }
138
139 // we have to clean this up, since the watcher may be long gone
140 unsafe {
141 handleapi::CloseHandle(self.wakeup_sem);
142 }
143 }
144
add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf>145 fn add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<PathBuf> {
146 // path must exist and be either a file or directory
147 if !path.is_dir() && !path.is_file() {
148 return Err(Error::generic(
149 "Input watch path is neither a file nor a directory.",
150 ).add_path(path));
151 }
152
153 let (watching_file, dir_target) = {
154 if path.is_dir() {
155 (false, path.clone())
156 } else {
157 // emulate file watching by watching the parent directory
158 (true, path.parent().unwrap().to_path_buf())
159 }
160 };
161
162 let encoded_path: Vec<u16> = dir_target
163 .as_os_str()
164 .encode_wide()
165 .chain(Some(0))
166 .collect();
167 let handle;
168 unsafe {
169 handle = fileapi::CreateFileW(
170 encoded_path.as_ptr(),
171 winnt::FILE_LIST_DIRECTORY,
172 winnt::FILE_SHARE_READ | winnt::FILE_SHARE_DELETE | winnt::FILE_SHARE_WRITE,
173 ptr::null_mut(),
174 fileapi::OPEN_EXISTING,
175 winbase::FILE_FLAG_BACKUP_SEMANTICS | winbase::FILE_FLAG_OVERLAPPED,
176 ptr::null_mut(),
177 );
178
179 if handle == INVALID_HANDLE_VALUE {
180 return Err(if watching_file {
181 Error::generic(
182 "You attempted to watch a single file, but parent \
183 directory could not be opened.",
184 ).add_path(path)
185 } else {
186 // TODO: Call GetLastError for better error info?
187 Error::path_not_found().add_path(path)
188 });
189 }
190 }
191 let wf = if watching_file {
192 Some(path.clone())
193 } else {
194 None
195 };
196 // every watcher gets its own semaphore to signal completion
197 let semaphore =
198 unsafe { synchapi::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
199 if semaphore.is_null() || semaphore == INVALID_HANDLE_VALUE {
200 unsafe {
201 handleapi::CloseHandle(handle);
202 }
203 return Err(Error::generic("Failed to create semaphore for watch.").add_path(path));
204 }
205 let rd = ReadData {
206 dir: dir_target,
207 file: wf,
208 complete_sem: semaphore,
209 is_recursive,
210 };
211 let ws = WatchState {
212 dir_handle: handle,
213 complete_sem: semaphore,
214 };
215 self.watches.insert(path.clone(), ws);
216 start_read(&rd, self.event_handler.clone(), handle);
217 Ok(path)
218 }
219
remove_watch(&mut self, path: PathBuf)220 fn remove_watch(&mut self, path: PathBuf) {
221 if let Some(ws) = self.watches.remove(&path) {
222 stop_watch(&ws, &self.meta_tx);
223 }
224 }
225
configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>)226 fn configure_raw_mode(&mut self, _config: Config, tx: Sender<Result<bool>>) {
227 tx.send(Ok(false))
228 .expect("configuration channel disconnect");
229 }
230 }
231
stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>)232 fn stop_watch(ws: &WatchState, meta_tx: &Sender<MetaEvent>) {
233 unsafe {
234 let cio = ioapiset::CancelIo(ws.dir_handle);
235 let ch = handleapi::CloseHandle(ws.dir_handle);
236 // have to wait for it, otherwise we leak the memory allocated for there read request
237 if cio != 0 && ch != 0 {
238 while synchapi::WaitForSingleObjectEx(ws.complete_sem, INFINITE, TRUE) != WAIT_OBJECT_0
239 {
240 // drain the apc queue, fix for https://github.com/notify-rs/notify/issues/287#issuecomment-801465550
241 }
242 }
243 handleapi::CloseHandle(ws.complete_sem);
244 }
245 let _ = meta_tx.send(MetaEvent::SingleWatchComplete);
246 }
247
start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE)248 fn start_read(rd: &ReadData, event_handler: Arc<Mutex<dyn EventHandler>>, handle: HANDLE) {
249 let mut request = Box::new(ReadDirectoryRequest {
250 event_handler,
251 handle,
252 buffer: [0u8; BUF_SIZE as usize],
253 data: rd.clone(),
254 });
255
256 let flags = winnt::FILE_NOTIFY_CHANGE_FILE_NAME
257 | winnt::FILE_NOTIFY_CHANGE_DIR_NAME
258 | winnt::FILE_NOTIFY_CHANGE_ATTRIBUTES
259 | winnt::FILE_NOTIFY_CHANGE_SIZE
260 | winnt::FILE_NOTIFY_CHANGE_LAST_WRITE
261 | winnt::FILE_NOTIFY_CHANGE_CREATION
262 | winnt::FILE_NOTIFY_CHANGE_SECURITY;
263
264 let monitor_subdir = if (&request.data.file).is_none() && request.data.is_recursive {
265 1
266 } else {
267 0
268 };
269
270 unsafe {
271 let mut overlapped: Box<OVERLAPPED> = Box::new(mem::zeroed());
272 // When using callback based async requests, we are allowed to use the hEvent member
273 // for our own purposes
274
275 let req_buf = request.buffer.as_mut_ptr() as *mut c_void;
276 let request_p = Box::into_raw(request) as *mut c_void;
277 overlapped.hEvent = request_p;
278
279 // This is using an asynchronous call with a completion routine for receiving notifications
280 // An I/O completion port would probably be more performant
281 let ret = winbase::ReadDirectoryChangesW(
282 handle,
283 req_buf,
284 BUF_SIZE,
285 monitor_subdir,
286 flags,
287 &mut 0u32 as *mut u32, // not used for async reqs
288 &mut *overlapped as *mut OVERLAPPED,
289 Some(handle_event),
290 );
291
292 if ret == 0 {
293 // error reading. retransmute request memory to allow drop.
294 // allow overlapped to drop by omitting forget()
295 let request: Box<ReadDirectoryRequest> = mem::transmute(request_p);
296
297 synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
298 } else {
299 // read ok. forget overlapped to let the completion routine handle memory
300 mem::forget(overlapped);
301 }
302 }
303 }
304
handle_event( error_code: u32, _bytes_written: u32, overlapped: LPOVERLAPPED, )305 unsafe extern "system" fn handle_event(
306 error_code: u32,
307 _bytes_written: u32,
308 overlapped: LPOVERLAPPED,
309 ) {
310 let overlapped: Box<OVERLAPPED> = Box::from_raw(overlapped);
311 let request: Box<ReadDirectoryRequest> = Box::from_raw(overlapped.hEvent as *mut _);
312
313 if error_code == ERROR_OPERATION_ABORTED {
314 // received when dir is unwatched or watcher is shutdown; return and let overlapped/request
315 // get drop-cleaned
316 synchapi::ReleaseSemaphore(request.data.complete_sem, 1, ptr::null_mut());
317 return;
318 }
319
320 // Get the next request queued up as soon as possible
321 start_read(&request.data, request.event_handler.clone(), request.handle);
322
323 // The FILE_NOTIFY_INFORMATION struct has a variable length due to the variable length
324 // string as its last member. Each struct contains an offset for getting the next entry in
325 // the buffer.
326 let mut cur_offset: *const u8 = request.buffer.as_ptr();
327 let mut cur_entry = cur_offset as *const FILE_NOTIFY_INFORMATION;
328 loop {
329 // filename length is size in bytes, so / 2
330 let len = (*cur_entry).FileNameLength as usize / 2;
331 let encoded_path: &[u16] = slice::from_raw_parts((*cur_entry).FileName.as_ptr(), len);
332 // prepend root to get a full path
333 let path = request
334 .data
335 .dir
336 .join(PathBuf::from(OsString::from_wide(encoded_path)));
337
338 // if we are watching a single file, ignore the event unless the path is exactly
339 // the watched file
340 let skip = match request.data.file {
341 None => false,
342 Some(ref watch_path) => *watch_path != path,
343 };
344
345 if !skip {
346 let newe = Event::new(EventKind::Any).add_path(path);
347
348 fn emit_event(event_handler: &Mutex<dyn EventHandler>, res: Result<Event>) {
349 if let Ok(mut guard) = event_handler.lock() {
350 let f: &mut dyn EventHandler = &mut *guard;
351 f.handle_event(res);
352 }
353 }
354
355 let event_handler = |res| emit_event(&request.event_handler, res);
356
357 if (*cur_entry).Action == winnt::FILE_ACTION_RENAMED_OLD_NAME {
358 let mode = RenameMode::From;
359 let kind = ModifyKind::Name(mode);
360 let kind = EventKind::Modify(kind);
361 let ev = newe.set_kind(kind);
362 event_handler(Ok(ev))
363 } else {
364 match (*cur_entry).Action {
365 winnt::FILE_ACTION_RENAMED_NEW_NAME => {
366 let kind = EventKind::Modify(ModifyKind::Name(RenameMode::To));
367 let ev = newe.set_kind(kind);
368 event_handler(Ok(ev));
369 }
370 winnt::FILE_ACTION_ADDED => {
371 let kind = EventKind::Create(CreateKind::Any);
372 let ev = newe.set_kind(kind);
373 event_handler(Ok(ev));
374 }
375 winnt::FILE_ACTION_REMOVED => {
376 let kind = EventKind::Remove(RemoveKind::Any);
377 let ev = newe.set_kind(kind);
378 event_handler(Ok(ev));
379 }
380 winnt::FILE_ACTION_MODIFIED => {
381 let kind = EventKind::Modify(ModifyKind::Any);
382 let ev = newe.set_kind(kind);
383 event_handler(Ok(ev));
384 }
385 _ => (),
386 };
387 }
388 }
389
390 if (*cur_entry).NextEntryOffset == 0 {
391 break;
392 }
393 cur_offset = cur_offset.offset((*cur_entry).NextEntryOffset as isize);
394 cur_entry = cur_offset as *const FILE_NOTIFY_INFORMATION;
395 }
396 }
397
398 /// Watcher implementation based on ReadDirectoryChanges
399 pub struct ReadDirectoryChangesWatcher {
400 tx: Sender<Action>,
401 cmd_rx: Receiver<Result<PathBuf>>,
402 wakeup_sem: HANDLE,
403 }
404
405 impl ReadDirectoryChangesWatcher {
create( event_handler: Arc<Mutex<dyn EventHandler>>, meta_tx: Sender<MetaEvent>, ) -> Result<ReadDirectoryChangesWatcher>406 pub fn create(
407 event_handler: Arc<Mutex<dyn EventHandler>>,
408 meta_tx: Sender<MetaEvent>,
409 ) -> Result<ReadDirectoryChangesWatcher> {
410 let (cmd_tx, cmd_rx) = unbounded();
411
412 let wakeup_sem =
413 unsafe { synchapi::CreateSemaphoreW(ptr::null_mut(), 0, 1, ptr::null_mut()) };
414 if wakeup_sem.is_null() || wakeup_sem == INVALID_HANDLE_VALUE {
415 return Err(Error::generic("Failed to create wakeup semaphore."));
416 }
417
418 let action_tx = ReadDirectoryChangesServer::start(event_handler, meta_tx, cmd_tx, wakeup_sem);
419
420 Ok(ReadDirectoryChangesWatcher {
421 tx: action_tx,
422 cmd_rx,
423 wakeup_sem,
424 })
425 }
426
wakeup_server(&mut self)427 fn wakeup_server(&mut self) {
428 // breaks the server out of its wait state. right now this is really just an optimization,
429 // so that if you add a watch you don't block for 100ms in watch() while the
430 // server sleeps.
431 unsafe {
432 synchapi::ReleaseSemaphore(self.wakeup_sem, 1, ptr::null_mut());
433 }
434 }
435
send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()>436 fn send_action_require_ack(&mut self, action: Action, pb: &PathBuf) -> Result<()> {
437 self.tx
438 .send(action)
439 .map_err(|_| Error::generic("Error sending to internal channel"))?;
440
441 // wake 'em up, we don't want to wait around for the ack
442 self.wakeup_server();
443
444 let ack_pb = self
445 .cmd_rx
446 .recv()
447 .map_err(|_| Error::generic("Error receiving from command channel"))?
448 .map_err(|e| Error::generic(&format!("Error in watcher: {:?}", e)))?;
449
450 if pb.as_path() != ack_pb.as_path() {
451 Err(Error::generic(&format!(
452 "Expected ack for {:?} but got \
453 ack for {:?}",
454 pb, ack_pb
455 )))
456 } else {
457 Ok(())
458 }
459 }
460
watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>461 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
462 let pb = if path.is_absolute() {
463 path.to_owned()
464 } else {
465 let p = env::current_dir().map_err(Error::io)?;
466 p.join(path)
467 };
468 // path must exist and be either a file or directory
469 if !pb.is_dir() && !pb.is_file() {
470 return Err(Error::generic(
471 "Input watch path is neither a file nor a directory.",
472 ));
473 }
474 self.send_action_require_ack(Action::Watch(pb.clone(), recursive_mode), &pb)
475 }
476
unwatch_inner(&mut self, path: &Path) -> Result<()>477 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
478 let pb = if path.is_absolute() {
479 path.to_owned()
480 } else {
481 let p = env::current_dir().map_err(Error::io)?;
482 p.join(path)
483 };
484 let res = self
485 .tx
486 .send(Action::Unwatch(pb))
487 .map_err(|_| Error::generic("Error sending to internal channel"));
488 self.wakeup_server();
489 res
490 }
491 }
492
493 impl Watcher for ReadDirectoryChangesWatcher {
new<F: EventHandler>(event_handler: F) -> Result<Self>494 fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
495 // create dummy channel for meta event
496 // TODO: determine the original purpose of this - can we remove it?
497 let (meta_tx, _) = unbounded();
498 let event_handler = Arc::new(Mutex::new(event_handler));
499 Self::create(event_handler, meta_tx)
500 }
501
watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>502 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
503 self.watch_inner(path, recursive_mode)
504 }
505
unwatch(&mut self, path: &Path) -> Result<()>506 fn unwatch(&mut self, path: &Path) -> Result<()> {
507 self.unwatch_inner(path)
508 }
509
configure(&mut self, config: Config) -> Result<bool>510 fn configure(&mut self, config: Config) -> Result<bool> {
511 let (tx, rx) = bounded(1);
512 self.tx.send(Action::Configure(config, tx))?;
513 rx.recv()?
514 }
515 }
516
517 impl Drop for ReadDirectoryChangesWatcher {
drop(&mut self)518 fn drop(&mut self) {
519 let _ = self.tx.send(Action::Stop);
520 // better wake it up
521 self.wakeup_server();
522 }
523 }
524
525 // `ReadDirectoryChangesWatcher` is not Send/Sync because of the semaphore Handle.
526 // As said elsewhere it's perfectly safe to send it across threads.
527 unsafe impl Send for ReadDirectoryChangesWatcher {}
528 // Because all public methods are `&mut self` it's also perfectly safe to share references.
529 unsafe impl Sync for ReadDirectoryChangesWatcher {}
530