1 //! Opt-in yield points for improved cooperative scheduling.
2 //!
3 //! A single call to [`poll`] on a top-level task may potentially do a lot of work before it
4 //! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the
5 //! executor, it can starve other tasks waiting on that executor to execute them, or drive
6 //! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a
7 //! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate
8 //! with the executor to avoid starvation.
9 //!
10 //! Consider a future like this one:
11 //!
12 //! ```
13 //! # use tokio::stream::{Stream, StreamExt};
14 //! async fn drop_all<I: Stream + Unpin>(mut input: I) {
15 //! while let Some(_) = input.next().await {}
16 //! }
17 //! ```
18 //!
19 //! It may look harmless, but consider what happens under heavy load if the input stream is
20 //! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks
21 //! and resources on the same executor. With opt-in yield points, this problem is alleviated:
22 //!
23 //! ```ignore
24 //! # use tokio::stream::{Stream, StreamExt};
25 //! async fn drop_all<I: Stream + Unpin>(mut input: I) {
26 //! while let Some(_) = input.next().await {
27 //! tokio::coop::proceed().await;
28 //! }
29 //! }
30 //! ```
31 //!
32 //! The `proceed` future will coordinate with the executor to make sure that every so often control
33 //! is yielded back to the executor so it can run other tasks.
34 //!
35 //! # Placing yield points
36 //!
37 //! Voluntary yield points should be placed _after_ at least some work has been done. If they are
38 //! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because
39 //! of the number of yield points that inevitably appear before it is reached. In general, you will
40 //! want yield points to only appear in "leaf" futures -- those that do not themselves poll other
41 //! futures. By doing this, you avoid double-counting each iteration of the outer future against
42 //! the cooperating budget.
43 //!
44 //! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
45
46 // NOTE: The doctests in this module are ignored since the whole module is (currently) private.
47
48 use std::cell::Cell;
49 use std::future::Future;
50 use std::pin::Pin;
51 use std::task::{Context, Poll};
52
53 /// Constant used to determine how much "work" a task is allowed to do without yielding.
54 ///
55 /// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup
56 /// and scheduling costs, but low enough that we do not starve other tasks for too long. The value
57 /// also needs to be high enough that particularly deep tasks are able to do at least some useful
58 /// work at all.
59 ///
60 /// Note that as more yield points are added in the ecosystem, this value will probably also have
61 /// to be raised.
62 const BUDGET: usize = 128;
63
64 /// Constant used to determine if budgeting has been disabled.
65 const UNCONSTRAINED: usize = usize::max_value();
66
67 thread_local! {
68 static HITS: Cell<usize> = Cell::new(UNCONSTRAINED);
69 }
70
71 /// Run the given closure with a cooperative task budget.
72 ///
73 /// Enabling budgeting when it is already enabled is a no-op.
74 #[inline(always)]
budget<F, R>(f: F) -> R where F: FnOnce() -> R,75 pub(crate) fn budget<F, R>(f: F) -> R
76 where
77 F: FnOnce() -> R,
78 {
79 HITS.with(move |hits| {
80 if hits.get() != UNCONSTRAINED {
81 // We are already being budgeted.
82 //
83 // Arguably this should be an error, but it can happen "correctly"
84 // such as with block_on + LocalSet, so we make it a no-op.
85 return f();
86 }
87
88 struct Guard<'a>(&'a Cell<usize>);
89 impl<'a> Drop for Guard<'a> {
90 fn drop(&mut self) {
91 self.0.set(UNCONSTRAINED);
92 }
93 }
94
95 hits.set(BUDGET);
96 let _guard = Guard(hits);
97 f()
98 })
99 }
100
101 cfg_rt_threaded! {
102 #[inline(always)]
103 pub(crate) fn has_budget_remaining() -> bool {
104 HITS.with(|hits| hits.get() > 0)
105 }
106 }
107
108 cfg_blocking_impl! {
109 /// Forcibly remove the budgeting constraints early.
110 pub(crate) fn stop() {
111 HITS.with(|hits| {
112 hits.set(UNCONSTRAINED);
113 });
114 }
115 }
116
117 /// Invoke `f` with a subset of the remaining budget.
118 ///
119 /// This is useful if you have sub-futures that you need to poll, but that you want to restrict
120 /// from using up your entire budget. For example, imagine the following future:
121 ///
122 /// ```rust
123 /// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
124 /// use futures::stream::FuturesUnordered;
125 /// struct MyFuture<F1, F2> {
126 /// big: FuturesUnordered<F1>,
127 /// small: F2,
128 /// }
129 ///
130 /// use tokio::stream::Stream;
131 /// impl<F1, F2> Future for MyFuture<F1, F2>
132 /// where F1: Future, F2: Future
133 /// # , F1: Unpin, F2: Unpin
134 /// {
135 /// type Output = F2::Output;
136 ///
137 /// // fn poll(...)
138 /// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
139 /// # let this = &mut *self;
140 /// let mut big = // something to pin self.big
141 /// # Pin::new(&mut this.big);
142 /// let small = // something to pin self.small
143 /// # Pin::new(&mut this.small);
144 ///
145 /// // see if any of the big futures have finished
146 /// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) {
147 /// // do something with e
148 /// # let _ = e;
149 /// }
150 ///
151 /// // see if the small future has finished
152 /// small.poll(cx)
153 /// }
154 /// # }
155 /// ```
156 ///
157 /// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and
158 /// `small` never gets polled. That would be sad. If you want to stick up for the little future,
159 /// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a
160 /// particular segment of your code. In the code above, you would write
161 ///
162 /// ```rust,ignore
163 /// # use std::{future::Future, pin::Pin, task::{Context, Poll}};
164 /// # use futures::stream::FuturesUnordered;
165 /// # struct MyFuture<F1, F2> {
166 /// # big: FuturesUnordered<F1>,
167 /// # small: F2,
168 /// # }
169 /// #
170 /// # use tokio::stream::Stream;
171 /// # impl<F1, F2> Future for MyFuture<F1, F2>
172 /// # where F1: Future, F2: Future
173 /// # , F1: Unpin, F2: Unpin
174 /// # {
175 /// # type Output = F2::Output;
176 /// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F2::Output> {
177 /// # let this = &mut *self;
178 /// # let mut big = Pin::new(&mut this.big);
179 /// # let small = Pin::new(&mut this.small);
180 /// #
181 /// // see if any of the big futures have finished
182 /// while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) {
183 /// # // do something with e
184 /// # let _ = e;
185 /// # }
186 /// # small.poll(cx)
187 /// # }
188 /// # }
189 /// ```
190 ///
191 /// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left
192 /// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`,
193 /// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its
194 /// entire budget.
195 ///
196 /// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code
197 /// inside the buget is the _minimum_ of the _current_ budget and the bound.
198 ///
199 #[allow(unreachable_pub, dead_code)]
limit<R>(bound: usize, f: impl FnOnce() -> R) -> R200 pub fn limit<R>(bound: usize, f: impl FnOnce() -> R) -> R {
201 HITS.with(|hits| {
202 let budget = hits.get();
203 // with_bound cannot _increase_ the remaining budget
204 let bound = std::cmp::min(budget, bound);
205 // When f() exits, how much should we add to what is left?
206 let floor = budget.saturating_sub(bound);
207 // Make sure we restore the remaining budget even on panic
208 struct RestoreBudget<'a>(&'a Cell<usize>, usize);
209 impl<'a> Drop for RestoreBudget<'a> {
210 fn drop(&mut self) {
211 let left = self.0.get();
212 self.0.set(self.1 + left);
213 }
214 }
215 // Time to restrict!
216 hits.set(bound);
217 let _restore = RestoreBudget(&hits, floor);
218 f()
219 })
220 }
221
222 /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
223 #[allow(unreachable_pub, dead_code)]
224 #[inline]
poll_proceed(cx: &mut Context<'_>) -> Poll<()>225 pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
226 HITS.with(|hits| {
227 let n = hits.get();
228 if n == UNCONSTRAINED {
229 // opted out of budgeting
230 Poll::Ready(())
231 } else if n == 0 {
232 cx.waker().wake_by_ref();
233 Poll::Pending
234 } else {
235 hits.set(n.saturating_sub(1));
236 Poll::Ready(())
237 }
238 })
239 }
240
241 /// Resolves immediately unless the current task has already exceeded its budget.
242 ///
243 /// This should be placed after at least some work has been done. Otherwise a future sufficiently
244 /// deep in the task hierarchy may end up never getting to run because of the number of yield
245 /// points that inevitably appear before it is even reached. For example:
246 ///
247 /// ```ignore
248 /// # use tokio::stream::{Stream, StreamExt};
249 /// async fn drop_all<I: Stream + Unpin>(mut input: I) {
250 /// while let Some(_) = input.next().await {
251 /// tokio::coop::proceed().await;
252 /// }
253 /// }
254 /// ```
255 #[allow(unreachable_pub, dead_code)]
256 #[inline]
proceed()257 pub async fn proceed() {
258 use crate::future::poll_fn;
259 poll_fn(|cx| poll_proceed(cx)).await;
260 }
261
262 pin_project_lite::pin_project! {
263 /// A future that cooperatively yields to the task scheduler when polling,
264 /// if the task's budget is exhausted.
265 ///
266 /// Internally, this is simply a future combinator which calls
267 /// [`poll_proceed`] in its `poll` implementation before polling the wrapped
268 /// future.
269 ///
270 /// # Examples
271 ///
272 /// ```rust,ignore
273 /// # #[tokio::main]
274 /// # async fn main() {
275 /// use tokio::coop::CoopFutureExt;
276 ///
277 /// async { /* ... */ }
278 /// .cooperate()
279 /// .await;
280 /// # }
281 /// ```
282 ///
283 /// [`poll_proceed`]: fn@poll_proceed
284 #[derive(Debug)]
285 #[allow(unreachable_pub, dead_code)]
286 pub struct CoopFuture<F> {
287 #[pin]
288 future: F,
289 }
290 }
291
292 impl<F: Future> Future for CoopFuture<F> {
293 type Output = F::Output;
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>294 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
295 ready!(poll_proceed(cx));
296 self.project().future.poll(cx)
297 }
298 }
299
300 impl<F: Future> CoopFuture<F> {
301 /// Returns a new `CoopFuture` wrapping the given future.
302 ///
303 #[allow(unreachable_pub, dead_code)]
new(future: F) -> Self304 pub fn new(future: F) -> Self {
305 Self { future }
306 }
307 }
308
309 // Currently only used by `tokio::sync`; and if we make this combinator public,
310 // it should probably be on the `FutureExt` trait instead.
311 cfg_sync! {
312 /// Extension trait providing `Future::cooperate` extension method.
313 ///
314 /// Note: if/when the co-op API becomes public, this method should probably be
315 /// provided by `FutureExt`, instead.
316 pub(crate) trait CoopFutureExt: Future {
317 /// Wrap `self` to cooperatively yield to the scheduler when polling, if the
318 /// task's budget is exhausted.
319 fn cooperate(self) -> CoopFuture<Self>
320 where
321 Self: Sized,
322 {
323 CoopFuture::new(self)
324 }
325 }
326
327 impl<F> CoopFutureExt for F where F: Future {}
328 }
329
330 #[cfg(all(test, not(loom)))]
331 mod test {
332 use super::*;
333
get() -> usize334 fn get() -> usize {
335 HITS.with(|hits| hits.get())
336 }
337
338 #[test]
bugeting()339 fn bugeting() {
340 use tokio_test::*;
341
342 assert_eq!(get(), UNCONSTRAINED);
343 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
344 assert_eq!(get(), UNCONSTRAINED);
345 budget(|| {
346 assert_eq!(get(), BUDGET);
347 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
348 assert_eq!(get(), BUDGET - 1);
349 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
350 assert_eq!(get(), BUDGET - 2);
351 });
352 assert_eq!(get(), UNCONSTRAINED);
353
354 budget(|| {
355 limit(3, || {
356 assert_eq!(get(), 3);
357 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
358 assert_eq!(get(), 2);
359 limit(4, || {
360 assert_eq!(get(), 2);
361 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
362 assert_eq!(get(), 1);
363 });
364 assert_eq!(get(), 1);
365 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
366 assert_eq!(get(), 0);
367 assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
368 assert_eq!(get(), 0);
369 assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
370 assert_eq!(get(), 0);
371 });
372 assert_eq!(get(), BUDGET - 3);
373 assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
374 assert_eq!(get(), BUDGET - 4);
375 assert_ready!(task::spawn(proceed()).poll());
376 assert_eq!(get(), BUDGET - 5);
377 });
378 }
379 }
380