1 #![cfg_attr(not(feature = "full"), allow(dead_code))]
2 
3 //! Yield points for improved cooperative scheduling.
4 //!
5 //! Documentation for this can be found in the [`tokio::task`] module.
6 //!
7 //! [`tokio::task`]: crate::task.
8 
9 // ```ignore
10 // # use tokio_stream::{Stream, StreamExt};
11 // async fn drop_all<I: Stream + Unpin>(mut input: I) {
12 //     while let Some(_) = input.next().await {
13 //         tokio::coop::proceed().await;
14 //     }
15 // }
16 // ```
17 //
18 // The `proceed` future will coordinate with the executor to make sure that
19 // every so often control is yielded back to the executor so it can run other
20 // tasks.
21 //
22 // # Placing yield points
23 //
24 // Voluntary yield points should be placed _after_ at least some work has been
25 // done. If they are not, a future sufficiently deep in the task hierarchy may
26 // end up _never_ getting to run because of the number of yield points that
27 // inevitably appear before it is reached. In general, you will want yield
28 // points to only appear in "leaf" futures -- those that do not themselves poll
29 // other futures. By doing this, you avoid double-counting each iteration of
30 // the outer future against the cooperating budget.
31 
32 use std::cell::Cell;
33 
34 thread_local! {
35     static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
36 }
37 
38 /// Opaque type tracking the amount of "work" a task may still do before
39 /// yielding back to the scheduler.
40 #[derive(Debug, Copy, Clone)]
41 pub(crate) struct Budget(Option<u8>);
42 
43 impl Budget {
44     /// Budget assigned to a task on each poll.
45     ///
46     /// The value itself is chosen somewhat arbitrarily. It needs to be high
47     /// enough to amortize wakeup and scheduling costs, but low enough that we
48     /// do not starve other tasks for too long. The value also needs to be high
49     /// enough that particularly deep tasks are able to do at least some useful
50     /// work at all.
51     ///
52     /// Note that as more yield points are added in the ecosystem, this value
53     /// will probably also have to be raised.
initial() -> Budget54     const fn initial() -> Budget {
55         Budget(Some(128))
56     }
57 
58     /// Returns an unconstrained budget. Operations will not be limited.
unconstrained() -> Budget59     const fn unconstrained() -> Budget {
60         Budget(None)
61     }
62 }
63 
64 cfg_rt_multi_thread! {
65     impl Budget {
66         fn has_remaining(self) -> bool {
67             self.0.map(|budget| budget > 0).unwrap_or(true)
68         }
69     }
70 }
71 
72 /// Runs the given closure with a cooperative task budget. When the function
73 /// returns, the budget is reset to the value prior to calling the function.
74 #[inline(always)]
budget<R>(f: impl FnOnce() -> R) -> R75 pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
76     with_budget(Budget::initial(), f)
77 }
78 
79 /// Runs the given closure with an unconstrained task budget. When the function returns, the budget
80 /// is reset to the value prior to calling the function.
81 #[inline(always)]
with_unconstrained<R>(f: impl FnOnce() -> R) -> R82 pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
83     with_budget(Budget::unconstrained(), f)
84 }
85 
86 #[inline(always)]
with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R87 fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
88     struct ResetGuard<'a> {
89         cell: &'a Cell<Budget>,
90         prev: Budget,
91     }
92 
93     impl<'a> Drop for ResetGuard<'a> {
94         fn drop(&mut self) {
95             self.cell.set(self.prev);
96         }
97     }
98 
99     CURRENT.with(move |cell| {
100         let prev = cell.get();
101 
102         cell.set(budget);
103 
104         let _guard = ResetGuard { cell, prev };
105 
106         f()
107     })
108 }
109 
110 cfg_rt_multi_thread! {
111     /// Sets the current task's budget.
112     pub(crate) fn set(budget: Budget) {
113         CURRENT.with(|cell| cell.set(budget))
114     }
115 
116     #[inline(always)]
117     pub(crate) fn has_budget_remaining() -> bool {
118         CURRENT.with(|cell| cell.get().has_remaining())
119     }
120 }
121 
122 cfg_rt! {
123     /// Forcibly removes the budgeting constraints early.
124     ///
125     /// Returns the remaining budget
126     pub(crate) fn stop() -> Budget {
127         CURRENT.with(|cell| {
128             let prev = cell.get();
129             cell.set(Budget::unconstrained());
130             prev
131         })
132     }
133 }
134 
135 cfg_coop! {
136     use std::task::{Context, Poll};
137 
138     #[must_use]
139     pub(crate) struct RestoreOnPending(Cell<Budget>);
140 
141     impl RestoreOnPending {
142         pub(crate) fn made_progress(&self) {
143             self.0.set(Budget::unconstrained());
144         }
145     }
146 
147     impl Drop for RestoreOnPending {
148         fn drop(&mut self) {
149             // Don't reset if budget was unconstrained or if we made progress.
150             // They are both represented as the remembered budget being unconstrained.
151             let budget = self.0.get();
152             if !budget.is_unconstrained() {
153                 CURRENT.with(|cell| {
154                     cell.set(budget);
155                 });
156             }
157         }
158     }
159 
160     /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
161     ///
162     /// When you call this method, the current budget is decremented. However, to ensure that
163     /// progress is made every time a task is polled, the budget is automatically restored to its
164     /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
165     /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
166     /// that the budget empties appropriately.
167     ///
168     /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
169     /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
170     /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
171     /// that progress was made.
172     #[inline]
173     pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
174         CURRENT.with(|cell| {
175             let mut budget = cell.get();
176 
177             if budget.decrement() {
178                 let restore = RestoreOnPending(Cell::new(cell.get()));
179                 cell.set(budget);
180                 Poll::Ready(restore)
181             } else {
182                 cx.waker().wake_by_ref();
183                 Poll::Pending
184             }
185         })
186     }
187 
188     impl Budget {
189         /// Decrements the budget. Returns `true` if successful. Decrementing fails
190         /// when there is not enough remaining budget.
191         fn decrement(&mut self) -> bool {
192             if let Some(num) = &mut self.0 {
193                 if *num > 0 {
194                     *num -= 1;
195                     true
196                 } else {
197                     false
198                 }
199             } else {
200                 true
201             }
202         }
203 
204         fn is_unconstrained(self) -> bool {
205             self.0.is_none()
206         }
207     }
208 }
209 
210 #[cfg(all(test, not(loom)))]
211 mod test {
212     use super::*;
213 
get() -> Budget214     fn get() -> Budget {
215         CURRENT.with(|cell| cell.get())
216     }
217 
218     #[test]
bugeting()219     fn bugeting() {
220         use futures::future::poll_fn;
221         use tokio_test::*;
222 
223         assert!(get().0.is_none());
224 
225         let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
226 
227         assert!(get().0.is_none());
228         drop(coop);
229         assert!(get().0.is_none());
230 
231         budget(|| {
232             assert_eq!(get().0, Budget::initial().0);
233 
234             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
235             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
236             drop(coop);
237             // we didn't make progress
238             assert_eq!(get().0, Budget::initial().0);
239 
240             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
241             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
242             coop.made_progress();
243             drop(coop);
244             // we _did_ make progress
245             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
246 
247             let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
248             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
249             coop.made_progress();
250             drop(coop);
251             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
252 
253             budget(|| {
254                 assert_eq!(get().0, Budget::initial().0);
255 
256                 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
257                 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
258                 coop.made_progress();
259                 drop(coop);
260                 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
261             });
262 
263             assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
264         });
265 
266         assert!(get().0.is_none());
267 
268         budget(|| {
269             let n = get().0.unwrap();
270 
271             for _ in 0..n {
272                 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
273                 coop.made_progress();
274             }
275 
276             let mut task = task::spawn(poll_fn(|cx| {
277                 let coop = ready!(poll_proceed(cx));
278                 coop.made_progress();
279                 Poll::Ready(())
280             }));
281 
282             assert_pending!(task.poll());
283         });
284     }
285 }
286