1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::mutex::MutexGuard; 9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::{deadlock, util}; 11 use core::{ 12 fmt, ptr, 13 sync::atomic::{AtomicPtr, Ordering}, 14 }; 15 use instant::Instant; 16 use lock_api::RawMutex as RawMutex_; 17 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; 18 use std::time::Duration; 19 20 /// A type indicating whether a timed wait on a condition variable returned 21 /// due to a time out or not. 22 #[derive(Debug, PartialEq, Eq, Copy, Clone)] 23 pub struct WaitTimeoutResult(bool); 24 25 impl WaitTimeoutResult { 26 /// Returns whether the wait was known to have timed out. 27 #[inline] timed_out(self) -> bool28 pub fn timed_out(self) -> bool { 29 self.0 30 } 31 } 32 33 /// A Condition Variable 34 /// 35 /// Condition variables represent the ability to block a thread such that it 36 /// consumes no CPU time while waiting for an event to occur. Condition 37 /// variables are typically associated with a boolean predicate (a condition) 38 /// and a mutex. The predicate is always verified inside of the mutex before 39 /// determining that thread must block. 40 /// 41 /// Note that this module places one additional restriction over the system 42 /// condition variables: each condvar can be used with only one mutex at a 43 /// time. Any attempt to use multiple mutexes on the same condition variable 44 /// simultaneously will result in a runtime panic. However it is possible to 45 /// switch to a different mutex if there are no threads currently waiting on 46 /// the condition variable. 47 /// 48 /// # Differences from the standard library `Condvar` 49 /// 50 /// - No spurious wakeups: A wait will only return a non-timeout result if it 51 /// was woken up by `notify_one` or `notify_all`. 52 /// - `Condvar::notify_all` will only wake up a single thread, the rest are 53 /// requeued to wait for the `Mutex` to be unlocked by the thread that was 54 /// woken up. 55 /// - Only requires 1 word of space, whereas the standard library boxes the 56 /// `Condvar` due to platform limitations. 57 /// - Can be statically constructed (requires the `const_fn` nightly feature). 58 /// - Does not require any drop glue when dropped. 59 /// - Inline fast path for the uncontended case. 60 /// 61 /// # Examples 62 /// 63 /// ``` 64 /// use parking_lot::{Mutex, Condvar}; 65 /// use std::sync::Arc; 66 /// use std::thread; 67 /// 68 /// let pair = Arc::new((Mutex::new(false), Condvar::new())); 69 /// let pair2 = pair.clone(); 70 /// 71 /// // Inside of our lock, spawn a new thread, and then wait for it to start 72 /// thread::spawn(move|| { 73 /// let &(ref lock, ref cvar) = &*pair2; 74 /// let mut started = lock.lock(); 75 /// *started = true; 76 /// cvar.notify_one(); 77 /// }); 78 /// 79 /// // wait for the thread to start up 80 /// let &(ref lock, ref cvar) = &*pair; 81 /// let mut started = lock.lock(); 82 /// if !*started { 83 /// cvar.wait(&mut started); 84 /// } 85 /// // Note that we used an if instead of a while loop above. This is only 86 /// // possible because parking_lot's Condvar will never spuriously wake up. 87 /// // This means that wait() will only return after notify_one or notify_all is 88 /// // called. 89 /// ``` 90 pub struct Condvar { 91 state: AtomicPtr<RawMutex>, 92 } 93 94 impl Condvar { 95 /// Creates a new condition variable which is ready to be waited on and 96 /// notified. 97 #[inline] new() -> Condvar98 pub const fn new() -> Condvar { 99 Condvar { 100 state: AtomicPtr::new(ptr::null_mut()), 101 } 102 } 103 104 /// Wakes up one blocked thread on this condvar. 105 /// 106 /// Returns whether a thread was woken up. 107 /// 108 /// If there is a blocked thread on this condition variable, then it will 109 /// be woken up from its call to `wait` or `wait_timeout`. Calls to 110 /// `notify_one` are not buffered in any way. 111 /// 112 /// To wake up all threads, see `notify_all()`. 113 /// 114 /// # Examples 115 /// 116 /// ``` 117 /// use parking_lot::Condvar; 118 /// 119 /// let condvar = Condvar::new(); 120 /// 121 /// // do something with condvar, share it with other threads 122 /// 123 /// if !condvar.notify_one() { 124 /// println!("Nobody was listening for this."); 125 /// } 126 /// ``` 127 #[inline] notify_one(&self) -> bool128 pub fn notify_one(&self) -> bool { 129 // Nothing to do if there are no waiting threads 130 let state = self.state.load(Ordering::Relaxed); 131 if state.is_null() { 132 return false; 133 } 134 135 self.notify_one_slow(state) 136 } 137 138 #[cold] notify_one_slow(&self, mutex: *mut RawMutex) -> bool139 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool { 140 unsafe { 141 // Unpark one thread and requeue the rest onto the mutex 142 let from = self as *const _ as usize; 143 let to = mutex as usize; 144 let validate = || { 145 // Make sure that our atomic state still points to the same 146 // mutex. If not then it means that all threads on the current 147 // mutex were woken up and a new waiting thread switched to a 148 // different mutex. In that case we can get away with doing 149 // nothing. 150 if self.state.load(Ordering::Relaxed) != mutex { 151 return RequeueOp::Abort; 152 } 153 154 // Unpark one thread if the mutex is unlocked, otherwise just 155 // requeue everything to the mutex. This is safe to do here 156 // since unlocking the mutex when the parked bit is set requires 157 // locking the queue. There is the possibility of a race if the 158 // mutex gets locked after we check, but that doesn't matter in 159 // this case. 160 if (*mutex).mark_parked_if_locked() { 161 RequeueOp::RequeueOne 162 } else { 163 RequeueOp::UnparkOne 164 } 165 }; 166 let callback = |_op, result: UnparkResult| { 167 // Clear our state if there are no more waiting threads 168 if !result.have_more_threads { 169 self.state.store(ptr::null_mut(), Ordering::Relaxed); 170 } 171 TOKEN_NORMAL 172 }; 173 let res = parking_lot_core::unpark_requeue(from, to, validate, callback); 174 175 res.unparked_threads + res.requeued_threads != 0 176 } 177 } 178 179 /// Wakes up all blocked threads on this condvar. 180 /// 181 /// Returns the number of threads woken up. 182 /// 183 /// This method will ensure that any current waiters on the condition 184 /// variable are awoken. Calls to `notify_all()` are not buffered in any 185 /// way. 186 /// 187 /// To wake up only one thread, see `notify_one()`. 188 #[inline] notify_all(&self) -> usize189 pub fn notify_all(&self) -> usize { 190 // Nothing to do if there are no waiting threads 191 let state = self.state.load(Ordering::Relaxed); 192 if state.is_null() { 193 return 0; 194 } 195 196 self.notify_all_slow(state) 197 } 198 199 #[cold] notify_all_slow(&self, mutex: *mut RawMutex) -> usize200 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize { 201 unsafe { 202 // Unpark one thread and requeue the rest onto the mutex 203 let from = self as *const _ as usize; 204 let to = mutex as usize; 205 let validate = || { 206 // Make sure that our atomic state still points to the same 207 // mutex. If not then it means that all threads on the current 208 // mutex were woken up and a new waiting thread switched to a 209 // different mutex. In that case we can get away with doing 210 // nothing. 211 if self.state.load(Ordering::Relaxed) != mutex { 212 return RequeueOp::Abort; 213 } 214 215 // Clear our state since we are going to unpark or requeue all 216 // threads. 217 self.state.store(ptr::null_mut(), Ordering::Relaxed); 218 219 // Unpark one thread if the mutex is unlocked, otherwise just 220 // requeue everything to the mutex. This is safe to do here 221 // since unlocking the mutex when the parked bit is set requires 222 // locking the queue. There is the possibility of a race if the 223 // mutex gets locked after we check, but that doesn't matter in 224 // this case. 225 if (*mutex).mark_parked_if_locked() { 226 RequeueOp::RequeueAll 227 } else { 228 RequeueOp::UnparkOneRequeueRest 229 } 230 }; 231 let callback = |op, result: UnparkResult| { 232 // If we requeued threads to the mutex, mark it as having 233 // parked threads. The RequeueAll case is already handled above. 234 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { 235 (*mutex).mark_parked(); 236 } 237 TOKEN_NORMAL 238 }; 239 let res = parking_lot_core::unpark_requeue(from, to, validate, callback); 240 241 res.unparked_threads + res.requeued_threads 242 } 243 } 244 245 /// Blocks the current thread until this condition variable receives a 246 /// notification. 247 /// 248 /// This function will atomically unlock the mutex specified (represented by 249 /// `mutex_guard`) and block the current thread. This means that any calls 250 /// to `notify_*()` which happen logically after the mutex is unlocked are 251 /// candidates to wake this thread up. When this function call returns, the 252 /// lock specified will have been re-acquired. 253 /// 254 /// # Panics 255 /// 256 /// This function will panic if another thread is waiting on the `Condvar` 257 /// with a different `Mutex` object. 258 #[inline] wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)259 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) { 260 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None); 261 } 262 263 /// Waits on this condition variable for a notification, timing out after 264 /// the specified time instant. 265 /// 266 /// The semantics of this function are equivalent to `wait()` except that 267 /// the thread will be blocked roughly until `timeout` is reached. This 268 /// method should not be used for precise timing due to anomalies such as 269 /// preemption or platform differences that may not cause the maximum 270 /// amount of time waited to be precisely `timeout`. 271 /// 272 /// Note that the best effort is made to ensure that the time waited is 273 /// measured with a monotonic clock, and not affected by the changes made to 274 /// the system time. 275 /// 276 /// The returned `WaitTimeoutResult` value indicates if the timeout is 277 /// known to have elapsed. 278 /// 279 /// Like `wait`, the lock specified will be re-acquired when this function 280 /// returns, regardless of whether the timeout elapsed or not. 281 /// 282 /// # Panics 283 /// 284 /// This function will panic if another thread is waiting on the `Condvar` 285 /// with a different `Mutex` object. 286 #[inline] wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult287 pub fn wait_until<T: ?Sized>( 288 &self, 289 mutex_guard: &mut MutexGuard<'_, T>, 290 timeout: Instant, 291 ) -> WaitTimeoutResult { 292 self.wait_until_internal( 293 unsafe { MutexGuard::mutex(mutex_guard).raw() }, 294 Some(timeout), 295 ) 296 } 297 298 // This is a non-generic function to reduce the monomorphization cost of 299 // using `wait_until`. wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult300 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult { 301 unsafe { 302 let result; 303 let mut bad_mutex = false; 304 let mut requeued = false; 305 { 306 let addr = self as *const _ as usize; 307 let lock_addr = mutex as *const _ as *mut _; 308 let validate = || { 309 // Ensure we don't use two different mutexes with the same 310 // Condvar at the same time. This is done while locked to 311 // avoid races with notify_one 312 let state = self.state.load(Ordering::Relaxed); 313 if state.is_null() { 314 self.state.store(lock_addr, Ordering::Relaxed); 315 } else if state != lock_addr { 316 bad_mutex = true; 317 return false; 318 } 319 true 320 }; 321 let before_sleep = || { 322 // Unlock the mutex before sleeping... 323 mutex.unlock(); 324 }; 325 let timed_out = |k, was_last_thread| { 326 // If we were requeued to a mutex, then we did not time out. 327 // We'll just park ourselves on the mutex again when we try 328 // to lock it later. 329 requeued = k != addr; 330 331 // If we were the last thread on the queue then we need to 332 // clear our state. This is normally done by the 333 // notify_{one,all} functions when not timing out. 334 if !requeued && was_last_thread { 335 self.state.store(ptr::null_mut(), Ordering::Relaxed); 336 } 337 }; 338 result = parking_lot_core::park( 339 addr, 340 validate, 341 before_sleep, 342 timed_out, 343 DEFAULT_PARK_TOKEN, 344 timeout, 345 ); 346 } 347 348 // Panic if we tried to use multiple mutexes with a Condvar. Note 349 // that at this point the MutexGuard is still locked. It will be 350 // unlocked by the unwinding logic. 351 if bad_mutex { 352 panic!("attempted to use a condition variable with more than one mutex"); 353 } 354 355 // ... and re-lock it once we are done sleeping 356 if result == ParkResult::Unparked(TOKEN_HANDOFF) { 357 deadlock::acquire_resource(mutex as *const _ as usize); 358 } else { 359 mutex.lock(); 360 } 361 362 WaitTimeoutResult(!(result.is_unparked() || requeued)) 363 } 364 } 365 366 /// Waits on this condition variable for a notification, timing out after a 367 /// specified duration. 368 /// 369 /// The semantics of this function are equivalent to `wait()` except that 370 /// the thread will be blocked for roughly no longer than `timeout`. This 371 /// method should not be used for precise timing due to anomalies such as 372 /// preemption or platform differences that may not cause the maximum 373 /// amount of time waited to be precisely `timeout`. 374 /// 375 /// Note that the best effort is made to ensure that the time waited is 376 /// measured with a monotonic clock, and not affected by the changes made to 377 /// the system time. 378 /// 379 /// The returned `WaitTimeoutResult` value indicates if the timeout is 380 /// known to have elapsed. 381 /// 382 /// Like `wait`, the lock specified will be re-acquired when this function 383 /// returns, regardless of whether the timeout elapsed or not. 384 /// 385 /// # Panics 386 /// 387 /// Panics if the given `timeout` is so large that it can't be added to the current time. 388 /// This panic is not possible if the crate is built with the `nightly` feature, then a too 389 /// large `timeout` becomes equivalent to just calling `wait`. 390 #[inline] wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult391 pub fn wait_for<T: ?Sized>( 392 &self, 393 mutex_guard: &mut MutexGuard<'_, T>, 394 timeout: Duration, 395 ) -> WaitTimeoutResult { 396 let deadline = util::to_deadline(timeout); 397 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) 398 } 399 } 400 401 impl Default for Condvar { 402 #[inline] default() -> Condvar403 fn default() -> Condvar { 404 Condvar::new() 405 } 406 } 407 408 impl fmt::Debug for Condvar { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 410 f.pad("Condvar { .. }") 411 } 412 } 413 414 #[cfg(test)] 415 mod tests { 416 use crate::{Condvar, Mutex, MutexGuard}; 417 use instant::Instant; 418 use std::sync::mpsc::channel; 419 use std::sync::Arc; 420 use std::thread; 421 use std::time::Duration; 422 423 #[test] smoke()424 fn smoke() { 425 let c = Condvar::new(); 426 c.notify_one(); 427 c.notify_all(); 428 } 429 430 #[test] notify_one()431 fn notify_one() { 432 let m = Arc::new(Mutex::new(())); 433 let m2 = m.clone(); 434 let c = Arc::new(Condvar::new()); 435 let c2 = c.clone(); 436 437 let mut g = m.lock(); 438 let _t = thread::spawn(move || { 439 let _g = m2.lock(); 440 c2.notify_one(); 441 }); 442 c.wait(&mut g); 443 } 444 445 #[test] notify_all()446 fn notify_all() { 447 const N: usize = 10; 448 449 let data = Arc::new((Mutex::new(0), Condvar::new())); 450 let (tx, rx) = channel(); 451 for _ in 0..N { 452 let data = data.clone(); 453 let tx = tx.clone(); 454 thread::spawn(move || { 455 let &(ref lock, ref cond) = &*data; 456 let mut cnt = lock.lock(); 457 *cnt += 1; 458 if *cnt == N { 459 tx.send(()).unwrap(); 460 } 461 while *cnt != 0 { 462 cond.wait(&mut cnt); 463 } 464 tx.send(()).unwrap(); 465 }); 466 } 467 drop(tx); 468 469 let &(ref lock, ref cond) = &*data; 470 rx.recv().unwrap(); 471 let mut cnt = lock.lock(); 472 *cnt = 0; 473 cond.notify_all(); 474 drop(cnt); 475 476 for _ in 0..N { 477 rx.recv().unwrap(); 478 } 479 } 480 481 #[test] notify_one_return_true()482 fn notify_one_return_true() { 483 let m = Arc::new(Mutex::new(())); 484 let m2 = m.clone(); 485 let c = Arc::new(Condvar::new()); 486 let c2 = c.clone(); 487 488 let mut g = m.lock(); 489 let _t = thread::spawn(move || { 490 let _g = m2.lock(); 491 assert!(c2.notify_one()); 492 }); 493 c.wait(&mut g); 494 } 495 496 #[test] notify_one_return_false()497 fn notify_one_return_false() { 498 let m = Arc::new(Mutex::new(())); 499 let c = Arc::new(Condvar::new()); 500 501 let _t = thread::spawn(move || { 502 let _g = m.lock(); 503 assert!(!c.notify_one()); 504 }); 505 } 506 507 #[test] notify_all_return()508 fn notify_all_return() { 509 const N: usize = 10; 510 511 let data = Arc::new((Mutex::new(0), Condvar::new())); 512 let (tx, rx) = channel(); 513 for _ in 0..N { 514 let data = data.clone(); 515 let tx = tx.clone(); 516 thread::spawn(move || { 517 let &(ref lock, ref cond) = &*data; 518 let mut cnt = lock.lock(); 519 *cnt += 1; 520 if *cnt == N { 521 tx.send(()).unwrap(); 522 } 523 while *cnt != 0 { 524 cond.wait(&mut cnt); 525 } 526 tx.send(()).unwrap(); 527 }); 528 } 529 drop(tx); 530 531 let &(ref lock, ref cond) = &*data; 532 rx.recv().unwrap(); 533 let mut cnt = lock.lock(); 534 *cnt = 0; 535 assert_eq!(cond.notify_all(), N); 536 drop(cnt); 537 538 for _ in 0..N { 539 rx.recv().unwrap(); 540 } 541 542 assert_eq!(cond.notify_all(), 0); 543 } 544 545 #[test] wait_for()546 fn wait_for() { 547 let m = Arc::new(Mutex::new(())); 548 let m2 = m.clone(); 549 let c = Arc::new(Condvar::new()); 550 let c2 = c.clone(); 551 552 let mut g = m.lock(); 553 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1)); 554 assert!(no_timeout.timed_out()); 555 556 let _t = thread::spawn(move || { 557 let _g = m2.lock(); 558 c2.notify_one(); 559 }); 560 // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait. 561 let very_long_timeout = if cfg!(feature = "nightly") { 562 Duration::from_secs(u64::max_value()) 563 } else { 564 Duration::from_millis(u32::max_value() as u64) 565 }; 566 567 let timeout_res = c.wait_for(&mut g, very_long_timeout); 568 assert!(!timeout_res.timed_out()); 569 570 drop(g); 571 } 572 573 #[test] wait_until()574 fn wait_until() { 575 let m = Arc::new(Mutex::new(())); 576 let m2 = m.clone(); 577 let c = Arc::new(Condvar::new()); 578 let c2 = c.clone(); 579 580 let mut g = m.lock(); 581 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1)); 582 assert!(no_timeout.timed_out()); 583 let _t = thread::spawn(move || { 584 let _g = m2.lock(); 585 c2.notify_one(); 586 }); 587 let timeout_res = c.wait_until( 588 &mut g, 589 Instant::now() + Duration::from_millis(u32::max_value() as u64), 590 ); 591 assert!(!timeout_res.timed_out()); 592 drop(g); 593 } 594 595 #[test] 596 #[should_panic] two_mutexes()597 fn two_mutexes() { 598 let m = Arc::new(Mutex::new(())); 599 let m2 = m.clone(); 600 let m3 = Arc::new(Mutex::new(())); 601 let c = Arc::new(Condvar::new()); 602 let c2 = c.clone(); 603 604 // Make sure we don't leave the child thread dangling 605 struct PanicGuard<'a>(&'a Condvar); 606 impl<'a> Drop for PanicGuard<'a> { 607 fn drop(&mut self) { 608 self.0.notify_one(); 609 } 610 } 611 612 let (tx, rx) = channel(); 613 let g = m.lock(); 614 let _t = thread::spawn(move || { 615 let mut g = m2.lock(); 616 tx.send(()).unwrap(); 617 c2.wait(&mut g); 618 }); 619 drop(g); 620 rx.recv().unwrap(); 621 let _g = m.lock(); 622 let _guard = PanicGuard(&*c); 623 c.wait(&mut m3.lock()); 624 } 625 626 #[test] two_mutexes_disjoint()627 fn two_mutexes_disjoint() { 628 let m = Arc::new(Mutex::new(())); 629 let m2 = m.clone(); 630 let m3 = Arc::new(Mutex::new(())); 631 let c = Arc::new(Condvar::new()); 632 let c2 = c.clone(); 633 634 let mut g = m.lock(); 635 let _t = thread::spawn(move || { 636 let _g = m2.lock(); 637 c2.notify_one(); 638 }); 639 c.wait(&mut g); 640 drop(g); 641 642 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1)); 643 } 644 645 #[test] test_debug_condvar()646 fn test_debug_condvar() { 647 let c = Condvar::new(); 648 assert_eq!(format!("{:?}", c), "Condvar { .. }"); 649 } 650 651 #[test] test_condvar_requeue()652 fn test_condvar_requeue() { 653 let m = Arc::new(Mutex::new(())); 654 let m2 = m.clone(); 655 let c = Arc::new(Condvar::new()); 656 let c2 = c.clone(); 657 let t = thread::spawn(move || { 658 let mut g = m2.lock(); 659 c2.wait(&mut g); 660 }); 661 662 let mut g = m.lock(); 663 while !c.notify_one() { 664 // Wait for the thread to get into wait() 665 MutexGuard::bump(&mut g); 666 // Yield, so the other thread gets a chance to do something. 667 // (At least Miri needs this, because it doesn't preempt threads.) 668 thread::yield_now(); 669 } 670 // The thread should have been requeued to the mutex, which we wake up now. 671 drop(g); 672 t.join().unwrap(); 673 } 674 675 #[test] test_issue_129()676 fn test_issue_129() { 677 let locks = Arc::new((Mutex::new(()), Condvar::new())); 678 679 let (tx, rx) = channel(); 680 for _ in 0..4 { 681 let locks = locks.clone(); 682 let tx = tx.clone(); 683 thread::spawn(move || { 684 let mut guard = locks.0.lock(); 685 locks.1.wait(&mut guard); 686 locks.1.wait_for(&mut guard, Duration::from_millis(1)); 687 locks.1.notify_one(); 688 tx.send(()).unwrap(); 689 }); 690 } 691 692 thread::sleep(Duration::from_millis(100)); 693 locks.1.notify_one(); 694 695 for _ in 0..4 { 696 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(())); 697 } 698 } 699 } 700 701 /// This module contains an integration test that is heavily inspired from WebKit's own integration 702 /// tests for it's own Condvar. 703 #[cfg(test)] 704 mod webkit_queue_test { 705 use crate::{Condvar, Mutex, MutexGuard}; 706 use std::{collections::VecDeque, sync::Arc, thread, time::Duration}; 707 708 #[derive(Clone, Copy)] 709 enum Timeout { 710 Bounded(Duration), 711 Forever, 712 } 713 714 #[derive(Clone, Copy)] 715 enum NotifyStyle { 716 One, 717 All, 718 } 719 720 struct Queue { 721 items: VecDeque<usize>, 722 should_continue: bool, 723 } 724 725 impl Queue { new() -> Self726 fn new() -> Self { 727 Self { 728 items: VecDeque::new(), 729 should_continue: true, 730 } 731 } 732 } 733 wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )734 fn wait<T: ?Sized>( 735 condition: &Condvar, 736 lock: &mut MutexGuard<'_, T>, 737 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, 738 timeout: &Timeout, 739 ) { 740 while !predicate(lock) { 741 match timeout { 742 Timeout::Forever => condition.wait(lock), 743 Timeout::Bounded(bound) => { 744 condition.wait_for(lock, *bound); 745 } 746 } 747 } 748 } 749 notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)750 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) { 751 match style { 752 NotifyStyle::One => { 753 condition.notify_one(); 754 } 755 NotifyStyle::All => { 756 if should_notify { 757 condition.notify_all(); 758 } 759 } 760 } 761 } 762 run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )763 fn run_queue_test( 764 num_producers: usize, 765 num_consumers: usize, 766 max_queue_size: usize, 767 messages_per_producer: usize, 768 notify_style: NotifyStyle, 769 timeout: Timeout, 770 delay: Duration, 771 ) { 772 let input_queue = Arc::new(Mutex::new(Queue::new())); 773 let empty_condition = Arc::new(Condvar::new()); 774 let full_condition = Arc::new(Condvar::new()); 775 776 let output_vec = Arc::new(Mutex::new(vec![])); 777 778 let consumers = (0..num_consumers) 779 .map(|_| { 780 consumer_thread( 781 input_queue.clone(), 782 empty_condition.clone(), 783 full_condition.clone(), 784 timeout, 785 notify_style, 786 output_vec.clone(), 787 max_queue_size, 788 ) 789 }) 790 .collect::<Vec<_>>(); 791 let producers = (0..num_producers) 792 .map(|_| { 793 producer_thread( 794 messages_per_producer, 795 input_queue.clone(), 796 empty_condition.clone(), 797 full_condition.clone(), 798 timeout, 799 notify_style, 800 max_queue_size, 801 ) 802 }) 803 .collect::<Vec<_>>(); 804 805 thread::sleep(delay); 806 807 for producer in producers.into_iter() { 808 producer.join().expect("Producer thread panicked"); 809 } 810 811 { 812 let mut input_queue = input_queue.lock(); 813 input_queue.should_continue = false; 814 } 815 empty_condition.notify_all(); 816 817 for consumer in consumers.into_iter() { 818 consumer.join().expect("Consumer thread panicked"); 819 } 820 821 let mut output_vec = output_vec.lock(); 822 assert_eq!(output_vec.len(), num_producers * messages_per_producer); 823 output_vec.sort(); 824 for msg_idx in 0..messages_per_producer { 825 for producer_idx in 0..num_producers { 826 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]); 827 } 828 } 829 } 830 consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>831 fn consumer_thread( 832 input_queue: Arc<Mutex<Queue>>, 833 empty_condition: Arc<Condvar>, 834 full_condition: Arc<Condvar>, 835 timeout: Timeout, 836 notify_style: NotifyStyle, 837 output_queue: Arc<Mutex<Vec<usize>>>, 838 max_queue_size: usize, 839 ) -> thread::JoinHandle<()> { 840 thread::spawn(move || loop { 841 let (should_notify, result) = { 842 let mut queue = input_queue.lock(); 843 wait( 844 &*empty_condition, 845 &mut queue, 846 |state| -> bool { !state.items.is_empty() || !state.should_continue }, 847 &timeout, 848 ); 849 if queue.items.is_empty() && !queue.should_continue { 850 return; 851 } 852 let should_notify = queue.items.len() == max_queue_size; 853 let result = queue.items.pop_front(); 854 std::mem::drop(queue); 855 (should_notify, result) 856 }; 857 notify(notify_style, &*full_condition, should_notify); 858 859 if let Some(result) = result { 860 output_queue.lock().push(result); 861 } 862 }) 863 } 864 producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>865 fn producer_thread( 866 num_messages: usize, 867 queue: Arc<Mutex<Queue>>, 868 empty_condition: Arc<Condvar>, 869 full_condition: Arc<Condvar>, 870 timeout: Timeout, 871 notify_style: NotifyStyle, 872 max_queue_size: usize, 873 ) -> thread::JoinHandle<()> { 874 thread::spawn(move || { 875 for message in 0..num_messages { 876 let should_notify = { 877 let mut queue = queue.lock(); 878 wait( 879 &*full_condition, 880 &mut queue, 881 |state| state.items.len() < max_queue_size, 882 &timeout, 883 ); 884 let should_notify = queue.items.is_empty(); 885 queue.items.push_back(message); 886 std::mem::drop(queue); 887 should_notify 888 }; 889 notify(notify_style, &*empty_condition, should_notify); 890 } 891 }) 892 } 893 894 macro_rules! run_queue_tests { 895 ( $( $name:ident( 896 num_producers: $num_producers:expr, 897 num_consumers: $num_consumers:expr, 898 max_queue_size: $max_queue_size:expr, 899 messages_per_producer: $messages_per_producer:expr, 900 notification_style: $notification_style:expr, 901 timeout: $timeout:expr, 902 delay_seconds: $delay_seconds:expr); 903 )* ) => { 904 $(#[test] 905 fn $name() { 906 let delay = Duration::from_secs($delay_seconds); 907 run_queue_test( 908 $num_producers, 909 $num_consumers, 910 $max_queue_size, 911 $messages_per_producer, 912 $notification_style, 913 $timeout, 914 delay, 915 ); 916 })* 917 }; 918 } 919 920 run_queue_tests! { 921 sanity_check_queue( 922 num_producers: 1, 923 num_consumers: 1, 924 max_queue_size: 1, 925 messages_per_producer: 100_000, 926 notification_style: NotifyStyle::All, 927 timeout: Timeout::Bounded(Duration::from_secs(1)), 928 delay_seconds: 0 929 ); 930 sanity_check_queue_timeout( 931 num_producers: 1, 932 num_consumers: 1, 933 max_queue_size: 1, 934 messages_per_producer: 100_000, 935 notification_style: NotifyStyle::All, 936 timeout: Timeout::Forever, 937 delay_seconds: 0 938 ); 939 new_test_without_timeout_5( 940 num_producers: 1, 941 num_consumers: 5, 942 max_queue_size: 1, 943 messages_per_producer: 100_000, 944 notification_style: NotifyStyle::All, 945 timeout: Timeout::Forever, 946 delay_seconds: 0 947 ); 948 one_producer_one_consumer_one_slot( 949 num_producers: 1, 950 num_consumers: 1, 951 max_queue_size: 1, 952 messages_per_producer: 100_000, 953 notification_style: NotifyStyle::All, 954 timeout: Timeout::Forever, 955 delay_seconds: 0 956 ); 957 one_producer_one_consumer_one_slot_timeout( 958 num_producers: 1, 959 num_consumers: 1, 960 max_queue_size: 1, 961 messages_per_producer: 100_000, 962 notification_style: NotifyStyle::All, 963 timeout: Timeout::Forever, 964 delay_seconds: 1 965 ); 966 one_producer_one_consumer_hundred_slots( 967 num_producers: 1, 968 num_consumers: 1, 969 max_queue_size: 100, 970 messages_per_producer: 1_000_000, 971 notification_style: NotifyStyle::All, 972 timeout: Timeout::Forever, 973 delay_seconds: 0 974 ); 975 ten_producers_one_consumer_one_slot( 976 num_producers: 10, 977 num_consumers: 1, 978 max_queue_size: 1, 979 messages_per_producer: 10000, 980 notification_style: NotifyStyle::All, 981 timeout: Timeout::Forever, 982 delay_seconds: 0 983 ); 984 ten_producers_one_consumer_hundred_slots_notify_all( 985 num_producers: 10, 986 num_consumers: 1, 987 max_queue_size: 100, 988 messages_per_producer: 10000, 989 notification_style: NotifyStyle::All, 990 timeout: Timeout::Forever, 991 delay_seconds: 0 992 ); 993 ten_producers_one_consumer_hundred_slots_notify_one( 994 num_producers: 10, 995 num_consumers: 1, 996 max_queue_size: 100, 997 messages_per_producer: 10000, 998 notification_style: NotifyStyle::One, 999 timeout: Timeout::Forever, 1000 delay_seconds: 0 1001 ); 1002 one_producer_ten_consumers_one_slot( 1003 num_producers: 1, 1004 num_consumers: 10, 1005 max_queue_size: 1, 1006 messages_per_producer: 10000, 1007 notification_style: NotifyStyle::All, 1008 timeout: Timeout::Forever, 1009 delay_seconds: 0 1010 ); 1011 one_producer_ten_consumers_hundred_slots_notify_all( 1012 num_producers: 1, 1013 num_consumers: 10, 1014 max_queue_size: 100, 1015 messages_per_producer: 100_000, 1016 notification_style: NotifyStyle::All, 1017 timeout: Timeout::Forever, 1018 delay_seconds: 0 1019 ); 1020 one_producer_ten_consumers_hundred_slots_notify_one( 1021 num_producers: 1, 1022 num_consumers: 10, 1023 max_queue_size: 100, 1024 messages_per_producer: 100_000, 1025 notification_style: NotifyStyle::One, 1026 timeout: Timeout::Forever, 1027 delay_seconds: 0 1028 ); 1029 ten_producers_ten_consumers_one_slot( 1030 num_producers: 10, 1031 num_consumers: 10, 1032 max_queue_size: 1, 1033 messages_per_producer: 50000, 1034 notification_style: NotifyStyle::All, 1035 timeout: Timeout::Forever, 1036 delay_seconds: 0 1037 ); 1038 ten_producers_ten_consumers_hundred_slots_notify_all( 1039 num_producers: 10, 1040 num_consumers: 10, 1041 max_queue_size: 100, 1042 messages_per_producer: 50000, 1043 notification_style: NotifyStyle::All, 1044 timeout: Timeout::Forever, 1045 delay_seconds: 0 1046 ); 1047 ten_producers_ten_consumers_hundred_slots_notify_one( 1048 num_producers: 10, 1049 num_consumers: 10, 1050 max_queue_size: 100, 1051 messages_per_producer: 50000, 1052 notification_style: NotifyStyle::One, 1053 timeout: Timeout::Forever, 1054 delay_seconds: 0 1055 ); 1056 } 1057 } 1058