1 use super::{Interest, Ready, ReadyEvent, Tick};
2 use crate::loom::sync::atomic::AtomicUsize;
3 use crate::loom::sync::Mutex;
4 use crate::util::bit;
5 use crate::util::slab::Entry;
6 use crate::util::WakeList;
7
8 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
9 use std::task::{Context, Poll, Waker};
10
11 use super::Direction;
12
13 cfg_io_readiness! {
14 use crate::util::linked_list::{self, LinkedList};
15
16 use std::cell::UnsafeCell;
17 use std::future::Future;
18 use std::marker::PhantomPinned;
19 use std::pin::Pin;
20 use std::ptr::NonNull;
21 }
22
23 /// Stored in the I/O driver resource slab.
24 #[derive(Debug)]
25 pub(crate) struct ScheduledIo {
26 /// Packs the resource's readiness with the resource's generation.
27 readiness: AtomicUsize,
28
29 waiters: Mutex<Waiters>,
30 }
31
32 cfg_io_readiness! {
33 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
34 }
35
36 #[derive(Debug, Default)]
37 struct Waiters {
38 #[cfg(feature = "net")]
39 /// List of all current waiters.
40 list: WaitList,
41
42 /// Waker used for AsyncRead.
43 reader: Option<Waker>,
44
45 /// Waker used for AsyncWrite.
46 writer: Option<Waker>,
47
48 /// True if this ScheduledIo has been killed due to IO driver shutdown.
49 is_shutdown: bool,
50 }
51
52 cfg_io_readiness! {
53 #[derive(Debug)]
54 struct Waiter {
55 pointers: linked_list::Pointers<Waiter>,
56
57 /// The waker for this task.
58 waker: Option<Waker>,
59
60 /// The interest this waiter is waiting on.
61 interest: Interest,
62
63 is_ready: bool,
64
65 /// Should never be `!Unpin`.
66 _p: PhantomPinned,
67 }
68
69 /// Future returned by `readiness()`.
70 struct Readiness<'a> {
71 scheduled_io: &'a ScheduledIo,
72
73 state: State,
74
75 /// Entry in the waiter `LinkedList`.
76 waiter: UnsafeCell<Waiter>,
77 }
78
79 enum State {
80 Init,
81 Waiting,
82 Done,
83 }
84 }
85
86 // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
87 //
88 // | reserved | generation | driver tick | readiness |
89 // |----------+------------+--------------+-----------|
90 // | 1 bit | 7 bits + 8 bits + 16 bits |
91
92 const READINESS: bit::Pack = bit::Pack::least_significant(16);
93
94 const TICK: bit::Pack = READINESS.then(8);
95
96 const GENERATION: bit::Pack = TICK.then(7);
97
98 #[test]
test_generations_assert_same()99 fn test_generations_assert_same() {
100 assert_eq!(super::GENERATION, GENERATION);
101 }
102
103 // ===== impl ScheduledIo =====
104
105 impl Entry for ScheduledIo {
reset(&self)106 fn reset(&self) {
107 let state = self.readiness.load(Acquire);
108
109 let generation = GENERATION.unpack(state);
110 let next = GENERATION.pack_lossy(generation + 1, 0);
111
112 self.readiness.store(next, Release);
113 }
114 }
115
116 impl Default for ScheduledIo {
default() -> ScheduledIo117 fn default() -> ScheduledIo {
118 ScheduledIo {
119 readiness: AtomicUsize::new(0),
120 waiters: Mutex::new(Default::default()),
121 }
122 }
123 }
124
125 impl ScheduledIo {
generation(&self) -> usize126 pub(crate) fn generation(&self) -> usize {
127 GENERATION.unpack(self.readiness.load(Acquire))
128 }
129
130 /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
131 /// permanently ready state.
shutdown(&self)132 pub(super) fn shutdown(&self) {
133 self.wake0(Ready::ALL, true)
134 }
135
136 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
137 /// the current value, returning the previous readiness value.
138 ///
139 /// # Arguments
140 /// - `token`: the token for this `ScheduledIo`.
141 /// - `tick`: whether setting the tick or trying to clear readiness for a
142 /// specific tick.
143 /// - `f`: a closure returning a new readiness value given the previous
144 /// readiness.
145 ///
146 /// # Returns
147 ///
148 /// If the given token's generation no longer matches the `ScheduledIo`'s
149 /// generation, then the corresponding IO resource has been removed and
150 /// replaced with a new resource. In that case, this method returns `Err`.
151 /// Otherwise, this returns the previous readiness.
set_readiness( &self, token: Option<usize>, tick: Tick, f: impl Fn(Ready) -> Ready, ) -> Result<(), ()>152 pub(super) fn set_readiness(
153 &self,
154 token: Option<usize>,
155 tick: Tick,
156 f: impl Fn(Ready) -> Ready,
157 ) -> Result<(), ()> {
158 let mut current = self.readiness.load(Acquire);
159
160 loop {
161 let current_generation = GENERATION.unpack(current);
162
163 if let Some(token) = token {
164 // Check that the generation for this access is still the
165 // current one.
166 if GENERATION.unpack(token) != current_generation {
167 return Err(());
168 }
169 }
170
171 // Mask out the tick/generation bits so that the modifying
172 // function doesn't see them.
173 let current_readiness = Ready::from_usize(current);
174 let new = f(current_readiness);
175
176 let packed = match tick {
177 Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
178 Tick::Clear(t) => {
179 if TICK.unpack(current) as u8 != t {
180 // Trying to clear readiness with an old event!
181 return Err(());
182 }
183
184 TICK.pack(t as usize, new.as_usize())
185 }
186 };
187
188 let next = GENERATION.pack(current_generation, packed);
189
190 match self
191 .readiness
192 .compare_exchange(current, next, AcqRel, Acquire)
193 {
194 Ok(_) => return Ok(()),
195 // we lost the race, retry!
196 Err(actual) => current = actual,
197 }
198 }
199 }
200
201 /// Notifies all pending waiters that have registered interest in `ready`.
202 ///
203 /// There may be many waiters to notify. Waking the pending task **must** be
204 /// done from outside of the lock otherwise there is a potential for a
205 /// deadlock.
206 ///
207 /// A stack array of wakers is created and filled with wakers to notify, the
208 /// lock is released, and the wakers are notified. Because there may be more
209 /// than 32 wakers to notify, if the stack array fills up, the lock is
210 /// released, the array is cleared, and the iteration continues.
wake(&self, ready: Ready)211 pub(super) fn wake(&self, ready: Ready) {
212 self.wake0(ready, false);
213 }
214
wake0(&self, ready: Ready, shutdown: bool)215 fn wake0(&self, ready: Ready, shutdown: bool) {
216 let mut wakers = WakeList::new();
217
218 let mut waiters = self.waiters.lock();
219
220 waiters.is_shutdown |= shutdown;
221
222 // check for AsyncRead slot
223 if ready.is_readable() {
224 if let Some(waker) = waiters.reader.take() {
225 wakers.push(waker);
226 }
227 }
228
229 // check for AsyncWrite slot
230 if ready.is_writable() {
231 if let Some(waker) = waiters.writer.take() {
232 wakers.push(waker);
233 }
234 }
235
236 #[cfg(feature = "net")]
237 'outer: loop {
238 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
239
240 while wakers.can_push() {
241 match iter.next() {
242 Some(waiter) => {
243 let waiter = unsafe { &mut *waiter.as_ptr() };
244
245 if let Some(waker) = waiter.waker.take() {
246 waiter.is_ready = true;
247 wakers.push(waker);
248 }
249 }
250 None => {
251 break 'outer;
252 }
253 }
254 }
255
256 drop(waiters);
257
258 wakers.wake_all();
259
260 // Acquire the lock again.
261 waiters = self.waiters.lock();
262 }
263
264 // Release the lock before notifying
265 drop(waiters);
266
267 wakers.wake_all();
268 }
269
ready_event(&self, interest: Interest) -> ReadyEvent270 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
271 let curr = self.readiness.load(Acquire);
272
273 ReadyEvent {
274 tick: TICK.unpack(curr) as u8,
275 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
276 }
277 }
278
279 /// Polls for readiness events in a given direction.
280 ///
281 /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
282 /// which cannot use the `async fn` version. This uses reserved reader
283 /// and writer slots.
poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<ReadyEvent>284 pub(super) fn poll_readiness(
285 &self,
286 cx: &mut Context<'_>,
287 direction: Direction,
288 ) -> Poll<ReadyEvent> {
289 let curr = self.readiness.load(Acquire);
290
291 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
292
293 if ready.is_empty() {
294 // Update the task info
295 let mut waiters = self.waiters.lock();
296 let slot = match direction {
297 Direction::Read => &mut waiters.reader,
298 Direction::Write => &mut waiters.writer,
299 };
300
301 // Avoid cloning the waker if one is already stored that matches the
302 // current task.
303 match slot {
304 Some(existing) => {
305 if !existing.will_wake(cx.waker()) {
306 *existing = cx.waker().clone();
307 }
308 }
309 None => {
310 *slot = Some(cx.waker().clone());
311 }
312 }
313
314 // Try again, in case the readiness was changed while we were
315 // taking the waiters lock
316 let curr = self.readiness.load(Acquire);
317 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
318 if waiters.is_shutdown {
319 Poll::Ready(ReadyEvent {
320 tick: TICK.unpack(curr) as u8,
321 ready: direction.mask(),
322 })
323 } else if ready.is_empty() {
324 Poll::Pending
325 } else {
326 Poll::Ready(ReadyEvent {
327 tick: TICK.unpack(curr) as u8,
328 ready,
329 })
330 }
331 } else {
332 Poll::Ready(ReadyEvent {
333 tick: TICK.unpack(curr) as u8,
334 ready,
335 })
336 }
337 }
338
clear_readiness(&self, event: ReadyEvent)339 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
340 // This consumes the current readiness state **except** for closed
341 // states. Closed states are excluded because they are final states.
342 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
343
344 // result isn't important
345 let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
346 }
347
clear_wakers(&self)348 pub(crate) fn clear_wakers(&self) {
349 let mut waiters = self.waiters.lock();
350 waiters.reader.take();
351 waiters.writer.take();
352 }
353 }
354
355 impl Drop for ScheduledIo {
drop(&mut self)356 fn drop(&mut self) {
357 self.wake(Ready::ALL);
358 }
359 }
360
361 unsafe impl Send for ScheduledIo {}
362 unsafe impl Sync for ScheduledIo {}
363
364 cfg_io_readiness! {
365 impl ScheduledIo {
366 /// An async version of `poll_readiness` which uses a linked list of wakers.
367 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
368 self.readiness_fut(interest).await
369 }
370
371 // This is in a separate function so that the borrow checker doesn't think
372 // we are borrowing the `UnsafeCell` possibly over await boundaries.
373 //
374 // Go figure.
375 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
376 Readiness {
377 scheduled_io: self,
378 state: State::Init,
379 waiter: UnsafeCell::new(Waiter {
380 pointers: linked_list::Pointers::new(),
381 waker: None,
382 is_ready: false,
383 interest,
384 _p: PhantomPinned,
385 }),
386 }
387 }
388 }
389
390 unsafe impl linked_list::Link for Waiter {
391 type Handle = NonNull<Waiter>;
392 type Target = Waiter;
393
394 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
395 *handle
396 }
397
398 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
399 ptr
400 }
401
402 unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
403 NonNull::from(&mut target.as_mut().pointers)
404 }
405 }
406
407 // ===== impl Readiness =====
408
409 impl Future for Readiness<'_> {
410 type Output = ReadyEvent;
411
412 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
413 use std::sync::atomic::Ordering::SeqCst;
414
415 let (scheduled_io, state, waiter) = unsafe {
416 let me = self.get_unchecked_mut();
417 (&me.scheduled_io, &mut me.state, &me.waiter)
418 };
419
420 loop {
421 match *state {
422 State::Init => {
423 // Optimistically check existing readiness
424 let curr = scheduled_io.readiness.load(SeqCst);
425 let ready = Ready::from_usize(READINESS.unpack(curr));
426
427 // Safety: `waiter.interest` never changes
428 let interest = unsafe { (*waiter.get()).interest };
429 let ready = ready.intersection(interest);
430
431 if !ready.is_empty() {
432 // Currently ready!
433 let tick = TICK.unpack(curr) as u8;
434 *state = State::Done;
435 return Poll::Ready(ReadyEvent { tick, ready });
436 }
437
438 // Wasn't ready, take the lock (and check again while locked).
439 let mut waiters = scheduled_io.waiters.lock();
440
441 let curr = scheduled_io.readiness.load(SeqCst);
442 let mut ready = Ready::from_usize(READINESS.unpack(curr));
443
444 if waiters.is_shutdown {
445 ready = Ready::ALL;
446 }
447
448 let ready = ready.intersection(interest);
449
450 if !ready.is_empty() {
451 // Currently ready!
452 let tick = TICK.unpack(curr) as u8;
453 *state = State::Done;
454 return Poll::Ready(ReadyEvent { tick, ready });
455 }
456
457 // Not ready even after locked, insert into list...
458
459 // Safety: called while locked
460 unsafe {
461 (*waiter.get()).waker = Some(cx.waker().clone());
462 }
463
464 // Insert the waiter into the linked list
465 //
466 // safety: pointers from `UnsafeCell` are never null.
467 waiters
468 .list
469 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
470 *state = State::Waiting;
471 }
472 State::Waiting => {
473 // Currently in the "Waiting" state, implying the caller has
474 // a waiter stored in the waiter list (guarded by
475 // `notify.waiters`). In order to access the waker fields,
476 // we must hold the lock.
477
478 let waiters = scheduled_io.waiters.lock();
479
480 // Safety: called while locked
481 let w = unsafe { &mut *waiter.get() };
482
483 if w.is_ready {
484 // Our waker has been notified.
485 *state = State::Done;
486 } else {
487 // Update the waker, if necessary.
488 if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
489 w.waker = Some(cx.waker().clone());
490 }
491
492 return Poll::Pending;
493 }
494
495 // Explicit drop of the lock to indicate the scope that the
496 // lock is held. Because holding the lock is required to
497 // ensure safe access to fields not held within the lock, it
498 // is helpful to visualize the scope of the critical
499 // section.
500 drop(waiters);
501 }
502 State::Done => {
503 let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;
504
505 // Safety: State::Done means it is no longer shared
506 let w = unsafe { &mut *waiter.get() };
507
508 return Poll::Ready(ReadyEvent {
509 tick,
510 ready: Ready::from_interest(w.interest),
511 });
512 }
513 }
514 }
515 }
516 }
517
518 impl Drop for Readiness<'_> {
519 fn drop(&mut self) {
520 let mut waiters = self.scheduled_io.waiters.lock();
521
522 // Safety: `waiter` is only ever stored in `waiters`
523 unsafe {
524 waiters
525 .list
526 .remove(NonNull::new_unchecked(self.waiter.get()))
527 };
528 }
529 }
530
531 unsafe impl Send for Readiness<'_> {}
532 unsafe impl Sync for Readiness<'_> {}
533 }
534