1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::mutex::MutexGuard; 9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::{deadlock, util}; 11 use core::{ 12 fmt, ptr, 13 sync::atomic::{AtomicPtr, Ordering}, 14 }; 15 use lock_api::RawMutex as RawMutex_; 16 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; 17 use std::time::{Duration, Instant}; 18 19 /// A type indicating whether a timed wait on a condition variable returned 20 /// due to a time out or not. 21 #[derive(Debug, PartialEq, Eq, Copy, Clone)] 22 pub struct WaitTimeoutResult(bool); 23 24 impl WaitTimeoutResult { 25 /// Returns whether the wait was known to have timed out. 26 #[inline] timed_out(self) -> bool27 pub fn timed_out(self) -> bool { 28 self.0 29 } 30 } 31 32 /// A Condition Variable 33 /// 34 /// Condition variables represent the ability to block a thread such that it 35 /// consumes no CPU time while waiting for an event to occur. Condition 36 /// variables are typically associated with a boolean predicate (a condition) 37 /// and a mutex. The predicate is always verified inside of the mutex before 38 /// determining that thread must block. 39 /// 40 /// Note that this module places one additional restriction over the system 41 /// condition variables: each condvar can be used with only one mutex at a 42 /// time. Any attempt to use multiple mutexes on the same condition variable 43 /// simultaneously will result in a runtime panic. However it is possible to 44 /// switch to a different mutex if there are no threads currently waiting on 45 /// the condition variable. 46 /// 47 /// # Differences from the standard library `Condvar` 48 /// 49 /// - No spurious wakeups: A wait will only return a non-timeout result if it 50 /// was woken up by `notify_one` or `notify_all`. 51 /// - `Condvar::notify_all` will only wake up a single thread, the rest are 52 /// requeued to wait for the `Mutex` to be unlocked by the thread that was 53 /// woken up. 54 /// - Only requires 1 word of space, whereas the standard library boxes the 55 /// `Condvar` due to platform limitations. 56 /// - Can be statically constructed (requires the `const_fn` nightly feature). 57 /// - Does not require any drop glue when dropped. 58 /// - Inline fast path for the uncontended case. 59 /// 60 /// # Examples 61 /// 62 /// ``` 63 /// use parking_lot::{Mutex, Condvar}; 64 /// use std::sync::Arc; 65 /// use std::thread; 66 /// 67 /// let pair = Arc::new((Mutex::new(false), Condvar::new())); 68 /// let pair2 = pair.clone(); 69 /// 70 /// // Inside of our lock, spawn a new thread, and then wait for it to start 71 /// thread::spawn(move|| { 72 /// let &(ref lock, ref cvar) = &*pair2; 73 /// let mut started = lock.lock(); 74 /// *started = true; 75 /// cvar.notify_one(); 76 /// }); 77 /// 78 /// // wait for the thread to start up 79 /// let &(ref lock, ref cvar) = &*pair; 80 /// let mut started = lock.lock(); 81 /// if !*started { 82 /// cvar.wait(&mut started); 83 /// } 84 /// // Note that we used an if instead of a while loop above. This is only 85 /// // possible because parking_lot's Condvar will never spuriously wake up. 86 /// // This means that wait() will only return after notify_one or notify_all is 87 /// // called. 88 /// ``` 89 pub struct Condvar { 90 state: AtomicPtr<RawMutex>, 91 } 92 93 impl Condvar { 94 /// Creates a new condition variable which is ready to be waited on and 95 /// notified. 96 #[inline] new() -> Condvar97 pub const fn new() -> Condvar { 98 Condvar { 99 state: AtomicPtr::new(ptr::null_mut()), 100 } 101 } 102 103 /// Wakes up one blocked thread on this condvar. 104 /// 105 /// Returns whether a thread was woken up. 106 /// 107 /// If there is a blocked thread on this condition variable, then it will 108 /// be woken up from its call to `wait` or `wait_timeout`. Calls to 109 /// `notify_one` are not buffered in any way. 110 /// 111 /// To wake up all threads, see `notify_all()`. 112 /// 113 /// # Examples 114 /// 115 /// ``` 116 /// use parking_lot::Condvar; 117 /// 118 /// let condvar = Condvar::new(); 119 /// 120 /// // do something with condvar, share it with other threads 121 /// 122 /// if !condvar.notify_one() { 123 /// println!("Nobody was listening for this."); 124 /// } 125 /// ``` 126 #[inline] notify_one(&self) -> bool127 pub fn notify_one(&self) -> bool { 128 // Nothing to do if there are no waiting threads 129 let state = self.state.load(Ordering::Relaxed); 130 if state.is_null() { 131 return false; 132 } 133 134 self.notify_one_slow(state) 135 } 136 137 #[cold] notify_one_slow(&self, mutex: *mut RawMutex) -> bool138 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { 139 unsafe { 140 // Unpark one thread and requeue the rest onto the mutex 141 let from = self as *const _ as usize; 142 let to = mutex as usize; 143 let validate = || { 144 // Make sure that our atomic state still points to the same 145 // mutex. If not then it means that all threads on the current 146 // mutex were woken up and a new waiting thread switched to a 147 // different mutex. In that case we can get away with doing 148 // nothing. 149 if self.state.load(Ordering::Relaxed) != mutex { 150 return RequeueOp::Abort; 151 } 152 153 // Unpark one thread if the mutex is unlocked, otherwise just 154 // requeue everything to the mutex. This is safe to do here 155 // since unlocking the mutex when the parked bit is set requires 156 // locking the queue. There is the possibility of a race if the 157 // mutex gets locked after we check, but that doesn't matter in 158 // this case. 159 if (*mutex).mark_parked_if_locked() { 160 RequeueOp::RequeueOne 161 } else { 162 RequeueOp::UnparkOne 163 } 164 }; 165 let callback = |_op, result: UnparkResult| { 166 // Clear our state if there are no more waiting threads 167 if !result.have_more_threads { 168 self.state.store(ptr::null_mut(), Ordering::Relaxed); 169 } 170 TOKEN_NORMAL 171 }; 172 let res = parking_lot_core::unpark_requeue(from, to, validate, callback); 173 174 res.unparked_threads + res.requeued_threads != 0 175 } 176 } 177 178 /// Wakes up all blocked threads on this condvar. 179 /// 180 /// Returns the number of threads woken up. 181 /// 182 /// This method will ensure that any current waiters on the condition 183 /// variable are awoken. Calls to `notify_all()` are not buffered in any 184 /// way. 185 /// 186 /// To wake up only one thread, see `notify_one()`. 187 #[inline] notify_all(&self) -> usize188 pub fn notify_all(&self) -> usize { 189 // Nothing to do if there are no waiting threads 190 let state = self.state.load(Ordering::Relaxed); 191 if state.is_null() { 192 return 0; 193 } 194 195 self.notify_all_slow(state) 196 } 197 198 #[cold] notify_all_slow(&self, mutex: *mut RawMutex) -> usize199 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { 200 unsafe { 201 // Unpark one thread and requeue the rest onto the mutex 202 let from = self as *const _ as usize; 203 let to = mutex as usize; 204 let validate = || { 205 // Make sure that our atomic state still points to the same 206 // mutex. If not then it means that all threads on the current 207 // mutex were woken up and a new waiting thread switched to a 208 // different mutex. In that case we can get away with doing 209 // nothing. 210 if self.state.load(Ordering::Relaxed) != mutex { 211 return RequeueOp::Abort; 212 } 213 214 // Clear our state since we are going to unpark or requeue all 215 // threads. 216 self.state.store(ptr::null_mut(), Ordering::Relaxed); 217 218 // Unpark one thread if the mutex is unlocked, otherwise just 219 // requeue everything to the mutex. This is safe to do here 220 // since unlocking the mutex when the parked bit is set requires 221 // locking the queue. There is the possibility of a race if the 222 // mutex gets locked after we check, but that doesn't matter in 223 // this case. 224 if (*mutex).mark_parked_if_locked() { 225 RequeueOp::RequeueAll 226 } else { 227 RequeueOp::UnparkOneRequeueRest 228 } 229 }; 230 let callback = |op, result: UnparkResult| { 231 // If we requeued threads to the mutex, mark it as having 232 // parked threads. The RequeueAll case is already handled above. 233 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { 234 (*mutex).mark_parked(); 235 } 236 TOKEN_NORMAL 237 }; 238 let res = parking_lot_core::unpark_requeue(from, to, validate, callback); 239 240 res.unparked_threads + res.requeued_threads 241 } 242 } 243 244 /// Blocks the current thread until this condition variable receives a 245 /// notification. 246 /// 247 /// This function will atomically unlock the mutex specified (represented by 248 /// `mutex_guard`) and block the current thread. This means that any calls 249 /// to `notify_*()` which happen logically after the mutex is unlocked are 250 /// candidates to wake this thread up. When this function call returns, the 251 /// lock specified will have been re-acquired. 252 /// 253 /// # Panics 254 /// 255 /// This function will panic if another thread is waiting on the `Condvar` 256 /// with a different `Mutex` object. 257 #[inline] wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)258 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { 259 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); 260 } 261 262 /// Waits on this condition variable for a notification, timing out after 263 /// the specified time instant. 264 /// 265 /// The semantics of this function are equivalent to `wait()` except that 266 /// the thread will be blocked roughly until `timeout` is reached. This 267 /// method should not be used for precise timing due to anomalies such as 268 /// preemption or platform differences that may not cause the maximum 269 /// amount of time waited to be precisely `timeout`. 270 /// 271 /// Note that the best effort is made to ensure that the time waited is 272 /// measured with a monotonic clock, and not affected by the changes made to 273 /// the system time. 274 /// 275 /// The returned `WaitTimeoutResult` value indicates if the timeout is 276 /// known to have elapsed. 277 /// 278 /// Like `wait`, the lock specified will be re-acquired when this function 279 /// returns, regardless of whether the timeout elapsed or not. 280 /// 281 /// # Panics 282 /// 283 /// This function will panic if another thread is waiting on the `Condvar` 284 /// with a different `Mutex` object. 285 #[inline] wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult286 pub fn wait_until<T: ?Sized>( 287 &self, 288 mutex_guard: &mut MutexGuard<'_, T>, 289 timeout: Instant, 290 ) -> WaitTimeoutResult { 291 self.wait_until_internal( 292 unsafe { MutexGuard::mutex(mutex_guard).raw() }, 293 Some(timeout), 294 ) 295 } 296 297 // This is a non-generic function to reduce the monomorphization cost of 298 // using `wait_until`. wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult299 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { 300 unsafe { 301 let result; 302 let mut bad_mutex = false; 303 let mut requeued = false; 304 { 305 let addr = self as *const _ as usize; 306 let lock_addr = mutex as *const _ as *mut _; 307 let validate = || { 308 // Ensure we don't use two different mutexes with the same 309 // Condvar at the same time. This is done while locked to 310 // avoid races with notify_one 311 let state = self.state.load(Ordering::Relaxed); 312 if state.is_null() { 313 self.state.store(lock_addr, Ordering::Relaxed); 314 } else if state != lock_addr { 315 bad_mutex = true; 316 return false; 317 } 318 true 319 }; 320 let before_sleep = || { 321 // Unlock the mutex before sleeping... 322 mutex.unlock(); 323 }; 324 let timed_out = |k, was_last_thread| { 325 // If we were requeued to a mutex, then we did not time out. 326 // We'll just park ourselves on the mutex again when we try 327 // to lock it later. 328 requeued = k != addr; 329 330 // If we were the last thread on the queue then we need to 331 // clear our state. This is normally done by the 332 // notify_{one,all} functions when not timing out. 333 if !requeued && was_last_thread { 334 self.state.store(ptr::null_mut(), Ordering::Relaxed); 335 } 336 }; 337 result = parking_lot_core::park( 338 addr, 339 validate, 340 before_sleep, 341 timed_out, 342 DEFAULT_PARK_TOKEN, 343 timeout, 344 ); 345 } 346 347 // Panic if we tried to use multiple mutexes with a Condvar. Note 348 // that at this point the MutexGuard is still locked. It will be 349 // unlocked by the unwinding logic. 350 if bad_mutex { 351 panic!("attempted to use a condition variable with more than one mutex"); 352 } 353 354 // ... and re-lock it once we are done sleeping 355 if result == ParkResult::Unparked(TOKEN_HANDOFF) { 356 deadlock::acquire_resource(mutex as *const _ as usize); 357 } else { 358 mutex.lock(); 359 } 360 361 WaitTimeoutResult(!(result.is_unparked() || requeued)) 362 } 363 } 364 365 /// Waits on this condition variable for a notification, timing out after a 366 /// specified duration. 367 /// 368 /// The semantics of this function are equivalent to `wait()` except that 369 /// the thread will be blocked for roughly no longer than `timeout`. This 370 /// method should not be used for precise timing due to anomalies such as 371 /// preemption or platform differences that may not cause the maximum 372 /// amount of time waited to be precisely `timeout`. 373 /// 374 /// Note that the best effort is made to ensure that the time waited is 375 /// measured with a monotonic clock, and not affected by the changes made to 376 /// the system time. 377 /// 378 /// The returned `WaitTimeoutResult` value indicates if the timeout is 379 /// known to have elapsed. 380 /// 381 /// Like `wait`, the lock specified will be re-acquired when this function 382 /// returns, regardless of whether the timeout elapsed or not. 383 /// 384 /// # Panics 385 /// 386 /// Panics if the given `timeout` is so large that it can't be added to the current time. 387 /// This panic is not possible if the crate is built with the `nightly` feature, then a too 388 /// large `timeout` becomes equivalent to just calling `wait`. 389 #[inline] wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult390 pub fn wait_for<T: ?Sized>( 391 &self, 392 mutex_guard: &mut MutexGuard<'_, T>, 393 timeout: Duration, 394 ) -> WaitTimeoutResult { 395 let deadline = util::to_deadline(timeout); 396 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) 397 } 398 } 399 400 impl Default for Condvar { 401 #[inline] default() -> Condvar402 fn default() -> Condvar { 403 Condvar::new() 404 } 405 } 406 407 impl fmt::Debug for Condvar { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 409 f.pad("Condvar { .. }") 410 } 411 } 412 413 #[cfg(test)] 414 mod tests { 415 use crate::{Condvar, Mutex, MutexGuard}; 416 use std::sync::mpsc::channel; 417 use std::sync::Arc; 418 use std::thread; 419 use std::time::{Duration, Instant}; 420 421 #[test] smoke()422 fn smoke() { 423 let c = Condvar::new(); 424 c.notify_one(); 425 c.notify_all(); 426 } 427 428 #[test] notify_one()429 fn notify_one() { 430 let m = Arc::new(Mutex::new(())); 431 let m2 = m.clone(); 432 let c = Arc::new(Condvar::new()); 433 let c2 = c.clone(); 434 435 let mut g = m.lock(); 436 let _t = thread::spawn(move || { 437 let _g = m2.lock(); 438 c2.notify_one(); 439 }); 440 c.wait(&mut g); 441 } 442 443 #[test] notify_all()444 fn notify_all() { 445 const N: usize = 10; 446 447 let data = Arc::new((Mutex::new(0), Condvar::new())); 448 let (tx, rx) = channel(); 449 for _ in 0..N { 450 let data = data.clone(); 451 let tx = tx.clone(); 452 thread::spawn(move || { 453 let &(ref lock, ref cond) = &*data; 454 let mut cnt = lock.lock(); 455 *cnt += 1; 456 if *cnt == N { 457 tx.send(()).unwrap(); 458 } 459 while *cnt != 0 { 460 cond.wait(&mut cnt); 461 } 462 tx.send(()).unwrap(); 463 }); 464 } 465 drop(tx); 466 467 let &(ref lock, ref cond) = &*data; 468 rx.recv().unwrap(); 469 let mut cnt = lock.lock(); 470 *cnt = 0; 471 cond.notify_all(); 472 drop(cnt); 473 474 for _ in 0..N { 475 rx.recv().unwrap(); 476 } 477 } 478 479 #[test] notify_one_return_true()480 fn notify_one_return_true() { 481 let m = Arc::new(Mutex::new(())); 482 let m2 = m.clone(); 483 let c = Arc::new(Condvar::new()); 484 let c2 = c.clone(); 485 486 let mut g = m.lock(); 487 let _t = thread::spawn(move || { 488 let _g = m2.lock(); 489 assert!(c2.notify_one()); 490 }); 491 c.wait(&mut g); 492 } 493 494 #[test] notify_one_return_false()495 fn notify_one_return_false() { 496 let m = Arc::new(Mutex::new(())); 497 let c = Arc::new(Condvar::new()); 498 499 let _t = thread::spawn(move || { 500 let _g = m.lock(); 501 assert!(!c.notify_one()); 502 }); 503 } 504 505 #[test] notify_all_return()506 fn notify_all_return() { 507 const N: usize = 10; 508 509 let data = Arc::new((Mutex::new(0), Condvar::new())); 510 let (tx, rx) = channel(); 511 for _ in 0..N { 512 let data = data.clone(); 513 let tx = tx.clone(); 514 thread::spawn(move || { 515 let &(ref lock, ref cond) = &*data; 516 let mut cnt = lock.lock(); 517 *cnt += 1; 518 if *cnt == N { 519 tx.send(()).unwrap(); 520 } 521 while *cnt != 0 { 522 cond.wait(&mut cnt); 523 } 524 tx.send(()).unwrap(); 525 }); 526 } 527 drop(tx); 528 529 let &(ref lock, ref cond) = &*data; 530 rx.recv().unwrap(); 531 let mut cnt = lock.lock(); 532 *cnt = 0; 533 assert_eq!(cond.notify_all(), N); 534 drop(cnt); 535 536 for _ in 0..N { 537 rx.recv().unwrap(); 538 } 539 540 assert_eq!(cond.notify_all(), 0); 541 } 542 543 #[test] wait_for()544 fn wait_for() { 545 let m = Arc::new(Mutex::new(())); 546 let m2 = m.clone(); 547 let c = Arc::new(Condvar::new()); 548 let c2 = c.clone(); 549 550 let mut g = m.lock(); 551 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); 552 assert!(no_timeout.timed_out()); 553 554 let _t = thread::spawn(move || { 555 let _g = m2.lock(); 556 c2.notify_one(); 557 }); 558 // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait. 559 let very_long_timeout = if cfg!(feature = "nightly") { 560 Duration::from_secs(u64::max_value()) 561 } else { 562 Duration::from_millis(u32::max_value() as u64) 563 }; 564 565 let timeout_res = c.wait_for(&mut g, very_long_timeout); 566 assert!(!timeout_res.timed_out()); 567 568 drop(g); 569 } 570 571 #[test] wait_until()572 fn wait_until() { 573 let m = Arc::new(Mutex::new(())); 574 let m2 = m.clone(); 575 let c = Arc::new(Condvar::new()); 576 let c2 = c.clone(); 577 578 let mut g = m.lock(); 579 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); 580 assert!(no_timeout.timed_out()); 581 let _t = thread::spawn(move || { 582 let _g = m2.lock(); 583 c2.notify_one(); 584 }); 585 let timeout_res = c.wait_until( 586 &mut g, 587 Instant::now() + Duration::from_millis(u32::max_value() as u64), 588 ); 589 assert!(!timeout_res.timed_out()); 590 drop(g); 591 } 592 593 #[test] 594 #[should_panic] two_mutexes()595 fn two_mutexes() { 596 let m = Arc::new(Mutex::new(())); 597 let m2 = m.clone(); 598 let m3 = Arc::new(Mutex::new(())); 599 let c = Arc::new(Condvar::new()); 600 let c2 = c.clone(); 601 602 // Make sure we don't leave the child thread dangling 603 struct PanicGuard<'a>(&'a Condvar); 604 impl<'a> Drop for PanicGuard<'a> { 605 fn drop(&mut self) { 606 self.0.notify_one(); 607 } 608 } 609 610 let (tx, rx) = channel(); 611 let g = m.lock(); 612 let _t = thread::spawn(move || { 613 let mut g = m2.lock(); 614 tx.send(()).unwrap(); 615 c2.wait(&mut g); 616 }); 617 drop(g); 618 rx.recv().unwrap(); 619 let _g = m.lock(); 620 let _guard = PanicGuard(&*c); 621 c.wait(&mut m3.lock()); 622 } 623 624 #[test] two_mutexes_disjoint()625 fn two_mutexes_disjoint() { 626 let m = Arc::new(Mutex::new(())); 627 let m2 = m.clone(); 628 let m3 = Arc::new(Mutex::new(())); 629 let c = Arc::new(Condvar::new()); 630 let c2 = c.clone(); 631 632 let mut g = m.lock(); 633 let _t = thread::spawn(move || { 634 let _g = m2.lock(); 635 c2.notify_one(); 636 }); 637 c.wait(&mut g); 638 drop(g); 639 640 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); 641 } 642 643 #[test] test_debug_condvar()644 fn test_debug_condvar() { 645 let c = Condvar::new(); 646 assert_eq!(format!("{:?}", c), "Condvar { .. }"); 647 } 648 649 #[test] test_condvar_requeue()650 fn test_condvar_requeue() { 651 let m = Arc::new(Mutex::new(())); 652 let m2 = m.clone(); 653 let c = Arc::new(Condvar::new()); 654 let c2 = c.clone(); 655 let t = thread::spawn(move || { 656 let mut g = m2.lock(); 657 c2.wait(&mut g); 658 }); 659 660 let mut g = m.lock(); 661 while !c.notify_one() { 662 // Wait for the thread to get into wait() 663 MutexGuard::bump(&mut g); 664 } 665 // The thread should have been requeued to the mutex, which we wake up now. 666 drop(g); 667 t.join().unwrap(); 668 } 669 670 #[test] test_issue_129()671 fn test_issue_129() { 672 let locks = Arc::new((Mutex::new(()), Condvar::new())); 673 674 let (tx, rx) = channel(); 675 for _ in 0..4 { 676 let locks = locks.clone(); 677 let tx = tx.clone(); 678 thread::spawn(move || { 679 let mut guard = locks.0.lock(); 680 locks.1.wait(&mut guard); 681 locks.1.wait_for(&mut guard, Duration::from_millis(1)); 682 locks.1.notify_one(); 683 tx.send(()).unwrap(); 684 }); 685 } 686 687 thread::sleep(Duration::from_millis(100)); 688 locks.1.notify_one(); 689 690 for _ in 0..4 { 691 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); 692 } 693 } 694 } 695 696 /// This module contains an integration test that is heavily inspired from WebKit's own integration 697 /// tests for it's own Condvar. 698 #[cfg(test)] 699 mod webkit_queue_test { 700 use crate::{Condvar, Mutex, MutexGuard}; 701 use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; 702 703 #[derive(Clone, Copy)] 704 enum Timeout { 705 Bounded(Duration), 706 Forever, 707 } 708 709 #[derive(Clone, Copy)] 710 enum NotifyStyle { 711 One, 712 All, 713 } 714 715 struct Queue { 716 items: VecDeque<usize>, 717 should_continue: bool, 718 } 719 720 impl Queue { new() -> Self721 fn new() -> Self { 722 Self { 723 items: VecDeque::new(), 724 should_continue: true, 725 } 726 } 727 } 728 wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )729 fn wait<T: ?Sized>( 730 condition: &Condvar, 731 lock: &mut MutexGuard<'_, T>, 732 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, 733 timeout: &Timeout, 734 ) { 735 while !predicate(lock) { 736 match timeout { 737 Timeout::Forever => condition.wait(lock), 738 Timeout::Bounded(bound) => { 739 condition.wait_for(lock, *bound); 740 } 741 } 742 } 743 } 744 notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)745 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { 746 match style { 747 NotifyStyle::One => { 748 condition.notify_one(); 749 } 750 NotifyStyle::All => { 751 if should_notify { 752 condition.notify_all(); 753 } 754 } 755 } 756 } 757 run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )758 fn run_queue_test( 759 num_producers: usize, 760 num_consumers: usize, 761 max_queue_size: usize, 762 messages_per_producer: usize, 763 notify_style: NotifyStyle, 764 timeout: Timeout, 765 delay: Duration, 766 ) { 767 let input_queue = Arc::new(Mutex::new(Queue::new())); 768 let empty_condition = Arc::new(Condvar::new()); 769 let full_condition = Arc::new(Condvar::new()); 770 771 let output_vec = Arc::new(Mutex::new(vec![])); 772 773 let consumers = (0..num_consumers) 774 .map(|_| { 775 consumer_thread( 776 input_queue.clone(), 777 empty_condition.clone(), 778 full_condition.clone(), 779 timeout, 780 notify_style, 781 output_vec.clone(), 782 max_queue_size, 783 ) 784 }) 785 .collect::<Vec<_>>(); 786 let producers = (0..num_producers) 787 .map(|_| { 788 producer_thread( 789 messages_per_producer, 790 input_queue.clone(), 791 empty_condition.clone(), 792 full_condition.clone(), 793 timeout, 794 notify_style, 795 max_queue_size, 796 ) 797 }) 798 .collect::<Vec<_>>(); 799 800 thread::sleep(delay); 801 802 for producer in producers.into_iter() { 803 producer.join().expect("Producer thread panicked"); 804 } 805 806 { 807 let mut input_queue = input_queue.lock(); 808 input_queue.should_continue = false; 809 } 810 empty_condition.notify_all(); 811 812 for consumer in consumers.into_iter() { 813 consumer.join().expect("Consumer thread panicked"); 814 } 815 816 let mut output_vec = output_vec.lock(); 817 assert_eq!(output_vec.len(), num_producers * messages_per_producer); 818 output_vec.sort(); 819 for msg_idx in 0..messages_per_producer { 820 for producer_idx in 0..num_producers { 821 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); 822 } 823 } 824 } 825 consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>826 fn consumer_thread( 827 input_queue: Arc<Mutex<Queue>>, 828 empty_condition: Arc<Condvar>, 829 full_condition: Arc<Condvar>, 830 timeout: Timeout, 831 notify_style: NotifyStyle, 832 output_queue: Arc<Mutex<Vec<usize>>>, 833 max_queue_size: usize, 834 ) -> thread::JoinHandle<()> { 835 thread::spawn(move || loop { 836 let (should_notify, result) = { 837 let mut queue = input_queue.lock(); 838 wait( 839 &*empty_condition, 840 &mut queue, 841 |state| -> bool { !state.items.is_empty() || !state.should_continue }, 842 &timeout, 843 ); 844 if queue.items.is_empty() && !queue.should_continue { 845 return; 846 } 847 let should_notify = queue.items.len() == max_queue_size; 848 let result = queue.items.pop_front(); 849 std::mem::drop(queue); 850 (should_notify, result) 851 }; 852 notify(notify_style, &*full_condition, should_notify); 853 854 if let Some(result) = result { 855 output_queue.lock().push(result); 856 } 857 }) 858 } 859 producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>860 fn producer_thread( 861 num_messages: usize, 862 queue: Arc<Mutex<Queue>>, 863 empty_condition: Arc<Condvar>, 864 full_condition: Arc<Condvar>, 865 timeout: Timeout, 866 notify_style: NotifyStyle, 867 max_queue_size: usize, 868 ) -> thread::JoinHandle<()> { 869 thread::spawn(move || { 870 for message in 0..num_messages { 871 let should_notify = { 872 let mut queue = queue.lock(); 873 wait( 874 &*full_condition, 875 &mut queue, 876 |state| state.items.len() < max_queue_size, 877 &timeout, 878 ); 879 let should_notify = queue.items.is_empty(); 880 queue.items.push_back(message); 881 std::mem::drop(queue); 882 should_notify 883 }; 884 notify(notify_style, &*empty_condition, should_notify); 885 } 886 }) 887 } 888 889 macro_rules! run_queue_tests { 890 ( $( $name:ident( 891 num_producers: $num_producers:expr, 892 num_consumers: $num_consumers:expr, 893 max_queue_size: $max_queue_size:expr, 894 messages_per_producer: $messages_per_producer:expr, 895 notification_style: $notification_style:expr, 896 timeout: $timeout:expr, 897 delay_seconds: $delay_seconds:expr); 898 )* ) => { 899 $(#[test] 900 fn $name() { 901 let delay = Duration::from_secs($delay_seconds); 902 run_queue_test( 903 $num_producers, 904 $num_consumers, 905 $max_queue_size, 906 $messages_per_producer, 907 $notification_style, 908 $timeout, 909 delay, 910 ); 911 })* 912 }; 913 } 914 915 run_queue_tests! { 916 sanity_check_queue( 917 num_producers: 1, 918 num_consumers: 1, 919 max_queue_size: 1, 920 messages_per_producer: 100_000, 921 notification_style: NotifyStyle::All, 922 timeout: Timeout::Bounded(Duration::from_secs(1)), 923 delay_seconds: 0 924 ); 925 sanity_check_queue_timeout( 926 num_producers: 1, 927 num_consumers: 1, 928 max_queue_size: 1, 929 messages_per_producer: 100_000, 930 notification_style: NotifyStyle::All, 931 timeout: Timeout::Forever, 932 delay_seconds: 0 933 ); 934 new_test_without_timeout_5( 935 num_producers: 1, 936 num_consumers: 5, 937 max_queue_size: 1, 938 messages_per_producer: 100_000, 939 notification_style: NotifyStyle::All, 940 timeout: Timeout::Forever, 941 delay_seconds: 0 942 ); 943 one_producer_one_consumer_one_slot( 944 num_producers: 1, 945 num_consumers: 1, 946 max_queue_size: 1, 947 messages_per_producer: 100_000, 948 notification_style: NotifyStyle::All, 949 timeout: Timeout::Forever, 950 delay_seconds: 0 951 ); 952 one_producer_one_consumer_one_slot_timeout( 953 num_producers: 1, 954 num_consumers: 1, 955 max_queue_size: 1, 956 messages_per_producer: 100_000, 957 notification_style: NotifyStyle::All, 958 timeout: Timeout::Forever, 959 delay_seconds: 1 960 ); 961 one_producer_one_consumer_hundred_slots( 962 num_producers: 1, 963 num_consumers: 1, 964 max_queue_size: 100, 965 messages_per_producer: 1_000_000, 966 notification_style: NotifyStyle::All, 967 timeout: Timeout::Forever, 968 delay_seconds: 0 969 ); 970 ten_producers_one_consumer_one_slot( 971 num_producers: 10, 972 num_consumers: 1, 973 max_queue_size: 1, 974 messages_per_producer: 10000, 975 notification_style: NotifyStyle::All, 976 timeout: Timeout::Forever, 977 delay_seconds: 0 978 ); 979 ten_producers_one_consumer_hundred_slots_notify_all( 980 num_producers: 10, 981 num_consumers: 1, 982 max_queue_size: 100, 983 messages_per_producer: 10000, 984 notification_style: NotifyStyle::All, 985 timeout: Timeout::Forever, 986 delay_seconds: 0 987 ); 988 ten_producers_one_consumer_hundred_slots_notify_one( 989 num_producers: 10, 990 num_consumers: 1, 991 max_queue_size: 100, 992 messages_per_producer: 10000, 993 notification_style: NotifyStyle::One, 994 timeout: Timeout::Forever, 995 delay_seconds: 0 996 ); 997 one_producer_ten_consumers_one_slot( 998 num_producers: 1, 999 num_consumers: 10, 1000 max_queue_size: 1, 1001 messages_per_producer: 10000, 1002 notification_style: NotifyStyle::All, 1003 timeout: Timeout::Forever, 1004 delay_seconds: 0 1005 ); 1006 one_producer_ten_consumers_hundred_slots_notify_all( 1007 num_producers: 1, 1008 num_consumers: 10, 1009 max_queue_size: 100, 1010 messages_per_producer: 100_000, 1011 notification_style: NotifyStyle::All, 1012 timeout: Timeout::Forever, 1013 delay_seconds: 0 1014 ); 1015 one_producer_ten_consumers_hundred_slots_notify_one( 1016 num_producers: 1, 1017 num_consumers: 10, 1018 max_queue_size: 100, 1019 messages_per_producer: 100_000, 1020 notification_style: NotifyStyle::One, 1021 timeout: Timeout::Forever, 1022 delay_seconds: 0 1023 ); 1024 ten_producers_ten_consumers_one_slot( 1025 num_producers: 10, 1026 num_consumers: 10, 1027 max_queue_size: 1, 1028 messages_per_producer: 50000, 1029 notification_style: NotifyStyle::All, 1030 timeout: Timeout::Forever, 1031 delay_seconds: 0 1032 ); 1033 ten_producers_ten_consumers_hundred_slots_notify_all( 1034 num_producers: 10, 1035 num_consumers: 10, 1036 max_queue_size: 100, 1037 messages_per_producer: 50000, 1038 notification_style: NotifyStyle::All, 1039 timeout: Timeout::Forever, 1040 delay_seconds: 0 1041 ); 1042 ten_producers_ten_consumers_hundred_slots_notify_one( 1043 num_producers: 10, 1044 num_consumers: 10, 1045 max_queue_size: 100, 1046 messages_per_producer: 50000, 1047 notification_style: NotifyStyle::One, 1048 timeout: Timeout::Forever, 1049 delay_seconds: 0 1050 ); 1051 } 1052 } 1053