1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::mutex::MutexGuard;
9 use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::{deadlock, util};
11 use core::{
12     fmt, ptr,
13     sync::atomic::{AtomicPtr, Ordering},
14 };
15 use lock_api::RawMutex as RawMutex_;
16 use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17 use std::time::{Duration, Instant};
18 
19 /// A type indicating whether a timed wait on a condition variable returned
20 /// due to a time out or not.
21 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
22 pub struct WaitTimeoutResult(bool);
23 
24 impl WaitTimeoutResult {
25     /// Returns whether the wait was known to have timed out.
26     #[inline]
timed_out(self) -> bool27     pub fn timed_out(self) -> bool {
28         self.0
29     }
30 }
31 
32 /// A Condition Variable
33 ///
34 /// Condition variables represent the ability to block a thread such that it
35 /// consumes no CPU time while waiting for an event to occur. Condition
36 /// variables are typically associated with a boolean predicate (a condition)
37 /// and a mutex. The predicate is always verified inside of the mutex before
38 /// determining that thread must block.
39 ///
40 /// Note that this module places one additional restriction over the system
41 /// condition variables: each condvar can be used with only one mutex at a
42 /// time. Any attempt to use multiple mutexes on the same condition variable
43 /// simultaneously will result in a runtime panic. However it is possible to
44 /// switch to a different mutex if there are no threads currently waiting on
45 /// the condition variable.
46 ///
47 /// # Differences from the standard library `Condvar`
48 ///
49 /// - No spurious wakeups: A wait will only return a non-timeout result if it
50 ///   was woken up by `notify_one` or `notify_all`.
51 /// - `Condvar::notify_all` will only wake up a single thread, the rest are
52 ///   requeued to wait for the `Mutex` to be unlocked by the thread that was
53 ///   woken up.
54 /// - Only requires 1 word of space, whereas the standard library boxes the
55 ///   `Condvar` due to platform limitations.
56 /// - Can be statically constructed (requires the `const_fn` nightly feature).
57 /// - Does not require any drop glue when dropped.
58 /// - Inline fast path for the uncontended case.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// use parking_lot::{Mutex, Condvar};
64 /// use std::sync::Arc;
65 /// use std::thread;
66 ///
67 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
68 /// let pair2 = pair.clone();
69 ///
70 /// // Inside of our lock, spawn a new thread, and then wait for it to start
71 /// thread::spawn(move|| {
72 ///     let &(ref lock, ref cvar) = &*pair2;
73 ///     let mut started = lock.lock();
74 ///     *started = true;
75 ///     cvar.notify_one();
76 /// });
77 ///
78 /// // wait for the thread to start up
79 /// let &(ref lock, ref cvar) = &*pair;
80 /// let mut started = lock.lock();
81 /// if !*started {
82 ///     cvar.wait(&mut started);
83 /// }
84 /// // Note that we used an if instead of a while loop above. This is only
85 /// // possible because parking_lot's Condvar will never spuriously wake up.
86 /// // This means that wait() will only return after notify_one or notify_all is
87 /// // called.
88 /// ```
89 pub struct Condvar {
90     state: AtomicPtr<RawMutex>,
91 }
92 
93 impl Condvar {
94     /// Creates a new condition variable which is ready to be waited on and
95     /// notified.
96     #[inline]
new() -> Condvar97     pub const fn new() -> Condvar {
98         Condvar {
99             state: AtomicPtr::new(ptr::null_mut()),
100         }
101     }
102 
103     /// Wakes up one blocked thread on this condvar.
104     ///
105     /// Returns whether a thread was woken up.
106     ///
107     /// If there is a blocked thread on this condition variable, then it will
108     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
109     /// `notify_one` are not buffered in any way.
110     ///
111     /// To wake up all threads, see `notify_all()`.
112     ///
113     /// # Examples
114     ///
115     /// ```
116     /// use parking_lot::Condvar;
117     ///
118     /// let condvar = Condvar::new();
119     ///
120     /// // do something with condvar, share it with other threads
121     ///
122     /// if !condvar.notify_one() {
123     ///     println!("Nobody was listening for this.");
124     /// }
125     /// ```
126     #[inline]
notify_one(&self) -> bool127     pub fn notify_one(&self) -> bool {
128         // Nothing to do if there are no waiting threads
129         let state = self.state.load(Ordering::Relaxed);
130         if state.is_null() {
131             return false;
132         }
133 
134         self.notify_one_slow(state)
135     }
136 
137     #[cold]
notify_one_slow(&self, mutex: *mut RawMutex) -> bool138     fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
139         unsafe {
140             // Unpark one thread and requeue the rest onto the mutex
141             let from = self as *const _ as usize;
142             let to = mutex as usize;
143             let validate = || {
144                 // Make sure that our atomic state still points to the same
145                 // mutex. If not then it means that all threads on the current
146                 // mutex were woken up and a new waiting thread switched to a
147                 // different mutex. In that case we can get away with doing
148                 // nothing.
149                 if self.state.load(Ordering::Relaxed) != mutex {
150                     return RequeueOp::Abort;
151                 }
152 
153                 // Unpark one thread if the mutex is unlocked, otherwise just
154                 // requeue everything to the mutex. This is safe to do here
155                 // since unlocking the mutex when the parked bit is set requires
156                 // locking the queue. There is the possibility of a race if the
157                 // mutex gets locked after we check, but that doesn't matter in
158                 // this case.
159                 if (*mutex).mark_parked_if_locked() {
160                     RequeueOp::RequeueOne
161                 } else {
162                     RequeueOp::UnparkOne
163                 }
164             };
165             let callback = |_op, result: UnparkResult| {
166                 // Clear our state if there are no more waiting threads
167                 if !result.have_more_threads {
168                     self.state.store(ptr::null_mut(), Ordering::Relaxed);
169                 }
170                 TOKEN_NORMAL
171             };
172             let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
173 
174             res.unparked_threads + res.requeued_threads != 0
175         }
176     }
177 
178     /// Wakes up all blocked threads on this condvar.
179     ///
180     /// Returns the number of threads woken up.
181     ///
182     /// This method will ensure that any current waiters on the condition
183     /// variable are awoken. Calls to `notify_all()` are not buffered in any
184     /// way.
185     ///
186     /// To wake up only one thread, see `notify_one()`.
187     #[inline]
notify_all(&self) -> usize188     pub fn notify_all(&self) -> usize {
189         // Nothing to do if there are no waiting threads
190         let state = self.state.load(Ordering::Relaxed);
191         if state.is_null() {
192             return 0;
193         }
194 
195         self.notify_all_slow(state)
196     }
197 
198     #[cold]
notify_all_slow(&self, mutex: *mut RawMutex) -> usize199     fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
200         unsafe {
201             // Unpark one thread and requeue the rest onto the mutex
202             let from = self as *const _ as usize;
203             let to = mutex as usize;
204             let validate = || {
205                 // Make sure that our atomic state still points to the same
206                 // mutex. If not then it means that all threads on the current
207                 // mutex were woken up and a new waiting thread switched to a
208                 // different mutex. In that case we can get away with doing
209                 // nothing.
210                 if self.state.load(Ordering::Relaxed) != mutex {
211                     return RequeueOp::Abort;
212                 }
213 
214                 // Clear our state since we are going to unpark or requeue all
215                 // threads.
216                 self.state.store(ptr::null_mut(), Ordering::Relaxed);
217 
218                 // Unpark one thread if the mutex is unlocked, otherwise just
219                 // requeue everything to the mutex. This is safe to do here
220                 // since unlocking the mutex when the parked bit is set requires
221                 // locking the queue. There is the possibility of a race if the
222                 // mutex gets locked after we check, but that doesn't matter in
223                 // this case.
224                 if (*mutex).mark_parked_if_locked() {
225                     RequeueOp::RequeueAll
226                 } else {
227                     RequeueOp::UnparkOneRequeueRest
228                 }
229             };
230             let callback = |op, result: UnparkResult| {
231                 // If we requeued threads to the mutex, mark it as having
232                 // parked threads. The RequeueAll case is already handled above.
233                 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
234                     (*mutex).mark_parked();
235                 }
236                 TOKEN_NORMAL
237             };
238             let res = parking_lot_core::unpark_requeue(from, to, validate, callback);
239 
240             res.unparked_threads + res.requeued_threads
241         }
242     }
243 
244     /// Blocks the current thread until this condition variable receives a
245     /// notification.
246     ///
247     /// This function will atomically unlock the mutex specified (represented by
248     /// `mutex_guard`) and block the current thread. This means that any calls
249     /// to `notify_*()` which happen logically after the mutex is unlocked are
250     /// candidates to wake this thread up. When this function call returns, the
251     /// lock specified will have been re-acquired.
252     ///
253     /// # Panics
254     ///
255     /// This function will panic if another thread is waiting on the `Condvar`
256     /// with a different `Mutex` object.
257     #[inline]
wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>)258     pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
259         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
260     }
261 
262     /// Waits on this condition variable for a notification, timing out after
263     /// the specified time instant.
264     ///
265     /// The semantics of this function are equivalent to `wait()` except that
266     /// the thread will be blocked roughly until `timeout` is reached. This
267     /// method should not be used for precise timing due to anomalies such as
268     /// preemption or platform differences that may not cause the maximum
269     /// amount of time waited to be precisely `timeout`.
270     ///
271     /// Note that the best effort is made to ensure that the time waited is
272     /// measured with a monotonic clock, and not affected by the changes made to
273     /// the system time.
274     ///
275     /// The returned `WaitTimeoutResult` value indicates if the timeout is
276     /// known to have elapsed.
277     ///
278     /// Like `wait`, the lock specified will be re-acquired when this function
279     /// returns, regardless of whether the timeout elapsed or not.
280     ///
281     /// # Panics
282     ///
283     /// This function will panic if another thread is waiting on the `Condvar`
284     /// with a different `Mutex` object.
285     #[inline]
wait_until<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Instant, ) -> WaitTimeoutResult286     pub fn wait_until<T: ?Sized>(
287         &self,
288         mutex_guard: &mut MutexGuard<'_, T>,
289         timeout: Instant,
290     ) -> WaitTimeoutResult {
291         self.wait_until_internal(
292             unsafe { MutexGuard::mutex(mutex_guard).raw() },
293             Some(timeout),
294         )
295     }
296 
297     // This is a non-generic function to reduce the monomorphization cost of
298     // using `wait_until`.
wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult299     fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
300         unsafe {
301             let result;
302             let mut bad_mutex = false;
303             let mut requeued = false;
304             {
305                 let addr = self as *const _ as usize;
306                 let lock_addr = mutex as *const _ as *mut _;
307                 let validate = || {
308                     // Ensure we don't use two different mutexes with the same
309                     // Condvar at the same time. This is done while locked to
310                     // avoid races with notify_one
311                     let state = self.state.load(Ordering::Relaxed);
312                     if state.is_null() {
313                         self.state.store(lock_addr, Ordering::Relaxed);
314                     } else if state != lock_addr {
315                         bad_mutex = true;
316                         return false;
317                     }
318                     true
319                 };
320                 let before_sleep = || {
321                     // Unlock the mutex before sleeping...
322                     mutex.unlock();
323                 };
324                 let timed_out = |k, was_last_thread| {
325                     // If we were requeued to a mutex, then we did not time out.
326                     // We'll just park ourselves on the mutex again when we try
327                     // to lock it later.
328                     requeued = k != addr;
329 
330                     // If we were the last thread on the queue then we need to
331                     // clear our state. This is normally done by the
332                     // notify_{one,all} functions when not timing out.
333                     if !requeued && was_last_thread {
334                         self.state.store(ptr::null_mut(), Ordering::Relaxed);
335                     }
336                 };
337                 result = parking_lot_core::park(
338                     addr,
339                     validate,
340                     before_sleep,
341                     timed_out,
342                     DEFAULT_PARK_TOKEN,
343                     timeout,
344                 );
345             }
346 
347             // Panic if we tried to use multiple mutexes with a Condvar. Note
348             // that at this point the MutexGuard is still locked. It will be
349             // unlocked by the unwinding logic.
350             if bad_mutex {
351                 panic!("attempted to use a condition variable with more than one mutex");
352             }
353 
354             // ... and re-lock it once we are done sleeping
355             if result == ParkResult::Unparked(TOKEN_HANDOFF) {
356                 deadlock::acquire_resource(mutex as *const _ as usize);
357             } else {
358                 mutex.lock();
359             }
360 
361             WaitTimeoutResult(!(result.is_unparked() || requeued))
362         }
363     }
364 
365     /// Waits on this condition variable for a notification, timing out after a
366     /// specified duration.
367     ///
368     /// The semantics of this function are equivalent to `wait()` except that
369     /// the thread will be blocked for roughly no longer than `timeout`. This
370     /// method should not be used for precise timing due to anomalies such as
371     /// preemption or platform differences that may not cause the maximum
372     /// amount of time waited to be precisely `timeout`.
373     ///
374     /// Note that the best effort is made to ensure that the time waited is
375     /// measured with a monotonic clock, and not affected by the changes made to
376     /// the system time.
377     ///
378     /// The returned `WaitTimeoutResult` value indicates if the timeout is
379     /// known to have elapsed.
380     ///
381     /// Like `wait`, the lock specified will be re-acquired when this function
382     /// returns, regardless of whether the timeout elapsed or not.
383     ///
384     /// # Panics
385     ///
386     /// Panics if the given `timeout` is so large that it can't be added to the current time.
387     /// This panic is not possible if the crate is built with the `nightly` feature, then a too
388     /// large `timeout` becomes equivalent to just calling `wait`.
389     #[inline]
wait_for<T: ?Sized>( &self, mutex_guard: &mut MutexGuard<'_, T>, timeout: Duration, ) -> WaitTimeoutResult390     pub fn wait_for<T: ?Sized>(
391         &self,
392         mutex_guard: &mut MutexGuard<'_, T>,
393         timeout: Duration,
394     ) -> WaitTimeoutResult {
395         let deadline = util::to_deadline(timeout);
396         self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
397     }
398 }
399 
400 impl Default for Condvar {
401     #[inline]
default() -> Condvar402     fn default() -> Condvar {
403         Condvar::new()
404     }
405 }
406 
407 impl fmt::Debug for Condvar {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result408     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409         f.pad("Condvar { .. }")
410     }
411 }
412 
413 #[cfg(test)]
414 mod tests {
415     use crate::{Condvar, Mutex, MutexGuard};
416     use std::sync::mpsc::channel;
417     use std::sync::Arc;
418     use std::thread;
419     use std::time::{Duration, Instant};
420 
421     #[test]
smoke()422     fn smoke() {
423         let c = Condvar::new();
424         c.notify_one();
425         c.notify_all();
426     }
427 
428     #[test]
notify_one()429     fn notify_one() {
430         let m = Arc::new(Mutex::new(()));
431         let m2 = m.clone();
432         let c = Arc::new(Condvar::new());
433         let c2 = c.clone();
434 
435         let mut g = m.lock();
436         let _t = thread::spawn(move || {
437             let _g = m2.lock();
438             c2.notify_one();
439         });
440         c.wait(&mut g);
441     }
442 
443     #[test]
notify_all()444     fn notify_all() {
445         const N: usize = 10;
446 
447         let data = Arc::new((Mutex::new(0), Condvar::new()));
448         let (tx, rx) = channel();
449         for _ in 0..N {
450             let data = data.clone();
451             let tx = tx.clone();
452             thread::spawn(move || {
453                 let &(ref lock, ref cond) = &*data;
454                 let mut cnt = lock.lock();
455                 *cnt += 1;
456                 if *cnt == N {
457                     tx.send(()).unwrap();
458                 }
459                 while *cnt != 0 {
460                     cond.wait(&mut cnt);
461                 }
462                 tx.send(()).unwrap();
463             });
464         }
465         drop(tx);
466 
467         let &(ref lock, ref cond) = &*data;
468         rx.recv().unwrap();
469         let mut cnt = lock.lock();
470         *cnt = 0;
471         cond.notify_all();
472         drop(cnt);
473 
474         for _ in 0..N {
475             rx.recv().unwrap();
476         }
477     }
478 
479     #[test]
notify_one_return_true()480     fn notify_one_return_true() {
481         let m = Arc::new(Mutex::new(()));
482         let m2 = m.clone();
483         let c = Arc::new(Condvar::new());
484         let c2 = c.clone();
485 
486         let mut g = m.lock();
487         let _t = thread::spawn(move || {
488             let _g = m2.lock();
489             assert!(c2.notify_one());
490         });
491         c.wait(&mut g);
492     }
493 
494     #[test]
notify_one_return_false()495     fn notify_one_return_false() {
496         let m = Arc::new(Mutex::new(()));
497         let c = Arc::new(Condvar::new());
498 
499         let _t = thread::spawn(move || {
500             let _g = m.lock();
501             assert!(!c.notify_one());
502         });
503     }
504 
505     #[test]
notify_all_return()506     fn notify_all_return() {
507         const N: usize = 10;
508 
509         let data = Arc::new((Mutex::new(0), Condvar::new()));
510         let (tx, rx) = channel();
511         for _ in 0..N {
512             let data = data.clone();
513             let tx = tx.clone();
514             thread::spawn(move || {
515                 let &(ref lock, ref cond) = &*data;
516                 let mut cnt = lock.lock();
517                 *cnt += 1;
518                 if *cnt == N {
519                     tx.send(()).unwrap();
520                 }
521                 while *cnt != 0 {
522                     cond.wait(&mut cnt);
523                 }
524                 tx.send(()).unwrap();
525             });
526         }
527         drop(tx);
528 
529         let &(ref lock, ref cond) = &*data;
530         rx.recv().unwrap();
531         let mut cnt = lock.lock();
532         *cnt = 0;
533         assert_eq!(cond.notify_all(), N);
534         drop(cnt);
535 
536         for _ in 0..N {
537             rx.recv().unwrap();
538         }
539 
540         assert_eq!(cond.notify_all(), 0);
541     }
542 
543     #[test]
wait_for()544     fn wait_for() {
545         let m = Arc::new(Mutex::new(()));
546         let m2 = m.clone();
547         let c = Arc::new(Condvar::new());
548         let c2 = c.clone();
549 
550         let mut g = m.lock();
551         let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
552         assert!(no_timeout.timed_out());
553 
554         let _t = thread::spawn(move || {
555             let _g = m2.lock();
556             c2.notify_one();
557         });
558         // Non-nightly panics on too large timeouts. Nightly treats it as indefinite wait.
559         let very_long_timeout = if cfg!(feature = "nightly") {
560             Duration::from_secs(u64::max_value())
561         } else {
562             Duration::from_millis(u32::max_value() as u64)
563         };
564 
565         let timeout_res = c.wait_for(&mut g, very_long_timeout);
566         assert!(!timeout_res.timed_out());
567 
568         drop(g);
569     }
570 
571     #[test]
wait_until()572     fn wait_until() {
573         let m = Arc::new(Mutex::new(()));
574         let m2 = m.clone();
575         let c = Arc::new(Condvar::new());
576         let c2 = c.clone();
577 
578         let mut g = m.lock();
579         let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
580         assert!(no_timeout.timed_out());
581         let _t = thread::spawn(move || {
582             let _g = m2.lock();
583             c2.notify_one();
584         });
585         let timeout_res = c.wait_until(
586             &mut g,
587             Instant::now() + Duration::from_millis(u32::max_value() as u64),
588         );
589         assert!(!timeout_res.timed_out());
590         drop(g);
591     }
592 
593     #[test]
594     #[should_panic]
two_mutexes()595     fn two_mutexes() {
596         let m = Arc::new(Mutex::new(()));
597         let m2 = m.clone();
598         let m3 = Arc::new(Mutex::new(()));
599         let c = Arc::new(Condvar::new());
600         let c2 = c.clone();
601 
602         // Make sure we don't leave the child thread dangling
603         struct PanicGuard<'a>(&'a Condvar);
604         impl<'a> Drop for PanicGuard<'a> {
605             fn drop(&mut self) {
606                 self.0.notify_one();
607             }
608         }
609 
610         let (tx, rx) = channel();
611         let g = m.lock();
612         let _t = thread::spawn(move || {
613             let mut g = m2.lock();
614             tx.send(()).unwrap();
615             c2.wait(&mut g);
616         });
617         drop(g);
618         rx.recv().unwrap();
619         let _g = m.lock();
620         let _guard = PanicGuard(&*c);
621         c.wait(&mut m3.lock());
622     }
623 
624     #[test]
two_mutexes_disjoint()625     fn two_mutexes_disjoint() {
626         let m = Arc::new(Mutex::new(()));
627         let m2 = m.clone();
628         let m3 = Arc::new(Mutex::new(()));
629         let c = Arc::new(Condvar::new());
630         let c2 = c.clone();
631 
632         let mut g = m.lock();
633         let _t = thread::spawn(move || {
634             let _g = m2.lock();
635             c2.notify_one();
636         });
637         c.wait(&mut g);
638         drop(g);
639 
640         let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
641     }
642 
643     #[test]
test_debug_condvar()644     fn test_debug_condvar() {
645         let c = Condvar::new();
646         assert_eq!(format!("{:?}", c), "Condvar { .. }");
647     }
648 
649     #[test]
test_condvar_requeue()650     fn test_condvar_requeue() {
651         let m = Arc::new(Mutex::new(()));
652         let m2 = m.clone();
653         let c = Arc::new(Condvar::new());
654         let c2 = c.clone();
655         let t = thread::spawn(move || {
656             let mut g = m2.lock();
657             c2.wait(&mut g);
658         });
659 
660         let mut g = m.lock();
661         while !c.notify_one() {
662             // Wait for the thread to get into wait()
663             MutexGuard::bump(&mut g);
664         }
665         // The thread should have been requeued to the mutex, which we wake up now.
666         drop(g);
667         t.join().unwrap();
668     }
669 
670     #[test]
test_issue_129()671     fn test_issue_129() {
672         let locks = Arc::new((Mutex::new(()), Condvar::new()));
673 
674         let (tx, rx) = channel();
675         for _ in 0..4 {
676             let locks = locks.clone();
677             let tx = tx.clone();
678             thread::spawn(move || {
679                 let mut guard = locks.0.lock();
680                 locks.1.wait(&mut guard);
681                 locks.1.wait_for(&mut guard, Duration::from_millis(1));
682                 locks.1.notify_one();
683                 tx.send(()).unwrap();
684             });
685         }
686 
687         thread::sleep(Duration::from_millis(100));
688         locks.1.notify_one();
689 
690         for _ in 0..4 {
691             assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
692         }
693     }
694 }
695 
696 /// This module contains an integration test that is heavily inspired from WebKit's own integration
697 /// tests for it's own Condvar.
698 #[cfg(test)]
699 mod webkit_queue_test {
700     use crate::{Condvar, Mutex, MutexGuard};
701     use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
702 
703     #[derive(Clone, Copy)]
704     enum Timeout {
705         Bounded(Duration),
706         Forever,
707     }
708 
709     #[derive(Clone, Copy)]
710     enum NotifyStyle {
711         One,
712         All,
713     }
714 
715     struct Queue {
716         items: VecDeque<usize>,
717         should_continue: bool,
718     }
719 
720     impl Queue {
new() -> Self721         fn new() -> Self {
722             Self {
723                 items: VecDeque::new(),
724                 should_continue: true,
725             }
726         }
727     }
728 
wait<T: ?Sized>( condition: &Condvar, lock: &mut MutexGuard<'_, T>, predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool, timeout: &Timeout, )729     fn wait<T: ?Sized>(
730         condition: &Condvar,
731         lock: &mut MutexGuard<'_, T>,
732         predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
733         timeout: &Timeout,
734     ) {
735         while !predicate(lock) {
736             match timeout {
737                 Timeout::Forever => condition.wait(lock),
738                 Timeout::Bounded(bound) => {
739                     condition.wait_for(lock, *bound);
740                 }
741             }
742         }
743     }
744 
notify(style: NotifyStyle, condition: &Condvar, should_notify: bool)745     fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
746         match style {
747             NotifyStyle::One => {
748                 condition.notify_one();
749             }
750             NotifyStyle::All => {
751                 if should_notify {
752                     condition.notify_all();
753                 }
754             }
755         }
756     }
757 
run_queue_test( num_producers: usize, num_consumers: usize, max_queue_size: usize, messages_per_producer: usize, notify_style: NotifyStyle, timeout: Timeout, delay: Duration, )758     fn run_queue_test(
759         num_producers: usize,
760         num_consumers: usize,
761         max_queue_size: usize,
762         messages_per_producer: usize,
763         notify_style: NotifyStyle,
764         timeout: Timeout,
765         delay: Duration,
766     ) {
767         let input_queue = Arc::new(Mutex::new(Queue::new()));
768         let empty_condition = Arc::new(Condvar::new());
769         let full_condition = Arc::new(Condvar::new());
770 
771         let output_vec = Arc::new(Mutex::new(vec![]));
772 
773         let consumers = (0..num_consumers)
774             .map(|_| {
775                 consumer_thread(
776                     input_queue.clone(),
777                     empty_condition.clone(),
778                     full_condition.clone(),
779                     timeout,
780                     notify_style,
781                     output_vec.clone(),
782                     max_queue_size,
783                 )
784             })
785             .collect::<Vec<_>>();
786         let producers = (0..num_producers)
787             .map(|_| {
788                 producer_thread(
789                     messages_per_producer,
790                     input_queue.clone(),
791                     empty_condition.clone(),
792                     full_condition.clone(),
793                     timeout,
794                     notify_style,
795                     max_queue_size,
796                 )
797             })
798             .collect::<Vec<_>>();
799 
800         thread::sleep(delay);
801 
802         for producer in producers.into_iter() {
803             producer.join().expect("Producer thread panicked");
804         }
805 
806         {
807             let mut input_queue = input_queue.lock();
808             input_queue.should_continue = false;
809         }
810         empty_condition.notify_all();
811 
812         for consumer in consumers.into_iter() {
813             consumer.join().expect("Consumer thread panicked");
814         }
815 
816         let mut output_vec = output_vec.lock();
817         assert_eq!(output_vec.len(), num_producers * messages_per_producer);
818         output_vec.sort();
819         for msg_idx in 0..messages_per_producer {
820             for producer_idx in 0..num_producers {
821                 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
822             }
823         }
824     }
825 
consumer_thread( input_queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, output_queue: Arc<Mutex<Vec<usize>>>, max_queue_size: usize, ) -> thread::JoinHandle<()>826     fn consumer_thread(
827         input_queue: Arc<Mutex<Queue>>,
828         empty_condition: Arc<Condvar>,
829         full_condition: Arc<Condvar>,
830         timeout: Timeout,
831         notify_style: NotifyStyle,
832         output_queue: Arc<Mutex<Vec<usize>>>,
833         max_queue_size: usize,
834     ) -> thread::JoinHandle<()> {
835         thread::spawn(move || loop {
836             let (should_notify, result) = {
837                 let mut queue = input_queue.lock();
838                 wait(
839                     &*empty_condition,
840                     &mut queue,
841                     |state| -> bool { !state.items.is_empty() || !state.should_continue },
842                     &timeout,
843                 );
844                 if queue.items.is_empty() && !queue.should_continue {
845                     return;
846                 }
847                 let should_notify = queue.items.len() == max_queue_size;
848                 let result = queue.items.pop_front();
849                 std::mem::drop(queue);
850                 (should_notify, result)
851             };
852             notify(notify_style, &*full_condition, should_notify);
853 
854             if let Some(result) = result {
855                 output_queue.lock().push(result);
856             }
857         })
858     }
859 
producer_thread( num_messages: usize, queue: Arc<Mutex<Queue>>, empty_condition: Arc<Condvar>, full_condition: Arc<Condvar>, timeout: Timeout, notify_style: NotifyStyle, max_queue_size: usize, ) -> thread::JoinHandle<()>860     fn producer_thread(
861         num_messages: usize,
862         queue: Arc<Mutex<Queue>>,
863         empty_condition: Arc<Condvar>,
864         full_condition: Arc<Condvar>,
865         timeout: Timeout,
866         notify_style: NotifyStyle,
867         max_queue_size: usize,
868     ) -> thread::JoinHandle<()> {
869         thread::spawn(move || {
870             for message in 0..num_messages {
871                 let should_notify = {
872                     let mut queue = queue.lock();
873                     wait(
874                         &*full_condition,
875                         &mut queue,
876                         |state| state.items.len() < max_queue_size,
877                         &timeout,
878                     );
879                     let should_notify = queue.items.is_empty();
880                     queue.items.push_back(message);
881                     std::mem::drop(queue);
882                     should_notify
883                 };
884                 notify(notify_style, &*empty_condition, should_notify);
885             }
886         })
887     }
888 
889     macro_rules! run_queue_tests {
890         ( $( $name:ident(
891             num_producers: $num_producers:expr,
892             num_consumers: $num_consumers:expr,
893             max_queue_size: $max_queue_size:expr,
894             messages_per_producer: $messages_per_producer:expr,
895             notification_style: $notification_style:expr,
896             timeout: $timeout:expr,
897             delay_seconds: $delay_seconds:expr);
898         )* ) => {
899             $(#[test]
900             fn $name() {
901                 let delay = Duration::from_secs($delay_seconds);
902                 run_queue_test(
903                     $num_producers,
904                     $num_consumers,
905                     $max_queue_size,
906                     $messages_per_producer,
907                     $notification_style,
908                     $timeout,
909                     delay,
910                     );
911             })*
912         };
913     }
914 
915     run_queue_tests! {
916         sanity_check_queue(
917             num_producers: 1,
918             num_consumers: 1,
919             max_queue_size: 1,
920             messages_per_producer: 100_000,
921             notification_style: NotifyStyle::All,
922             timeout: Timeout::Bounded(Duration::from_secs(1)),
923             delay_seconds: 0
924         );
925         sanity_check_queue_timeout(
926             num_producers: 1,
927             num_consumers: 1,
928             max_queue_size: 1,
929             messages_per_producer: 100_000,
930             notification_style: NotifyStyle::All,
931             timeout: Timeout::Forever,
932             delay_seconds: 0
933         );
934         new_test_without_timeout_5(
935             num_producers: 1,
936             num_consumers: 5,
937             max_queue_size: 1,
938             messages_per_producer: 100_000,
939             notification_style: NotifyStyle::All,
940             timeout: Timeout::Forever,
941             delay_seconds: 0
942         );
943         one_producer_one_consumer_one_slot(
944             num_producers: 1,
945             num_consumers: 1,
946             max_queue_size: 1,
947             messages_per_producer: 100_000,
948             notification_style: NotifyStyle::All,
949             timeout: Timeout::Forever,
950             delay_seconds: 0
951         );
952         one_producer_one_consumer_one_slot_timeout(
953             num_producers: 1,
954             num_consumers: 1,
955             max_queue_size: 1,
956             messages_per_producer: 100_000,
957             notification_style: NotifyStyle::All,
958             timeout: Timeout::Forever,
959             delay_seconds: 1
960         );
961         one_producer_one_consumer_hundred_slots(
962             num_producers: 1,
963             num_consumers: 1,
964             max_queue_size: 100,
965             messages_per_producer: 1_000_000,
966             notification_style: NotifyStyle::All,
967             timeout: Timeout::Forever,
968             delay_seconds: 0
969         );
970         ten_producers_one_consumer_one_slot(
971             num_producers: 10,
972             num_consumers: 1,
973             max_queue_size: 1,
974             messages_per_producer: 10000,
975             notification_style: NotifyStyle::All,
976             timeout: Timeout::Forever,
977             delay_seconds: 0
978         );
979         ten_producers_one_consumer_hundred_slots_notify_all(
980             num_producers: 10,
981             num_consumers: 1,
982             max_queue_size: 100,
983             messages_per_producer: 10000,
984             notification_style: NotifyStyle::All,
985             timeout: Timeout::Forever,
986             delay_seconds: 0
987         );
988         ten_producers_one_consumer_hundred_slots_notify_one(
989             num_producers: 10,
990             num_consumers: 1,
991             max_queue_size: 100,
992             messages_per_producer: 10000,
993             notification_style: NotifyStyle::One,
994             timeout: Timeout::Forever,
995             delay_seconds: 0
996         );
997         one_producer_ten_consumers_one_slot(
998             num_producers: 1,
999             num_consumers: 10,
1000             max_queue_size: 1,
1001             messages_per_producer: 10000,
1002             notification_style: NotifyStyle::All,
1003             timeout: Timeout::Forever,
1004             delay_seconds: 0
1005         );
1006         one_producer_ten_consumers_hundred_slots_notify_all(
1007             num_producers: 1,
1008             num_consumers: 10,
1009             max_queue_size: 100,
1010             messages_per_producer: 100_000,
1011             notification_style: NotifyStyle::All,
1012             timeout: Timeout::Forever,
1013             delay_seconds: 0
1014         );
1015         one_producer_ten_consumers_hundred_slots_notify_one(
1016             num_producers: 1,
1017             num_consumers: 10,
1018             max_queue_size: 100,
1019             messages_per_producer: 100_000,
1020             notification_style: NotifyStyle::One,
1021             timeout: Timeout::Forever,
1022             delay_seconds: 0
1023         );
1024         ten_producers_ten_consumers_one_slot(
1025             num_producers: 10,
1026             num_consumers: 10,
1027             max_queue_size: 1,
1028             messages_per_producer: 50000,
1029             notification_style: NotifyStyle::All,
1030             timeout: Timeout::Forever,
1031             delay_seconds: 0
1032         );
1033         ten_producers_ten_consumers_hundred_slots_notify_all(
1034             num_producers: 10,
1035             num_consumers: 10,
1036             max_queue_size: 100,
1037             messages_per_producer: 50000,
1038             notification_style: NotifyStyle::All,
1039             timeout: Timeout::Forever,
1040             delay_seconds: 0
1041         );
1042         ten_producers_ten_consumers_hundred_slots_notify_one(
1043             num_producers: 10,
1044             num_consumers: 10,
1045             max_queue_size: 100,
1046             messages_per_producer: 50000,
1047             notification_style: NotifyStyle::One,
1048             timeout: Timeout::Forever,
1049             delay_seconds: 0
1050         );
1051     }
1052 }
1053