1 extern crate futures;
2 
3 use std::thread;
4 
5 use futures::prelude::*;
6 use futures::executor;
7 use futures::stream;
8 use futures::future;
9 use futures::sync::BiLock;
10 
11 mod support;
12 use support::*;
13 
14 #[test]
smoke()15 fn smoke() {
16     let future = future::lazy(|| {
17         let (a, b) = BiLock::new(1);
18 
19         {
20             let mut lock = match a.poll_lock() {
21                 Async::Ready(l) => l,
22                 Async::NotReady => panic!("poll not ready"),
23             };
24             assert_eq!(*lock, 1);
25             *lock = 2;
26 
27             assert!(b.poll_lock().is_not_ready());
28             assert!(a.poll_lock().is_not_ready());
29         }
30 
31         assert!(b.poll_lock().is_ready());
32         assert!(a.poll_lock().is_ready());
33 
34         {
35             let lock = match b.poll_lock() {
36                 Async::Ready(l) => l,
37                 Async::NotReady => panic!("poll not ready"),
38             };
39             assert_eq!(*lock, 2);
40         }
41 
42         assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
43 
44         Ok::<(), ()>(())
45     });
46 
47     assert!(executor::spawn(future)
48                 .poll_future_notify(&notify_noop(), 0)
49                 .expect("failure in poll")
50                 .is_ready());
51 }
52 
53 #[test]
concurrent()54 fn concurrent() {
55     const N: usize = 10000;
56     let (a, b) = BiLock::new(0);
57 
58     let a = Increment {
59         a: Some(a),
60         remaining: N,
61     };
62     let b = stream::iter_ok::<_, ()>((0..N)).fold(b, |b, _n| {
63         b.lock().map(|mut b| {
64             *b += 1;
65             b.unlock()
66         })
67     });
68 
69     let t1 = thread::spawn(move || a.wait());
70     let b = b.wait().expect("b error");
71     let a = t1.join().unwrap().expect("a error");
72 
73     match a.poll_lock() {
74         Async::Ready(l) => assert_eq!(*l, 2 * N),
75         Async::NotReady => panic!("poll not ready"),
76     }
77     match b.poll_lock() {
78         Async::Ready(l) => assert_eq!(*l, 2 * N),
79         Async::NotReady => panic!("poll not ready"),
80     }
81 
82     assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
83 
84     struct Increment {
85         remaining: usize,
86         a: Option<BiLock<usize>>,
87     }
88 
89     impl Future for Increment {
90         type Item = BiLock<usize>;
91         type Error = ();
92 
93         fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
94             loop {
95                 if self.remaining == 0 {
96                     return Ok(self.a.take().unwrap().into())
97                 }
98 
99                 let a = self.a.as_ref().unwrap();
100                 let mut a = match a.poll_lock() {
101                     Async::Ready(l) => l,
102                     Async::NotReady => return Ok(Async::NotReady),
103                 };
104                 self.remaining -= 1;
105                 *a += 1;
106             }
107         }
108     }
109 }
110