1 use std::io; 2 3 use crate::codec::UserError::*; 4 use crate::codec::{RecvError, UserError}; 5 use crate::frame::Reason; 6 use crate::proto::{self, PollReset}; 7 8 use self::Inner::*; 9 use self::Peer::*; 10 11 /// Represents the state of an H2 stream 12 /// 13 /// ```not_rust 14 /// +--------+ 15 /// send PP | | recv PP 16 /// ,--------| idle |--------. 17 /// / | | \ 18 /// v +--------+ v 19 /// +----------+ | +----------+ 20 /// | | | send H / | | 21 /// ,------| reserved | | recv H | reserved |------. 22 /// | | (local) | | | (remote) | | 23 /// | +----------+ v +----------+ | 24 /// | | +--------+ | | 25 /// | | recv ES | | send ES | | 26 /// | send H | ,-------| open |-------. | recv H | 27 /// | | / | | \ | | 28 /// | v v +--------+ v v | 29 /// | +----------+ | +----------+ | 30 /// | | half | | | half | | 31 /// | | closed | | send R / | closed | | 32 /// | | (remote) | | recv R | (local) | | 33 /// | +----------+ | +----------+ | 34 /// | | | | | 35 /// | | send ES / | recv ES / | | 36 /// | | send R / v send R / | | 37 /// | | recv R +--------+ recv R | | 38 /// | send R / `----------->| |<-----------' send R / | 39 /// | recv R | closed | recv R | 40 /// `----------------------->| |<----------------------' 41 /// +--------+ 42 /// 43 /// send: endpoint sends this frame 44 /// recv: endpoint receives this frame 45 /// 46 /// H: HEADERS frame (with implied CONTINUATIONs) 47 /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) 48 /// ES: END_STREAM flag 49 /// R: RST_STREAM frame 50 /// ``` 51 #[derive(Debug, Clone)] 52 pub struct State { 53 inner: Inner, 54 } 55 56 #[derive(Debug, Clone, Copy)] 57 enum Inner { 58 Idle, 59 // TODO: these states shouldn't count against concurrency limits: 60 ReservedLocal, 61 ReservedRemote, 62 Open { local: Peer, remote: Peer }, 63 HalfClosedLocal(Peer), // TODO: explicitly name this value 64 HalfClosedRemote(Peer), 65 Closed(Cause), 66 } 67 68 #[derive(Debug, Copy, Clone)] 69 enum Peer { 70 AwaitingHeaders, 71 Streaming, 72 } 73 74 #[derive(Debug, Copy, Clone)] 75 enum Cause { 76 EndStream, 77 Proto(Reason), 78 LocallyReset(Reason), 79 Io, 80 81 /// This indicates to the connection that a reset frame must be sent out 82 /// once the send queue has been flushed. 83 /// 84 /// Examples of when this could happen: 85 /// - User drops all references to a stream, so we want to CANCEL the it. 86 /// - Header block size was too large, so we want to REFUSE, possibly 87 /// after sending a 431 response frame. 88 Scheduled(Reason), 89 } 90 91 impl State { 92 /// Opens the send-half of a stream if it is not already open. send_open(&mut self, eos: bool) -> Result<(), UserError>93 pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { 94 let local = Streaming; 95 96 self.inner = match self.inner { 97 Idle => { 98 if eos { 99 HalfClosedLocal(AwaitingHeaders) 100 } else { 101 Open { 102 local, 103 remote: AwaitingHeaders, 104 } 105 } 106 } 107 Open { 108 local: AwaitingHeaders, 109 remote, 110 } => { 111 if eos { 112 HalfClosedLocal(remote) 113 } else { 114 Open { local, remote } 115 } 116 } 117 HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { 118 if eos { 119 Closed(Cause::EndStream) 120 } else { 121 HalfClosedRemote(local) 122 } 123 } 124 _ => { 125 // All other transitions result in a protocol error 126 return Err(UnexpectedFrameType); 127 } 128 }; 129 130 Ok(()) 131 } 132 133 /// Opens the receive-half of the stream when a HEADERS frame is received. 134 /// 135 /// Returns true if this transitions the state to Open. recv_open(&mut self, eos: bool) -> Result<bool, RecvError>136 pub fn recv_open(&mut self, eos: bool) -> Result<bool, RecvError> { 137 let remote = Streaming; 138 let mut initial = false; 139 140 self.inner = match self.inner { 141 Idle => { 142 initial = true; 143 144 if eos { 145 HalfClosedRemote(AwaitingHeaders) 146 } else { 147 Open { 148 local: AwaitingHeaders, 149 remote, 150 } 151 } 152 } 153 ReservedRemote => { 154 initial = true; 155 156 if eos { 157 Closed(Cause::EndStream) 158 } else { 159 HalfClosedLocal(Streaming) 160 } 161 } 162 Open { 163 local, 164 remote: AwaitingHeaders, 165 } => { 166 if eos { 167 HalfClosedRemote(local) 168 } else { 169 Open { local, remote } 170 } 171 } 172 HalfClosedLocal(AwaitingHeaders) => { 173 if eos { 174 Closed(Cause::EndStream) 175 } else { 176 HalfClosedLocal(remote) 177 } 178 } 179 state => { 180 // All other transitions result in a protocol error 181 proto_err!(conn: "recv_open: in unexpected state {:?}", state); 182 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 183 } 184 }; 185 186 Ok(initial) 187 } 188 189 /// Transition from Idle -> ReservedRemote reserve_remote(&mut self) -> Result<(), RecvError>190 pub fn reserve_remote(&mut self) -> Result<(), RecvError> { 191 match self.inner { 192 Idle => { 193 self.inner = ReservedRemote; 194 Ok(()) 195 } 196 state => { 197 proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); 198 Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) 199 } 200 } 201 } 202 203 /// Transition from Idle -> ReservedLocal reserve_local(&mut self) -> Result<(), UserError>204 pub fn reserve_local(&mut self) -> Result<(), UserError> { 205 match self.inner { 206 Idle => { 207 self.inner = ReservedLocal; 208 Ok(()) 209 } 210 _ => Err(UserError::UnexpectedFrameType), 211 } 212 } 213 214 /// Indicates that the remote side will not send more data to the local. recv_close(&mut self) -> Result<(), RecvError>215 pub fn recv_close(&mut self) -> Result<(), RecvError> { 216 match self.inner { 217 Open { local, .. } => { 218 // The remote side will continue to receive data. 219 tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local); 220 self.inner = HalfClosedRemote(local); 221 Ok(()) 222 } 223 HalfClosedLocal(..) => { 224 tracing::trace!("recv_close: HalfClosedLocal => Closed"); 225 self.inner = Closed(Cause::EndStream); 226 Ok(()) 227 } 228 state => { 229 proto_err!(conn: "recv_close: in unexpected state {:?}", state); 230 Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) 231 } 232 } 233 } 234 235 /// The remote explicitly sent a RST_STREAM. 236 /// 237 /// # Arguments 238 /// - `reason`: the reason field of the received RST_STREAM frame. 239 /// - `queued`: true if this stream has frames in the pending send queue. recv_reset(&mut self, reason: Reason, queued: bool)240 pub fn recv_reset(&mut self, reason: Reason, queued: bool) { 241 match self.inner { 242 // If the stream is already in a `Closed` state, do nothing, 243 // provided that there are no frames still in the send queue. 244 Closed(..) if !queued => {} 245 // A notionally `Closed` stream may still have queued frames in 246 // the following cases: 247 // 248 // - if the cause is `Cause::Scheduled(..)` (i.e. we have not 249 // actually closed the stream yet). 250 // - if the cause is `Cause::EndStream`: we transition to this 251 // state when an EOS frame is *enqueued* (so that it's invalid 252 // to enqueue more frames), not when the EOS frame is *sent*; 253 // therefore, there may still be frames ahead of the EOS frame 254 // in the send queue. 255 // 256 // In either of these cases, we want to overwrite the stream's 257 // previous state with the received RST_STREAM, so that the queue 258 // will be cleared by `Prioritize::pop_frame`. 259 state => { 260 tracing::trace!( 261 "recv_reset; reason={:?}; state={:?}; queued={:?}", 262 reason, 263 state, 264 queued 265 ); 266 self.inner = Closed(Cause::Proto(reason)); 267 } 268 } 269 } 270 271 /// We noticed a protocol error. recv_err(&mut self, err: &proto::Error)272 pub fn recv_err(&mut self, err: &proto::Error) { 273 use crate::proto::Error::*; 274 275 match self.inner { 276 Closed(..) => {} 277 _ => { 278 tracing::trace!("recv_err; err={:?}", err); 279 self.inner = Closed(match *err { 280 Proto(reason) => Cause::LocallyReset(reason), 281 Io(..) => Cause::Io, 282 }); 283 } 284 } 285 } 286 recv_eof(&mut self)287 pub fn recv_eof(&mut self) { 288 match self.inner { 289 Closed(..) => {} 290 s => { 291 tracing::trace!("recv_eof; state={:?}", s); 292 self.inner = Closed(Cause::Io); 293 } 294 } 295 } 296 297 /// Indicates that the local side will not send more data to the local. send_close(&mut self)298 pub fn send_close(&mut self) { 299 match self.inner { 300 Open { remote, .. } => { 301 // The remote side will continue to receive data. 302 tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote); 303 self.inner = HalfClosedLocal(remote); 304 } 305 HalfClosedRemote(..) => { 306 tracing::trace!("send_close: HalfClosedRemote => Closed"); 307 self.inner = Closed(Cause::EndStream); 308 } 309 state => panic!("send_close: unexpected state {:?}", state), 310 } 311 } 312 313 /// Set the stream state to reset locally. set_reset(&mut self, reason: Reason)314 pub fn set_reset(&mut self, reason: Reason) { 315 self.inner = Closed(Cause::LocallyReset(reason)); 316 } 317 318 /// Set the stream state to a scheduled reset. set_scheduled_reset(&mut self, reason: Reason)319 pub fn set_scheduled_reset(&mut self, reason: Reason) { 320 debug_assert!(!self.is_closed()); 321 self.inner = Closed(Cause::Scheduled(reason)); 322 } 323 get_scheduled_reset(&self) -> Option<Reason>324 pub fn get_scheduled_reset(&self) -> Option<Reason> { 325 match self.inner { 326 Closed(Cause::Scheduled(reason)) => Some(reason), 327 _ => None, 328 } 329 } 330 is_scheduled_reset(&self) -> bool331 pub fn is_scheduled_reset(&self) -> bool { 332 match self.inner { 333 Closed(Cause::Scheduled(..)) => true, 334 _ => false, 335 } 336 } 337 is_local_reset(&self) -> bool338 pub fn is_local_reset(&self) -> bool { 339 match self.inner { 340 Closed(Cause::LocallyReset(_)) => true, 341 Closed(Cause::Scheduled(..)) => true, 342 _ => false, 343 } 344 } 345 346 /// Returns true if the stream is already reset. is_reset(&self) -> bool347 pub fn is_reset(&self) -> bool { 348 match self.inner { 349 Closed(Cause::EndStream) => false, 350 Closed(_) => true, 351 _ => false, 352 } 353 } 354 is_send_streaming(&self) -> bool355 pub fn is_send_streaming(&self) -> bool { 356 match self.inner { 357 Open { 358 local: Streaming, .. 359 } => true, 360 HalfClosedRemote(Streaming) => true, 361 _ => false, 362 } 363 } 364 365 /// Returns true when the stream is in a state to receive headers is_recv_headers(&self) -> bool366 pub fn is_recv_headers(&self) -> bool { 367 match self.inner { 368 Idle => true, 369 Open { 370 remote: AwaitingHeaders, 371 .. 372 } => true, 373 HalfClosedLocal(AwaitingHeaders) => true, 374 ReservedRemote => true, 375 _ => false, 376 } 377 } 378 is_recv_streaming(&self) -> bool379 pub fn is_recv_streaming(&self) -> bool { 380 match self.inner { 381 Open { 382 remote: Streaming, .. 383 } => true, 384 HalfClosedLocal(Streaming) => true, 385 _ => false, 386 } 387 } 388 is_closed(&self) -> bool389 pub fn is_closed(&self) -> bool { 390 match self.inner { 391 Closed(_) => true, 392 _ => false, 393 } 394 } 395 is_recv_closed(&self) -> bool396 pub fn is_recv_closed(&self) -> bool { 397 match self.inner { 398 Closed(..) | HalfClosedRemote(..) | ReservedLocal => true, 399 _ => false, 400 } 401 } 402 is_send_closed(&self) -> bool403 pub fn is_send_closed(&self) -> bool { 404 match self.inner { 405 Closed(..) | HalfClosedLocal(..) | ReservedRemote => true, 406 _ => false, 407 } 408 } 409 is_idle(&self) -> bool410 pub fn is_idle(&self) -> bool { 411 match self.inner { 412 Idle => true, 413 _ => false, 414 } 415 } 416 ensure_recv_open(&self) -> Result<bool, proto::Error>417 pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { 418 // TODO: Is this correct? 419 match self.inner { 420 Closed(Cause::Proto(reason)) 421 | Closed(Cause::LocallyReset(reason)) 422 | Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)), 423 Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), 424 Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), 425 _ => Ok(true), 426 } 427 } 428 429 /// Returns a reason if the stream has been reset. ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error>430 pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { 431 match self.inner { 432 Closed(Cause::Proto(reason)) 433 | Closed(Cause::LocallyReset(reason)) 434 | Closed(Cause::Scheduled(reason)) => Ok(Some(reason)), 435 Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()).into()), 436 Open { 437 local: Streaming, .. 438 } 439 | HalfClosedRemote(Streaming) => match mode { 440 PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), 441 PollReset::Streaming => Ok(None), 442 }, 443 _ => Ok(None), 444 } 445 } 446 } 447 448 impl Default for State { default() -> State449 fn default() -> State { 450 State { inner: Inner::Idle } 451 } 452 } 453 454 impl Default for Peer { default() -> Self455 fn default() -> Self { 456 AwaitingHeaders 457 } 458 } 459