1 use std::io;
2 
3 use crate::codec::UserError::*;
4 use crate::codec::{RecvError, UserError};
5 use crate::frame::Reason;
6 use crate::proto::{self, PollReset};
7 
8 use self::Inner::*;
9 use self::Peer::*;
10 
11 /// Represents the state of an H2 stream
12 ///
13 /// ```not_rust
14 ///                              +--------+
15 ///                      send PP |        | recv PP
16 ///                     ,--------|  idle  |--------.
17 ///                    /         |        |         \
18 ///                   v          +--------+          v
19 ///            +----------+          |           +----------+
20 ///            |          |          | send H /  |          |
21 ///     ,------| reserved |          | recv H    | reserved |------.
22 ///     |      | (local)  |          |           | (remote) |      |
23 ///     |      +----------+          v           +----------+      |
24 ///     |          |             +--------+             |          |
25 ///     |          |     recv ES |        | send ES     |          |
26 ///     |   send H |     ,-------|  open  |-------.     | recv H   |
27 ///     |          |    /        |        |        \    |          |
28 ///     |          v   v         +--------+         v   v          |
29 ///     |      +----------+          |           +----------+      |
30 ///     |      |   half   |          |           |   half   |      |
31 ///     |      |  closed  |          | send R /  |  closed  |      |
32 ///     |      | (remote) |          | recv R    | (local)  |      |
33 ///     |      +----------+          |           +----------+      |
34 ///     |           |                |                 |           |
35 ///     |           | send ES /      |       recv ES / |           |
36 ///     |           | send R /       v        send R / |           |
37 ///     |           | recv R     +--------+   recv R   |           |
38 ///     | send R /  `----------->|        |<-----------'  send R / |
39 ///     | recv R                 | closed |               recv R   |
40 ///     `----------------------->|        |<----------------------'
41 ///                              +--------+
42 ///
43 ///        send:   endpoint sends this frame
44 ///        recv:   endpoint receives this frame
45 ///
46 ///        H:  HEADERS frame (with implied CONTINUATIONs)
47 ///        PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
48 ///        ES: END_STREAM flag
49 ///        R:  RST_STREAM frame
50 /// ```
51 #[derive(Debug, Clone)]
52 pub struct State {
53     inner: Inner,
54 }
55 
56 #[derive(Debug, Clone, Copy)]
57 enum Inner {
58     Idle,
59     // TODO: these states shouldn't count against concurrency limits:
60     ReservedLocal,
61     ReservedRemote,
62     Open { local: Peer, remote: Peer },
63     HalfClosedLocal(Peer), // TODO: explicitly name this value
64     HalfClosedRemote(Peer),
65     Closed(Cause),
66 }
67 
68 #[derive(Debug, Copy, Clone)]
69 enum Peer {
70     AwaitingHeaders,
71     Streaming,
72 }
73 
74 #[derive(Debug, Copy, Clone)]
75 enum Cause {
76     EndStream,
77     Proto(Reason),
78     LocallyReset(Reason),
79     Io,
80 
81     /// This indicates to the connection that a reset frame must be sent out
82     /// once the send queue has been flushed.
83     ///
84     /// Examples of when this could happen:
85     /// - User drops all references to a stream, so we want to CANCEL the it.
86     /// - Header block size was too large, so we want to REFUSE, possibly
87     ///   after sending a 431 response frame.
88     Scheduled(Reason),
89 }
90 
91 impl State {
92     /// Opens the send-half of a stream if it is not already open.
send_open(&mut self, eos: bool) -> Result<(), UserError>93     pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
94         let local = Streaming;
95 
96         self.inner = match self.inner {
97             Idle => {
98                 if eos {
99                     HalfClosedLocal(AwaitingHeaders)
100                 } else {
101                     Open {
102                         local,
103                         remote: AwaitingHeaders,
104                     }
105                 }
106             }
107             Open {
108                 local: AwaitingHeaders,
109                 remote,
110             } => {
111                 if eos {
112                     HalfClosedLocal(remote)
113                 } else {
114                     Open { local, remote }
115                 }
116             }
117             HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
118                 if eos {
119                     Closed(Cause::EndStream)
120                 } else {
121                     HalfClosedRemote(local)
122                 }
123             }
124             _ => {
125                 // All other transitions result in a protocol error
126                 return Err(UnexpectedFrameType);
127             }
128         };
129 
130         Ok(())
131     }
132 
133     /// Opens the receive-half of the stream when a HEADERS frame is received.
134     ///
135     /// Returns true if this transitions the state to Open.
recv_open(&mut self, eos: bool) -> Result<bool, RecvError>136     pub fn recv_open(&mut self, eos: bool) -> Result<bool, RecvError> {
137         let remote = Streaming;
138         let mut initial = false;
139 
140         self.inner = match self.inner {
141             Idle => {
142                 initial = true;
143 
144                 if eos {
145                     HalfClosedRemote(AwaitingHeaders)
146                 } else {
147                     Open {
148                         local: AwaitingHeaders,
149                         remote,
150                     }
151                 }
152             }
153             ReservedRemote => {
154                 initial = true;
155 
156                 if eos {
157                     Closed(Cause::EndStream)
158                 } else {
159                     HalfClosedLocal(Streaming)
160                 }
161             }
162             Open {
163                 local,
164                 remote: AwaitingHeaders,
165             } => {
166                 if eos {
167                     HalfClosedRemote(local)
168                 } else {
169                     Open { local, remote }
170                 }
171             }
172             HalfClosedLocal(AwaitingHeaders) => {
173                 if eos {
174                     Closed(Cause::EndStream)
175                 } else {
176                     HalfClosedLocal(remote)
177                 }
178             }
179             state => {
180                 // All other transitions result in a protocol error
181                 proto_err!(conn: "recv_open: in unexpected state {:?}", state);
182                 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
183             }
184         };
185 
186         Ok(initial)
187     }
188 
189     /// Transition from Idle -> ReservedRemote
reserve_remote(&mut self) -> Result<(), RecvError>190     pub fn reserve_remote(&mut self) -> Result<(), RecvError> {
191         match self.inner {
192             Idle => {
193                 self.inner = ReservedRemote;
194                 Ok(())
195             }
196             state => {
197                 proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
198                 Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
199             }
200         }
201     }
202 
203     /// Transition from Idle -> ReservedLocal
reserve_local(&mut self) -> Result<(), UserError>204     pub fn reserve_local(&mut self) -> Result<(), UserError> {
205         match self.inner {
206             Idle => {
207                 self.inner = ReservedLocal;
208                 Ok(())
209             }
210             _ => Err(UserError::UnexpectedFrameType),
211         }
212     }
213 
214     /// Indicates that the remote side will not send more data to the local.
recv_close(&mut self) -> Result<(), RecvError>215     pub fn recv_close(&mut self) -> Result<(), RecvError> {
216         match self.inner {
217             Open { local, .. } => {
218                 // The remote side will continue to receive data.
219                 log::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
220                 self.inner = HalfClosedRemote(local);
221                 Ok(())
222             }
223             HalfClosedLocal(..) => {
224                 log::trace!("recv_close: HalfClosedLocal => Closed");
225                 self.inner = Closed(Cause::EndStream);
226                 Ok(())
227             }
228             state => {
229                 proto_err!(conn: "recv_close: in unexpected state {:?}", state);
230                 Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
231             }
232         }
233     }
234 
235     /// The remote explicitly sent a RST_STREAM.
236     ///
237     /// # Arguments
238     /// - `reason`: the reason field of the received RST_STREAM frame.
239     /// - `queued`: true if this stream has frames in the pending send queue.
recv_reset(&mut self, reason: Reason, queued: bool)240     pub fn recv_reset(&mut self, reason: Reason, queued: bool) {
241         match self.inner {
242             // If the stream is already in a `Closed` state, do nothing,
243             // provided that there are no frames still in the send queue.
244             Closed(..) if !queued => {}
245             // A notionally `Closed` stream may still have queued frames in
246             // the following cases:
247             //
248             // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
249             //   actually closed the stream yet).
250             // - if the cause is `Cause::EndStream`: we transition to this
251             //   state when an EOS frame is *enqueued* (so that it's invalid
252             //   to enqueue more frames), not when the EOS frame is *sent*;
253             //   therefore, there may still be frames ahead of the EOS frame
254             //   in the send queue.
255             //
256             // In either of these cases, we want to overwrite the stream's
257             // previous state with the received RST_STREAM, so that the queue
258             // will be cleared by `Prioritize::pop_frame`.
259             state => {
260                 log::trace!(
261                     "recv_reset; reason={:?}; state={:?}; queued={:?}",
262                     reason,
263                     state,
264                     queued
265                 );
266                 self.inner = Closed(Cause::Proto(reason));
267             }
268         }
269     }
270 
271     /// We noticed a protocol error.
recv_err(&mut self, err: &proto::Error)272     pub fn recv_err(&mut self, err: &proto::Error) {
273         use crate::proto::Error::*;
274 
275         match self.inner {
276             Closed(..) => {}
277             _ => {
278                 log::trace!("recv_err; err={:?}", err);
279                 self.inner = Closed(match *err {
280                     Proto(reason) => Cause::LocallyReset(reason),
281                     Io(..) => Cause::Io,
282                 });
283             }
284         }
285     }
286 
recv_eof(&mut self)287     pub fn recv_eof(&mut self) {
288         match self.inner {
289             Closed(..) => {}
290             s => {
291                 log::trace!("recv_eof; state={:?}", s);
292                 self.inner = Closed(Cause::Io);
293             }
294         }
295     }
296 
297     /// Indicates that the local side will not send more data to the local.
send_close(&mut self)298     pub fn send_close(&mut self) {
299         match self.inner {
300             Open { remote, .. } => {
301                 // The remote side will continue to receive data.
302                 log::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
303                 self.inner = HalfClosedLocal(remote);
304             }
305             HalfClosedRemote(..) => {
306                 log::trace!("send_close: HalfClosedRemote => Closed");
307                 self.inner = Closed(Cause::EndStream);
308             }
309             state => panic!("send_close: unexpected state {:?}", state),
310         }
311     }
312 
313     /// Set the stream state to reset locally.
set_reset(&mut self, reason: Reason)314     pub fn set_reset(&mut self, reason: Reason) {
315         self.inner = Closed(Cause::LocallyReset(reason));
316     }
317 
318     /// Set the stream state to a scheduled reset.
set_scheduled_reset(&mut self, reason: Reason)319     pub fn set_scheduled_reset(&mut self, reason: Reason) {
320         debug_assert!(!self.is_closed());
321         self.inner = Closed(Cause::Scheduled(reason));
322     }
323 
get_scheduled_reset(&self) -> Option<Reason>324     pub fn get_scheduled_reset(&self) -> Option<Reason> {
325         match self.inner {
326             Closed(Cause::Scheduled(reason)) => Some(reason),
327             _ => None,
328         }
329     }
330 
is_scheduled_reset(&self) -> bool331     pub fn is_scheduled_reset(&self) -> bool {
332         match self.inner {
333             Closed(Cause::Scheduled(..)) => true,
334             _ => false,
335         }
336     }
337 
is_local_reset(&self) -> bool338     pub fn is_local_reset(&self) -> bool {
339         match self.inner {
340             Closed(Cause::LocallyReset(_)) => true,
341             Closed(Cause::Scheduled(..)) => true,
342             _ => false,
343         }
344     }
345 
346     /// Returns true if the stream is already reset.
is_reset(&self) -> bool347     pub fn is_reset(&self) -> bool {
348         match self.inner {
349             Closed(Cause::EndStream) => false,
350             Closed(_) => true,
351             _ => false,
352         }
353     }
354 
is_send_streaming(&self) -> bool355     pub fn is_send_streaming(&self) -> bool {
356         match self.inner {
357             Open {
358                 local: Streaming, ..
359             } => true,
360             HalfClosedRemote(Streaming) => true,
361             _ => false,
362         }
363     }
364 
365     /// Returns true when the stream is in a state to receive headers
is_recv_headers(&self) -> bool366     pub fn is_recv_headers(&self) -> bool {
367         match self.inner {
368             Idle => true,
369             Open {
370                 remote: AwaitingHeaders,
371                 ..
372             } => true,
373             HalfClosedLocal(AwaitingHeaders) => true,
374             ReservedRemote => true,
375             _ => false,
376         }
377     }
378 
is_recv_streaming(&self) -> bool379     pub fn is_recv_streaming(&self) -> bool {
380         match self.inner {
381             Open {
382                 remote: Streaming, ..
383             } => true,
384             HalfClosedLocal(Streaming) => true,
385             _ => false,
386         }
387     }
388 
is_closed(&self) -> bool389     pub fn is_closed(&self) -> bool {
390         match self.inner {
391             Closed(_) => true,
392             _ => false,
393         }
394     }
395 
is_recv_closed(&self) -> bool396     pub fn is_recv_closed(&self) -> bool {
397         match self.inner {
398             Closed(..) | HalfClosedRemote(..) | ReservedLocal => true,
399             _ => false,
400         }
401     }
402 
is_send_closed(&self) -> bool403     pub fn is_send_closed(&self) -> bool {
404         match self.inner {
405             Closed(..) | HalfClosedLocal(..) | ReservedRemote => true,
406             _ => false,
407         }
408     }
409 
is_idle(&self) -> bool410     pub fn is_idle(&self) -> bool {
411         match self.inner {
412             Idle => true,
413             _ => false,
414         }
415     }
416 
ensure_recv_open(&self) -> Result<bool, proto::Error>417     pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
418         // TODO: Is this correct?
419         match self.inner {
420             Closed(Cause::Proto(reason))
421             | Closed(Cause::LocallyReset(reason))
422             | Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)),
423             Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
424             Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
425             _ => Ok(true),
426         }
427     }
428 
429     /// Returns a reason if the stream has been reset.
ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error>430     pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
431         match self.inner {
432             Closed(Cause::Proto(reason))
433             | Closed(Cause::LocallyReset(reason))
434             | Closed(Cause::Scheduled(reason)) => Ok(Some(reason)),
435             Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()),
436             Open {
437                 local: Streaming, ..
438             }
439             | HalfClosedRemote(Streaming) => match mode {
440                 PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
441                 PollReset::Streaming => Ok(None),
442             },
443             _ => Ok(None),
444         }
445     }
446 }
447 
448 impl Default for State {
default() -> State449     fn default() -> State {
450         State { inner: Inner::Idle }
451     }
452 }
453 
454 impl Default for Peer {
default() -> Self455     fn default() -> Self {
456         AwaitingHeaders
457     }
458 }
459