1 extern crate fsevent;
2 extern crate tempdir;
3 extern crate time;
4
5 use fsevent::*;
6 use std::fs;
7 use std::fs::OpenOptions;
8 use std::fs::read_link;
9 use std::io::Write;
10 use std::path::{Component, PathBuf};
11 use std::thread;
12 use std::time::Duration;
13
14 use std::sync::mpsc::{channel, Receiver};
15 use tempdir::TempDir;
16
17 const TIMEOUT_S: f64 = 5.0;
18
validate_recv(rx: Receiver<Event>, evs: Vec<(String, StreamFlags)>)19 fn validate_recv(rx: Receiver<Event>, evs: Vec<(String, StreamFlags)>) {
20 let deadline = time::precise_time_s() + TIMEOUT_S;
21 let mut evs = evs.clone();
22
23 while time::precise_time_s() < deadline {
24 if let Ok(actual) = rx.try_recv() {
25 let mut found: Option<usize> = None;
26 for i in 0..evs.len() {
27 let expected = evs.get(i).unwrap();
28 if actual.path == expected.0 && actual.flag == expected.1 {
29 found = Some(i);
30 break;
31 }
32 }
33 if let Some(i) = found {
34 evs.remove(i);
35 } else {
36 assert!(
37 false,
38 format!("actual: {:?} not found in expected: {:?}", actual, evs)
39 );
40 }
41 }
42 if evs.is_empty() {
43 break;
44 }
45 }
46 assert!(
47 evs.is_empty(),
48 "Some expected events did not occur before the test timedout:\n\t\t{:?}",
49 evs
50 );
51 }
52
53 // TODO: replace with std::fs::canonicalize rust-lang/rust#27706.
resolve_path(path: &str) -> PathBuf54 fn resolve_path(path: &str) -> PathBuf {
55 let mut out = PathBuf::new();
56 let buf = PathBuf::from(path);
57 for p in buf.components() {
58 match p {
59 Component::RootDir => out.push("/"),
60 Component::Normal(osstr) => {
61 out.push(osstr);
62 if let Ok(real) = read_link(&out) {
63 if real.is_relative() {
64 out.pop();
65 out.push(real);
66 } else {
67 out = real;
68 }
69 }
70 }
71 _ => (),
72 }
73 }
74 out
75 }
76
77 #[test]
observe_folder_sync()78 fn observe_folder_sync() {
79 internal_observe_folder(false);
80 }
81
82 #[test]
observe_folder_async()83 fn observe_folder_async() {
84 internal_observe_folder(true);
85 }
86
internal_observe_folder(run_async: bool)87 fn internal_observe_folder(run_async: bool) {
88 let dir = TempDir::new("dur").unwrap();
89 // Resolve path so we don't have to worry about affect of symlinks on the test.
90 let dst = resolve_path(dir.path().to_str().unwrap());
91
92 let mut dst1 = dst.clone();
93 dst1.push("dest1");
94
95 let ddst1 = dst1.clone();
96 fs::create_dir(dst1.as_path().to_str().unwrap()).unwrap();
97
98 let mut dst2 = dst.clone();
99
100 dst2.push("dest2");
101 let ddst2 = dst2.clone();
102 fs::create_dir(dst2.as_path().to_str().unwrap()).unwrap();
103
104 let mut dst3 = dst.clone();
105
106 dst3.push("dest3");
107 let ddst3 = dst3.clone();
108 fs::create_dir(dst3.as_path().to_str().unwrap()).unwrap();
109
110 let (sender, receiver) = channel();
111
112 let mut async_fsevent = fsevent::FsEvent::new(vec![]);
113 let fsevent_ref_wrapper = if run_async {
114 async_fsevent
115 .append_path(dst1.as_path().to_str().unwrap())
116 .unwrap();
117 async_fsevent
118 .append_path(dst2.as_path().to_str().unwrap())
119 .unwrap();
120 async_fsevent
121 .append_path(dst3.as_path().to_str().unwrap())
122 .unwrap();
123 Some(async_fsevent.observe_async(sender).unwrap())
124 } else {
125 let _t = thread::spawn(move || {
126 let mut fsevent = fsevent::FsEvent::new(vec![]);
127 fsevent
128 .append_path(dst1.as_path().to_str().unwrap())
129 .unwrap();
130 fsevent
131 .append_path(dst2.as_path().to_str().unwrap())
132 .unwrap();
133 fsevent
134 .append_path(dst3.as_path().to_str().unwrap())
135 .unwrap();
136 fsevent.observe(sender);
137 });
138 None
139 };
140
141 validate_recv(
142 receiver,
143 vec![
144 (
145 ddst1.to_str().unwrap().to_string(),
146 StreamFlags::ITEM_CREATED | StreamFlags::IS_DIR,
147 ),
148 (
149 ddst2.to_str().unwrap().to_string(),
150 StreamFlags::ITEM_CREATED | StreamFlags::IS_DIR,
151 ),
152 (
153 ddst3.to_str().unwrap().to_string(),
154 StreamFlags::ITEM_CREATED | StreamFlags::IS_DIR,
155 ),
156 ],
157 );
158
159 match fsevent_ref_wrapper {
160 Some(r) => async_fsevent.shutdown_observe(r),
161 None => {}
162 }
163 }
164
165 #[test]
validate_watch_single_file_sync()166 fn validate_watch_single_file_sync() {
167 internal_validate_watch_single_file(false);
168 }
169
170 #[test]
validate_watch_single_file_async()171 fn validate_watch_single_file_async() {
172 internal_validate_watch_single_file(true);
173 }
174
internal_validate_watch_single_file(run_async: bool)175 fn internal_validate_watch_single_file(run_async: bool) {
176 let dir = TempDir::new("dir").unwrap();
177 // Resolve path so we don't have to worry about affect of symlinks on the test.
178 let mut dst = resolve_path(dir.path().to_str().unwrap());
179 dst.push("out.txt");
180 let (sender, receiver) = channel();
181
182 let mut file = OpenOptions::new()
183 .write(true)
184 .create(true)
185 .open(dst.clone().as_path())
186 .unwrap();
187 file.write_all(b"create").unwrap();
188 file.flush().unwrap();
189 drop(file);
190
191 let mut async_fsevent = fsevent::FsEvent::new(vec![]);
192 let _fsevent_ref_wrapper = if run_async {
193 let dst = dst.clone();
194 async_fsevent
195 .append_path(dst.as_path().to_str().unwrap())
196 .unwrap();
197 Some(async_fsevent.observe_async(sender).unwrap())
198 } else {
199 let dst = dst.clone();
200 let _t = thread::spawn(move || {
201 let mut fsevent = fsevent::FsEvent::new(vec![]);
202 fsevent
203 .append_path(dst.as_path().to_str().unwrap())
204 .unwrap();
205 fsevent.observe(sender);
206 });
207 None
208 };
209
210 {
211 let dst = dst.clone();
212 let t3 = thread::spawn(move || {
213 thread::sleep(Duration::new(15, 0)); // Wait another 500ms after observe.
214 let mut file = OpenOptions::new()
215 .write(true)
216 .append(true)
217 .open(dst.as_path())
218 .unwrap();
219 file.write_all(b"foo").unwrap();
220 file.flush().unwrap();
221 });
222 t3.join().unwrap();
223 }
224
225 validate_recv(
226 receiver,
227 vec![
228 (
229 dst.to_str().unwrap().to_string(),
230 StreamFlags::ITEM_MODIFIED | StreamFlags::ITEM_CREATED | StreamFlags::IS_FILE,
231 ),
232 ],
233 );
234
235 match _fsevent_ref_wrapper {
236 Some(r) => async_fsevent.shutdown_observe(r),
237 None => {}
238 }
239 }
240