1 extern crate futures;
2
3 use std::thread;
4
5 use futures::prelude::*;
6 use futures::executor;
7 use futures::stream;
8 use futures::future;
9 use futures::sync::BiLock;
10
11 mod support;
12 use support::*;
13
14 #[test]
smoke()15 fn smoke() {
16 let future = future::lazy(|| {
17 let (a, b) = BiLock::new(1);
18
19 {
20 let mut lock = match a.poll_lock() {
21 Async::Ready(l) => l,
22 Async::NotReady => panic!("poll not ready"),
23 };
24 assert_eq!(*lock, 1);
25 *lock = 2;
26
27 assert!(b.poll_lock().is_not_ready());
28 assert!(a.poll_lock().is_not_ready());
29 }
30
31 assert!(b.poll_lock().is_ready());
32 assert!(a.poll_lock().is_ready());
33
34 {
35 let lock = match b.poll_lock() {
36 Async::Ready(l) => l,
37 Async::NotReady => panic!("poll not ready"),
38 };
39 assert_eq!(*lock, 2);
40 }
41
42 assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
43
44 Ok::<(), ()>(())
45 });
46
47 assert!(executor::spawn(future)
48 .poll_future_notify(¬ify_noop(), 0)
49 .expect("failure in poll")
50 .is_ready());
51 }
52
53 #[test]
concurrent()54 fn concurrent() {
55 const N: usize = 10000;
56 let (a, b) = BiLock::new(0);
57
58 let a = Increment {
59 a: Some(a),
60 remaining: N,
61 };
62 let b = stream::iter_ok::<_, ()>((0..N)).fold(b, |b, _n| {
63 b.lock().map(|mut b| {
64 *b += 1;
65 b.unlock()
66 })
67 });
68
69 let t1 = thread::spawn(move || a.wait());
70 let b = b.wait().expect("b error");
71 let a = t1.join().unwrap().expect("a error");
72
73 match a.poll_lock() {
74 Async::Ready(l) => assert_eq!(*l, 2 * N),
75 Async::NotReady => panic!("poll not ready"),
76 }
77 match b.poll_lock() {
78 Async::Ready(l) => assert_eq!(*l, 2 * N),
79 Async::NotReady => panic!("poll not ready"),
80 }
81
82 assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
83
84 struct Increment {
85 remaining: usize,
86 a: Option<BiLock<usize>>,
87 }
88
89 impl Future for Increment {
90 type Item = BiLock<usize>;
91 type Error = ();
92
93 fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
94 loop {
95 if self.remaining == 0 {
96 return Ok(self.a.take().unwrap().into())
97 }
98
99 let a = self.a.as_ref().unwrap();
100 let mut a = match a.poll_lock() {
101 Async::Ready(l) => l,
102 Async::NotReady => return Ok(Async::NotReady),
103 };
104 self.remaining -= 1;
105 *a += 1;
106 }
107 }
108 }
109 }
110