1 use super::{Interest, Ready, ReadyEvent, Tick};
2 use crate::loom::sync::atomic::AtomicUsize;
3 use crate::loom::sync::Mutex;
4 use crate::util::bit;
5 use crate::util::slab::Entry;
6 use crate::util::WakeList;
7 
8 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
9 use std::task::{Context, Poll, Waker};
10 
11 use super::Direction;
12 
13 cfg_io_readiness! {
14     use crate::util::linked_list::{self, LinkedList};
15 
16     use std::cell::UnsafeCell;
17     use std::future::Future;
18     use std::marker::PhantomPinned;
19     use std::pin::Pin;
20     use std::ptr::NonNull;
21 }
22 
23 /// Stored in the I/O driver resource slab.
24 #[derive(Debug)]
25 pub(crate) struct ScheduledIo {
26     /// Packs the resource's readiness with the resource's generation.
27     readiness: AtomicUsize,
28 
29     waiters: Mutex<Waiters>,
30 }
31 
32 cfg_io_readiness! {
33     type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
34 }
35 
36 #[derive(Debug, Default)]
37 struct Waiters {
38     #[cfg(feature = "net")]
39     /// List of all current waiters.
40     list: WaitList,
41 
42     /// Waker used for AsyncRead.
43     reader: Option<Waker>,
44 
45     /// Waker used for AsyncWrite.
46     writer: Option<Waker>,
47 
48     /// True if this ScheduledIo has been killed due to IO driver shutdown.
49     is_shutdown: bool,
50 }
51 
52 cfg_io_readiness! {
53     #[derive(Debug)]
54     struct Waiter {
55         pointers: linked_list::Pointers<Waiter>,
56 
57         /// The waker for this task.
58         waker: Option<Waker>,
59 
60         /// The interest this waiter is waiting on.
61         interest: Interest,
62 
63         is_ready: bool,
64 
65         /// Should never be `!Unpin`.
66         _p: PhantomPinned,
67     }
68 
69     /// Future returned by `readiness()`.
70     struct Readiness<'a> {
71         scheduled_io: &'a ScheduledIo,
72 
73         state: State,
74 
75         /// Entry in the waiter `LinkedList`.
76         waiter: UnsafeCell<Waiter>,
77     }
78 
79     enum State {
80         Init,
81         Waiting,
82         Done,
83     }
84 }
85 
86 // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
87 //
88 // | reserved | generation |  driver tick | readiness |
89 // |----------+------------+--------------+-----------|
90 // |   1 bit  |   7 bits   +    8 bits    +   16 bits |
91 
92 const READINESS: bit::Pack = bit::Pack::least_significant(16);
93 
94 const TICK: bit::Pack = READINESS.then(8);
95 
96 const GENERATION: bit::Pack = TICK.then(7);
97 
98 #[test]
test_generations_assert_same()99 fn test_generations_assert_same() {
100     assert_eq!(super::GENERATION, GENERATION);
101 }
102 
103 // ===== impl ScheduledIo =====
104 
105 impl Entry for ScheduledIo {
reset(&self)106     fn reset(&self) {
107         let state = self.readiness.load(Acquire);
108 
109         let generation = GENERATION.unpack(state);
110         let next = GENERATION.pack_lossy(generation + 1, 0);
111 
112         self.readiness.store(next, Release);
113     }
114 }
115 
116 impl Default for ScheduledIo {
default() -> ScheduledIo117     fn default() -> ScheduledIo {
118         ScheduledIo {
119             readiness: AtomicUsize::new(0),
120             waiters: Mutex::new(Default::default()),
121         }
122     }
123 }
124 
125 impl ScheduledIo {
generation(&self) -> usize126     pub(crate) fn generation(&self) -> usize {
127         GENERATION.unpack(self.readiness.load(Acquire))
128     }
129 
130     /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
131     /// permanently ready state.
shutdown(&self)132     pub(super) fn shutdown(&self) {
133         self.wake0(Ready::ALL, true)
134     }
135 
136     /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
137     /// the current value, returning the previous readiness value.
138     ///
139     /// # Arguments
140     /// - `token`: the token for this `ScheduledIo`.
141     /// - `tick`: whether setting the tick or trying to clear readiness for a
142     ///    specific tick.
143     /// - `f`: a closure returning a new readiness value given the previous
144     ///   readiness.
145     ///
146     /// # Returns
147     ///
148     /// If the given token's generation no longer matches the `ScheduledIo`'s
149     /// generation, then the corresponding IO resource has been removed and
150     /// replaced with a new resource. In that case, this method returns `Err`.
151     /// Otherwise, this returns the previous readiness.
set_readiness( &self, token: Option<usize>, tick: Tick, f: impl Fn(Ready) -> Ready, ) -> Result<(), ()>152     pub(super) fn set_readiness(
153         &self,
154         token: Option<usize>,
155         tick: Tick,
156         f: impl Fn(Ready) -> Ready,
157     ) -> Result<(), ()> {
158         let mut current = self.readiness.load(Acquire);
159 
160         loop {
161             let current_generation = GENERATION.unpack(current);
162 
163             if let Some(token) = token {
164                 // Check that the generation for this access is still the
165                 // current one.
166                 if GENERATION.unpack(token) != current_generation {
167                     return Err(());
168                 }
169             }
170 
171             // Mask out the tick/generation bits so that the modifying
172             // function doesn't see them.
173             let current_readiness = Ready::from_usize(current);
174             let new = f(current_readiness);
175 
176             let packed = match tick {
177                 Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
178                 Tick::Clear(t) => {
179                     if TICK.unpack(current) as u8 != t {
180                         // Trying to clear readiness with an old event!
181                         return Err(());
182                     }
183 
184                     TICK.pack(t as usize, new.as_usize())
185                 }
186             };
187 
188             let next = GENERATION.pack(current_generation, packed);
189 
190             match self
191                 .readiness
192                 .compare_exchange(current, next, AcqRel, Acquire)
193             {
194                 Ok(_) => return Ok(()),
195                 // we lost the race, retry!
196                 Err(actual) => current = actual,
197             }
198         }
199     }
200 
201     /// Notifies all pending waiters that have registered interest in `ready`.
202     ///
203     /// There may be many waiters to notify. Waking the pending task **must** be
204     /// done from outside of the lock otherwise there is a potential for a
205     /// deadlock.
206     ///
207     /// A stack array of wakers is created and filled with wakers to notify, the
208     /// lock is released, and the wakers are notified. Because there may be more
209     /// than 32 wakers to notify, if the stack array fills up, the lock is
210     /// released, the array is cleared, and the iteration continues.
wake(&self, ready: Ready)211     pub(super) fn wake(&self, ready: Ready) {
212         self.wake0(ready, false);
213     }
214 
wake0(&self, ready: Ready, shutdown: bool)215     fn wake0(&self, ready: Ready, shutdown: bool) {
216         let mut wakers = WakeList::new();
217 
218         let mut waiters = self.waiters.lock();
219 
220         waiters.is_shutdown |= shutdown;
221 
222         // check for AsyncRead slot
223         if ready.is_readable() {
224             if let Some(waker) = waiters.reader.take() {
225                 wakers.push(waker);
226             }
227         }
228 
229         // check for AsyncWrite slot
230         if ready.is_writable() {
231             if let Some(waker) = waiters.writer.take() {
232                 wakers.push(waker);
233             }
234         }
235 
236         #[cfg(feature = "net")]
237         'outer: loop {
238             let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
239 
240             while wakers.can_push() {
241                 match iter.next() {
242                     Some(waiter) => {
243                         let waiter = unsafe { &mut *waiter.as_ptr() };
244 
245                         if let Some(waker) = waiter.waker.take() {
246                             waiter.is_ready = true;
247                             wakers.push(waker);
248                         }
249                     }
250                     None => {
251                         break 'outer;
252                     }
253                 }
254             }
255 
256             drop(waiters);
257 
258             wakers.wake_all();
259 
260             // Acquire the lock again.
261             waiters = self.waiters.lock();
262         }
263 
264         // Release the lock before notifying
265         drop(waiters);
266 
267         wakers.wake_all();
268     }
269 
ready_event(&self, interest: Interest) -> ReadyEvent270     pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
271         let curr = self.readiness.load(Acquire);
272 
273         ReadyEvent {
274             tick: TICK.unpack(curr) as u8,
275             ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
276         }
277     }
278 
279     /// Polls for readiness events in a given direction.
280     ///
281     /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
282     /// which cannot use the `async fn` version. This uses reserved reader
283     /// and writer slots.
poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<ReadyEvent>284     pub(super) fn poll_readiness(
285         &self,
286         cx: &mut Context<'_>,
287         direction: Direction,
288     ) -> Poll<ReadyEvent> {
289         let curr = self.readiness.load(Acquire);
290 
291         let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
292 
293         if ready.is_empty() {
294             // Update the task info
295             let mut waiters = self.waiters.lock();
296             let slot = match direction {
297                 Direction::Read => &mut waiters.reader,
298                 Direction::Write => &mut waiters.writer,
299             };
300 
301             // Avoid cloning the waker if one is already stored that matches the
302             // current task.
303             match slot {
304                 Some(existing) => {
305                     if !existing.will_wake(cx.waker()) {
306                         *existing = cx.waker().clone();
307                     }
308                 }
309                 None => {
310                     *slot = Some(cx.waker().clone());
311                 }
312             }
313 
314             // Try again, in case the readiness was changed while we were
315             // taking the waiters lock
316             let curr = self.readiness.load(Acquire);
317             let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
318             if waiters.is_shutdown {
319                 Poll::Ready(ReadyEvent {
320                     tick: TICK.unpack(curr) as u8,
321                     ready: direction.mask(),
322                 })
323             } else if ready.is_empty() {
324                 Poll::Pending
325             } else {
326                 Poll::Ready(ReadyEvent {
327                     tick: TICK.unpack(curr) as u8,
328                     ready,
329                 })
330             }
331         } else {
332             Poll::Ready(ReadyEvent {
333                 tick: TICK.unpack(curr) as u8,
334                 ready,
335             })
336         }
337     }
338 
clear_readiness(&self, event: ReadyEvent)339     pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
340         // This consumes the current readiness state **except** for closed
341         // states. Closed states are excluded because they are final states.
342         let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
343 
344         // result isn't important
345         let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
346     }
347 
clear_wakers(&self)348     pub(crate) fn clear_wakers(&self) {
349         let mut waiters = self.waiters.lock();
350         waiters.reader.take();
351         waiters.writer.take();
352     }
353 }
354 
355 impl Drop for ScheduledIo {
drop(&mut self)356     fn drop(&mut self) {
357         self.wake(Ready::ALL);
358     }
359 }
360 
361 unsafe impl Send for ScheduledIo {}
362 unsafe impl Sync for ScheduledIo {}
363 
364 cfg_io_readiness! {
365     impl ScheduledIo {
366         /// An async version of `poll_readiness` which uses a linked list of wakers.
367         pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
368             self.readiness_fut(interest).await
369         }
370 
371         // This is in a separate function so that the borrow checker doesn't think
372         // we are borrowing the `UnsafeCell` possibly over await boundaries.
373         //
374         // Go figure.
375         fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
376             Readiness {
377                 scheduled_io: self,
378                 state: State::Init,
379                 waiter: UnsafeCell::new(Waiter {
380                     pointers: linked_list::Pointers::new(),
381                     waker: None,
382                     is_ready: false,
383                     interest,
384                     _p: PhantomPinned,
385                 }),
386             }
387         }
388     }
389 
390     unsafe impl linked_list::Link for Waiter {
391         type Handle = NonNull<Waiter>;
392         type Target = Waiter;
393 
394         fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
395             *handle
396         }
397 
398         unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
399             ptr
400         }
401 
402         unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
403             NonNull::from(&mut target.as_mut().pointers)
404         }
405     }
406 
407     // ===== impl Readiness =====
408 
409     impl Future for Readiness<'_> {
410         type Output = ReadyEvent;
411 
412         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
413             use std::sync::atomic::Ordering::SeqCst;
414 
415             let (scheduled_io, state, waiter) = unsafe {
416                 let me = self.get_unchecked_mut();
417                 (&me.scheduled_io, &mut me.state, &me.waiter)
418             };
419 
420             loop {
421                 match *state {
422                     State::Init => {
423                         // Optimistically check existing readiness
424                         let curr = scheduled_io.readiness.load(SeqCst);
425                         let ready = Ready::from_usize(READINESS.unpack(curr));
426 
427                         // Safety: `waiter.interest` never changes
428                         let interest = unsafe { (*waiter.get()).interest };
429                         let ready = ready.intersection(interest);
430 
431                         if !ready.is_empty() {
432                             // Currently ready!
433                             let tick = TICK.unpack(curr) as u8;
434                             *state = State::Done;
435                             return Poll::Ready(ReadyEvent { tick, ready });
436                         }
437 
438                         // Wasn't ready, take the lock (and check again while locked).
439                         let mut waiters = scheduled_io.waiters.lock();
440 
441                         let curr = scheduled_io.readiness.load(SeqCst);
442                         let mut ready = Ready::from_usize(READINESS.unpack(curr));
443 
444                         if waiters.is_shutdown {
445                             ready = Ready::ALL;
446                         }
447 
448                         let ready = ready.intersection(interest);
449 
450                         if !ready.is_empty() {
451                             // Currently ready!
452                             let tick = TICK.unpack(curr) as u8;
453                             *state = State::Done;
454                             return Poll::Ready(ReadyEvent { tick, ready });
455                         }
456 
457                         // Not ready even after locked, insert into list...
458 
459                         // Safety: called while locked
460                         unsafe {
461                             (*waiter.get()).waker = Some(cx.waker().clone());
462                         }
463 
464                         // Insert the waiter into the linked list
465                         //
466                         // safety: pointers from `UnsafeCell` are never null.
467                         waiters
468                             .list
469                             .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
470                         *state = State::Waiting;
471                     }
472                     State::Waiting => {
473                         // Currently in the "Waiting" state, implying the caller has
474                         // a waiter stored in the waiter list (guarded by
475                         // `notify.waiters`). In order to access the waker fields,
476                         // we must hold the lock.
477 
478                         let waiters = scheduled_io.waiters.lock();
479 
480                         // Safety: called while locked
481                         let w = unsafe { &mut *waiter.get() };
482 
483                         if w.is_ready {
484                             // Our waker has been notified.
485                             *state = State::Done;
486                         } else {
487                             // Update the waker, if necessary.
488                             if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
489                                 w.waker = Some(cx.waker().clone());
490                             }
491 
492                             return Poll::Pending;
493                         }
494 
495                         // Explicit drop of the lock to indicate the scope that the
496                         // lock is held. Because holding the lock is required to
497                         // ensure safe access to fields not held within the lock, it
498                         // is helpful to visualize the scope of the critical
499                         // section.
500                         drop(waiters);
501                     }
502                     State::Done => {
503                         let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;
504 
505                         // Safety: State::Done means it is no longer shared
506                         let w = unsafe { &mut *waiter.get() };
507 
508                         return Poll::Ready(ReadyEvent {
509                             tick,
510                             ready: Ready::from_interest(w.interest),
511                         });
512                     }
513                 }
514             }
515         }
516     }
517 
518     impl Drop for Readiness<'_> {
519         fn drop(&mut self) {
520             let mut waiters = self.scheduled_io.waiters.lock();
521 
522             // Safety: `waiter` is only ever stored in `waiters`
523             unsafe {
524                 waiters
525                     .list
526                     .remove(NonNull::new_unchecked(self.waiter.get()))
527             };
528         }
529     }
530 
531     unsafe impl Send for Readiness<'_> {}
532     unsafe impl Sync for Readiness<'_> {}
533 }
534