1 #![cfg_attr(not(feature = "full"), allow(dead_code))]
2
3 //! Yield points for improved cooperative scheduling.
4 //!
5 //! Documentation for this can be found in the [`tokio::task`] module.
6 //!
7 //! [`tokio::task`]: crate::task.
8
9 // ```ignore
10 // # use tokio_stream::{Stream, StreamExt};
11 // async fn drop_all<I: Stream + Unpin>(mut input: I) {
12 // while let Some(_) = input.next().await {
13 // tokio::coop::proceed().await;
14 // }
15 // }
16 // ```
17 //
18 // The `proceed` future will coordinate with the executor to make sure that
19 // every so often control is yielded back to the executor so it can run other
20 // tasks.
21 //
22 // # Placing yield points
23 //
24 // Voluntary yield points should be placed _after_ at least some work has been
25 // done. If they are not, a future sufficiently deep in the task hierarchy may
26 // end up _never_ getting to run because of the number of yield points that
27 // inevitably appear before it is reached. In general, you will want yield
28 // points to only appear in "leaf" futures -- those that do not themselves poll
29 // other futures. By doing this, you avoid double-counting each iteration of
30 // the outer future against the cooperating budget.
31
32 use std::cell::Cell;
33
34 thread_local! {
35 static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
36 }
37
38 /// Opaque type tracking the amount of "work" a task may still do before
39 /// yielding back to the scheduler.
40 #[derive(Debug, Copy, Clone)]
41 pub(crate) struct Budget(Option<u8>);
42
43 impl Budget {
44 /// Budget assigned to a task on each poll.
45 ///
46 /// The value itself is chosen somewhat arbitrarily. It needs to be high
47 /// enough to amortize wakeup and scheduling costs, but low enough that we
48 /// do not starve other tasks for too long. The value also needs to be high
49 /// enough that particularly deep tasks are able to do at least some useful
50 /// work at all.
51 ///
52 /// Note that as more yield points are added in the ecosystem, this value
53 /// will probably also have to be raised.
initial() -> Budget54 const fn initial() -> Budget {
55 Budget(Some(128))
56 }
57
58 /// Returns an unconstrained budget. Operations will not be limited.
unconstrained() -> Budget59 const fn unconstrained() -> Budget {
60 Budget(None)
61 }
62 }
63
64 cfg_rt_multi_thread! {
65 impl Budget {
66 fn has_remaining(self) -> bool {
67 self.0.map(|budget| budget > 0).unwrap_or(true)
68 }
69 }
70 }
71
72 /// Runs the given closure with a cooperative task budget. When the function
73 /// returns, the budget is reset to the value prior to calling the function.
74 #[inline(always)]
budget<R>(f: impl FnOnce() -> R) -> R75 pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
76 with_budget(Budget::initial(), f)
77 }
78
79 /// Runs the given closure with an unconstrained task budget. When the function returns, the budget
80 /// is reset to the value prior to calling the function.
81 #[inline(always)]
with_unconstrained<R>(f: impl FnOnce() -> R) -> R82 pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
83 with_budget(Budget::unconstrained(), f)
84 }
85
86 #[inline(always)]
with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R87 fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
88 struct ResetGuard<'a> {
89 cell: &'a Cell<Budget>,
90 prev: Budget,
91 }
92
93 impl<'a> Drop for ResetGuard<'a> {
94 fn drop(&mut self) {
95 self.cell.set(self.prev);
96 }
97 }
98
99 CURRENT.with(move |cell| {
100 let prev = cell.get();
101
102 cell.set(budget);
103
104 let _guard = ResetGuard { cell, prev };
105
106 f()
107 })
108 }
109
110 cfg_rt_multi_thread! {
111 /// Sets the current task's budget.
112 pub(crate) fn set(budget: Budget) {
113 CURRENT.with(|cell| cell.set(budget))
114 }
115
116 #[inline(always)]
117 pub(crate) fn has_budget_remaining() -> bool {
118 CURRENT.with(|cell| cell.get().has_remaining())
119 }
120 }
121
122 cfg_rt! {
123 /// Forcibly removes the budgeting constraints early.
124 ///
125 /// Returns the remaining budget
126 pub(crate) fn stop() -> Budget {
127 CURRENT.with(|cell| {
128 let prev = cell.get();
129 cell.set(Budget::unconstrained());
130 prev
131 })
132 }
133 }
134
135 cfg_coop! {
136 use std::task::{Context, Poll};
137
138 #[must_use]
139 pub(crate) struct RestoreOnPending(Cell<Budget>);
140
141 impl RestoreOnPending {
142 pub(crate) fn made_progress(&self) {
143 self.0.set(Budget::unconstrained());
144 }
145 }
146
147 impl Drop for RestoreOnPending {
148 fn drop(&mut self) {
149 // Don't reset if budget was unconstrained or if we made progress.
150 // They are both represented as the remembered budget being unconstrained.
151 let budget = self.0.get();
152 if !budget.is_unconstrained() {
153 CURRENT.with(|cell| {
154 cell.set(budget);
155 });
156 }
157 }
158 }
159
160 /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
161 ///
162 /// When you call this method, the current budget is decremented. However, to ensure that
163 /// progress is made every time a task is polled, the budget is automatically restored to its
164 /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
165 /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
166 /// that the budget empties appropriately.
167 ///
168 /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
169 /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
170 /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
171 /// that progress was made.
172 #[inline]
173 pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
174 CURRENT.with(|cell| {
175 let mut budget = cell.get();
176
177 if budget.decrement() {
178 let restore = RestoreOnPending(Cell::new(cell.get()));
179 cell.set(budget);
180 Poll::Ready(restore)
181 } else {
182 cx.waker().wake_by_ref();
183 Poll::Pending
184 }
185 })
186 }
187
188 impl Budget {
189 /// Decrements the budget. Returns `true` if successful. Decrementing fails
190 /// when there is not enough remaining budget.
191 fn decrement(&mut self) -> bool {
192 if let Some(num) = &mut self.0 {
193 if *num > 0 {
194 *num -= 1;
195 true
196 } else {
197 false
198 }
199 } else {
200 true
201 }
202 }
203
204 fn is_unconstrained(self) -> bool {
205 self.0.is_none()
206 }
207 }
208 }
209
210 #[cfg(all(test, not(loom)))]
211 mod test {
212 use super::*;
213
get() -> Budget214 fn get() -> Budget {
215 CURRENT.with(|cell| cell.get())
216 }
217
218 #[test]
bugeting()219 fn bugeting() {
220 use futures::future::poll_fn;
221 use tokio_test::*;
222
223 assert!(get().0.is_none());
224
225 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
226
227 assert!(get().0.is_none());
228 drop(coop);
229 assert!(get().0.is_none());
230
231 budget(|| {
232 assert_eq!(get().0, Budget::initial().0);
233
234 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
235 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
236 drop(coop);
237 // we didn't make progress
238 assert_eq!(get().0, Budget::initial().0);
239
240 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
241 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
242 coop.made_progress();
243 drop(coop);
244 // we _did_ make progress
245 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
246
247 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
248 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
249 coop.made_progress();
250 drop(coop);
251 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
252
253 budget(|| {
254 assert_eq!(get().0, Budget::initial().0);
255
256 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
257 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
258 coop.made_progress();
259 drop(coop);
260 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
261 });
262
263 assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
264 });
265
266 assert!(get().0.is_none());
267
268 budget(|| {
269 let n = get().0.unwrap();
270
271 for _ in 0..n {
272 let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
273 coop.made_progress();
274 }
275
276 let mut task = task::spawn(poll_fn(|cx| {
277 let coop = ready!(poll_proceed(cx));
278 coop.made_progress();
279 Poll::Ready(())
280 }));
281
282 assert_pending!(task.poll());
283 });
284 }
285 }
286