1 #![warn(clippy::all)] 2 3 //! Filesystem walk. 4 //! 5 //! - Performed in parallel using rayon 6 //! - Entries streamed in sorted order 7 //! - Custom sort/filter/skip/state 8 //! 9 //! # Example 10 //! 11 //! Recursively iterate over the "foo" directory sorting by name: 12 //! 13 //! ```no_run 14 //! # use std::io::Error; 15 //! use jwalk::{WalkDir}; 16 //! 17 //! # fn try_main() -> Result<(), Error> { 18 //! for entry in WalkDir::new("foo").sort(true) { 19 //! println!("{}", entry?.path().display()); 20 //! } 21 //! # Ok(()) 22 //! # } 23 //! ``` 24 //! # Extended Example 25 //! 26 //! This example uses the 27 //! [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) 28 //! callback for custom: 29 //! 1. **Sort** Entries by name 30 //! 2. **Filter** Errors and hidden files 31 //! 3. **Skip** Content of directories at depth 2 32 //! 4. **State** Track depth `read_dir_state`. Mark first entry in each 33 //! directory with [`client_state`](struct.DirEntry.html#field.client_state) 34 //! `= true`. 35 //! 36 //! ```no_run 37 //! # use std::io::Error; 38 //! use std::cmp::Ordering; 39 //! use jwalk::{ WalkDirGeneric }; 40 //! 41 //! # fn try_main() -> Result<(), Error> { 42 //! let walk_dir = WalkDirGeneric::<((usize),(bool))>::new("foo") 43 //! .process_read_dir(|depth, path, read_dir_state, children| { 44 //! // 1. Custom sort 45 //! children.sort_by(|a, b| match (a, b) { 46 //! (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name), 47 //! (Ok(_), Err(_)) => Ordering::Less, 48 //! (Err(_), Ok(_)) => Ordering::Greater, 49 //! (Err(_), Err(_)) => Ordering::Equal, 50 //! }); 51 //! // 2. Custom filter 52 //! children.retain(|dir_entry_result| { 53 //! dir_entry_result.as_ref().map(|dir_entry| { 54 //! dir_entry.file_name 55 //! .to_str() 56 //! .map(|s| s.starts_with('.')) 57 //! .unwrap_or(false) 58 //! }).unwrap_or(false) 59 //! }); 60 //! // 3. Custom skip 61 //! children.iter_mut().for_each(|dir_entry_result| { 62 //! if let Ok(dir_entry) = dir_entry_result { 63 //! if dir_entry.depth == 2 { 64 //! dir_entry.read_children_path = None; 65 //! } 66 //! } 67 //! }); 68 //! // 4. Custom state 69 //! *read_dir_state += 1; 70 //! children.first_mut().map(|dir_entry_result| { 71 //! if let Ok(dir_entry) = dir_entry_result { 72 //! dir_entry.client_state = true; 73 //! } 74 //! }); 75 //! }); 76 //! 77 //! for entry in walk_dir { 78 //! println!("{}", entry?.path().display()); 79 //! } 80 //! # Ok(()) 81 //! # } 82 //! ``` 83 //! # Inspiration 84 //! 85 //! This crate is inspired by both [`walkdir`](https://crates.io/crates/walkdir) 86 //! and [`ignore`](https://crates.io/crates/ignore). It attempts to combine the 87 //! parallelism of `ignore` with `walkdir`'s streaming iterator API. Some code, 88 //! comments, and test are copied directly from `walkdir`. 89 //! 90 //! # Implementation 91 //! 92 //! The following structures are central to the implementation: 93 //! 94 //! ## `ReadDirSpec` 95 //! 96 //! Specification of a future `read_dir` operation. These are stored in the 97 //! `read_dir_spec_queue` in depth first order. When a rayon thread is ready for 98 //! work it pulls the first availible `ReadDirSpec` from this queue. 99 //! 100 //! ## `ReadDir` 101 //! 102 //! Result of a `read_dir` operation generated by rayon thread. These results 103 //! are stored in the `read_dir_result_queue`, also depth first ordered. 104 //! 105 //! ## `ReadDirIter` 106 //! 107 //! Pulls `ReadDir` results from the `read_dir_result_queue`. This iterator is 108 //! driven by calling thread. Results are returned in strict depth first order. 109 //! 110 //! ## `DirEntryIter` 111 //! 112 //! Wraps a `ReadDirIter` and yields individual `DirEntry` results in strict 113 //! depth first order. 114 115 mod core; 116 117 use rayon::{ThreadPool, ThreadPoolBuilder}; 118 use std::cmp::Ordering; 119 use std::default::Default; 120 use std::ffi::OsStr; 121 use std::fmt::Debug; 122 use std::fs; 123 use std::path::{Path, PathBuf}; 124 use std::sync::Arc; 125 126 use crate::core::{ReadDir, ReadDirSpec}; 127 128 pub use crate::core::{DirEntry, DirEntryIter, Error}; 129 130 /// Builder for walking a directory. 131 pub type WalkDir = WalkDirGeneric<((), ())>; 132 133 /// A specialized Result type for WalkDir. 134 pub type Result<T> = std::result::Result<T, Error>; 135 136 /// Client state maintained while performing walk. 137 /// 138 /// for state stored in DirEntry's 139 /// [`client_state`](struct.DirEntry.html#field.client_state) field. 140 /// 141 /// Client state can be stored from within the 142 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) callback. 143 /// The type of ClientState is determined by WalkDirGeneric type parameter. 144 pub trait ClientState: Send + Default + Debug + 'static { 145 type ReadDirState: Clone + Send + Default + Debug + 'static; 146 type DirEntryState: Send + Default + Debug + 'static; 147 } 148 149 /// Generic builder for walking a directory. 150 /// 151 /// [`ClientState`](trait.ClientState.html) type parameter allows you to specify 152 /// state to be stored with each DirEntry from within the 153 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) 154 /// callback. 155 /// 156 /// Use [`WalkDir`](type.WalkDir.html) if you don't need to store client state 157 /// into yeilded DirEntries. 158 pub struct WalkDirGeneric<C: ClientState> { 159 root: PathBuf, 160 options: WalkDirOptions<C>, 161 } 162 163 type ProcessReadDirFunction<C> = dyn Fn(Option<usize>, &Path, &mut <C as ClientState>::ReadDirState, &mut Vec<Result<DirEntry<C>>>) 164 + Send 165 + Sync 166 + 'static; 167 168 /// Degree of parallelism to use when performing walk. 169 /// 170 /// Parallelism happens at the directory level. It will help when walking deep 171 /// filesystems with many directories. It wont help when reading a single 172 /// directory with many files. 173 /// 174 /// If you plan to perform lots of per file processing you might want to use Rayon to 175 #[derive(Clone)] 176 pub enum Parallelism { 177 /// Run on calling thread 178 Serial, 179 /// Run in default rayon thread pool 180 RayonDefaultPool, 181 /// Run in existing rayon thread pool 182 RayonExistingPool(Arc<ThreadPool>), 183 /// Run in new rayon thread pool with # threads 184 RayonNewPool(usize), 185 } 186 187 struct WalkDirOptions<C: ClientState> { 188 sort: bool, 189 min_depth: usize, 190 max_depth: usize, 191 skip_hidden: bool, 192 follow_links: bool, 193 parallelism: Parallelism, 194 root_read_dir_state: C::ReadDirState, 195 process_read_dir: Option<Arc<ProcessReadDirFunction<C>>>, 196 } 197 198 impl<C: ClientState> WalkDirGeneric<C> { 199 /// Create a builder for a recursive directory iterator starting at the file 200 /// path root. If root is a directory, then it is the first item yielded by 201 /// the iterator. If root is a file, then it is the first and only item 202 /// yielded by the iterator. 203 pub fn new<P: AsRef<Path>>(root: P) -> Self { 204 WalkDirGeneric { 205 root: root.as_ref().to_path_buf(), 206 options: WalkDirOptions { 207 sort: false, 208 min_depth: 0, 209 max_depth: ::std::usize::MAX, 210 skip_hidden: true, 211 follow_links: false, 212 parallelism: Parallelism::RayonDefaultPool, 213 root_read_dir_state: C::ReadDirState::default(), 214 process_read_dir: None, 215 }, 216 } 217 } 218 219 /// Root path of the walk. 220 pub fn root(&self) -> &Path { 221 &self.root 222 } 223 224 /// Sort entries by `file_name` per directory. Defaults to `false`. Use 225 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) for custom 226 /// sorting or filtering. 227 pub fn sort(mut self, sort: bool) -> Self { 228 self.options.sort = sort; 229 self 230 } 231 232 /// Skip hidden entries. Enabled by default. 233 pub fn skip_hidden(mut self, skip_hidden: bool) -> Self { 234 self.options.skip_hidden = skip_hidden; 235 self 236 } 237 238 /// Follow symbolic links. By default, this is disabled. 239 /// 240 /// When `yes` is `true`, symbolic links are followed as if they were normal 241 /// directories and files. If a symbolic link is broken or is involved in a 242 /// loop, an error is yielded. 243 /// 244 /// When enabled, the yielded [`DirEntry`] values represent the target of 245 /// the link while the path corresponds to the link. See the [`DirEntry`] 246 /// type for more details. 247 /// 248 /// [`DirEntry`]: struct.DirEntry.html 249 pub fn follow_links(mut self, follow_links: bool) -> Self { 250 self.options.follow_links = follow_links; 251 self 252 } 253 254 /// Set the minimum depth of entries yielded by the iterator. 255 /// 256 /// The smallest depth is `0` and always corresponds to the path given 257 /// to the `new` function on this type. Its direct descendents have depth 258 /// `1`, and their descendents have depth `2`, and so on. 259 pub fn min_depth(mut self, depth: usize) -> Self { 260 self.options.min_depth = depth; 261 if self.options.min_depth > self.options.max_depth { 262 self.options.min_depth = self.options.max_depth; 263 } 264 self 265 } 266 267 /// Set the maximum depth of entries yield by the iterator. 268 /// 269 /// The smallest depth is `0` and always corresponds to the path given 270 /// to the `new` function on this type. Its direct descendents have depth 271 /// `1`, and their descendents have depth `2`, and so on. 272 /// 273 /// A depth < 2 will automatically change `parallelism` to 274 /// `Parallelism::Serial`. Parrallelism happens at the `fs::read_dir` level. 275 /// It only makes sense to use multiple threads when reading more then one 276 /// directory. 277 /// 278 /// Note that this will not simply filter the entries of the iterator, but 279 /// it will actually avoid descending into directories when the depth is 280 /// exceeded. 281 pub fn max_depth(mut self, depth: usize) -> Self { 282 self.options.max_depth = depth; 283 if self.options.max_depth < self.options.min_depth { 284 self.options.max_depth = self.options.min_depth; 285 } 286 if self.options.max_depth < 2 { 287 self.options.parallelism = Parallelism::Serial; 288 } 289 self 290 } 291 292 /// Degree of parallelism to use when performing walk. Defaults to 293 /// [`Parallelism::RayonDefaultPool`](enum.Parallelism.html#variant.RayonDefaultPool). 294 pub fn parallelism(mut self, parallelism: Parallelism) -> Self { 295 self.options.parallelism = parallelism; 296 self 297 } 298 299 /// Initial ClientState::ReadDirState that is passed to 300 /// [`process_read_dir`](struct.WalkDirGeneric.html#method.process_read_dir) 301 /// when processing root. Defaults to ClientState::ReadDirState::default(). 302 pub fn root_read_dir_state(mut self, read_dir_state: C::ReadDirState) -> Self { 303 self.options.root_read_dir_state = read_dir_state; 304 self 305 } 306 307 /// A callback function to process (sort/filter/skip/state) each directory 308 /// of entries before they are yielded. Modify the given array to 309 /// sort/filter entries. Use [`entry.read_children_path = 310 /// None`](struct.DirEntry.html#field.read_children_path) to yield a 311 /// directory entry but skip reading its contents. Use 312 /// [`entry.client_state`](struct.DirEntry.html#field.client_state) 313 /// to store custom state with an entry. 314 pub fn process_read_dir<F>(mut self, process_by: F) -> Self 315 where 316 F: Fn(Option<usize>, &Path, &mut C::ReadDirState, &mut Vec<Result<DirEntry<C>>>) 317 + Send 318 + Sync 319 + 'static, 320 { 321 self.options.process_read_dir = Some(Arc::new(process_by)); 322 self 323 } 324 } 325 326 fn process_dir_entry_result<C: ClientState>( 327 dir_entry_result: Result<DirEntry<C>>, 328 follow_links: bool, 329 ) -> Result<DirEntry<C>> { 330 match dir_entry_result { 331 Ok(mut dir_entry) => { 332 if follow_links && dir_entry.file_type.is_symlink() { 333 dir_entry = dir_entry.follow_symlink()?; 334 } 335 336 if dir_entry.depth == 0 && dir_entry.file_type.is_symlink() { 337 // As a special case, if we are processing a root entry, then we 338 // always follow it even if it's a symlink and follow_links is 339 // false. We are careful to not let this change the semantics of 340 // the DirEntry however. Namely, the DirEntry should still 341 // respect the follow_links setting. When it's disabled, it 342 // should report itself as a symlink. When it's enabled, it 343 // should always report itself as the target. 344 let metadata = fs::metadata(dir_entry.path()) 345 .map_err(|err| Error::from_path(0, dir_entry.path(), err))?; 346 if metadata.file_type().is_dir() { 347 dir_entry.read_children_path = Some(Arc::from(dir_entry.path())); 348 } 349 } 350 351 Ok(dir_entry) 352 } 353 Err(err) => Err(err), 354 } 355 } 356 357 impl<C: ClientState> IntoIterator for WalkDirGeneric<C> { 358 type Item = Result<DirEntry<C>>; 359 type IntoIter = DirEntryIter<C>; 360 361 fn into_iter(self) -> DirEntryIter<C> { 362 let sort = self.options.sort; 363 let max_depth = self.options.max_depth; 364 let min_depth = self.options.min_depth; 365 let parallelism = self.options.parallelism; 366 let skip_hidden = self.options.skip_hidden; 367 let follow_links = self.options.follow_links; 368 let process_read_dir = self.options.process_read_dir.clone(); 369 let mut root_read_dir_state = self.options.root_read_dir_state; 370 let follow_link_ancestors = if follow_links { 371 Arc::new(vec![Arc::from(self.root.clone()) as Arc<Path>]) 372 } else { 373 Arc::new(vec![]) 374 }; 375 376 let root_entry = DirEntry::from_path(0, &self.root, false, follow_link_ancestors); 377 let root_parent_path = root_entry 378 .as_ref() 379 .map(|root| root.parent_path().to_owned()) 380 .unwrap_or_default(); 381 let mut root_entry_results = vec![process_dir_entry_result(root_entry, follow_links)]; 382 if let Some(process_read_dir) = process_read_dir.as_ref() { 383 process_read_dir( 384 None, 385 &root_parent_path, 386 &mut root_read_dir_state, 387 &mut root_entry_results, 388 ); 389 } 390 391 DirEntryIter::new( 392 root_entry_results, 393 parallelism, 394 min_depth, 395 root_read_dir_state.clone(), 396 Arc::new(move |read_dir_spec| { 397 let ReadDirSpec { 398 path, 399 depth, 400 mut client_read_state, 401 mut follow_link_ancestors, 402 } = read_dir_spec; 403 404 let read_dir_depth = depth; 405 let read_dir_contents_depth = depth + 1; 406 407 if read_dir_contents_depth > max_depth { 408 return Ok(ReadDir::new(client_read_state, Vec::new())); 409 } 410 411 follow_link_ancestors = if follow_links { 412 let mut ancestors = Vec::with_capacity(follow_link_ancestors.len() + 1); 413 ancestors.extend(follow_link_ancestors.iter().cloned()); 414 ancestors.push(path.clone()); 415 Arc::new(ancestors) 416 } else { 417 follow_link_ancestors 418 }; 419 420 let mut dir_entry_results: Vec<_> = fs::read_dir(path.as_ref()) 421 .map_err(|err| Error::from_path(0, path.to_path_buf(), err))? 422 .filter_map(|dir_entry_result| { 423 let fs_dir_entry = match dir_entry_result { 424 Ok(fs_dir_entry) => fs_dir_entry, 425 Err(err) => { 426 return Some(Err(Error::from_io(read_dir_contents_depth, err))) 427 } 428 }; 429 430 let dir_entry = match DirEntry::from_entry( 431 read_dir_contents_depth, 432 path.clone(), 433 &fs_dir_entry, 434 follow_link_ancestors.clone(), 435 ) { 436 Ok(dir_entry) => dir_entry, 437 Err(err) => return Some(Err(err)), 438 }; 439 440 if skip_hidden && is_hidden(&dir_entry.file_name) { 441 return None; 442 } 443 444 Some(process_dir_entry_result(Ok(dir_entry), follow_links)) 445 }) 446 .collect(); 447 448 if sort { 449 dir_entry_results.sort_by(|a, b| match (a, b) { 450 (Ok(a), Ok(b)) => a.file_name.cmp(&b.file_name), 451 (Ok(_), Err(_)) => Ordering::Less, 452 (Err(_), Ok(_)) => Ordering::Greater, 453 (Err(_), Err(_)) => Ordering::Equal, 454 }); 455 } 456 457 if let Some(process_read_dir) = process_read_dir.as_ref() { 458 process_read_dir( 459 Some(read_dir_depth), 460 path.as_ref(), 461 &mut client_read_state, 462 &mut dir_entry_results, 463 ); 464 } 465 466 Ok(ReadDir::new(client_read_state, dir_entry_results)) 467 }), 468 ) 469 } 470 } 471 472 impl<C: ClientState> Clone for WalkDirOptions<C> { 473 fn clone(&self) -> WalkDirOptions<C> { 474 WalkDirOptions { 475 sort: false, 476 min_depth: self.min_depth, 477 max_depth: self.max_depth, 478 skip_hidden: self.skip_hidden, 479 follow_links: self.follow_links, 480 parallelism: self.parallelism.clone(), 481 root_read_dir_state: self.root_read_dir_state.clone(), 482 process_read_dir: self.process_read_dir.clone(), 483 } 484 } 485 } 486 487 impl Parallelism { 488 pub(crate) fn install<OP>(&self, op: OP) 489 where 490 OP: FnOnce() -> () + Send + 'static, 491 { 492 match self { 493 Parallelism::Serial => op(), 494 Parallelism::RayonDefaultPool => rayon::spawn(op), 495 Parallelism::RayonNewPool(num_threads) => { 496 let mut thread_pool = ThreadPoolBuilder::new(); 497 if *num_threads > 0 { 498 thread_pool = thread_pool.num_threads(*num_threads); 499 } 500 if let Ok(thread_pool) = thread_pool.build() { 501 thread_pool.install(op); 502 } else { 503 rayon::spawn(op); 504 } 505 } 506 Parallelism::RayonExistingPool(thread_pool) => thread_pool.install(op), 507 } 508 } 509 } 510 511 fn is_hidden(file_name: &OsStr) -> bool { 512 file_name 513 .to_str() 514 .map(|s| s.starts_with('.')) 515 .unwrap_or(false) 516 } 517 518 impl<B, E> ClientState for (B, E) 519 where 520 B: Clone + Send + Default + Debug + 'static, 521 E: Send + Default + Debug + 'static, 522 { 523 type ReadDirState = B; 524 type DirEntryState = E; 525 } 526