1 #![warn(clippy::all)]
2
3 //! Filesystem walk.
4 //!
5 //! - Performed in parallel using rayon
6 //! - Entries streamed in sorted order
7 //! - Custom sort/filter/skip/state
8 //!
9 //! # Example
10 //!
11 //! Recursively iterate over the "foo" directory sorting by name:
12 //!
13 //! ```no_run
14 //! # use std::io::Error;
15 //! use jwalk::{WalkDir};
16 //!
17 //! # fn try_main() -> Result<(), Error> {
18 //! for entry in WalkDir::new("foo").sort(true) {
19 //! println!("{}", entry?.path().display());
20 //! }
21 //! # Ok(())
22 //! # }
23 //! ```
24 //! # Extended Example
25 //!
26 //! This example uses the
27 //! [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
28 //! callback for custom:
29 //! 1. **Sort** Entries by name
30 //! 2. **Filter** Errors and hidden files
31 //! 3. **Skip** Content of directories at depth 2
32 //! 4. **State** Track depth `read_dir_state`. Mark first entry in each
33 //! directory with [`client_state`](struct.DirEntry.html#field.client_state)
34 //! `= true`.
35 //!
36 //! ```no_run
37 //! # use std::io::Error;
38 //! use std::cmp::Ordering;
39 //! use jwalk::{ WalkDirGeneric };
40 //!
41 //! # fn try_main() -> Result<(), Error> {
42 //! let walk_dir = WalkDirGeneric::<((usize),(bool))>::new("foo")
43 //! .process_read_dir(|read_dir_state, children| {
44 //! // 1. Custom sort
45 //! children.sort_by(|a, b| match (a, b) {
46 //! (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
47 //! (Ok(_), Err(_)) => Ordering::Less,
48 //! (Err(_), Ok(_)) => Ordering::Greater,
49 //! (Err(_), Err(_)) => Ordering::Equal,
50 //! });
51 //! // 2. Custom filter
52 //! children.retain(|dir_entry_result| {
53 //! dir_entry_result.as_ref().map(|dir_entry| {
54 //! dir_entry.file_name
55 //! .to_str()
56 //! .map(|s| s.starts_with('.'))
57 //! .unwrap_or(false)
58 //! }).unwrap_or(false)
59 //! });
60 //! // 3. Custom skip
61 //! children.iter_mut().for_each(|dir_entry_result| {
62 //! if let Ok(dir_entry) = dir_entry_result {
63 //! if dir_entry.depth == 2 {
64 //! dir_entry.read_children_path = None;
65 //! }
66 //! }
67 //! });
68 //! // 4. Custom state
69 //! *read_dir_state += 1;
70 //! children.first_mut().map(|dir_entry_result| {
71 //! if let Ok(dir_entry) = dir_entry_result {
72 //! dir_entry.client_state = true;
73 //! }
74 //! });
75 //! });
76 //!
77 //! for entry in walk_dir {
78 //! println!("{}", entry?.path().display());
79 //! }
80 //! # Ok(())
81 //! # }
82 //! ```
83 //! # Inspiration
84 //!
85 //! This crate is inspired by both [`walkdir`](https://crates.io/crates/walkdir)
86 //! and [`ignore`](https://crates.io/crates/ignore). It attempts to combine the
87 //! parallelism of `ignore` with `walkdir`'s streaming iterator API. Some code,
88 //! comments, and test are copied directly from `walkdir`.
89 //!
90 //! # Implementation
91 //!
92 //! The following structures are central to the implementation:
93 //!
94 //! ## `ReadDirSpec`
95 //!
96 //! Specification of a future `read_dir` operation. These are stored in the
97 //! `read_dir_spec_queue` in depth first order. When a rayon thread is ready for
98 //! work it pulls the first availible `ReadDirSpec` from this queue.
99 //!
100 //! ## `ReadDir`
101 //!
102 //! Result of a `read_dir` operation generated by rayon thread. These results
103 //! are stored in the `read_dir_result_queue`, also depth first ordered.
104 //!
105 //! ## `ReadDirIter`
106 //!
107 //! Pulls `ReadDir` results from the `read_dir_result_queue`. This iterator is
108 //! driven by calling thread. Results are returned in strict depth first order.
109 //!
110 //! ## `DirEntryIter`
111 //!
112 //! Wraps a `ReadDirIter` and yields individual `DirEntry` results in strict
113 //! depth first order.
114
115 mod core;
116
117 use rayon::{ThreadPool, ThreadPoolBuilder};
118 use std::cmp::Ordering;
119 use std::default::Default;
120 use std::ffi::OsStr;
121 use std::fmt::Debug;
122 use std::fs;
123 use std::path::{Path, PathBuf};
124 use std::sync::Arc;
125
126 use crate::core::{ReadDir, ReadDirSpec};
127
128 pub use crate::core::{DirEntry, DirEntryIter, Error};
129
130 /// Builder for walking a directory.
131 pub type WalkDir = WalkDirGeneric<((), ())>;
132
133 /// A specialized Result type for WalkDir.
134 pub type Result<T> = std::result::Result<T, Error>;
135
136 /// Client state maintained while performing walk.
137 ///
138 /// for state stored in DirEntry's
139 /// [`client_state`](struct.DirEntry.html#field.client_state) field.
140 ///
141 /// Client state can be stored from within the
142 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) callback.
143 /// The type of ClientState is determined by WalkDirGeneric type parameter.
144 pub trait ClientState: Send + Default + Debug + 'static {
145 type ReadDirState: Clone + Send + Default + Debug + 'static;
146 type DirEntryState: Send + Default + Debug + 'static;
147 }
148
149 /// Generic builder for walking a directory.
150 ///
151 /// [`ClientState`](trait.ClientState.html) type parameter allows you to specify
152 /// state to be stored with each DirEntry from within the
153 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir)
154 /// callback.
155 ///
156 /// Use [`WalkDir`](type.WalkDir.html) if you don't need to store client state
157 /// into yeilded DirEntries.
158 pub struct WalkDirGeneric<C: ClientState> {
159 root: PathBuf,
160 options: WalkDirOptions<C>,
161 }
162
163 type ProcessReadDirFunction<C> = dyn Fn(&mut <C as ClientState>::ReadDirState, &mut Vec<Result<DirEntry<C>>>)
164 + Send
165 + Sync
166 + 'static;
167
168 /// Degree of parallelism to use when performing walk.
169 ///
170 /// Parallelism happens at the directory level. It will help when walking deep
171 /// filesystems with many directories. It wont help when reading a single
172 /// directory with many files.
173 ///
174 /// If you plan to perform lots of per file processing you might want to use Rayon to
175 #[derive(Clone)]
176 pub enum Parallelism {
177 /// Run on calling thread
178 Serial,
179 /// Run in default rayon thread pool
180 RayonDefaultPool,
181 /// Run in existing rayon thread pool
182 RayonExistingPool(Arc<ThreadPool>),
183 /// Run in new rayon thread pool with # threads
184 RayonNewPool(usize),
185 }
186
187 struct WalkDirOptions<C: ClientState> {
188 sort: bool,
189 min_depth: usize,
190 max_depth: usize,
191 skip_hidden: bool,
192 follow_links: bool,
193 parallelism: Parallelism,
194 process_read_dir: Option<Arc<ProcessReadDirFunction<C>>>,
195 }
196
197 impl<C: ClientState> WalkDirGeneric<C> {
198 /// Create a builder for a recursive directory iterator starting at the file
199 /// path root. If root is a directory, then it is the first item yielded by
200 /// the iterator. If root is a file, then it is the first and only item
201 /// yielded by the iterator.
new<P: AsRef<Path>>(root: P) -> Self202 pub fn new<P: AsRef<Path>>(root: P) -> Self {
203 WalkDirGeneric {
204 root: root.as_ref().to_path_buf(),
205 options: WalkDirOptions {
206 sort: false,
207 min_depth: 0,
208 max_depth: ::std::usize::MAX,
209 skip_hidden: true,
210 follow_links: false,
211 parallelism: Parallelism::RayonDefaultPool,
212 process_read_dir: None,
213 },
214 }
215 }
216
217 /// Root path of the walk.
root(&self) -> &Path218 pub fn root(&self) -> &Path {
219 &self.root
220 }
221
222 /// Sort entries by `file_name` per directory. Defaults to `false`. Use
223 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) for custom
224 /// sorting or filtering.
sort(mut self, sort: bool) -> Self225 pub fn sort(mut self, sort: bool) -> Self {
226 self.options.sort = sort;
227 self
228 }
229
230 /// Skip hidden entries. Enabled by default.
skip_hidden(mut self, skip_hidden: bool) -> Self231 pub fn skip_hidden(mut self, skip_hidden: bool) -> Self {
232 self.options.skip_hidden = skip_hidden;
233 self
234 }
235
236 /// Follow symbolic links. By default, this is disabled.
237 ///
238 /// When `yes` is `true`, symbolic links are followed as if they were normal
239 /// directories and files. If a symbolic link is broken or is involved in a
240 /// loop, an error is yielded.
241 ///
242 /// When enabled, the yielded [`DirEntry`] values represent the target of
243 /// the link while the path corresponds to the link. See the [`DirEntry`]
244 /// type for more details.
245 ///
246 /// [`DirEntry`]: struct.DirEntry.html
follow_links(mut self, follow_links: bool) -> Self247 pub fn follow_links(mut self, follow_links: bool) -> Self {
248 self.options.follow_links = follow_links;
249 self
250 }
251
252 /// Set the minimum depth of entries yielded by the iterator.
253 ///
254 /// The smallest depth is `0` and always corresponds to the path given
255 /// to the `new` function on this type. Its direct descendents have depth
256 /// `1`, and their descendents have depth `2`, and so on.
min_depth(mut self, depth: usize) -> Self257 pub fn min_depth(mut self, depth: usize) -> Self {
258 self.options.min_depth = depth;
259 if self.options.min_depth > self.options.max_depth {
260 self.options.min_depth = self.options.max_depth;
261 }
262 self
263 }
264
265 /// Set the maximum depth of entries yield by the iterator.
266 ///
267 /// The smallest depth is `0` and always corresponds to the path given
268 /// to the `new` function on this type. Its direct descendents have depth
269 /// `1`, and their descendents have depth `2`, and so on.
270 ///
271 /// A depth < 2 will automatically change `parallelism` to
272 /// `Parallelism::Serial`. Parrallelism happens at the `fs::read_dir` level.
273 /// It only makes sense to use multiple threads when reading more then one
274 /// directory.
275 ///
276 /// Note that this will not simply filter the entries of the iterator, but
277 /// it will actually avoid descending into directories when the depth is
278 /// exceeded.
max_depth(mut self, depth: usize) -> Self279 pub fn max_depth(mut self, depth: usize) -> Self {
280 self.options.max_depth = depth;
281 if self.options.max_depth < self.options.min_depth {
282 self.options.max_depth = self.options.min_depth;
283 }
284 if self.options.max_depth < 2 {
285 self.options.parallelism = Parallelism::Serial;
286 }
287 self
288 }
289
290 /// Degree of parallelism to use when performing walk. Defaults to
291 /// [`Parallelism::RayonDefaultPool`](enum.Parallelism.html#variant.RayonDefaultPool).
parallelism(mut self, parallelism: Parallelism) -> Self292 pub fn parallelism(mut self, parallelism: Parallelism) -> Self {
293 self.options.parallelism = parallelism;
294 self
295 }
296
297 /// A callback function to process (sort/filter/skip/state) each directory
298 /// of entries before they are yielded. Modify the given array to
299 /// sort/filter entries. Use [`entry.read_children_path =
300 /// None`](struct.DirEntry.html#field.read_children_path) to yield a
301 /// directory entry but skip reading its contents. Use
302 /// [`entry.client_state`](struct.DirEntry.html#field.client_state)
303 /// to store custom state with an entry.
process_read_dir<F>(mut self, process_by: F) -> Self where F: Fn(&mut C::ReadDirState, &mut Vec<Result<DirEntry<C>>>) + Send + Sync + 'static,304 pub fn process_read_dir<F>(mut self, process_by: F) -> Self
305 where
306 F: Fn(&mut C::ReadDirState, &mut Vec<Result<DirEntry<C>>>) + Send + Sync + 'static,
307 {
308 self.options.process_read_dir = Some(Arc::new(process_by));
309 self
310 }
311 }
312
process_dir_entry_result<C: ClientState>( dir_entry_result: Result<DirEntry<C>>, follow_links: bool, ) -> Result<DirEntry<C>>313 fn process_dir_entry_result<C: ClientState>(
314 dir_entry_result: Result<DirEntry<C>>,
315 follow_links: bool,
316 ) -> Result<DirEntry<C>> {
317 match dir_entry_result {
318 Ok(mut dir_entry) => {
319 if follow_links && dir_entry.file_type.is_symlink() {
320 dir_entry = dir_entry.follow_symlink()?;
321 }
322
323 if dir_entry.depth == 0 && dir_entry.file_type.is_symlink() {
324 // As a special case, if we are processing a root entry, then we
325 // always follow it even if it's a symlink and follow_linzks is
326 // false. We are careful to not let this change the semantics of
327 // the DirEntry however. Namely, the DirEntry should still
328 // respect the follow_links setting. When it's disabled, it
329 // should report itself as a symlink. When it's enabled, it
330 // should always report itself as the target.
331 let metadata = fs::metadata(dir_entry.path())
332 .map_err(|err| Error::from_path(0, dir_entry.path(), err))?;
333 if metadata.file_type().is_dir() {
334 dir_entry.read_children_path = Some(Arc::from(dir_entry.path()));
335 }
336 }
337
338 Ok(dir_entry)
339 }
340 Err(err) => Err(err),
341 }
342 }
343
344 impl<C: ClientState> IntoIterator for WalkDirGeneric<C> {
345 type Item = Result<DirEntry<C>>;
346 type IntoIter = DirEntryIter<C>;
347
into_iter(self) -> DirEntryIter<C>348 fn into_iter(self) -> DirEntryIter<C> {
349 let sort = self.options.sort;
350 let max_depth = self.options.max_depth;
351 let min_depth = self.options.min_depth;
352 let parallelism = self.options.parallelism;
353 let skip_hidden = self.options.skip_hidden;
354 let follow_links = self.options.follow_links;
355 let process_read_dir = self.options.process_read_dir.clone();
356 let follow_link_ancestors = if follow_links {
357 Arc::new(vec![Arc::from(self.root.clone()) as Arc<Path>])
358 } else {
359 Arc::new(vec![])
360 };
361
362 let mut root_entry_results = vec![process_dir_entry_result(
363 DirEntry::from_path(0, &self.root, false, follow_link_ancestors),
364 follow_links,
365 )];
366
367 if let Some(process_read_dir) = process_read_dir.as_ref() {
368 process_read_dir(&mut C::ReadDirState::default(), &mut root_entry_results);
369 }
370
371 DirEntryIter::new(
372 root_entry_results,
373 parallelism,
374 min_depth,
375 Arc::new(move |read_dir_spec| {
376 let ReadDirSpec {
377 path,
378 depth,
379 mut client_read_state,
380 mut follow_link_ancestors,
381 } = read_dir_spec;
382
383 let depth = depth + 1;
384
385 if depth > max_depth {
386 return Ok(ReadDir::new(client_read_state, Vec::new()));
387 }
388
389 follow_link_ancestors = if follow_links {
390 let mut ancestors = Vec::with_capacity(follow_link_ancestors.len() + 1);
391 ancestors.extend(follow_link_ancestors.iter().cloned());
392 ancestors.push(path.clone());
393 Arc::new(ancestors)
394 } else {
395 follow_link_ancestors
396 };
397
398 let mut dir_entry_results: Vec<_> = fs::read_dir(path.as_ref())
399 .map_err(|err| Error::from_path(0, path.to_path_buf(), err))?
400 .filter_map(|dir_entry_result| {
401 let fs_dir_entry = match dir_entry_result {
402 Ok(fs_dir_entry) => fs_dir_entry,
403 Err(err) => return Some(Err(Error::from_io(depth, err))),
404 };
405
406 let dir_entry = match DirEntry::from_entry(
407 depth,
408 path.clone(),
409 &fs_dir_entry,
410 follow_link_ancestors.clone(),
411 ) {
412 Ok(dir_entry) => dir_entry,
413 Err(err) => return Some(Err(err)),
414 };
415
416 if skip_hidden && is_hidden(&dir_entry.file_name) {
417 return None;
418 }
419
420 Some(process_dir_entry_result(Ok(dir_entry), follow_links))
421 })
422 .collect();
423
424 if sort {
425 dir_entry_results.sort_by(|a, b| match (a, b) {
426 (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name),
427 (Ok(_), Err(_)) => Ordering::Less,
428 (Err(_), Ok(_)) => Ordering::Greater,
429 (Err(_), Err(_)) => Ordering::Equal,
430 });
431 }
432
433 if let Some(process_read_dir) = process_read_dir.as_ref() {
434 process_read_dir(&mut client_read_state, &mut dir_entry_results);
435 }
436
437 Ok(ReadDir::new(client_read_state, dir_entry_results))
438 }),
439 )
440 }
441 }
442
443 impl<C: ClientState> Clone for WalkDirOptions<C> {
clone(&self) -> WalkDirOptions<C>444 fn clone(&self) -> WalkDirOptions<C> {
445 WalkDirOptions {
446 sort: false,
447 min_depth: self.min_depth,
448 max_depth: self.max_depth,
449 skip_hidden: self.skip_hidden,
450 follow_links: self.follow_links,
451 parallelism: self.parallelism.clone(),
452 process_read_dir: self.process_read_dir.clone(),
453 }
454 }
455 }
456
457 impl Parallelism {
install<OP>(&self, op: OP) where OP: FnOnce() -> () + Send + 'static,458 pub(crate) fn install<OP>(&self, op: OP)
459 where
460 OP: FnOnce() -> () + Send + 'static,
461 {
462 match self {
463 Parallelism::Serial => op(),
464 Parallelism::RayonDefaultPool => rayon::spawn(op),
465 Parallelism::RayonNewPool(num_threads) => {
466 let mut thread_pool = ThreadPoolBuilder::new();
467 if *num_threads > 0 {
468 thread_pool = thread_pool.num_threads(*num_threads);
469 }
470 if let Ok(thread_pool) = thread_pool.build() {
471 thread_pool.install(op);
472 } else {
473 rayon::spawn(op);
474 }
475 }
476 Parallelism::RayonExistingPool(thread_pool) => thread_pool.install(op),
477 }
478 }
479 }
480
is_hidden(file_name: &OsStr) -> bool481 fn is_hidden(file_name: &OsStr) -> bool {
482 file_name
483 .to_str()
484 .map(|s| s.starts_with('.'))
485 .unwrap_or(false)
486 }
487
488 impl<B, E> ClientState for (B, E)
489 where
490 B: Clone + Send + Default + Debug + 'static,
491 E: Send + Default + Debug + 'static,
492 {
493 type ReadDirState = B;
494 type DirEntryState = E;
495 }
496