1 use super::batch_semaphore as ll; // low level implementation
2 use super::{AcquireError, TryAcquireError};
3 use std::sync::Arc;
4 
5 /// Counting semaphore performing asynchronous permit acquisition.
6 ///
7 /// A semaphore maintains a set of permits. Permits are used to synchronize
8 /// access to a shared resource. A semaphore differs from a mutex in that it
9 /// can allow more than one concurrent caller to access the shared resource at a
10 /// time.
11 ///
12 /// When `acquire` is called and the semaphore has remaining permits, the
13 /// function immediately returns a permit. However, if no remaining permits are
14 /// available, `acquire` (asynchronously) waits until an outstanding permit is
15 /// dropped. At this point, the freed permit is assigned to the caller.
16 ///
17 /// This `Semaphore` is fair, which means that permits are given out in the order
main(int argc,char ** argv)18 /// they were requested. This fairness is also applied when `acquire_many` gets
19 /// involved, so if a call to `acquire_many` at the front of the queue requests
20 /// more permits than currently available, this can prevent a call to `acquire`
21 /// from completing, even if the semaphore has enough permits complete the call
22 /// to `acquire`.
23 ///
24 /// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
25 /// utility.
26 ///
27 /// # Examples
28 ///
29 /// Basic usage:
30 ///
31 /// ```
32 /// use tokio::sync::{Semaphore, TryAcquireError};
33 ///
34 /// #[tokio::main]
35 /// async fn main() {
36 ///     let semaphore = Semaphore::new(3);
37 ///
38 ///     let a_permit = semaphore.acquire().await.unwrap();
39 ///     let two_permits = semaphore.acquire_many(2).await.unwrap();
40 ///
41 ///     assert_eq!(semaphore.available_permits(), 0);
42 ///
43 ///     let permit_attempt = semaphore.try_acquire();
44 ///     assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
45 /// }
46 /// ```
47 ///
48 /// Use [`Semaphore::acquire_owned`] to move permits across tasks:
49 ///
50 /// ```
51 /// use std::sync::Arc;
52 /// use tokio::sync::Semaphore;
53 ///
54 /// #[tokio::main]
55 /// async fn main() {
56 ///     let semaphore = Arc::new(Semaphore::new(3));
57 ///     let mut join_handles = Vec::new();
58 ///
59 ///     for _ in 0..5 {
60 ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
61 ///         join_handles.push(tokio::spawn(async move {
62 ///             // perform task...
63 ///             // explicitly own `permit` in the task
64 ///             drop(permit);
65 ///         }));
66 ///     }
67 ///
68 ///     for handle in join_handles {
69 ///         handle.await.unwrap();
70 ///     }
71 /// }
72 /// ```
73 ///
74 /// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
75 /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
76 #[derive(Debug)]
77 pub struct Semaphore {
78     /// The low level semaphore
79     ll_sem: ll::Semaphore,
80 }
81 
82 /// A permit from the semaphore.
83 ///
84 /// This type is created by the [`acquire`] method.
85 ///
86 /// [`acquire`]: crate::sync::Semaphore::acquire()
87 #[must_use]
88 #[derive(Debug)]
89 pub struct SemaphorePermit<'a> {
90     sem: &'a Semaphore,
91     permits: u32,
92 }
93 
94 /// An owned permit from the semaphore.
95 ///
96 /// This type is created by the [`acquire_owned`] method.
97 ///
98 /// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
99 #[must_use]
100 #[derive(Debug)]
101 pub struct OwnedSemaphorePermit {
102     sem: Arc<Semaphore>,
103     permits: u32,
104 }
105 
106 #[test]
107 #[cfg(not(loom))]
108 fn bounds() {
109     fn check_unpin<T: Unpin>() {}
110     // This has to take a value, since the async fn's return type is unnameable.
111     fn check_send_sync_val<T: Send + Sync>(_t: T) {}
112     fn check_send_sync<T: Send + Sync>() {}
113     check_unpin::<Semaphore>();
114     check_unpin::<SemaphorePermit<'_>>();
115     check_send_sync::<Semaphore>();
116 
117     let semaphore = Semaphore::new(0);
118     check_send_sync_val(semaphore.acquire());
119 }
120 
121 impl Semaphore {
122     /// Creates a new semaphore with the initial number of permits.
123     pub fn new(permits: usize) -> Self {
124         Self {
125             ll_sem: ll::Semaphore::new(permits),
126         }
127     }
128 
129     /// Creates a new semaphore with the initial number of permits.
130     ///
131     /// # Examples
132     ///
133     /// ```
134     /// use tokio::sync::Semaphore;
135     ///
136     /// static SEM: Semaphore = Semaphore::const_new(10);
137     /// ```
138     ///
139     #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
140     #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
141     pub const fn const_new(permits: usize) -> Self {
142         Self {
143             ll_sem: ll::Semaphore::const_new(permits),
144         }
145     }
146 
147     /// Returns the current number of available permits.
148     pub fn available_permits(&self) -> usize {
149         self.ll_sem.available_permits()
150     }
151 
152     /// Adds `n` new permits to the semaphore.
153     ///
154     /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
155     pub fn add_permits(&self, n: usize) {
156         self.ll_sem.release(n);
157     }
158 
159     /// Acquires a permit from the semaphore.
160     ///
161     /// If the semaphore has been closed, this returns an [`AcquireError`].
162     /// Otherwise, this returns a [`SemaphorePermit`] representing the
163     /// acquired permit.
164     ///
165     /// # Cancel safety
166     ///
167     /// This method uses a queue to fairly distribute permits in the order they
168     /// were requested. Cancelling a call to `acquire` makes you lose your place
169     /// in the queue.
170     ///
171     /// # Examples
172     ///
173     /// ```
174     /// use tokio::sync::Semaphore;
175     ///
176     /// #[tokio::main]
177     /// async fn main() {
178     ///     let semaphore = Semaphore::new(2);
179     ///
180     ///     let permit_1 = semaphore.acquire().await.unwrap();
181     ///     assert_eq!(semaphore.available_permits(), 1);
182     ///
183     ///     let permit_2 = semaphore.acquire().await.unwrap();
184     ///     assert_eq!(semaphore.available_permits(), 0);
185     ///
186     ///     drop(permit_1);
187     ///     assert_eq!(semaphore.available_permits(), 1);
188     /// }
189     /// ```
190     ///
191     /// [`AcquireError`]: crate::sync::AcquireError
192     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
193     pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
194         self.ll_sem.acquire(1).await?;
195         Ok(SemaphorePermit {
196             sem: self,
197             permits: 1,
198         })
199     }
200 
201     /// Acquires `n` permits from the semaphore.
202     ///
203     /// If the semaphore has been closed, this returns an [`AcquireError`].
204     /// Otherwise, this returns a [`SemaphorePermit`] representing the
205     /// acquired permits.
206     ///
207     /// # Cancel safety
208     ///
209     /// This method uses a queue to fairly distribute permits in the order they
210     /// were requested. Cancelling a call to `acquire_many` makes you lose your
211     /// place in the queue.
212     ///
213     /// # Examples
214     ///
215     /// ```
216     /// use tokio::sync::Semaphore;
217     ///
218     /// #[tokio::main]
219     /// async fn main() {
220     ///     let semaphore = Semaphore::new(5);
221     ///
222     ///     let permit = semaphore.acquire_many(3).await.unwrap();
223     ///     assert_eq!(semaphore.available_permits(), 2);
224     /// }
225     /// ```
226     ///
227     /// [`AcquireError`]: crate::sync::AcquireError
228     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
229     pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
230         self.ll_sem.acquire(n).await?;
231         Ok(SemaphorePermit {
232             sem: self,
233             permits: n,
234         })
235     }
236 
237     /// Tries to acquire a permit from the semaphore.
238     ///
239     /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
240     /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
241     /// this returns a [`SemaphorePermit`] representing the acquired permits.
242     ///
243     /// # Examples
244     ///
245     /// ```
246     /// use tokio::sync::{Semaphore, TryAcquireError};
247     ///
248     /// # fn main() {
249     /// let semaphore = Semaphore::new(2);
250     ///
251     /// let permit_1 = semaphore.try_acquire().unwrap();
252     /// assert_eq!(semaphore.available_permits(), 1);
253     ///
254     /// let permit_2 = semaphore.try_acquire().unwrap();
255     /// assert_eq!(semaphore.available_permits(), 0);
256     ///
257     /// let permit_3 = semaphore.try_acquire();
258     /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
259     /// # }
260     /// ```
261     ///
262     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
263     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
264     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
265     pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
266         match self.ll_sem.try_acquire(1) {
267             Ok(_) => Ok(SemaphorePermit {
268                 sem: self,
269                 permits: 1,
270             }),
271             Err(e) => Err(e),
272         }
273     }
274 
275     /// Tries to acquire `n` permits from the semaphore.
276     ///
277     /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
278     /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
279     /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
280     ///
281     /// # Examples
282     ///
283     /// ```
284     /// use tokio::sync::{Semaphore, TryAcquireError};
285     ///
286     /// # fn main() {
287     /// let semaphore = Semaphore::new(4);
288     ///
289     /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
290     /// assert_eq!(semaphore.available_permits(), 1);
291     ///
292     /// let permit_2 = semaphore.try_acquire_many(2);
293     /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
294     /// # }
295     /// ```
296     ///
297     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
298     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
299     /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
300     pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
301         match self.ll_sem.try_acquire(n) {
302             Ok(_) => Ok(SemaphorePermit {
303                 sem: self,
304                 permits: n,
305             }),
306             Err(e) => Err(e),
307         }
308     }
309 
310     /// Acquires a permit from the semaphore.
311     ///
312     /// The semaphore must be wrapped in an [`Arc`] to call this method.
313     /// If the semaphore has been closed, this returns an [`AcquireError`].
314     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
315     /// acquired permit.
316     ///
317     /// # Cancel safety
318     ///
319     /// This method uses a queue to fairly distribute permits in the order they
320     /// were requested. Cancelling a call to `acquire_owned` makes you lose your
321     /// place in the queue.
322     ///
323     /// # Examples
324     ///
325     /// ```
326     /// use std::sync::Arc;
327     /// use tokio::sync::Semaphore;
328     ///
329     /// #[tokio::main]
330     /// async fn main() {
331     ///     let semaphore = Arc::new(Semaphore::new(3));
332     ///     let mut join_handles = Vec::new();
333     ///
334     ///     for _ in 0..5 {
335     ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
336     ///         join_handles.push(tokio::spawn(async move {
337     ///             // perform task...
338     ///             // explicitly own `permit` in the task
339     ///             drop(permit);
340     ///         }));
341     ///     }
342     ///
343     ///     for handle in join_handles {
344     ///         handle.await.unwrap();
345     ///     }
346     /// }
347     /// ```
348     ///
349     /// [`Arc`]: std::sync::Arc
350     /// [`AcquireError`]: crate::sync::AcquireError
351     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
352     pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
353         self.ll_sem.acquire(1).await?;
354         Ok(OwnedSemaphorePermit {
355             sem: self,
356             permits: 1,
357         })
358     }
359 
360     /// Acquires `n` permits from the semaphore.
361     ///
362     /// The semaphore must be wrapped in an [`Arc`] to call this method.
363     /// If the semaphore has been closed, this returns an [`AcquireError`].
364     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
365     /// acquired permit.
366     ///
367     /// # Cancel safety
368     ///
369     /// This method uses a queue to fairly distribute permits in the order they
370     /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
371     /// your place in the queue.
372     ///
373     /// # Examples
374     ///
375     /// ```
376     /// use std::sync::Arc;
377     /// use tokio::sync::Semaphore;
378     ///
379     /// #[tokio::main]
380     /// async fn main() {
381     ///     let semaphore = Arc::new(Semaphore::new(10));
382     ///     let mut join_handles = Vec::new();
383     ///
384     ///     for _ in 0..5 {
385     ///         let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
386     ///         join_handles.push(tokio::spawn(async move {
387     ///             // perform task...
388     ///             // explicitly own `permit` in the task
389     ///             drop(permit);
390     ///         }));
391     ///     }
392     ///
393     ///     for handle in join_handles {
394     ///         handle.await.unwrap();
395     ///     }
396     /// }
397     /// ```
398     ///
399     /// [`Arc`]: std::sync::Arc
400     /// [`AcquireError`]: crate::sync::AcquireError
401     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
402     pub async fn acquire_many_owned(
403         self: Arc<Self>,
404         n: u32,
405     ) -> Result<OwnedSemaphorePermit, AcquireError> {
406         self.ll_sem.acquire(n).await?;
407         Ok(OwnedSemaphorePermit {
408             sem: self,
409             permits: n,
410         })
411     }
412 
413     /// Tries to acquire a permit from the semaphore.
414     ///
415     /// The semaphore must be wrapped in an [`Arc`] to call this method. If
416     /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
417     /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
418     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
419     /// acquired permit.
420     ///
421     /// # Examples
422     ///
423     /// ```
424     /// use std::sync::Arc;
425     /// use tokio::sync::{Semaphore, TryAcquireError};
426     ///
427     /// # fn main() {
428     /// let semaphore = Arc::new(Semaphore::new(2));
429     ///
430     /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
431     /// assert_eq!(semaphore.available_permits(), 1);
432     ///
433     /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
434     /// assert_eq!(semaphore.available_permits(), 0);
435     ///
436     /// let permit_3 = semaphore.try_acquire_owned();
437     /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
438     /// # }
439     /// ```
440     ///
441     /// [`Arc`]: std::sync::Arc
442     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
443     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
444     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
445     pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
446         match self.ll_sem.try_acquire(1) {
447             Ok(_) => Ok(OwnedSemaphorePermit {
448                 sem: self,
449                 permits: 1,
450             }),
451             Err(e) => Err(e),
452         }
453     }
454 
455     /// Tries to acquire `n` permits from the semaphore.
456     ///
457     /// The semaphore must be wrapped in an [`Arc`] to call this method. If
458     /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
459     /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
460     /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
461     /// acquired permit.
462     ///
463     /// # Examples
464     ///
465     /// ```
466     /// use std::sync::Arc;
467     /// use tokio::sync::{Semaphore, TryAcquireError};
468     ///
469     /// # fn main() {
470     /// let semaphore = Arc::new(Semaphore::new(4));
471     ///
472     /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
473     /// assert_eq!(semaphore.available_permits(), 1);
474     ///
475     /// let permit_2 = semaphore.try_acquire_many_owned(2);
476     /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
477     /// # }
478     /// ```
479     ///
480     /// [`Arc`]: std::sync::Arc
481     /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
482     /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
483     /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
484     pub fn try_acquire_many_owned(
485         self: Arc<Self>,
486         n: u32,
487     ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
488         match self.ll_sem.try_acquire(n) {
489             Ok(_) => Ok(OwnedSemaphorePermit {
490                 sem: self,
491                 permits: n,
492             }),
493             Err(e) => Err(e),
494         }
495     }
496 
497     /// Closes the semaphore.
498     ///
499     /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
500     ///
501     /// # Examples
502     ///
503     /// ```
504     /// use tokio::sync::Semaphore;
505     /// use std::sync::Arc;
506     /// use tokio::sync::TryAcquireError;
507     ///
508     /// #[tokio::main]
509     /// async fn main() {
510     ///     let semaphore = Arc::new(Semaphore::new(1));
511     ///     let semaphore2 = semaphore.clone();
512     ///
513     ///     tokio::spawn(async move {
514     ///         let permit = semaphore.acquire_many(2).await;
515     ///         assert!(permit.is_err());
516     ///         println!("waiter received error");
517     ///     });
518     ///
519     ///     println!("closing semaphore");
520     ///     semaphore2.close();
521     ///
522     ///     // Cannot obtain more permits
523     ///     assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
524     /// }
525     /// ```
526     pub fn close(&self) {
527         self.ll_sem.close();
528     }
529 
530     /// Returns true if the semaphore is closed
531     pub fn is_closed(&self) -> bool {
532         self.ll_sem.is_closed()
533     }
534 }
535 
536 impl<'a> SemaphorePermit<'a> {
537     /// Forgets the permit **without** releasing it back to the semaphore.
538     /// This can be used to reduce the amount of permits available from a
539     /// semaphore.
540     pub fn forget(mut self) {
541         self.permits = 0;
542     }
543 }
544 
545 impl OwnedSemaphorePermit {
546     /// Forgets the permit **without** releasing it back to the semaphore.
547     /// This can be used to reduce the amount of permits available from a
548     /// semaphore.
549     pub fn forget(mut self) {
550         self.permits = 0;
551     }
552 }
553 
554 impl<'a> Drop for SemaphorePermit<'_> {
555     fn drop(&mut self) {
556         self.sem.add_permits(self.permits as usize);
557     }
558 }
559 
560 impl Drop for OwnedSemaphorePermit {
561     fn drop(&mut self) {
562         self.sem.add_permits(self.permits as usize);
563     }
564 }
565