1 #![cfg_attr(not(feature = "full"), allow(dead_code))]
2
3 //! Yield points for improved cooperative scheduling.
4 //!
5 //! Documentation for this can be found in the [`tokio::task`] module.
6 //!
7 //! [`tokio::task`]: crate::task.
8
9 // ```ignore
10 // # use tokio_stream::{Stream, StreamExt};
11 // async fn drop_all<I: Stream + Unpin>(mut input: I) {
12 // while let Some(_) = input.next().await {
13 // tokio::coop::proceed().await;
14 // }
15 // }
16 // ```
17 //
18 // The `proceed` future will coordinate with the executor to make sure that
19 // every so often control is yielded back to the executor so it can run other
20 // tasks.
21 //
22 // # Placing yield points
23 //
24 // Voluntary yield points should be placed _after_ at least some work has been
25 // done. If they are not, a future sufficiently deep in the task hierarchy may
26 // end up _never_ getting to run because of the number of yield points that
27 // inevitably appear before it is reached. In general, you will want yield
28 // points to only appear in "leaf" futures -- those that do not themselves poll
29 // other futures. By doing this, you avoid double-counting each iteration of
30 // the outer future against the cooperating budget.
31
32 use std::cell::Cell;
33
34 thread_local! {
35 static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
36 }
37
38 /// Opaque type tracking the amount of "work" a task may still do before
39 /// yielding back to the scheduler.
40 #[derive(Debug, Copy, Clone)]
41 pub(crate) struct Budget(Option<u8>);
42
43 impl Budget {
44 /// Budget assigned to a task on each poll.
45 ///
46 /// The value itself is chosen somewhat arbitrarily. It needs to be high
47 /// enough to amortize wakeup and scheduling costs, but low enough that we
48 /// do not starve other tasks for too long. The value also needs to be high
49 /// enough that particularly deep tasks are able to do at least some useful
50 /// work at all.
51 ///
52 /// Note that as more yield points are added in the ecosystem, this value
53 /// will probably also have to be raised.
initial() -> Budget54 const fn initial() -> Budget {
55 Budget(Some(128))
56 }
57
58 /// Returns an unconstrained budget. Operations will not be limited.
unconstrained() -> Budget59 const fn unconstrained() -> Budget {
60 Budget(None)
61 }
62
has_remaining(self) -> bool63 fn has_remaining(self) -> bool {
64 self.0.map(|budget| budget > 0).unwrap_or(true)
65 }
66 }
67
68 /// Runs the given closure with a cooperative task budget. When the function
69 /// returns, the budget is reset to the value prior to calling the function.
70 #[inline(always)]
budget<R>(f: impl FnOnce() -> R) -> R71 pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
72 with_budget(Budget::initial(), f)
73 }
74
75 /// Runs the given closure with an unconstrained task budget. When the function returns, the budget
76 /// is reset to the value prior to calling the function.
77 #[inline(always)]
with_unconstrained<R>(f: impl FnOnce() -> R) -> R78 pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
79 with_budget(Budget::unconstrained(), f)
80 }
81
82 #[inline(always)]
with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R83 fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
84 struct ResetGuard<'a> {
85 cell: &'a Cell<Budget>,
86 prev: Budget,
87 }
88
89 impl<'a> Drop for ResetGuard<'a> {
90 fn drop(&mut self) {
91 self.cell.set(self.prev);
92 }
93 }
94
95 CURRENT.with(move |cell| {
96 let prev = cell.get();
97
98 cell.set(budget);
99
100 let _guard = ResetGuard { cell, prev };
101
102 f()
103 })
104 }
105
106 #[inline(always)]
has_budget_remaining() -> bool107 pub(crate) fn has_budget_remaining() -> bool {
108 CURRENT.with(|cell| cell.get().has_remaining())
109 }
110
111 cfg_rt_multi_thread! {
112 /// Sets the current task's budget.
113 pub(crate) fn set(budget: Budget) {
114 CURRENT.with(|cell| cell.set(budget))
115 }
116 }
117
118 cfg_rt! {
119 /// Forcibly removes the budgeting constraints early.
120 ///
121 /// Returns the remaining budget
122 pub(crate) fn stop() -> Budget {
123 CURRENT.with(|cell| {
124 let prev = cell.get();
125 cell.set(Budget::unconstrained());
126 prev
127 })
128 }
129 }
130
131 cfg_coop! {
132 use std::task::{Context, Poll};
133
134 #[must_use]
135 pub(crate) struct RestoreOnPending(Cell<Budget>);
136
137 impl RestoreOnPending {
138 pub(crate) fn made_progress(&self) {
139 self.0.set(Budget::unconstrained());
140 }
141 }
142
143 impl Drop for RestoreOnPending {
144 fn drop(&mut self) {
145 // Don't reset if budget was unconstrained or if we made progress.
146 // They are both represented as the remembered budget being unconstrained.
147 let budget = self.0.get();
148 if !budget.is_unconstrained() {
149 CURRENT.with(|cell| {
150 cell.set(budget);
151 });
152 }
153 }
154 }
155
156 /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
157 ///
158 /// When you call this method, the current budget is decremented. However, to ensure that
159 /// progress is made every time a task is polled, the budget is automatically restored to its
160 /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
161 /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
162 /// that the budget empties appropriately.
163 ///
164 /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
165 /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
166 /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
167 /// that progress was made.
168 #[inline]
169 pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
170 CURRENT.with(|cell| {
171 let mut budget = cell.get();
172
173 if budget.decrement() {
174 let restore = RestoreOnPending(Cell::new(cell.get()));
175 cell.set(budget);
176 Poll::Ready(restore)
177 } else {
178 cx.waker().wake_by_ref();
179 Poll::Pending
180 }
181 })
182 }
183
184 impl Budget {
185 /// Decrements the budget. Returns `true` if successful. Decrementing fails
186 /// when there is not enough remaining budget.
187 fn decrement(&mut self) -> bool {
188 if let Some(num) = &mut self.0 {
189 if *num > 0 {
190 *num -= 1;
191 true
192 } else {
193 false
194 }
195 } else {
196 true
197 }
198 }
199
200 fn is_unconstrained(self) -> bool {
201 self.0.is_none()
202 }
203 }
204 }
205
206 #[cfg(all(test, not(loom)))]
207 mod test {
208 use super::*;
209
210 #[cfg(target_arch = "wasm32")]
211 use wasm_bindgen_test::wasm_bindgen_test as test;
212
get() -> Budget213 fn get() -> Budget {
214 CURRENT.with(|cell| cell.get())
215 }
216
217 #[test]
bugeting()218 fn bugeting() {
219 use futures::future::poll_fn;
220 use tokio_test::*;
221
222 assert!(get().0.is_none());
223
224 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
225
226 assert!(get().0.is_none());
227 drop(coop);
228 assert!(get().0.is_none());
229
230 budget(|| {
231 assert_eq!(get().0, Budget::initial().0);
232
233 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
234 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
235 drop(coop);
236 // we didn't make progress
237 assert_eq!(get().0, Budget::initial().0);
238
239 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
240 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
241 coop.made_progress();
242 drop(coop);
243 // we _did_ make progress
244 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
245
246 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
247 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
248 coop.made_progress();
249 drop(coop);
250 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
251
252 budget(|| {
253 assert_eq!(get().0, Budget::initial().0);
254
255 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
256 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
257 coop.made_progress();
258 drop(coop);
259 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
260 });
261
262 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
263 });
264
265 assert!(get().0.is_none());
266
267 budget(|| {
268 let n = get().0.unwrap();
269
270 for _ in 0..n {
271 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
272 coop.made_progress();
273 }
274
275 let mut task = task::spawn(poll_fn(|cx| {
276 let coop = ready!(poll_proceed(cx));
277 coop.made_progress();
278 Poll::Ready(())
279 }));
280
281 assert_pending!(task.poll());
282 });
283 }
284 }
285