1 extern crate fsevent;
2 extern crate tempdir;
3 extern crate time;
4 
5 use fsevent::*;
6 use std::fs;
7 use std::fs::OpenOptions;
8 use std::fs::read_link;
9 use std::io::Write;
10 use std::path::{Component, PathBuf};
11 use std::thread;
12 use std::time::Duration;
13 
14 use std::sync::mpsc::{channel, Receiver};
15 use tempdir::TempDir;
16 
17 const TIMEOUT_S: f64 = 5.0;
18 
validate_recv(rx: Receiver<Event>, evs: Vec<(String, StreamFlags)>)19 fn validate_recv(rx: Receiver<Event>, evs: Vec<(String, StreamFlags)>) {
20     let deadline = time::precise_time_s() + TIMEOUT_S;
21     let mut evs = evs.clone();
22 
23     while time::precise_time_s() < deadline {
24         if let Ok(actual) = rx.try_recv() {
25             let mut found: Option<usize> = None;
26             for i in 0..evs.len() {
27                 let expected = evs.get(i).unwrap();
28                 if actual.path == expected.0 && actual.flag == expected.1 {
29                     found = Some(i);
30                     break;
31                 }
32             }
33             if let Some(i) = found {
34                 evs.remove(i);
35             } else {
36                 assert!(
37                     false,
38                     format!("actual: {:?} not found in expected: {:?}", actual, evs)
39                 );
40             }
41         }
42         if evs.is_empty() {
43             break;
44         }
45     }
46     assert!(
47         evs.is_empty(),
48         "Some expected events did not occur before the test timedout:\n\t\t{:?}",
49         evs
50     );
51 }
52 
53 // TODO: replace with std::fs::canonicalize rust-lang/rust#27706.
resolve_path(path: &str) -> PathBuf54 fn resolve_path(path: &str) -> PathBuf {
55     let mut out = PathBuf::new();
56     let buf = PathBuf::from(path);
57     for p in buf.components() {
58         match p {
59             Component::RootDir => out.push("/"),
60             Component::Normal(osstr) => {
61                 out.push(osstr);
62                 if let Ok(real) = read_link(&out) {
63                     if real.is_relative() {
64                         out.pop();
65                         out.push(real);
66                     } else {
67                         out = real;
68                     }
69                 }
70             }
71             _ => (),
72         }
73     }
74     out
75 }
76 
77 #[test]
observe_folder_sync()78 fn observe_folder_sync() {
79     internal_observe_folder(false);
80 }
81 
82 #[test]
observe_folder_async()83 fn observe_folder_async() {
84     internal_observe_folder(true);
85 }
86 
internal_observe_folder(run_async: bool)87 fn internal_observe_folder(run_async: bool) {
88     let dir = TempDir::new("dur").unwrap();
89     // Resolve path so we don't have to worry about affect of symlinks on the test.
90     let dst = resolve_path(dir.path().to_str().unwrap());
91 
92     let mut dst1 = dst.clone();
93     dst1.push("dest1");
94 
95     let ddst1 = dst1.clone();
96     fs::create_dir(dst1.as_path().to_str().unwrap()).unwrap();
97 
98     let mut dst2 = dst.clone();
99 
100     dst2.push("dest2");
101     let ddst2 = dst2.clone();
102     fs::create_dir(dst2.as_path().to_str().unwrap()).unwrap();
103 
104     let mut dst3 = dst.clone();
105 
106     dst3.push("dest3");
107     let ddst3 = dst3.clone();
108     fs::create_dir(dst3.as_path().to_str().unwrap()).unwrap();
109 
110     let (sender, receiver) = channel();
111 
112     let mut async_fsevent = fsevent::FsEvent::new(vec![]);
113     let fsevent_ref_wrapper = if run_async {
114         async_fsevent
115             .append_path(dst1.as_path().to_str().unwrap())
116             .unwrap();
117         async_fsevent
118             .append_path(dst2.as_path().to_str().unwrap())
119             .unwrap();
120         async_fsevent
121             .append_path(dst3.as_path().to_str().unwrap())
122             .unwrap();
123         Some(async_fsevent.observe_async(sender).unwrap())
124     } else {
125         let _t = thread::spawn(move || {
126             let mut fsevent = fsevent::FsEvent::new(vec![]);
127             fsevent
128                 .append_path(dst1.as_path().to_str().unwrap())
129                 .unwrap();
130             fsevent
131                 .append_path(dst2.as_path().to_str().unwrap())
132                 .unwrap();
133             fsevent
134                 .append_path(dst3.as_path().to_str().unwrap())
135                 .unwrap();
136             fsevent.observe(sender);
137         });
138         None
139     };
140 
141     validate_recv(
142         receiver,
143         vec![
144             (
145                 ddst1.to_str().unwrap().to_string(),
146                 StreamFlags::ITEM_CREATED | StreamFlags::IS_DIR,
147             ),
148             (
149                 ddst2.to_str().unwrap().to_string(),
150                 StreamFlags::ITEM_CREATED | StreamFlags::IS_DIR,
151             ),
152             (
153                 ddst3.to_str().unwrap().to_string(),
154                 StreamFlags::ITEM_CREATED | StreamFlags::IS_DIR,
155             ),
156         ],
157     );
158 
159     match fsevent_ref_wrapper {
160         Some(r) => async_fsevent.shutdown_observe(r),
161         None => {}
162     }
163 }
164 
165 #[test]
validate_watch_single_file_sync()166 fn validate_watch_single_file_sync() {
167     internal_validate_watch_single_file(false);
168 }
169 
170 #[test]
validate_watch_single_file_async()171 fn validate_watch_single_file_async() {
172     internal_validate_watch_single_file(true);
173 }
174 
internal_validate_watch_single_file(run_async: bool)175 fn internal_validate_watch_single_file(run_async: bool) {
176     let dir = TempDir::new("dir").unwrap();
177     // Resolve path so we don't have to worry about affect of symlinks on the test.
178     let mut dst = resolve_path(dir.path().to_str().unwrap());
179     dst.push("out.txt");
180     let (sender, receiver) = channel();
181 
182     let mut file = OpenOptions::new()
183         .write(true)
184         .create(true)
185         .open(dst.clone().as_path())
186         .unwrap();
187     file.write_all(b"create").unwrap();
188     file.flush().unwrap();
189     drop(file);
190 
191     let mut async_fsevent = fsevent::FsEvent::new(vec![]);
192     let _fsevent_ref_wrapper = if run_async {
193         let dst = dst.clone();
194         async_fsevent
195             .append_path(dst.as_path().to_str().unwrap())
196             .unwrap();
197         Some(async_fsevent.observe_async(sender).unwrap())
198     } else {
199         let dst = dst.clone();
200         let _t = thread::spawn(move || {
201             let mut fsevent = fsevent::FsEvent::new(vec![]);
202             fsevent
203                 .append_path(dst.as_path().to_str().unwrap())
204                 .unwrap();
205             fsevent.observe(sender);
206         });
207         None
208     };
209 
210     {
211         let dst = dst.clone();
212         let t3 = thread::spawn(move || {
213             thread::sleep(Duration::new(15, 0)); // Wait another 500ms after observe.
214             let mut file = OpenOptions::new()
215                 .write(true)
216                 .append(true)
217                 .open(dst.as_path())
218                 .unwrap();
219             file.write_all(b"foo").unwrap();
220             file.flush().unwrap();
221         });
222         t3.join().unwrap();
223     }
224 
225     validate_recv(
226         receiver,
227         vec![
228             (
229                 dst.to_str().unwrap().to_string(),
230                 StreamFlags::ITEM_MODIFIED | StreamFlags::ITEM_CREATED | StreamFlags::IS_FILE,
231             ),
232         ],
233     );
234 
235     match _fsevent_ref_wrapper {
236         Some(r) => async_fsevent.shutdown_observe(r),
237         None => {}
238     }
239 }
240