1 use crate::sync::batch_semaphore::*;
2 
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::sync::atomic::AtomicUsize;
6 use loom::thread;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::Ordering::SeqCst;
10 use std::sync::Arc;
11 use std::task::Poll::Ready;
12 use std::task::{Context, Poll};
13 
14 #[test]
basic_usage()15 fn basic_usage() {
16     const NUM: usize = 2;
17 
18     struct Shared {
19         semaphore: Semaphore,
20         active: AtomicUsize,
21     }
22 
23     async fn actor(shared: Arc<Shared>) {
24         shared.semaphore.acquire(1).await.unwrap();
25         let actual = shared.active.fetch_add(1, SeqCst);
26         assert!(actual <= NUM - 1);
27 
28         let actual = shared.active.fetch_sub(1, SeqCst);
29         assert!(actual <= NUM);
30         shared.semaphore.release(1);
31     }
32 
33     loom::model(|| {
34         let shared = Arc::new(Shared {
35             semaphore: Semaphore::new(NUM),
36             active: AtomicUsize::new(0),
37         });
38 
39         for _ in 0..NUM {
40             let shared = shared.clone();
41 
42             thread::spawn(move || {
43                 block_on(actor(shared));
44             });
45         }
46 
47         block_on(actor(shared));
48     });
49 }
50 
51 #[test]
release()52 fn release() {
53     loom::model(|| {
54         let semaphore = Arc::new(Semaphore::new(1));
55 
56         {
57             let semaphore = semaphore.clone();
58             thread::spawn(move || {
59                 block_on(semaphore.acquire(1)).unwrap();
60                 semaphore.release(1);
61             });
62         }
63 
64         block_on(semaphore.acquire(1)).unwrap();
65 
66         semaphore.release(1);
67     });
68 }
69 
70 #[test]
basic_closing()71 fn basic_closing() {
72     const NUM: usize = 2;
73 
74     loom::model(|| {
75         let semaphore = Arc::new(Semaphore::new(1));
76 
77         for _ in 0..NUM {
78             let semaphore = semaphore.clone();
79 
80             thread::spawn(move || {
81                 for _ in 0..2 {
82                     block_on(semaphore.acquire(1)).map_err(|_| ())?;
83 
84                     semaphore.release(1);
85                 }
86 
87                 Ok::<(), ()>(())
88             });
89         }
90 
91         semaphore.close();
92     });
93 }
94 
95 #[test]
concurrent_close()96 fn concurrent_close() {
97     const NUM: usize = 3;
98 
99     loom::model(|| {
100         let semaphore = Arc::new(Semaphore::new(1));
101 
102         for _ in 0..NUM {
103             let semaphore = semaphore.clone();
104 
105             thread::spawn(move || {
106                 block_on(semaphore.acquire(1)).map_err(|_| ())?;
107                 semaphore.release(1);
108                 semaphore.close();
109 
110                 Ok::<(), ()>(())
111             });
112         }
113     });
114 }
115 
116 #[test]
concurrent_cancel()117 fn concurrent_cancel() {
118     async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
119         let mut acquire1 = Some(semaphore.acquire(1));
120         let mut acquire2 = Some(semaphore.acquire(1));
121         poll_fn(|cx| {
122             // poll the acquire future once, and then immediately throw
123             // it away. this simulates a situation where a future is
124             // polled and then cancelled, such as by a timeout.
125             if let Some(acquire) = acquire1.take() {
126                 pin!(acquire);
127                 let _ = acquire.poll(cx);
128             }
129             if let Some(acquire) = acquire2.take() {
130                 pin!(acquire);
131                 let _ = acquire.poll(cx);
132             }
133             Poll::Ready(())
134         })
135         .await
136     }
137 
138     loom::model(|| {
139         let semaphore = Arc::new(Semaphore::new(0));
140         let t1 = {
141             let semaphore = semaphore.clone();
142             thread::spawn(move || block_on(poll_and_cancel(semaphore)))
143         };
144         let t2 = {
145             let semaphore = semaphore.clone();
146             thread::spawn(move || block_on(poll_and_cancel(semaphore)))
147         };
148         let t3 = {
149             let semaphore = semaphore.clone();
150             thread::spawn(move || block_on(poll_and_cancel(semaphore)))
151         };
152 
153         t1.join().unwrap();
154         semaphore.release(10);
155         t2.join().unwrap();
156         t3.join().unwrap();
157     });
158 }
159 
160 #[test]
batch()161 fn batch() {
162     let mut b = loom::model::Builder::new();
163     b.preemption_bound = Some(1);
164 
165     b.check(|| {
166         let semaphore = Arc::new(Semaphore::new(10));
167         let active = Arc::new(AtomicUsize::new(0));
168         let mut ths = vec![];
169 
170         for _ in 0..2 {
171             let semaphore = semaphore.clone();
172             let active = active.clone();
173 
174             ths.push(thread::spawn(move || {
175                 for n in &[4, 10, 8] {
176                     block_on(semaphore.acquire(*n)).unwrap();
177 
178                     active.fetch_add(*n as usize, SeqCst);
179 
180                     let num_active = active.load(SeqCst);
181                     assert!(num_active <= 10);
182 
183                     thread::yield_now();
184 
185                     active.fetch_sub(*n as usize, SeqCst);
186 
187                     semaphore.release(*n as usize);
188                 }
189             }));
190         }
191 
192         for th in ths.into_iter() {
193             th.join().unwrap();
194         }
195 
196         assert_eq!(10, semaphore.available_permits());
197     });
198 }
199 
200 #[test]
release_during_acquire()201 fn release_during_acquire() {
202     loom::model(|| {
203         let semaphore = Arc::new(Semaphore::new(10));
204         semaphore
205             .try_acquire(8)
206             .expect("try_acquire should succeed; semaphore uncontended");
207         let semaphore2 = semaphore.clone();
208         let thread = thread::spawn(move || block_on(semaphore2.acquire(4)).unwrap());
209 
210         semaphore.release(8);
211         thread.join().unwrap();
212         semaphore.release(4);
213         assert_eq!(10, semaphore.available_permits());
214     })
215 }
216