1 use crate::sync::semaphore_ll::*;
2 
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::thread;
6 use std::future::Future;
7 use std::pin::Pin;
8 use std::sync::atomic::AtomicUsize;
9 use std::sync::atomic::Ordering::SeqCst;
10 use std::sync::Arc;
11 use std::task::Poll::Ready;
12 use std::task::{Context, Poll};
13 
14 #[test]
basic_usage()15 fn basic_usage() {
16     const NUM: usize = 2;
17 
18     struct Actor {
19         waiter: Permit,
20         shared: Arc<Shared>,
21     }
22 
23     struct Shared {
24         semaphore: Semaphore,
25         active: AtomicUsize,
26     }
27 
28     impl Future for Actor {
29         type Output = ();
30 
31         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
32             let me = &mut *self;
33 
34             ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();
35 
36             let actual = me.shared.active.fetch_add(1, SeqCst);
37             assert!(actual <= NUM - 1);
38 
39             let actual = me.shared.active.fetch_sub(1, SeqCst);
40             assert!(actual <= NUM);
41 
42             me.waiter.release(1, &me.shared.semaphore);
43 
44             Ready(())
45         }
46     }
47 
48     loom::model(|| {
49         let shared = Arc::new(Shared {
50             semaphore: Semaphore::new(NUM),
51             active: AtomicUsize::new(0),
52         });
53 
54         for _ in 0..NUM {
55             let shared = shared.clone();
56 
57             thread::spawn(move || {
58                 block_on(Actor {
59                     waiter: Permit::new(),
60                     shared,
61                 });
62             });
63         }
64 
65         block_on(Actor {
66             waiter: Permit::new(),
67             shared,
68         });
69     });
70 }
71 
72 #[test]
release()73 fn release() {
74     loom::model(|| {
75         let semaphore = Arc::new(Semaphore::new(1));
76 
77         {
78             let semaphore = semaphore.clone();
79             thread::spawn(move || {
80                 let mut permit = Permit::new();
81 
82                 block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
83 
84                 permit.release(1, &semaphore);
85             });
86         }
87 
88         let mut permit = Permit::new();
89 
90         block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
91 
92         permit.release(1, &semaphore);
93     });
94 }
95 
96 #[test]
basic_closing()97 fn basic_closing() {
98     const NUM: usize = 2;
99 
100     loom::model(|| {
101         let semaphore = Arc::new(Semaphore::new(1));
102 
103         for _ in 0..NUM {
104             let semaphore = semaphore.clone();
105 
106             thread::spawn(move || {
107                 let mut permit = Permit::new();
108 
109                 for _ in 0..2 {
110                     block_on(poll_fn(|cx| {
111                         permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
112                     }))?;
113 
114                     permit.release(1, &semaphore);
115                 }
116 
117                 Ok::<(), ()>(())
118             });
119         }
120 
121         semaphore.close();
122     });
123 }
124 
125 #[test]
concurrent_close()126 fn concurrent_close() {
127     const NUM: usize = 3;
128 
129     loom::model(|| {
130         let semaphore = Arc::new(Semaphore::new(1));
131 
132         for _ in 0..NUM {
133             let semaphore = semaphore.clone();
134 
135             thread::spawn(move || {
136                 let mut permit = Permit::new();
137 
138                 block_on(poll_fn(|cx| {
139                     permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
140                 }))?;
141 
142                 permit.release(1, &semaphore);
143 
144                 semaphore.close();
145 
146                 Ok::<(), ()>(())
147             });
148         }
149     });
150 }
151 
152 #[test]
batch()153 fn batch() {
154     let mut b = loom::model::Builder::new();
155     b.preemption_bound = Some(1);
156 
157     b.check(|| {
158         let semaphore = Arc::new(Semaphore::new(10));
159         let active = Arc::new(AtomicUsize::new(0));
160         let mut ths = vec![];
161 
162         for _ in 0..2 {
163             let semaphore = semaphore.clone();
164             let active = active.clone();
165 
166             ths.push(thread::spawn(move || {
167                 let mut permit = Permit::new();
168 
169                 for n in &[4, 10, 8] {
170                     block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();
171 
172                     active.fetch_add(*n as usize, SeqCst);
173 
174                     let num_active = active.load(SeqCst);
175                     assert!(num_active <= 10);
176 
177                     thread::yield_now();
178 
179                     active.fetch_sub(*n as usize, SeqCst);
180 
181                     permit.release(*n, &semaphore);
182                 }
183             }));
184         }
185 
186         for th in ths.into_iter() {
187             th.join().unwrap();
188         }
189 
190         assert_eq!(10, semaphore.available_permits());
191     });
192 }
193