1 #![cfg_attr(not(feature = "full"), allow(dead_code))]
2 
3 //! Yield points for improved cooperative scheduling.
4 //!
5 //! Documentation for this can be found in the [`tokio::task`] module.
6 //!
7 //! [`tokio::task`]: crate::task.
8 
9 // ```ignore
10 // # use tokio_stream::{Stream, StreamExt};
11 // async fn drop_all<I: Stream + Unpin>(mut input: I) {
12 //     while let Some(_) = input.next().await {
13 //         tokio::coop::proceed().await;
14 //     }
15 // }
16 // ```
17 //
18 // The `proceed` future will coordinate with the executor to make sure that
19 // every so often control is yielded back to the executor so it can run other
20 // tasks.
21 //
22 // # Placing yield points
23 //
24 // Voluntary yield points should be placed _after_ at least some work has been
25 // done. If they are not, a future sufficiently deep in the task hierarchy may
26 // end up _never_ getting to run because of the number of yield points that
27 // inevitably appear before it is reached. In general, you will want yield
28 // points to only appear in "leaf" futures -- those that do not themselves poll
29 // other futures. By doing this, you avoid double-counting each iteration of
30 // the outer future against the cooperating budget.
31 
32 use std::cell::Cell;
33 
34 thread_local! {
35     static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
36 }
37 
38 /// Opaque type tracking the amount of "work" a task may still do before
39 /// yielding back to the scheduler.
40 #[derive(Debug, Copy, Clone)]
41 pub(crate) struct Budget(Option<u8>);
42 
43 impl Budget {
44     /// Budget assigned to a task on each poll.
45     ///
46     /// The value itself is chosen somewhat arbitrarily. It needs to be high
47     /// enough to amortize wakeup and scheduling costs, but low enough that we
48     /// do not starve other tasks for too long. The value also needs to be high
49     /// enough that particularly deep tasks are able to do at least some useful
50     /// work at all.
51     ///
52     /// Note that as more yield points are added in the ecosystem, this value
53     /// will probably also have to be raised.
initial() -> Budget54     const fn initial() -> Budget {
55         Budget(Some(128))
56     }
57 
58     /// Returns an unconstrained budget. Operations will not be limited.
unconstrained() -> Budget59     const fn unconstrained() -> Budget {
60         Budget(None)
61     }
62 
has_remaining(self) -> bool63     fn has_remaining(self) -> bool {
64         self.0.map(|budget| budget > 0).unwrap_or(true)
65     }
66 }
67 
68 /// Runs the given closure with a cooperative task budget. When the function
69 /// returns, the budget is reset to the value prior to calling the function.
70 #[inline(always)]
budget<R>(f: impl FnOnce() -> R) -> R71 pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
72     with_budget(Budget::initial(), f)
73 }
74 
75 /// Runs the given closure with an unconstrained task budget. When the function returns, the budget
76 /// is reset to the value prior to calling the function.
77 #[inline(always)]
with_unconstrained<R>(f: impl FnOnce() -> R) -> R78 pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
79     with_budget(Budget::unconstrained(), f)
80 }
81 
82 #[inline(always)]
with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R83 fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
84     struct ResetGuard<'a> {
85         cell: &'a Cell<Budget>,
86         prev: Budget,
87     }
88 
89     impl<'a> Drop for ResetGuard<'a> {
90         fn drop(&mut self) {
91             self.cell.set(self.prev);
92         }
93     }
94 
95     CURRENT.with(move |cell| {
96         let prev = cell.get();
97 
98         cell.set(budget);
99 
100         let _guard = ResetGuard { cell, prev };
101 
102         f()
103     })
104 }
105 
106 #[inline(always)]
has_budget_remaining() -> bool107 pub(crate) fn has_budget_remaining() -> bool {
108     CURRENT.with(|cell| cell.get().has_remaining())
109 }
110 
111 cfg_rt_multi_thread! {
112     /// Sets the current task's budget.
113     pub(crate) fn set(budget: Budget) {
114         CURRENT.with(|cell| cell.set(budget))
115     }
116 }
117 
118 cfg_rt! {
119     /// Forcibly removes the budgeting constraints early.
120     ///
121     /// Returns the remaining budget
122     pub(crate) fn stop() -> Budget {
123         CURRENT.with(|cell| {
124             let prev = cell.get();
125             cell.set(Budget::unconstrained());
126             prev
127         })
128     }
129 }
130 
131 cfg_coop! {
132     use std::task::{Context, Poll};
133 
134     #[must_use]
135     pub(crate) struct RestoreOnPending(Cell<Budget>);
136 
137     impl RestoreOnPending {
138         pub(crate) fn made_progress(&self) {
139             self.0.set(Budget::unconstrained());
140         }
141     }
142 
143     impl Drop for RestoreOnPending {
144         fn drop(&mut self) {
145             // Don't reset if budget was unconstrained or if we made progress.
146             // They are both represented as the remembered budget being unconstrained.
147             let budget = self.0.get();
148             if !budget.is_unconstrained() {
149                 CURRENT.with(|cell| {
150                     cell.set(budget);
151                 });
152             }
153         }
154     }
155 
156     /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
157     ///
158     /// When you call this method, the current budget is decremented. However, to ensure that
159     /// progress is made every time a task is polled, the budget is automatically restored to its
160     /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
161     /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
162     /// that the budget empties appropriately.
163     ///
164     /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
165     /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
166     /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
167     /// that progress was made.
168     #[inline]
169     pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
170         CURRENT.with(|cell| {
171             let mut budget = cell.get();
172 
173             if budget.decrement() {
174                 let restore = RestoreOnPending(Cell::new(cell.get()));
175                 cell.set(budget);
176                 Poll::Ready(restore)
177             } else {
178                 cx.waker().wake_by_ref();
179                 Poll::Pending
180             }
181         })
182     }
183 
184     impl Budget {
185         /// Decrements the budget. Returns `true` if successful. Decrementing fails
186         /// when there is not enough remaining budget.
187         fn decrement(&mut self) -> bool {
188             if let Some(num) = &mut self.0 {
189                 if *num > 0 {
190                     *num -= 1;
191                     true
192                 } else {
193                     false
194                 }
195             } else {
196                 true
197             }
198         }
199 
200         fn is_unconstrained(self) -> bool {
201             self.0.is_none()
202         }
203     }
204 }
205 
206 #[cfg(all(test, not(loom)))]
207 mod test {
208     use super::*;
209 
210     #[cfg(target_arch = "wasm32")]
211     use wasm_bindgen_test::wasm_bindgen_test as test;
212 
get() -> Budget213     fn get() -> Budget {
214         CURRENT.with(|cell| cell.get())
215     }
216 
217     #[test]
bugeting()218     fn bugeting() {
219         use futures::future::poll_fn;
220         use tokio_test::*;
221 
222         assert!(get().0.is_none());
223 
224         let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
225 
226         assert!(get().0.is_none());
227         drop(coop);
228         assert!(get().0.is_none());
229 
230         budget(|| {
231             assert_eq!(get().0, Budget::initial().0);
232 
233             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
234             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
235             drop(coop);
236             // we didn't make progress
237             assert_eq!(get().0, Budget::initial().0);
238 
239             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
240             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
241             coop.made_progress();
242             drop(coop);
243             // we _did_ make progress
244             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
245 
246             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
247             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
248             coop.made_progress();
249             drop(coop);
250             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
251 
252             budget(|| {
253                 assert_eq!(get().0, Budget::initial().0);
254 
255                 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
256                 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
257                 coop.made_progress();
258                 drop(coop);
259                 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
260             });
261 
262             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
263         });
264 
265         assert!(get().0.is_none());
266 
267         budget(|| {
268             let n = get().0.unwrap();
269 
270             for _ in 0..n {
271                 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
272                 coop.made_progress();
273             }
274 
275             let mut task = task::spawn(poll_fn(|cx| {
276                 let coop = ready!(poll_proceed(cx));
277                 coop.made_progress();
278                 Poll::Ready(())
279             }));
280 
281             assert_pending!(task.poll());
282         });
283     }
284 }
285