1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::mutex::MutexGuard;
9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::{deadlock, util};
11 use core::{
12     fmt, ptr,
13     sync::atomic::{AtomicPtr, Ordering},
14 };
15 use instant::Instant;
16 use lock_api::RawMutex as RawMutex_;
17 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
18 use std::time::Duration;
19 
20 /// A type indicating whether a timed wait on a condition variable returned
21 /// due to a time out or not.
22 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
23 pub struct WaitTimeoutResult(bool);
24 
25 impl WaitTimeoutResult {
26     /// Returns whether the wait was known to have timed out.
27     #[inline]
timed_out(self) -> bool28     pub fn timed_out(self) -> bool {
29         self.0
30     }
31 }
32 
33 /// A Condition Variable
34 ///
35 /// Condition variables represent the ability to block a thread such that it
36 /// consumes no CPU time while waiting for an event to occur. Condition
37 /// variables are typically associated with a boolean predicate (a condition)
38 /// and a mutex. The predicate is always verified inside of the mutex before
39 /// determining that thread must block.
40 ///
41 /// Note that this module places one additional restriction over the system
42 /// condition variables: each condvar can be used with only one mutex at a
43 /// time. Any attempt to use multiple mutexes on the same condition variable
44 /// simultaneously will result in a runtime panic. However it is possible to
45 /// switch to a different mutex if there are no threads currently waiting on
46 /// the condition variable.
47 ///
48 /// # Differences from the standard library `Condvar`
49 ///
50 /// - No spurious wakeups: A wait will only return a non-timeout result if it
51 ///   was woken up by `notify_one` or `notify_all`.
52 /// - `Condvar::notify_all` will only wake up a single thread, the rest are
53 ///   requeued to wait for the `Mutex` to be unlocked by the thread that was
54 ///   woken up.
55 /// - Only requires 1 word of space, whereas the standard library boxes the
56 ///   `Condvar` due to platform limitations.
57 /// - Can be statically constructed (requires the `const_fn` nightly feature).
58 /// - Does not require any drop glue when dropped.
59 /// - Inline fast path for the uncontended case.
60 ///
61 /// # Examples
62 ///
63 /// ```
64 /// use parking_lot::{Mutex, Condvar};
65 /// use std::sync::Arc;
66 /// use std::thread;
67 ///
68 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69 /// let pair2 = pair.clone();
70 ///
71 /// // Inside of our lock, spawn a new thread, and then wait for it to start
72 /// thread::spawn(move|| {
73 ///     let &(ref lock, ref cvar) = &*pair2;
74 ///     let mut started = lock.lock();
75 ///     *started = true;
76 ///     cvar.notify_one();
77 /// });
78 ///
79 /// // wait for the thread to start up
80 /// let &(ref lock, ref cvar) = &*pair;
81 /// let mut started = lock.lock();
82 /// if !*started {
83 ///     cvar.wait(&mut started);
84 /// }
85 /// // Note that we used an if instead of a while loop above. This is only
86 /// // possible because parking_lot's Condvar will never spuriously wake up.
87 /// // This means that wait() will only return after notify_one or notify_all is
88 /// // called.
89 /// ```
90 pub struct Condvar {
91     state: AtomicPtr<RawMutex>,
92 }
93 
94 impl Condvar {
95     /// Creates a new condition variable which is ready to be waited on and
96     /// notified.
97     #[inline]
new() -> Condvar98     pub const fn new() -> Condvar {
99         Condvar {
100             state: AtomicPtr::new(ptr::null_mut()),
101         }
102     }
103 
104     /// Wakes up one blocked thread on this condvar.
105     ///
106     /// Returns whether a thread was woken up.
107     ///
108     /// If there is a blocked thread on this condition variable, then it will
109     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110     /// `notify_one` are not buffered in any way.
111     ///
112     /// To wake up all threads, see `notify_all()`.
113     ///
114     /// # Examples
115     ///
116     /// ```
117     /// use parking_lot::Condvar;
118     ///
119     /// let condvar = Condvar::new();
120     ///
121     /// // do something with condvar, share it with other threads
122     ///
123     /// if !condvar.notify_one() {
124     ///     println!("Nobody was listening for this.");
125     /// }
126     /// ```
127     #[inline]
notify_one(&self) -> bool128     pub fn notify_one(&self) -> bool {
129         // Nothing to do if there are no waiting threads
130         let state = self.state.load(Ordering::Relaxed);
131         if state.is_null() {
132             return false;
133         }
134 
135         self.notify_one_slow(state)
136     }
137 
138     #[cold]
notify_one_slow(&self, mutex: *mut RawMutex) -> bool139     fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140         unsafe {
141             // Unpark one thread and requeue the rest onto the mutex
142             let from = self as *const _ as usize;
143             let to = mutex as usize;
144             let validate = || {
145                 // Make sure that our atomic state still points to the same
146                 // mutex. If not then it means that all threads on the current
147                 // mutex were woken up and a new waiting thread switched to a
148                 // different mutex. In that case we can get away with doing
149                 // nothing.
150                 if self.state.load(Ordering::Relaxed) != mutex {
151                     return RequeueOp::Abort;
152                 }
153 
154                 // Unpark one thread if the mutex is unlocked, otherwise just
155                 // requeue everything to the mutex. This is safe to do here
156                 // since unlocking the mutex when the parked bit is set requires
157                 // locking the queue. There is the possibility of a race if the
158                 // mutex gets locked after we check, but that doesn't matter in
159                 // this case.
160                 if (*mutex).mark_parked_if_locked() {
161                     RequeueOp::RequeueOne
162                 } else {
163                     RequeueOp::UnparkOne
164                 }
165             };
166             let callback = |_op, result: UnparkResult| {
167                 // Clear our state if there are no more waiting threads
168                 if !result.have_more_threads {
169                     self.state.store(ptr::null_mut(), Ordering::Relaxed);
170                 }
171                 TOKEN_NORMAL
172             };
173             let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
174 
175             res.unparked_threads + res.requeued_threads != 0
176         }
177     }
178 
179     /// Wakes up all blocked threads on this condvar.
180     ///
181     /// Returns the number of threads woken up.
182     ///
183     /// This method will ensure that any current waiters on the condition
184     /// variable are awoken. Calls to `notify_all()` are not buffered in any
185     /// way.
186     ///
187     /// To wake up only one thread, see `notify_one()`.
188     #[inline]
notify_all(&self) -> usize189     pub fn notify_all(&self) -> usize {
190         // Nothing to do if there are no waiting threads
191         let state = self.state.load(Ordering::Relaxed);
192         if state.is_null() {
193             return 0;
194         }
195 
196         self.notify_all_slow(state)
197     }
198 
199     #[cold]
notify_all_slow(&self, mutex: *mut RawMutex) -> usize200     fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
201         unsafe {
202             // Unpark one thread and requeue the rest onto the mutex
203             let from = self as *const _ as usize;
204             let to = mutex as usize;
205             let validate = || {
206                 // Make sure that our atomic state still points to the same
207                 // mutex. If not then it means that all threads on the current
208                 // mutex were woken up and a new waiting thread switched to a
209                 // different mutex. In that case we can get away with doing
210                 // nothing.
211                 if self.state.load(Ordering::Relaxed) != mutex {
212                     return RequeueOp::Abort;
213                 }
214 
215                 // Clear our state since we are going to unpark or requeue all
216                 // threads.
217                 self.state.store(ptr::null_mut(), Ordering::Relaxed);
218 
219                 // Unpark one thread if the mutex is unlocked, otherwise just
220                 // requeue everything to the mutex. This is safe to do here
221                 // since unlocking the mutex when the parked bit is set requires
222                 // locking the queue. There is the possibility of a race if the
223                 // mutex gets locked after we check, but that doesn't matter in
224                 // this case.
225                 if (*mutex).mark_parked_if_locked() {
226                     RequeueOp::RequeueAll
227                 } else {
228                     RequeueOp::UnparkOneRequeueRest
229                 }
230             };
231             let callback = |op, result: UnparkResult| {
232                 // If we requeued threads to the mutex, mark it as having
233                 // parked threads. The RequeueAll case is already handled above.
234                 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
235                     (*mutex).mark_parked();
236                 }
237                 TOKEN_NORMAL
238             };
239             let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
240 
241             res.unparked_threads + res.requeued_threads
242         }
243     }
244 
245     /// Blocks the current thread until this condition variable receives a
246     /// notification.
247     ///
248     /// This function will atomically unlock the mutex specified (represented by
249     /// `mutex_guard`) and block the current thread. This means that any calls
250     /// to `notify_*()` which happen logically after the mutex is unlocked are
251     /// candidates to wake this thread up. When this function call returns, the
252     /// lock specified will have been re-acquired.
253     ///
254     /// # Panics
255     ///
256     /// This function will panic if another thread is waiting on the `Condvar`
257     /// with a different `Mutex` object.
258     #[inline]
wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)259     pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
260         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
261     }
262 
263     /// Waits on this condition variable for a notification, timing out after
264     /// the specified time instant.
265     ///
266     /// The semantics of this function are equivalent to `wait()` except that
267     /// the thread will be blocked roughly until `timeout` is reached. This
268     /// method should not be used for precise timing due to anomalies such as
269     /// preemption or platform differences that may not cause the maximum
270     /// amount of time waited to be precisely `timeout`.
271     ///
272     /// Note that the best effort is made to ensure that the time waited is
273     /// measured with a monotonic clock, and not affected by the changes made to
274     /// the system time.
275     ///
276     /// The returned `WaitTimeoutResult` value indicates if the timeout is
277     /// known to have elapsed.
278     ///
279     /// Like `wait`, the lock specified will be re-acquired when this function
280     /// returns, regardless of whether the timeout elapsed or not.
281     ///
282     /// # Panics
283     ///
284     /// This function will panic if another thread is waiting on the `Condvar`
285     /// with a different `Mutex` object.
286     #[inline]
wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult287     pub fn wait_until<T: ?Sized>(
288         &self,
289         mutex_guard: &mut MutexGuard<'_, T>,
290         timeout: Instant,
291     ) -> WaitTimeoutResult {
292         self.wait_until_internal(
293             unsafe { MutexGuard::mutex(mutex_guard).raw() },
294             Some(timeout),
295         )
296     }
297 
298     // This is a non-generic function to reduce the monomorphization cost of
299     // using `wait_until`.
wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult300     fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
301         unsafe {
302             let result;
303             let mut bad_mutex = false;
304             let mut requeued = false;
305             {
306                 let addr = self as *const _ as usize;
307                 let lock_addr = mutex as *const _ as *mut _;
308                 let validate = || {
309                     // Ensure we don't use two different mutexes with the same
310                     // Condvar at the same time. This is done while locked to
311                     // avoid races with notify_one
312                     let state = self.state.load(Ordering::Relaxed);
313                     if state.is_null() {
314                         self.state.store(lock_addr, Ordering::Relaxed);
315                     } else if state != lock_addr {
316                         bad_mutex = true;
317                         return false;
318                     }
319                     true
320                 };
321                 let before_sleep = || {
322                     // Unlock the mutex before sleeping...
323                     mutex.unlock();
324                 };
325                 let timed_out = |k, was_last_thread| {
326                     // If we were requeued to a mutex, then we did not time out.
327                     // We'll just park ourselves on the mutex again when we try
328                     // to lock it later.
329                     requeued = k != addr;
330 
331                     // If we were the last thread on the queue then we need to
332                     // clear our state. This is normally done by the
333                     // notify_{one,all} functions when not timing out.
334                     if !requeued && was_last_thread {
335                         self.state.store(ptr::null_mut(), Ordering::Relaxed);
336                     }
337                 };
338                 result = parking_lot_core::park(
339                     addr,
340                     validate,
341                     before_sleep,
342                     timed_out,
343                     DEFAULT_PARK_TOKEN,
344                     timeout,
345                 );
346             }
347 
348             // Panic if we tried to use multiple mutexes with a Condvar. Note
349             // that at this point the MutexGuard is still locked. It will be
350             // unlocked by the unwinding logic.
351             if bad_mutex {
352                 panic!("attempted to use a condition variable with more than one mutex");
353             }
354 
355             // ... and re-lock it once we are done sleeping
356             if result == ParkResult::Unparked(TOKEN_HANDOFF) {
357                 deadlock::acquire_resource(mutex as *const _ as usize);
358             } else {
359                 mutex.lock();
360             }
361 
362             WaitTimeoutResult(!(result.is_unparked() || requeued))
363         }
364     }
365 
366     /// Waits on this condition variable for a notification, timing out after a
367     /// specified duration.
368     ///
369     /// The semantics of this function are equivalent to `wait()` except that
370     /// the thread will be blocked for roughly no longer than `timeout`. This
371     /// method should not be used for precise timing due to anomalies such as
372     /// preemption or platform differences that may not cause the maximum
373     /// amount of time waited to be precisely `timeout`.
374     ///
375     /// Note that the best effort is made to ensure that the time waited is
376     /// measured with a monotonic clock, and not affected by the changes made to
377     /// the system time.
378     ///
379     /// The returned `WaitTimeoutResult` value indicates if the timeout is
380     /// known to have elapsed.
381     ///
382     /// Like `wait`, the lock specified will be re-acquired when this function
383     /// returns, regardless of whether the timeout elapsed or not.
384     ///
385     /// # Panics
386     ///
387     /// Panics if the given `timeout` is so large that it can't be added to the current time.
388     /// This panic is not possible if the crate is built with the `nightly` feature, then a too
389     /// large `timeout` becomes equivalent to just calling `wait`.
390     #[inline]
wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult391     pub fn wait_for<T: ?Sized>(
392         &self,
393         mutex_guard: &mut MutexGuard<'_, T>,
394         timeout: Duration,
395     ) -> WaitTimeoutResult {
396         let deadline = util::to_deadline(timeout);
397         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
398     }
399 }
400 
401 impl Default for Condvar {
402     #[inline]
default() -> Condvar403     fn default() -> Condvar {
404         Condvar::new()
405     }
406 }
407 
408 impl fmt::Debug for Condvar {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result409     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410         f.pad("Condvar { .. }")
411     }
412 }
413 
414 #[cfg(test)]
415 mod tests {
416     use crate::{Condvar, Mutex, MutexGuard};
417     use instant::Instant;
418     use std::sync::mpsc::channel;
419     use std::sync::Arc;
420     use std::thread;
421     use std::time::Duration;
422 
423     #[test]
smoke()424     fn smoke() {
425         let c = Condvar::new();
426         c.notify_one();
427         c.notify_all();
428     }
429 
430     #[test]
notify_one()431     fn notify_one() {
432         let m = Arc::new(Mutex::new(()));
433         let m2 = m.clone();
434         let c = Arc::new(Condvar::new());
435         let c2 = c.clone();
436 
437         let mut g = m.lock();
438         let _t = thread::spawn(move || {
439             let _g = m2.lock();
440             c2.notify_one();
441         });
442         c.wait(&mut g);
443     }
444 
445     #[test]
notify_all()446     fn notify_all() {
447         const N: usize = 10;
448 
449         let data = Arc::new((Mutex::new(0), Condvar::new()));
450         let (tx, rx) = channel();
451         for _ in 0..N {
452             let data = data.clone();
453             let tx = tx.clone();
454             thread::spawn(move || {
455                 let &(ref lock, ref cond) = &*data;
456                 let mut cnt = lock.lock();
457                 *cnt += 1;
458                 if *cnt == N {
459                     tx.send(()).unwrap();
460                 }
461                 while *cnt != 0 {
462                     cond.wait(&mut cnt);
463                 }
464                 tx.send(()).unwrap();
465             });
466         }
467         drop(tx);
468 
469         let &(ref lock, ref cond) = &*data;
470         rx.recv().unwrap();
471         let mut cnt = lock.lock();
472         *cnt = 0;
473         cond.notify_all();
474         drop(cnt);
475 
476         for _ in 0..N {
477             rx.recv().unwrap();
478         }
479     }
480 
481     #[test]
notify_one_return_true()482     fn notify_one_return_true() {
483         let m = Arc::new(Mutex::new(()));
484         let m2 = m.clone();
485         let c = Arc::new(Condvar::new());
486         let c2 = c.clone();
487 
488         let mut g = m.lock();
489         let _t = thread::spawn(move || {
490             let _g = m2.lock();
491             assert!(c2.notify_one());
492         });
493         c.wait(&mut g);
494     }
495 
496     #[test]
notify_one_return_false()497     fn notify_one_return_false() {
498         let m = Arc::new(Mutex::new(()));
499         let c = Arc::new(Condvar::new());
500 
501         let _t = thread::spawn(move || {
502             let _g = m.lock();
503             assert!(!c.notify_one());
504         });
505     }
506 
507     #[test]
notify_all_return()508     fn notify_all_return() {
509         const N: usize = 10;
510 
511         let data = Arc::new((Mutex::new(0), Condvar::new()));
512         let (tx, rx) = channel();
513         for _ in 0..N {
514             let data = data.clone();
515             let tx = tx.clone();
516             thread::spawn(move || {
517                 let &(ref lock, ref cond) = &*data;
518                 let mut cnt = lock.lock();
519                 *cnt += 1;
520                 if *cnt == N {
521                     tx.send(()).unwrap();
522                 }
523                 while *cnt != 0 {
524                     cond.wait(&mut cnt);
525                 }
526                 tx.send(()).unwrap();
527             });
528         }
529         drop(tx);
530 
531         let &(ref lock, ref cond) = &*data;
532         rx.recv().unwrap();
533         let mut cnt = lock.lock();
534         *cnt = 0;
535         assert_eq!(cond.notify_all(), N);
536         drop(cnt);
537 
538         for _ in 0..N {
539             rx.recv().unwrap();
540         }
541 
542         assert_eq!(cond.notify_all(), 0);
543     }
544 
545     #[test]
wait_for()546     fn wait_for() {
547         let m = Arc::new(Mutex::new(()));
548         let m2 = m.clone();
549         let c = Arc::new(Condvar::new());
550         let c2 = c.clone();
551 
552         let mut g = m.lock();
553         let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
554         assert!(no_timeout.timed_out());
555 
556         let _t = thread::spawn(move || {
557             let _g = m2.lock();
558             c2.notify_one();
559         });
560         // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait.
561         let very_long_timeout = if cfg!(feature = "nightly") {
562             Duration::from_secs(u64::max_value())
563         } else {
564             Duration::from_millis(u32::max_value() as u64)
565         };
566 
567         let timeout_res = c.wait_for(&mut g, very_long_timeout);
568         assert!(!timeout_res.timed_out());
569 
570         drop(g);
571     }
572 
573     #[test]
wait_until()574     fn wait_until() {
575         let m = Arc::new(Mutex::new(()));
576         let m2 = m.clone();
577         let c = Arc::new(Condvar::new());
578         let c2 = c.clone();
579 
580         let mut g = m.lock();
581         let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
582         assert!(no_timeout.timed_out());
583         let _t = thread::spawn(move || {
584             let _g = m2.lock();
585             c2.notify_one();
586         });
587         let timeout_res = c.wait_until(
588             &mut g,
589             Instant::now() + Duration::from_millis(u32::max_value() as u64),
590         );
591         assert!(!timeout_res.timed_out());
592         drop(g);
593     }
594 
595     #[test]
596     #[should_panic]
two_mutexes()597     fn two_mutexes() {
598         let m = Arc::new(Mutex::new(()));
599         let m2 = m.clone();
600         let m3 = Arc::new(Mutex::new(()));
601         let c = Arc::new(Condvar::new());
602         let c2 = c.clone();
603 
604         // Make sure we don't leave the child thread dangling
605         struct PanicGuard<'a>(&'a Condvar);
606         impl<'a> Drop for PanicGuard<'a> {
607             fn drop(&mut self) {
608                 self.0.notify_one();
609             }
610         }
611 
612         let (tx, rx) = channel();
613         let g = m.lock();
614         let _t = thread::spawn(move || {
615             let mut g = m2.lock();
616             tx.send(()).unwrap();
617             c2.wait(&mut g);
618         });
619         drop(g);
620         rx.recv().unwrap();
621         let _g = m.lock();
622         let _guard = PanicGuard(&*c);
623         c.wait(&mut m3.lock());
624     }
625 
626     #[test]
two_mutexes_disjoint()627     fn two_mutexes_disjoint() {
628         let m = Arc::new(Mutex::new(()));
629         let m2 = m.clone();
630         let m3 = Arc::new(Mutex::new(()));
631         let c = Arc::new(Condvar::new());
632         let c2 = c.clone();
633 
634         let mut g = m.lock();
635         let _t = thread::spawn(move || {
636             let _g = m2.lock();
637             c2.notify_one();
638         });
639         c.wait(&mut g);
640         drop(g);
641 
642         let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
643     }
644 
645     #[test]
test_debug_condvar()646     fn test_debug_condvar() {
647         let c = Condvar::new();
648         assert_eq!(format!("{:?}", c), "Condvar { .. }");
649     }
650 
651     #[test]
test_condvar_requeue()652     fn test_condvar_requeue() {
653         let m = Arc::new(Mutex::new(()));
654         let m2 = m.clone();
655         let c = Arc::new(Condvar::new());
656         let c2 = c.clone();
657         let t = thread::spawn(move || {
658             let mut g = m2.lock();
659             c2.wait(&mut g);
660         });
661 
662         let mut g = m.lock();
663         while !c.notify_one() {
664             // Wait for the thread to get into wait()
665             MutexGuard::bump(&mut g);
666             // Yield, so the other thread gets a chance to do something.
667             // (At least Miri needs this, because it doesn't preempt threads.)
668             thread::yield_now();
669         }
670         // The thread should have been requeued to the mutex, which we wake up now.
671         drop(g);
672         t.join().unwrap();
673     }
674 
675     #[test]
test_issue_129()676     fn test_issue_129() {
677         let locks = Arc::new((Mutex::new(()), Condvar::new()));
678 
679         let (tx, rx) = channel();
680         for _ in 0..4 {
681             let locks = locks.clone();
682             let tx = tx.clone();
683             thread::spawn(move || {
684                 let mut guard = locks.0.lock();
685                 locks.1.wait(&mut guard);
686                 locks.1.wait_for(&mut guard, Duration::from_millis(1));
687                 locks.1.notify_one();
688                 tx.send(()).unwrap();
689             });
690         }
691 
692         thread::sleep(Duration::from_millis(100));
693         locks.1.notify_one();
694 
695         for _ in 0..4 {
696             assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
697         }
698     }
699 }
700 
701 /// This module contains an integration test that is heavily inspired from WebKit's own integration
702 /// tests for it's own Condvar.
703 #[cfg(test)]
704 mod webkit_queue_test {
705     use crate::{Condvar, Mutex, MutexGuard};
706     use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
707 
708     #[derive(Clone, Copy)]
709     enum Timeout {
710         Bounded(Duration),
711         Forever,
712     }
713 
714     #[derive(Clone, Copy)]
715     enum NotifyStyle {
716         One,
717         All,
718     }
719 
720     struct Queue {
721         items: VecDeque<usize>,
722         should_continue: bool,
723     }
724 
725     impl Queue {
new() -> Self726         fn new() -> Self {
727             Self {
728                 items: VecDeque::new(),
729                 should_continue: true,
730             }
731         }
732     }
733 
wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )734     fn wait<T: ?Sized>(
735         condition: &Condvar,
736         lock: &mut MutexGuard<'_, T>,
737         predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
738         timeout: &Timeout,
739     ) {
740         while !predicate(lock) {
741             match timeout {
742                 Timeout::Forever => condition.wait(lock),
743                 Timeout::Bounded(bound) => {
744                     condition.wait_for(lock, *bound);
745                 }
746             }
747         }
748     }
749 
notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)750     fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
751         match style {
752             NotifyStyle::One => {
753                 condition.notify_one();
754             }
755             NotifyStyle::All => {
756                 if should_notify {
757                     condition.notify_all();
758                 }
759             }
760         }
761     }
762 
run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )763     fn run_queue_test(
764         num_producers: usize,
765         num_consumers: usize,
766         max_queue_size: usize,
767         messages_per_producer: usize,
768         notify_style: NotifyStyle,
769         timeout: Timeout,
770         delay: Duration,
771     ) {
772         let input_queue = Arc::new(Mutex::new(Queue::new()));
773         let empty_condition = Arc::new(Condvar::new());
774         let full_condition = Arc::new(Condvar::new());
775 
776         let output_vec = Arc::new(Mutex::new(vec![]));
777 
778         let consumers = (0..num_consumers)
779             .map(|_| {
780                 consumer_thread(
781                     input_queue.clone(),
782                     empty_condition.clone(),
783                     full_condition.clone(),
784                     timeout,
785                     notify_style,
786                     output_vec.clone(),
787                     max_queue_size,
788                 )
789             })
790             .collect::<Vec<_>>();
791         let producers = (0..num_producers)
792             .map(|_| {
793                 producer_thread(
794                     messages_per_producer,
795                     input_queue.clone(),
796                     empty_condition.clone(),
797                     full_condition.clone(),
798                     timeout,
799                     notify_style,
800                     max_queue_size,
801                 )
802             })
803             .collect::<Vec<_>>();
804 
805         thread::sleep(delay);
806 
807         for producer in producers.into_iter() {
808             producer.join().expect("Producer thread panicked");
809         }
810 
811         {
812             let mut input_queue = input_queue.lock();
813             input_queue.should_continue = false;
814         }
815         empty_condition.notify_all();
816 
817         for consumer in consumers.into_iter() {
818             consumer.join().expect("Consumer thread panicked");
819         }
820 
821         let mut output_vec = output_vec.lock();
822         assert_eq!(output_vec.len(), num_producers * messages_per_producer);
823         output_vec.sort();
824         for msg_idx in 0..messages_per_producer {
825             for producer_idx in 0..num_producers {
826                 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
827             }
828         }
829     }
830 
consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>831     fn consumer_thread(
832         input_queue: Arc<Mutex<Queue>>,
833         empty_condition: Arc<Condvar>,
834         full_condition: Arc<Condvar>,
835         timeout: Timeout,
836         notify_style: NotifyStyle,
837         output_queue: Arc<Mutex<Vec<usize>>>,
838         max_queue_size: usize,
839     ) -> thread::JoinHandle<()> {
840         thread::spawn(move || loop {
841             let (should_notify, result) = {
842                 let mut queue = input_queue.lock();
843                 wait(
844                     &*empty_condition,
845                     &mut queue,
846                     |state| -> bool { !state.items.is_empty() || !state.should_continue },
847                     &timeout,
848                 );
849                 if queue.items.is_empty() && !queue.should_continue {
850                     return;
851                 }
852                 let should_notify = queue.items.len() == max_queue_size;
853                 let result = queue.items.pop_front();
854                 std::mem::drop(queue);
855                 (should_notify, result)
856             };
857             notify(notify_style, &*full_condition, should_notify);
858 
859             if let Some(result) = result {
860                 output_queue.lock().push(result);
861             }
862         })
863     }
864 
producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>865     fn producer_thread(
866         num_messages: usize,
867         queue: Arc<Mutex<Queue>>,
868         empty_condition: Arc<Condvar>,
869         full_condition: Arc<Condvar>,
870         timeout: Timeout,
871         notify_style: NotifyStyle,
872         max_queue_size: usize,
873     ) -> thread::JoinHandle<()> {
874         thread::spawn(move || {
875             for message in 0..num_messages {
876                 let should_notify = {
877                     let mut queue = queue.lock();
878                     wait(
879                         &*full_condition,
880                         &mut queue,
881                         |state| state.items.len() < max_queue_size,
882                         &timeout,
883                     );
884                     let should_notify = queue.items.is_empty();
885                     queue.items.push_back(message);
886                     std::mem::drop(queue);
887                     should_notify
888                 };
889                 notify(notify_style, &*empty_condition, should_notify);
890             }
891         })
892     }
893 
894     macro_rules! run_queue_tests {
895         ( $( $name:ident(
896             num_producers: $num_producers:expr,
897             num_consumers: $num_consumers:expr,
898             max_queue_size: $max_queue_size:expr,
899             messages_per_producer: $messages_per_producer:expr,
900             notification_style: $notification_style:expr,
901             timeout: $timeout:expr,
902             delay_seconds: $delay_seconds:expr);
903         )* ) => {
904             $(#[test]
905             fn $name() {
906                 let delay = Duration::from_secs($delay_seconds);
907                 run_queue_test(
908                     $num_producers,
909                     $num_consumers,
910                     $max_queue_size,
911                     $messages_per_producer,
912                     $notification_style,
913                     $timeout,
914                     delay,
915                     );
916             })*
917         };
918     }
919 
920     run_queue_tests! {
921         sanity_check_queue(
922             num_producers: 1,
923             num_consumers: 1,
924             max_queue_size: 1,
925             messages_per_producer: 100_000,
926             notification_style: NotifyStyle::All,
927             timeout: Timeout::Bounded(Duration::from_secs(1)),
928             delay_seconds: 0
929         );
930         sanity_check_queue_timeout(
931             num_producers: 1,
932             num_consumers: 1,
933             max_queue_size: 1,
934             messages_per_producer: 100_000,
935             notification_style: NotifyStyle::All,
936             timeout: Timeout::Forever,
937             delay_seconds: 0
938         );
939         new_test_without_timeout_5(
940             num_producers: 1,
941             num_consumers: 5,
942             max_queue_size: 1,
943             messages_per_producer: 100_000,
944             notification_style: NotifyStyle::All,
945             timeout: Timeout::Forever,
946             delay_seconds: 0
947         );
948         one_producer_one_consumer_one_slot(
949             num_producers: 1,
950             num_consumers: 1,
951             max_queue_size: 1,
952             messages_per_producer: 100_000,
953             notification_style: NotifyStyle::All,
954             timeout: Timeout::Forever,
955             delay_seconds: 0
956         );
957         one_producer_one_consumer_one_slot_timeout(
958             num_producers: 1,
959             num_consumers: 1,
960             max_queue_size: 1,
961             messages_per_producer: 100_000,
962             notification_style: NotifyStyle::All,
963             timeout: Timeout::Forever,
964             delay_seconds: 1
965         );
966         one_producer_one_consumer_hundred_slots(
967             num_producers: 1,
968             num_consumers: 1,
969             max_queue_size: 100,
970             messages_per_producer: 1_000_000,
971             notification_style: NotifyStyle::All,
972             timeout: Timeout::Forever,
973             delay_seconds: 0
974         );
975         ten_producers_one_consumer_one_slot(
976             num_producers: 10,
977             num_consumers: 1,
978             max_queue_size: 1,
979             messages_per_producer: 10000,
980             notification_style: NotifyStyle::All,
981             timeout: Timeout::Forever,
982             delay_seconds: 0
983         );
984         ten_producers_one_consumer_hundred_slots_notify_all(
985             num_producers: 10,
986             num_consumers: 1,
987             max_queue_size: 100,
988             messages_per_producer: 10000,
989             notification_style: NotifyStyle::All,
990             timeout: Timeout::Forever,
991             delay_seconds: 0
992         );
993         ten_producers_one_consumer_hundred_slots_notify_one(
994             num_producers: 10,
995             num_consumers: 1,
996             max_queue_size: 100,
997             messages_per_producer: 10000,
998             notification_style: NotifyStyle::One,
999             timeout: Timeout::Forever,
1000             delay_seconds: 0
1001         );
1002         one_producer_ten_consumers_one_slot(
1003             num_producers: 1,
1004             num_consumers: 10,
1005             max_queue_size: 1,
1006             messages_per_producer: 10000,
1007             notification_style: NotifyStyle::All,
1008             timeout: Timeout::Forever,
1009             delay_seconds: 0
1010         );
1011         one_producer_ten_consumers_hundred_slots_notify_all(
1012             num_producers: 1,
1013             num_consumers: 10,
1014             max_queue_size: 100,
1015             messages_per_producer: 100_000,
1016             notification_style: NotifyStyle::All,
1017             timeout: Timeout::Forever,
1018             delay_seconds: 0
1019         );
1020         one_producer_ten_consumers_hundred_slots_notify_one(
1021             num_producers: 1,
1022             num_consumers: 10,
1023             max_queue_size: 100,
1024             messages_per_producer: 100_000,
1025             notification_style: NotifyStyle::One,
1026             timeout: Timeout::Forever,
1027             delay_seconds: 0
1028         );
1029         ten_producers_ten_consumers_one_slot(
1030             num_producers: 10,
1031             num_consumers: 10,
1032             max_queue_size: 1,
1033             messages_per_producer: 50000,
1034             notification_style: NotifyStyle::All,
1035             timeout: Timeout::Forever,
1036             delay_seconds: 0
1037         );
1038         ten_producers_ten_consumers_hundred_slots_notify_all(
1039             num_producers: 10,
1040             num_consumers: 10,
1041             max_queue_size: 100,
1042             messages_per_producer: 50000,
1043             notification_style: NotifyStyle::All,
1044             timeout: Timeout::Forever,
1045             delay_seconds: 0
1046         );
1047         ten_producers_ten_consumers_hundred_slots_notify_one(
1048             num_producers: 10,
1049             num_consumers: 10,
1050             max_queue_size: 100,
1051             messages_per_producer: 50000,
1052             notification_style: NotifyStyle::One,
1053             timeout: Timeout::Forever,
1054             delay_seconds: 0
1055         );
1056     }
1057 }
1058