1 //! Opt-in yield points for improved cooperative scheduling.
2 //!
3 //! A single call to [`poll`] on a top-level task may potentially do a lot of work before it
4 //! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the
5 //! executor, it can starve other tasks waiting on that executor to execute them, or drive
6 //! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a
7 //! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate
8 //! with the executor to avoid starvation.
9 //!
10 //! Consider a future like this one:
11 //!
12 //! ```
13 //! # use tokio::stream::{Stream, StreamExt};
14 //! async fn drop_all<I: Stream + Unpin>(mut input: I) {
15 //!     while let Some(_) = input.next().await {}
16 //! }
17 //! ```
18 //!
19 //! It may look harmless, but consider what happens under heavy load if the input stream is
20 //! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks
21 //! and resources on the same executor. With opt-in yield points, this problem is alleviated:
22 //!
23 //! ```ignore
24 //! # use tokio::stream::{Stream, StreamExt};
25 //! async fn drop_all<I: Stream + Unpin>(mut input: I) {
26 //!     while let Some(_) = input.next().await {
27 //!         tokio::coop::proceed().await;
28 //!     }
29 //! }
30 //! ```
31 //!
32 //! The `proceed` future will coordinate with the executor to make sure that every so often control
33 //! is yielded back to the executor so it can run other tasks.
34 //!
35 //! # Placing yield points
36 //!
37 //! Voluntary yield points should be placed _after_ at least some work has been done. If they are
38 //! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because
39 //! of the number of yield points that inevitably appear before it is reached. In general, you will
40 //! want yield points to only appear in "leaf" futures -- those that do not themselves poll other
41 //! futures. By doing this, you avoid double-counting each iteration of the outer future against
42 //! the cooperating budget.
43 //!
44 //!   [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
45 
46 // NOTE: The doctests in this module are ignored since the whole module is (currently) private.
47 
48 use std::cell::Cell;
49 use std::future::Future;
50 use std::pin::Pin;
51 use std::task::{Context, Poll};
52 
53 /// Constant used to determine how much "work" a task is allowed to do without yielding.
54 ///
55 /// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup
56 /// and scheduling costs, but low enough that we do not starve other tasks for too long. The value
57 /// also needs to be high enough that particularly deep tasks are able to do at least some useful
58 /// work at all.
59 ///
60 /// Note that as more yield points are added in the ecosystem, this value will probably also have
61 /// to be raised.
62 const BUDGET: usize = 128;
63 
64 /// Constant used to determine if budgeting has been disabled.
65 const UNCONSTRAINED: usize = usize::max_value();
66 
67 thread_local! {
68     static HITS: Cell<usize> = Cell::new(UNCONSTRAINED);
69 }
70 
71 /// Run the given closure with a cooperative task budget.
72 ///
73 /// Enabling budgeting when it is already enabled is a no-op.
74 #[inline(always)]
budget<F, R>(f: F) -> R where F: FnOnce() -> R,75 pub(crate) fn budget<F, R>(f: F) -> R
76 where
77     F: FnOnce() -> R,
78 {
79     HITS.with(move |hits| {
80         if hits.get() != UNCONSTRAINED {
81             // We are already being budgeted.
82             //
83             // Arguably this should be an error, but it can happen "correctly"
84             // such as with block_on + LocalSet, so we make it a no-op.
85             return f();
86         }
87 
88         struct Guard<'a>(&'a Cell<usize>);
89         impl<'a> Drop for Guard<'a> {
90             fn drop(&mut self) {
91                 self.0.set(UNCONSTRAINED);
92             }
93         }
94 
95         hits.set(BUDGET);
96         let _guard = Guard(hits);
97         f()
98     })
99 }
100 
101 cfg_rt_threaded! {
102     #[inline(always)]
103     pub(crate) fn has_budget_remaining() -> bool {
104         HITS.with(|hits| hits.get() > 0)
105     }
106 }
107 
108 cfg_blocking_impl! {
109     /// Forcibly remove the budgeting constraints early.
110     pub(crate) fn stop() {
111         HITS.with(|hits| {
112             hits.set(UNCONSTRAINED);
113         });
114     }
115 }
116 
117 /// Invoke `f` with a subset of the remaining budget.
118 ///
119 /// This is useful if you have sub-futures that you need to poll, but that you want to restrict
120 /// from using up your entire budget. For example, imagine the following future:
121 ///
122 /// ```rust
123 /// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
124 /// use futures::stream::FuturesUnordered;
125 /// struct MyFuture<F1, F2> {
126 ///     big: FuturesUnordered<F1>,
127 ///     small: F2,
128 /// }
129 ///
130 /// use tokio::stream::Stream;
131 /// impl<F1, F2> Future for MyFuture<F1, F2>
132 ///   where F1: Future, F2: Future
133 /// # , F1: Unpin, F2: Unpin
134 /// {
135 ///     type Output = F2::Output;
136 ///
137 ///     // fn poll(...)
138 /// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
139 /// #   let this = &mut *self;
140 ///     let mut big = // something to pin self.big
141 /// #                 Pin::new(&mut this.big);
142 ///     let small = // something to pin self.small
143 /// #             Pin::new(&mut this.small);
144 ///
145 ///     // see if any of the big futures have finished
146 ///     while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) {
147 ///         // do something with e
148 /// #       let _ = e;
149 ///     }
150 ///
151 ///     // see if the small future has finished
152 ///     small.poll(cx)
153 /// }
154 /// # }
155 /// ```
156 ///
157 /// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and
158 /// `small` never gets polled. That would be sad. If you want to stick up for the little future,
159 /// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a
160 /// particular segment of your code. In the code above, you would write
161 ///
162 /// ```rust,ignore
163 /// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
164 /// # use futures::stream::FuturesUnordered;
165 /// # struct MyFuture<F1, F2> {
166 /// #     big: FuturesUnordered<F1>,
167 /// #     small: F2,
168 /// # }
169 /// #
170 /// # use tokio::stream::Stream;
171 /// # impl<F1, F2> Future for MyFuture<F1, F2>
172 /// #   where F1: Future, F2: Future
173 /// # , F1: Unpin, F2: Unpin
174 /// # {
175 /// # type Output = F2::Output;
176 /// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
177 /// #   let this = &mut *self;
178 /// #   let mut big = Pin::new(&mut this.big);
179 /// #   let small = Pin::new(&mut this.small);
180 /// #
181 ///     // see if any of the big futures have finished
182 ///     while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) {
183 /// #       // do something with e
184 /// #       let _ = e;
185 /// #   }
186 /// #   small.poll(cx)
187 /// # }
188 /// # }
189 /// ```
190 ///
191 /// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left
192 /// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`,
193 /// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its
194 /// entire budget.
195 ///
196 /// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code
197 /// inside the buget is the _minimum_ of the _current_ budget and the bound.
198 ///
199 #[allow(unreachable_pub, dead_code)]
limit<R>(bound: usize, f: impl FnOnce() -> R) -> R200 pub fn limit<R>(bound: usize, f: impl FnOnce() -> R) -> R {
201     HITS.with(|hits| {
202         let budget = hits.get();
203         // with_bound cannot _increase_ the remaining budget
204         let bound = std::cmp::min(budget, bound);
205         // When f() exits, how much should we add to what is left?
206         let floor = budget.saturating_sub(bound);
207         // Make sure we restore the remaining budget even on panic
208         struct RestoreBudget<'a>(&'a Cell<usize>, usize);
209         impl<'a> Drop for RestoreBudget<'a> {
210             fn drop(&mut self) {
211                 let left = self.0.get();
212                 self.0.set(self.1 + left);
213             }
214         }
215         // Time to restrict!
216         hits.set(bound);
217         let _restore = RestoreBudget(&hits, floor);
218         f()
219     })
220 }
221 
222 /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
223 #[allow(unreachable_pub, dead_code)]
224 #[inline]
poll_proceed(cx: &mut Context<'_>) -> Poll<()>225 pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
226     HITS.with(|hits| {
227         let n = hits.get();
228         if n == UNCONSTRAINED {
229             // opted out of budgeting
230             Poll::Ready(())
231         } else if n == 0 {
232             cx.waker().wake_by_ref();
233             Poll::Pending
234         } else {
235             hits.set(n.saturating_sub(1));
236             Poll::Ready(())
237         }
238     })
239 }
240 
241 /// Resolves immediately unless the current task has already exceeded its budget.
242 ///
243 /// This should be placed after at least some work has been done. Otherwise a future sufficiently
244 /// deep in the task hierarchy may end up never getting to run because of the number of yield
245 /// points that inevitably appear before it is even reached. For example:
246 ///
247 /// ```ignore
248 /// # use tokio::stream::{Stream, StreamExt};
249 /// async fn drop_all<I: Stream + Unpin>(mut input: I) {
250 ///     while let Some(_) = input.next().await {
251 ///         tokio::coop::proceed().await;
252 ///     }
253 /// }
254 /// ```
255 #[allow(unreachable_pub, dead_code)]
256 #[inline]
proceed()257 pub async fn proceed() {
258     use crate::future::poll_fn;
259     poll_fn(|cx| poll_proceed(cx)).await;
260 }
261 
262 pin_project_lite::pin_project! {
263     /// A future that cooperatively yields to the task scheduler when polling,
264     /// if the task's budget is exhausted.
265     ///
266     /// Internally, this is simply a future combinator which calls
267     /// [`poll_proceed`] in its `poll` implementation before polling the wrapped
268     /// future.
269     ///
270     /// # Examples
271     ///
272     /// ```rust,ignore
273     /// # #[tokio::main]
274     /// # async fn main() {
275     /// use tokio::coop::CoopFutureExt;
276     ///
277     /// async {  /* ... */ }
278     ///     .cooperate()
279     ///     .await;
280     /// # }
281     /// ```
282     ///
283     /// [`poll_proceed`]: fn@poll_proceed
284     #[derive(Debug)]
285     #[allow(unreachable_pub, dead_code)]
286     pub struct CoopFuture<F> {
287         #[pin]
288         future: F,
289     }
290 }
291 
292 impl<F: Future> Future for CoopFuture<F> {
293     type Output = F::Output;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>294     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
295         ready!(poll_proceed(cx));
296         self.project().future.poll(cx)
297     }
298 }
299 
300 impl<F: Future> CoopFuture<F> {
301     /// Returns a new `CoopFuture` wrapping the given future.
302     ///
303     #[allow(unreachable_pub, dead_code)]
new(future: F) -> Self304     pub fn new(future: F) -> Self {
305         Self { future }
306     }
307 }
308 
309 // Currently only used by `tokio::sync`; and if we make this combinator public,
310 // it should probably be on the `FutureExt` trait instead.
311 cfg_sync! {
312     /// Extension trait providing `Future::cooperate` extension method.
313     ///
314     /// Note: if/when the co-op API becomes public, this method should probably be
315     /// provided by `FutureExt`, instead.
316     pub(crate) trait CoopFutureExt: Future {
317         /// Wrap `self` to cooperatively yield to the scheduler when polling, if the
318         /// task's budget is exhausted.
319         fn cooperate(self) -> CoopFuture<Self>
320         where
321             Self: Sized,
322         {
323             CoopFuture::new(self)
324         }
325     }
326 
327     impl<F> CoopFutureExt for F where F: Future {}
328 }
329 
330 #[cfg(all(test, not(loom)))]
331 mod test {
332     use super::*;
333 
get() -> usize334     fn get() -> usize {
335         HITS.with(|hits| hits.get())
336     }
337 
338     #[test]
bugeting()339     fn bugeting() {
340         use tokio_test::*;
341 
342         assert_eq!(get(), UNCONSTRAINED);
343         assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
344         assert_eq!(get(), UNCONSTRAINED);
345         budget(|| {
346             assert_eq!(get(), BUDGET);
347             assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
348             assert_eq!(get(), BUDGET - 1);
349             assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
350             assert_eq!(get(), BUDGET - 2);
351         });
352         assert_eq!(get(), UNCONSTRAINED);
353 
354         budget(|| {
355             limit(3, || {
356                 assert_eq!(get(), 3);
357                 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
358                 assert_eq!(get(), 2);
359                 limit(4, || {
360                     assert_eq!(get(), 2);
361                     assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
362                     assert_eq!(get(), 1);
363                 });
364                 assert_eq!(get(), 1);
365                 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
366                 assert_eq!(get(), 0);
367                 assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
368                 assert_eq!(get(), 0);
369                 assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
370                 assert_eq!(get(), 0);
371             });
372             assert_eq!(get(), BUDGET - 3);
373             assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
374             assert_eq!(get(), BUDGET - 4);
375             assert_ready!(task::spawn(proceed()).poll());
376             assert_eq!(get(), BUDGET - 5);
377         });
378     }
379 }
380