1 use crate::sync::semaphore_ll::*;
2
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::thread;
6 use std::future::Future;
7 use std::pin::Pin;
8 use std::sync::atomic::AtomicUsize;
9 use std::sync::atomic::Ordering::SeqCst;
10 use std::sync::Arc;
11 use std::task::Poll::Ready;
12 use std::task::{Context, Poll};
13
14 #[test]
basic_usage()15 fn basic_usage() {
16 const NUM: usize = 2;
17
18 struct Actor {
19 waiter: Permit,
20 shared: Arc<Shared>,
21 }
22
23 struct Shared {
24 semaphore: Semaphore,
25 active: AtomicUsize,
26 }
27
28 impl Future for Actor {
29 type Output = ();
30
31 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
32 let me = &mut *self;
33
34 ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();
35
36 let actual = me.shared.active.fetch_add(1, SeqCst);
37 assert!(actual <= NUM - 1);
38
39 let actual = me.shared.active.fetch_sub(1, SeqCst);
40 assert!(actual <= NUM);
41
42 me.waiter.release(1, &me.shared.semaphore);
43
44 Ready(())
45 }
46 }
47
48 loom::model(|| {
49 let shared = Arc::new(Shared {
50 semaphore: Semaphore::new(NUM),
51 active: AtomicUsize::new(0),
52 });
53
54 for _ in 0..NUM {
55 let shared = shared.clone();
56
57 thread::spawn(move || {
58 block_on(Actor {
59 waiter: Permit::new(),
60 shared,
61 });
62 });
63 }
64
65 block_on(Actor {
66 waiter: Permit::new(),
67 shared,
68 });
69 });
70 }
71
72 #[test]
release()73 fn release() {
74 loom::model(|| {
75 let semaphore = Arc::new(Semaphore::new(1));
76
77 {
78 let semaphore = semaphore.clone();
79 thread::spawn(move || {
80 let mut permit = Permit::new();
81
82 block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
83
84 permit.release(1, &semaphore);
85 });
86 }
87
88 let mut permit = Permit::new();
89
90 block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
91
92 permit.release(1, &semaphore);
93 });
94 }
95
96 #[test]
basic_closing()97 fn basic_closing() {
98 const NUM: usize = 2;
99
100 loom::model(|| {
101 let semaphore = Arc::new(Semaphore::new(1));
102
103 for _ in 0..NUM {
104 let semaphore = semaphore.clone();
105
106 thread::spawn(move || {
107 let mut permit = Permit::new();
108
109 for _ in 0..2 {
110 block_on(poll_fn(|cx| {
111 permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
112 }))?;
113
114 permit.release(1, &semaphore);
115 }
116
117 Ok::<(), ()>(())
118 });
119 }
120
121 semaphore.close();
122 });
123 }
124
125 #[test]
concurrent_close()126 fn concurrent_close() {
127 const NUM: usize = 3;
128
129 loom::model(|| {
130 let semaphore = Arc::new(Semaphore::new(1));
131
132 for _ in 0..NUM {
133 let semaphore = semaphore.clone();
134
135 thread::spawn(move || {
136 let mut permit = Permit::new();
137
138 block_on(poll_fn(|cx| {
139 permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
140 }))?;
141
142 permit.release(1, &semaphore);
143
144 semaphore.close();
145
146 Ok::<(), ()>(())
147 });
148 }
149 });
150 }
151
152 #[test]
batch()153 fn batch() {
154 let mut b = loom::model::Builder::new();
155 b.preemption_bound = Some(1);
156
157 b.check(|| {
158 let semaphore = Arc::new(Semaphore::new(10));
159 let active = Arc::new(AtomicUsize::new(0));
160 let mut ths = vec![];
161
162 for _ in 0..2 {
163 let semaphore = semaphore.clone();
164 let active = active.clone();
165
166 ths.push(thread::spawn(move || {
167 let mut permit = Permit::new();
168
169 for n in &[4, 10, 8] {
170 block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();
171
172 active.fetch_add(*n as usize, SeqCst);
173
174 let num_active = active.load(SeqCst);
175 assert!(num_active <= 10);
176
177 thread::yield_now();
178
179 active.fetch_sub(*n as usize, SeqCst);
180
181 permit.release(*n, &semaphore);
182 }
183 }));
184 }
185
186 for th in ths.into_iter() {
187 th.join().unwrap();
188 }
189
190 assert_eq!(10, semaphore.available_permits());
191 });
192 }
193