1 use super::batch_semaphore as ll; // low level implementation
2 use super::{AcquireError, TryAcquireError};
3 use std::sync::Arc;
4
5 /// Counting semaphore performing asynchronous permit acquisition.
6 ///
7 /// A semaphore maintains a set of permits. Permits are used to synchronize
8 /// access to a shared resource. A semaphore differs from a mutex in that it
9 /// can allow more than one concurrent caller to access the shared resource at a
10 /// time.
11 ///
12 /// When `acquire` is called and the semaphore has remaining permits, the
13 /// function immediately returns a permit. However, if no remaining permits are
14 /// available, `acquire` (asynchronously) waits until an outstanding permit is
15 /// dropped. At this point, the freed permit is assigned to the caller.
16 ///
17 /// This `Semaphore` is fair, which means that permits are given out in the order
main(int argc,char ** argv)18 /// they were requested. This fairness is also applied when `acquire_many` gets
19 /// involved, so if a call to `acquire_many` at the front of the queue requests
20 /// more permits than currently available, this can prevent a call to `acquire`
21 /// from completing, even if the semaphore has enough permits complete the call
22 /// to `acquire`.
23 ///
24 /// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
25 /// utility.
26 ///
27 /// # Examples
28 ///
29 /// Basic usage:
30 ///
31 /// ```
32 /// use tokio::sync::{Semaphore, TryAcquireError};
33 ///
34 /// #[tokio::main]
35 /// async fn main() {
36 /// let semaphore = Semaphore::new(3);
37 ///
38 /// let a_permit = semaphore.acquire().await.unwrap();
39 /// let two_permits = semaphore.acquire_many(2).await.unwrap();
40 ///
41 /// assert_eq!(semaphore.available_permits(), 0);
42 ///
43 /// let permit_attempt = semaphore.try_acquire();
44 /// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
45 /// }
46 /// ```
47 ///
48 /// Use [`Semaphore::acquire_owned`] to move permits across tasks:
49 ///
50 /// ```
51 /// use std::sync::Arc;
52 /// use tokio::sync::Semaphore;
53 ///
54 /// #[tokio::main]
55 /// async fn main() {
56 /// let semaphore = Arc::new(Semaphore::new(3));
57 /// let mut join_handles = Vec::new();
58 ///
59 /// for _ in 0..5 {
60 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
61 /// join_handles.push(tokio::spawn(async move {
62 /// // perform task...
63 /// // explicitly own `permit` in the task
64 /// drop(permit);
65 /// }));
66 /// }
67 ///
68 /// for handle in join_handles {
69 /// handle.await.unwrap();
70 /// }
71 /// }
72 /// ```
73 ///
74 /// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
75 /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
76 #[derive(Debug)]
77 pub struct Semaphore {
78 /// The low level semaphore
79 ll_sem: ll::Semaphore,
80 }
81
82 /// A permit from the semaphore.
83 ///
84 /// This type is created by the [`acquire`] method.
85 ///
86 /// [`acquire`]: crate::sync::Semaphore::acquire()
87 #[must_use]
88 #[derive(Debug)]
89 pub struct SemaphorePermit<'a> {
90 sem: &'a Semaphore,
91 permits: u32,
92 }
93
94 /// An owned permit from the semaphore.
95 ///
96 /// This type is created by the [`acquire_owned`] method.
97 ///
98 /// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
99 #[must_use]
100 #[derive(Debug)]
101 pub struct OwnedSemaphorePermit {
102 sem: Arc<Semaphore>,
103 permits: u32,
104 }
105
106 #[test]
107 #[cfg(not(loom))]
108 fn bounds() {
109 fn check_unpin<T: Unpin>() {}
110 // This has to take a value, since the async fn's return type is unnameable.
111 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
112 fn check_send_sync<T: Send + Sync>() {}
113 check_unpin::<Semaphore>();
114 check_unpin::<SemaphorePermit<'_>>();
115 check_send_sync::<Semaphore>();
116
117 let semaphore = Semaphore::new(0);
118 check_send_sync_val(semaphore.acquire());
119 }
120
121 impl Semaphore {
122 /// Creates a new semaphore with the initial number of permits.
123 pub fn new(permits: usize) -> Self {
124 Self {
125 ll_sem: ll::Semaphore::new(permits),
126 }
127 }
128
129 /// Creates a new semaphore with the initial number of permits.
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use tokio::sync::Semaphore;
135 ///
136 /// static SEM: Semaphore = Semaphore::const_new(10);
137 /// ```
138 ///
139 #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
140 #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
141 pub const fn const_new(permits: usize) -> Self {
142 Self {
143 ll_sem: ll::Semaphore::const_new(permits),
144 }
145 }
146
147 /// Returns the current number of available permits.
148 pub fn available_permits(&self) -> usize {
149 self.ll_sem.available_permits()
150 }
151
152 /// Adds `n` new permits to the semaphore.
153 ///
154 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
155 pub fn add_permits(&self, n: usize) {
156 self.ll_sem.release(n);
157 }
158
159 /// Acquires a permit from the semaphore.
160 ///
161 /// If the semaphore has been closed, this returns an [`AcquireError`].
162 /// Otherwise, this returns a [`SemaphorePermit`] representing the
163 /// acquired permit.
164 ///
165 /// # Cancel safety
166 ///
167 /// This method uses a queue to fairly distribute permits in the order they
168 /// were requested. Cancelling a call to `acquire` makes you lose your place
169 /// in the queue.
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// use tokio::sync::Semaphore;
175 ///
176 /// #[tokio::main]
177 /// async fn main() {
178 /// let semaphore = Semaphore::new(2);
179 ///
180 /// let permit_1 = semaphore.acquire().await.unwrap();
181 /// assert_eq!(semaphore.available_permits(), 1);
182 ///
183 /// let permit_2 = semaphore.acquire().await.unwrap();
184 /// assert_eq!(semaphore.available_permits(), 0);
185 ///
186 /// drop(permit_1);
187 /// assert_eq!(semaphore.available_permits(), 1);
188 /// }
189 /// ```
190 ///
191 /// [`AcquireError`]: crate::sync::AcquireError
192 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
193 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
194 self.ll_sem.acquire(1).await?;
195 Ok(SemaphorePermit {
196 sem: self,
197 permits: 1,
198 })
199 }
200
201 /// Acquires `n` permits from the semaphore.
202 ///
203 /// If the semaphore has been closed, this returns an [`AcquireError`].
204 /// Otherwise, this returns a [`SemaphorePermit`] representing the
205 /// acquired permits.
206 ///
207 /// # Cancel safety
208 ///
209 /// This method uses a queue to fairly distribute permits in the order they
210 /// were requested. Cancelling a call to `acquire_many` makes you lose your
211 /// place in the queue.
212 ///
213 /// # Examples
214 ///
215 /// ```
216 /// use tokio::sync::Semaphore;
217 ///
218 /// #[tokio::main]
219 /// async fn main() {
220 /// let semaphore = Semaphore::new(5);
221 ///
222 /// let permit = semaphore.acquire_many(3).await.unwrap();
223 /// assert_eq!(semaphore.available_permits(), 2);
224 /// }
225 /// ```
226 ///
227 /// [`AcquireError`]: crate::sync::AcquireError
228 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
229 pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
230 self.ll_sem.acquire(n).await?;
231 Ok(SemaphorePermit {
232 sem: self,
233 permits: n,
234 })
235 }
236
237 /// Tries to acquire a permit from the semaphore.
238 ///
239 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
240 /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
241 /// this returns a [`SemaphorePermit`] representing the acquired permits.
242 ///
243 /// # Examples
244 ///
245 /// ```
246 /// use tokio::sync::{Semaphore, TryAcquireError};
247 ///
248 /// # fn main() {
249 /// let semaphore = Semaphore::new(2);
250 ///
251 /// let permit_1 = semaphore.try_acquire().unwrap();
252 /// assert_eq!(semaphore.available_permits(), 1);
253 ///
254 /// let permit_2 = semaphore.try_acquire().unwrap();
255 /// assert_eq!(semaphore.available_permits(), 0);
256 ///
257 /// let permit_3 = semaphore.try_acquire();
258 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
259 /// # }
260 /// ```
261 ///
262 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
263 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
264 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
265 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
266 match self.ll_sem.try_acquire(1) {
267 Ok(_) => Ok(SemaphorePermit {
268 sem: self,
269 permits: 1,
270 }),
271 Err(e) => Err(e),
272 }
273 }
274
275 /// Tries to acquire `n` permits from the semaphore.
276 ///
277 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
278 /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
279 /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
280 ///
281 /// # Examples
282 ///
283 /// ```
284 /// use tokio::sync::{Semaphore, TryAcquireError};
285 ///
286 /// # fn main() {
287 /// let semaphore = Semaphore::new(4);
288 ///
289 /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
290 /// assert_eq!(semaphore.available_permits(), 1);
291 ///
292 /// let permit_2 = semaphore.try_acquire_many(2);
293 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
294 /// # }
295 /// ```
296 ///
297 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
298 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
299 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
300 pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
301 match self.ll_sem.try_acquire(n) {
302 Ok(_) => Ok(SemaphorePermit {
303 sem: self,
304 permits: n,
305 }),
306 Err(e) => Err(e),
307 }
308 }
309
310 /// Acquires a permit from the semaphore.
311 ///
312 /// The semaphore must be wrapped in an [`Arc`] to call this method.
313 /// If the semaphore has been closed, this returns an [`AcquireError`].
314 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
315 /// acquired permit.
316 ///
317 /// # Cancel safety
318 ///
319 /// This method uses a queue to fairly distribute permits in the order they
320 /// were requested. Cancelling a call to `acquire_owned` makes you lose your
321 /// place in the queue.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// use std::sync::Arc;
327 /// use tokio::sync::Semaphore;
328 ///
329 /// #[tokio::main]
330 /// async fn main() {
331 /// let semaphore = Arc::new(Semaphore::new(3));
332 /// let mut join_handles = Vec::new();
333 ///
334 /// for _ in 0..5 {
335 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
336 /// join_handles.push(tokio::spawn(async move {
337 /// // perform task...
338 /// // explicitly own `permit` in the task
339 /// drop(permit);
340 /// }));
341 /// }
342 ///
343 /// for handle in join_handles {
344 /// handle.await.unwrap();
345 /// }
346 /// }
347 /// ```
348 ///
349 /// [`Arc`]: std::sync::Arc
350 /// [`AcquireError`]: crate::sync::AcquireError
351 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
352 pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
353 self.ll_sem.acquire(1).await?;
354 Ok(OwnedSemaphorePermit {
355 sem: self,
356 permits: 1,
357 })
358 }
359
360 /// Acquires `n` permits from the semaphore.
361 ///
362 /// The semaphore must be wrapped in an [`Arc`] to call this method.
363 /// If the semaphore has been closed, this returns an [`AcquireError`].
364 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
365 /// acquired permit.
366 ///
367 /// # Cancel safety
368 ///
369 /// This method uses a queue to fairly distribute permits in the order they
370 /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
371 /// your place in the queue.
372 ///
373 /// # Examples
374 ///
375 /// ```
376 /// use std::sync::Arc;
377 /// use tokio::sync::Semaphore;
378 ///
379 /// #[tokio::main]
380 /// async fn main() {
381 /// let semaphore = Arc::new(Semaphore::new(10));
382 /// let mut join_handles = Vec::new();
383 ///
384 /// for _ in 0..5 {
385 /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
386 /// join_handles.push(tokio::spawn(async move {
387 /// // perform task...
388 /// // explicitly own `permit` in the task
389 /// drop(permit);
390 /// }));
391 /// }
392 ///
393 /// for handle in join_handles {
394 /// handle.await.unwrap();
395 /// }
396 /// }
397 /// ```
398 ///
399 /// [`Arc`]: std::sync::Arc
400 /// [`AcquireError`]: crate::sync::AcquireError
401 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
402 pub async fn acquire_many_owned(
403 self: Arc<Self>,
404 n: u32,
405 ) -> Result<OwnedSemaphorePermit, AcquireError> {
406 self.ll_sem.acquire(n).await?;
407 Ok(OwnedSemaphorePermit {
408 sem: self,
409 permits: n,
410 })
411 }
412
413 /// Tries to acquire a permit from the semaphore.
414 ///
415 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
416 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
417 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
418 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
419 /// acquired permit.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// use std::sync::Arc;
425 /// use tokio::sync::{Semaphore, TryAcquireError};
426 ///
427 /// # fn main() {
428 /// let semaphore = Arc::new(Semaphore::new(2));
429 ///
430 /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
431 /// assert_eq!(semaphore.available_permits(), 1);
432 ///
433 /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
434 /// assert_eq!(semaphore.available_permits(), 0);
435 ///
436 /// let permit_3 = semaphore.try_acquire_owned();
437 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
438 /// # }
439 /// ```
440 ///
441 /// [`Arc`]: std::sync::Arc
442 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
443 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
444 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
445 pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
446 match self.ll_sem.try_acquire(1) {
447 Ok(_) => Ok(OwnedSemaphorePermit {
448 sem: self,
449 permits: 1,
450 }),
451 Err(e) => Err(e),
452 }
453 }
454
455 /// Tries to acquire `n` permits from the semaphore.
456 ///
457 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
458 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
459 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
460 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
461 /// acquired permit.
462 ///
463 /// # Examples
464 ///
465 /// ```
466 /// use std::sync::Arc;
467 /// use tokio::sync::{Semaphore, TryAcquireError};
468 ///
469 /// # fn main() {
470 /// let semaphore = Arc::new(Semaphore::new(4));
471 ///
472 /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
473 /// assert_eq!(semaphore.available_permits(), 1);
474 ///
475 /// let permit_2 = semaphore.try_acquire_many_owned(2);
476 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
477 /// # }
478 /// ```
479 ///
480 /// [`Arc`]: std::sync::Arc
481 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
482 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
483 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
484 pub fn try_acquire_many_owned(
485 self: Arc<Self>,
486 n: u32,
487 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
488 match self.ll_sem.try_acquire(n) {
489 Ok(_) => Ok(OwnedSemaphorePermit {
490 sem: self,
491 permits: n,
492 }),
493 Err(e) => Err(e),
494 }
495 }
496
497 /// Closes the semaphore.
498 ///
499 /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
500 ///
501 /// # Examples
502 ///
503 /// ```
504 /// use tokio::sync::Semaphore;
505 /// use std::sync::Arc;
506 /// use tokio::sync::TryAcquireError;
507 ///
508 /// #[tokio::main]
509 /// async fn main() {
510 /// let semaphore = Arc::new(Semaphore::new(1));
511 /// let semaphore2 = semaphore.clone();
512 ///
513 /// tokio::spawn(async move {
514 /// let permit = semaphore.acquire_many(2).await;
515 /// assert!(permit.is_err());
516 /// println!("waiter received error");
517 /// });
518 ///
519 /// println!("closing semaphore");
520 /// semaphore2.close();
521 ///
522 /// // Cannot obtain more permits
523 /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
524 /// }
525 /// ```
526 pub fn close(&self) {
527 self.ll_sem.close();
528 }
529
530 /// Returns true if the semaphore is closed
531 pub fn is_closed(&self) -> bool {
532 self.ll_sem.is_closed()
533 }
534 }
535
536 impl<'a> SemaphorePermit<'a> {
537 /// Forgets the permit **without** releasing it back to the semaphore.
538 /// This can be used to reduce the amount of permits available from a
539 /// semaphore.
540 pub fn forget(mut self) {
541 self.permits = 0;
542 }
543 }
544
545 impl OwnedSemaphorePermit {
546 /// Forgets the permit **without** releasing it back to the semaphore.
547 /// This can be used to reduce the amount of permits available from a
548 /// semaphore.
549 pub fn forget(mut self) {
550 self.permits = 0;
551 }
552 }
553
554 impl<'a> Drop for SemaphorePermit<'_> {
555 fn drop(&mut self) {
556 self.sem.add_permits(self.permits as usize);
557 }
558 }
559
560 impl Drop for OwnedSemaphorePermit {
561 fn drop(&mut self) {
562 self.sem.add_permits(self.permits as usize);
563 }
564 }
565