1# Refactor I/O driver
2
3Describes changes to the I/O driver for the Tokio 0.3 release.
4
5## Goals
6
7* Support `async fn` on I/O types with `&self`.
8* Refine the `Registration` API.
9
10### Non-goals
11
12* Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type.
13
14## Overview
15
16Currently, I/O types require `&mut self` for `async` functions. The reason for
17this is the task's waker is stored in the I/O resource's internal state
18(`ScheduledIo`) instead of in the future returned by the `async` function.
19Because of this limitation, I/O types limit the number of wakers to one per
20direction (a direction is either read-related events or write-related events).
21
22Moving the waker from the internal I/O resource's state to the operation's
23future enables multiple wakers to be registered per operation. The "intrusive
24wake list" strategy used by `Notify` applies to this case, though there are some
25concerns unique to the I/O driver.
26
27## Reworking the `Registration` type
28
29While `Registration` is made private (per #2728), it remains in Tokio as an
30implementation detail backing I/O resources such as `TcpStream`. The API of
31`Registration` is updated to support waiting for an arbitrary interest set with
32`&self`. This supports concurrent waiters with a different readiness interest.
33
34```rust
35struct Registration { ... }
36
37// TODO: naming
38struct ReadyEvent {
39    tick: u32,
40    ready: mio::Ready,
41}
42
43impl Registration {
44    /// `interest` must be a super set of **all** interest sets specified in
45    /// the other methods. This is the interest set passed to `mio`.
46    pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration>
47        where T: mio::Evented;
48
49    /// Awaits for any readiness event included in `interest`. Returns a
50    /// `ReadyEvent` representing the received readiness event.
51    async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>;
52
53    /// Clears resource level readiness represented by the specified `ReadyEvent`
54    async fn clear_readiness(&self, ready_event: ReadyEvent);
55```
56
57A new registration is created for a `T: mio::Evented` and a `interest`. This
58creates a `ScheduledIo` entry with the I/O driver and registers the resource
59with `mio`.
60
61Because Tokio uses **edge-triggered** notifications, the I/O driver only
62receives readiness from the OS once the ready state **changes**. The I/O driver
63must track each resource's known readiness state. This helps prevent syscalls
64when the process knows the syscall should return with `EWOULDBLOCK`.
65
66A call to `readiness()` checks if the currently known resource readiness
67overlaps with `interest`. If it does, then the `readiness()` immediately
68returns. If it does not, then the task waits until the I/O driver receives a
69readiness event.
70
71The pseudocode to perform a TCP read is as follows.
72
73```rust
74async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
75    loop {
76        // Await readiness
77        let event = self.readiness(interest).await?;
78
79        match self.mio_socket.read(buf) {
80            Ok(v) => return Ok(v),
81            Err(ref e) if e.kind() == WouldBlock => {
82                self.clear_readiness(event);
83            }
84            Err(e) => return Err(e),
85        }
86    }
87}
88```
89
90## Reworking the `ScheduledIo` type
91
92The `ScheduledIo` type is switched to use an intrusive waker linked list. Each
93entry in the linked list includes the `interest` set passed to `readiness()`.
94
95```rust
96#[derive(Debug)]
97pub(crate) struct ScheduledIo {
98    /// Resource's known state packed with other state that must be
99    /// atomically updated.
100    readiness: AtomicUsize,
101
102    /// Tracks tasks waiting on the resource
103    waiters: Mutex<Waiters>,
104}
105
106#[derive(Debug)]
107struct Waiters {
108    // List of intrusive waiters.
109    list: LinkedList<Waiter>,
110
111    /// Waiter used by `AsyncRead` implementations.
112    reader: Option<Waker>,
113
114    /// Waiter used by `AsyncWrite` implementations.
115    writer: Option<Waker>,
116}
117
118// This struct is contained by the **future** returned by `readiness()`.
119#[derive(Debug)]
120struct Waiter {
121    /// Intrusive linked-list pointers
122    pointers: linked_list::Pointers<Waiter>,
123
124    /// Waker for task waiting on I/O resource
125    waiter: Option<Waker>,
126
127    /// Readiness events being waited on. This is
128    /// the value passed to `readiness()`
129    interest: mio::Ready,
130
131    /// Should not be `Unpin`.
132    _p: PhantomPinned,
133}
134```
135
136When an I/O event is received from `mio`, the associated resources' readiness is
137updated and the waiter list is iterated. All waiters with `interest` that
138overlap the received readiness event are notified. Any waiter with an `interest`
139that does not overlap the readiness event remains in the list.
140
141## Cancel interest on drop
142
143The future returned by `readiness()` uses an intrusive linked list to store the
144waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many
145wakers may be stored simultaneously in the list. If the `readiness()` future is
146dropped early, it is essential that the waker is removed from the list. This
147prevents leaking memory.
148
149## Race condition
150
151Consider how many tasks may concurrently attempt I/O operations. This, combined
152with how Tokio uses edge-triggered events, can result in a race condition. Let's
153revisit the TCP read function:
154
155```rust
156async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
157    loop {
158        // Await readiness
159        let event = self.readiness(interest).await?;
160
161        match self.mio_socket.read(buf) {
162            Ok(v) => return Ok(v),
163            Err(ref e) if e.kind() == WouldBlock => {
164                self.clear_readiness(event);
165            }
166            Err(e) => return Err(e),
167        }
168    }
169}
170```
171
172If care is not taken, if between `mio_socket.read(buf)` returning and
173`clear_readiness(event)` is called, a readiness event arrives, the `read()`
174function could deadlock. This happens because the readiness event is received,
175`clear_readiness()` unsets the readiness event, and on the next iteration,
176`readiness().await` will block forever as a new readiness event is not received.
177
178The current I/O driver handles this condition by always registering the task's
179waker before performing the operation. This is not ideal as it will result in
180unnecessary task notification.
181
182Instead, we will use a strategy to prevent clearing readiness if an "unseen"
183readiness event has been received. The I/O driver will maintain a "tick" value.
184Every time the `mio` `poll()` function is called, the tick is incremented. Each
185readiness event has an associated tick. When the I/O driver sets the resource's
186readiness, the driver's tick is packed into the atomic `usize`.
187
188The `ScheduledIo` readiness `AtomicUsize` is structured as:
189
190```
191| reserved | generation |  driver tick | readinesss |
192|----------+------------+--------------+------------|
193|   1 bit  |   7 bits   +    8 bits    +   16 bits  |
194```
195
196The `reserved` and `generation` components exist today.
197
198The `readiness()` function returns a `ReadyEvent` value. This value includes the
199`tick` component read with the resource's readiness value. When
200`clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only
201cleared if the current `tick` matches the `tick` included in the `ReadyEvent`.
202If the tick values do not match, the call to `readiness()` on the next iteration
203will not block and the new `tick` is included in the new `ReadyToken.`
204
205TODO
206
207## Implementing `AsyncRead` / `AsyncWrite`
208
209The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that
210it is not possible to use an intrusive linked list to track the waker.
211Additionally, there is no future associated with the operation which means it is
212not possible to cancel interest in the readiness events.
213
214To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated
215waker values for the read direction and the write direction. These values are
216used to store the waker. Specific `interest` is not tracked for `AsyncRead` and
217`AsyncWrite` implementations. It is assumed that only events of interest are:
218
219* Read ready
220* Read closed
221* Write ready
222* Write closed
223
224Note that "read closed" and "write closed" are only available with Mio 0.7. With
225Mio 0.6, things were a bit messy.
226
227It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types
228themselves and not for `&Resource`. Implementing the traits for `&Resource`
229would permit concurrent operations to the resource. Because only a single waker
230is stored per direction, any concurrent usage would result in deadlocks. An
231alternate implementation would call for a `Vec<Waker>` but this would result in
232memory leaks.
233
234## Enabling reads and writes for `&TcpStream`
235
236Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new
237function is added to `TcpStream`.
238
239```rust
240impl TcpStream {
241    /// Naming TBD
242    fn by_ref(&self) -> TcpStreamRef<'_>;
243}
244
245struct TcpStreamRef<'a> {
246    stream: &'a TcpStream,
247
248    // `Waiter` is the node in the intrusive waiter linked-list
249    read_waiter: Waiter,
250    write_waiter: Waiter,
251}
252```
253
254Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When
255the `TcpStreamRef` is dropped, all associated waker resources are cleaned up.
256
257### Removing all the `split()` functions
258
259With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead,
260it is possible to do something as follows.
261
262```rust
263let rd = my_stream.by_ref();
264let wr = my_stream.by_ref();
265
266select! {
267    // use `rd` and `wr` in separate branches.
268}
269```
270
271It is also possible to store a `TcpStream` in an `Arc`.
272
273```rust
274let arc_stream = Arc::new(my_tcp_stream);
275let n = arc_stream.by_ref().read(buf).await?;
276```
277