1 use super::*;
2 use crate::codec::{RecvError, UserError};
3 use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4 use crate::{frame, proto};
5 use std::task::Context;
6 
7 use http::{HeaderMap, Request, Response};
8 
9 use std::io;
10 use std::task::{Poll, Waker};
11 use std::time::{Duration, Instant};
12 
13 #[derive(Debug)]
14 pub(super) struct Recv {
15     /// Initial window size of remote initiated streams
16     init_window_sz: WindowSize,
17 
18     /// Connection level flow control governing received data
19     flow: FlowControl,
20 
21     /// Amount of connection window capacity currently used by outstanding streams.
22     in_flight_data: WindowSize,
23 
24     /// The lowest stream ID that is still idle
25     next_stream_id: Result<StreamId, StreamIdOverflow>,
26 
27     /// The stream ID of the last processed stream
28     last_processed_id: StreamId,
29 
30     /// Any streams with a higher ID are ignored.
31     ///
32     /// This starts as MAX, but is lowered when a GOAWAY is received.
33     ///
34     /// > After sending a GOAWAY frame, the sender can discard frames for
35     /// > streams initiated by the receiver with identifiers higher than
36     /// > the identified last stream.
37     max_stream_id: StreamId,
38 
39     /// Streams that have pending window updates
40     pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41 
42     /// New streams to be accepted
43     pending_accept: store::Queue<stream::NextAccept>,
44 
45     /// Locally reset streams that should be reaped when they expire
46     pending_reset_expired: store::Queue<stream::NextResetExpire>,
47 
48     /// How long locally reset streams should ignore received frames
49     reset_duration: Duration,
50 
51     /// Holds frames that are waiting to be read
52     buffer: Buffer<Event>,
53 
54     /// Refused StreamId, this represents a frame that must be sent out.
55     refused: Option<StreamId>,
56 
57     /// If push promises are allowed to be received.
58     is_push_enabled: bool,
59 }
60 
61 #[derive(Debug)]
62 pub(super) enum Event {
63     Headers(peer::PollMessage),
64     Data(Bytes),
65     Trailers(HeaderMap),
66 }
67 
68 #[derive(Debug)]
69 pub(super) enum RecvHeaderBlockError<T> {
70     Oversize(T),
71     State(RecvError),
72 }
73 
74 #[derive(Debug)]
75 pub(crate) enum Open {
76     PushPromise,
77     Headers,
78 }
79 
80 #[derive(Debug, Clone, Copy)]
81 struct Indices {
82     head: store::Key,
83     tail: store::Key,
84 }
85 
86 impl Recv {
new(peer: peer::Dyn, config: &Config) -> Self87     pub fn new(peer: peer::Dyn, config: &Config) -> Self {
88         let next_stream_id = if peer.is_server() { 1 } else { 2 };
89 
90         let mut flow = FlowControl::new();
91 
92         // connections always have the default window size, regardless of
93         // settings
94         flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
95             .expect("invalid initial remote window size");
96         flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
97 
98         Recv {
99             init_window_sz: config.local_init_window_sz,
100             flow,
101             in_flight_data: 0 as WindowSize,
102             next_stream_id: Ok(next_stream_id.into()),
103             pending_window_updates: store::Queue::new(),
104             last_processed_id: StreamId::ZERO,
105             max_stream_id: StreamId::MAX,
106             pending_accept: store::Queue::new(),
107             pending_reset_expired: store::Queue::new(),
108             reset_duration: config.local_reset_duration,
109             buffer: Buffer::new(),
110             refused: None,
111             is_push_enabled: config.local_push_enabled,
112         }
113     }
114 
115     /// Returns the initial receive window size
init_window_sz(&self) -> WindowSize116     pub fn init_window_sz(&self) -> WindowSize {
117         self.init_window_sz
118     }
119 
120     /// Returns the ID of the last processed stream
last_processed_id(&self) -> StreamId121     pub fn last_processed_id(&self) -> StreamId {
122         self.last_processed_id
123     }
124 
125     /// Update state reflecting a new, remotely opened stream
126     ///
127     /// Returns the stream state if successful. `None` if refused
open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, RecvError>128     pub fn open(
129         &mut self,
130         id: StreamId,
131         mode: Open,
132         counts: &mut Counts,
133     ) -> Result<Option<StreamId>, RecvError> {
134         assert!(self.refused.is_none());
135 
136         counts.peer().ensure_can_open(id, mode)?;
137 
138         let next_id = self.next_stream_id()?;
139         if id < next_id {
140             proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
141             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
142         }
143 
144         self.next_stream_id = id.next_id();
145 
146         if !counts.can_inc_num_recv_streams() {
147             self.refused = Some(id);
148             return Ok(None);
149         }
150 
151         Ok(Some(id))
152     }
153 
154     /// Transition the stream state based on receiving headers
155     ///
156     /// The caller ensures that the frame represents headers and not trailers.
recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>157     pub fn recv_headers(
158         &mut self,
159         frame: frame::Headers,
160         stream: &mut store::Ptr,
161         counts: &mut Counts,
162     ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
163         tracing::trace!("opening stream; init_window={}", self.init_window_sz);
164         let is_initial = stream.state.recv_open(&frame)?;
165 
166         if is_initial {
167             // TODO: be smarter about this logic
168             if frame.stream_id() > self.last_processed_id {
169                 self.last_processed_id = frame.stream_id();
170             }
171 
172             // Increment the number of concurrent streams
173             counts.inc_num_recv_streams(stream);
174         }
175 
176         if !stream.content_length.is_head() {
177             use super::stream::ContentLength;
178             use http::header;
179 
180             if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
181                 let content_length = match frame::parse_u64(content_length.as_bytes()) {
182                     Ok(v) => v,
183                     Err(()) => {
184                         proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
185                         return Err(RecvError::Stream {
186                             id: stream.id,
187                             reason: Reason::PROTOCOL_ERROR,
188                         }
189                         .into());
190                     }
191                 };
192 
193                 stream.content_length = ContentLength::Remaining(content_length);
194             }
195         }
196 
197         if frame.is_over_size() {
198             // A frame is over size if the decoded header block was bigger than
199             // SETTINGS_MAX_HEADER_LIST_SIZE.
200             //
201             // > A server that receives a larger header block than it is willing
202             // > to handle can send an HTTP 431 (Request Header Fields Too
203             // > Large) status code [RFC6585]. A client can discard responses
204             // > that it cannot process.
205             //
206             // So, if peer is a server, we'll send a 431. In either case,
207             // an error is recorded, which will send a REFUSED_STREAM,
208             // since we don't want any of the data frames either.
209             tracing::debug!(
210                 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
211                  recv_headers: frame is over size; stream={:?}",
212                 stream.id
213             );
214             return if counts.peer().is_server() && is_initial {
215                 let mut res = frame::Headers::new(
216                     stream.id,
217                     frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
218                     HeaderMap::new(),
219                 );
220                 res.set_end_stream();
221                 Err(RecvHeaderBlockError::Oversize(Some(res)))
222             } else {
223                 Err(RecvHeaderBlockError::Oversize(None))
224             };
225         }
226 
227         let stream_id = frame.stream_id();
228         let (pseudo, fields) = frame.into_parts();
229         if !pseudo.is_informational() {
230             let message = counts
231                 .peer()
232                 .convert_poll_message(pseudo, fields, stream_id)?;
233 
234             // Push the frame onto the stream's recv buffer
235             stream
236                 .pending_recv
237                 .push_back(&mut self.buffer, Event::Headers(message));
238             stream.notify_recv();
239         }
240 
241         // Only servers can receive a headers frame that initiates the stream.
242         // This is verified in `Streams` before calling this function.
243         if counts.peer().is_server() {
244             self.pending_accept.push(stream);
245         }
246 
247         Ok(())
248     }
249 
250     /// Called by the server to get the request
251     ///
252     /// TODO: Should this fn return `Result`?
take_request(&mut self, stream: &mut store::Ptr) -> Request<()>253     pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
254         use super::peer::PollMessage::*;
255 
256         match stream.pending_recv.pop_front(&mut self.buffer) {
257             Some(Event::Headers(Server(request))) => request,
258             _ => panic!(),
259         }
260     }
261 
262     /// Called by the client to get pushed response
poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>263     pub fn poll_pushed(
264         &mut self,
265         cx: &Context,
266         stream: &mut store::Ptr,
267     ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
268         use super::peer::PollMessage::*;
269 
270         let mut ppp = stream.pending_push_promises.take();
271         let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
272             match pushed.pending_recv.pop_front(&mut self.buffer) {
273                 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
274                 // When frames are pushed into the queue, it is verified that
275                 // the first frame is a HEADERS frame.
276                 _ => panic!("Headers not set on pushed stream"),
277             }
278         });
279         stream.pending_push_promises = ppp;
280         if let Some(p) = pushed {
281             Poll::Ready(Some(Ok(p)))
282         } else {
283             let is_open = stream.state.ensure_recv_open()?;
284 
285             if is_open {
286                 stream.recv_task = Some(cx.waker().clone());
287                 Poll::Pending
288             } else {
289                 Poll::Ready(None)
290             }
291         }
292     }
293 
294     /// Called by the client to get the response
poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>295     pub fn poll_response(
296         &mut self,
297         cx: &Context,
298         stream: &mut store::Ptr,
299     ) -> Poll<Result<Response<()>, proto::Error>> {
300         use super::peer::PollMessage::*;
301 
302         // If the buffer is not empty, then the first frame must be a HEADERS
303         // frame or the user violated the contract.
304         match stream.pending_recv.pop_front(&mut self.buffer) {
305             Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
306             Some(_) => panic!("poll_response called after response returned"),
307             None => {
308                 stream.state.ensure_recv_open()?;
309 
310                 stream.recv_task = Some(cx.waker().clone());
311                 Poll::Pending
312             }
313         }
314     }
315 
316     /// Transition the stream based on receiving trailers
recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), RecvError>317     pub fn recv_trailers(
318         &mut self,
319         frame: frame::Headers,
320         stream: &mut store::Ptr,
321     ) -> Result<(), RecvError> {
322         // Transition the state
323         stream.state.recv_close()?;
324 
325         if stream.ensure_content_length_zero().is_err() {
326             proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
327             return Err(RecvError::Stream {
328                 id: stream.id,
329                 reason: Reason::PROTOCOL_ERROR,
330             });
331         }
332 
333         let trailers = frame.into_fields();
334 
335         // Push the frame onto the stream's recv buffer
336         stream
337             .pending_recv
338             .push_back(&mut self.buffer, Event::Trailers(trailers));
339         stream.notify_recv();
340 
341         Ok(())
342     }
343 
344     /// Releases capacity of the connection
release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)345     pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
346         tracing::trace!(
347             "release_connection_capacity; size={}, connection in_flight_data={}",
348             capacity,
349             self.in_flight_data,
350         );
351 
352         // Decrement in-flight data
353         self.in_flight_data -= capacity;
354 
355         // Assign capacity to connection
356         self.flow.assign_capacity(capacity);
357 
358         if self.flow.unclaimed_capacity().is_some() {
359             if let Some(task) = task.take() {
360                 task.wake();
361             }
362         }
363     }
364 
365     /// Releases capacity back to the connection & stream
release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>366     pub fn release_capacity(
367         &mut self,
368         capacity: WindowSize,
369         stream: &mut store::Ptr,
370         task: &mut Option<Waker>,
371     ) -> Result<(), UserError> {
372         tracing::trace!("release_capacity; size={}", capacity);
373 
374         if capacity > stream.in_flight_recv_data {
375             return Err(UserError::ReleaseCapacityTooBig);
376         }
377 
378         self.release_connection_capacity(capacity, task);
379 
380         // Decrement in-flight data
381         stream.in_flight_recv_data -= capacity;
382 
383         // Assign capacity to stream
384         stream.recv_flow.assign_capacity(capacity);
385 
386         if stream.recv_flow.unclaimed_capacity().is_some() {
387             // Queue the stream for sending the WINDOW_UPDATE frame.
388             self.pending_window_updates.push(stream);
389 
390             if let Some(task) = task.take() {
391                 task.wake();
392             }
393         }
394 
395         Ok(())
396     }
397 
398     /// Release any unclaimed capacity for a closed stream.
release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)399     pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
400         debug_assert_eq!(stream.ref_count, 0);
401 
402         if stream.in_flight_recv_data == 0 {
403             return;
404         }
405 
406         tracing::trace!(
407             "auto-release closed stream ({:?}) capacity: {:?}",
408             stream.id,
409             stream.in_flight_recv_data,
410         );
411 
412         self.release_connection_capacity(stream.in_flight_recv_data, task);
413         stream.in_flight_recv_data = 0;
414 
415         self.clear_recv_buffer(stream);
416     }
417 
418     /// Set the "target" connection window size.
419     ///
420     /// By default, all new connections start with 64kb of window size. As
421     /// streams used and release capacity, we will send WINDOW_UPDATEs for the
422     /// connection to bring it back up to the initial "target".
423     ///
424     /// Setting a target means that we will try to tell the peer about
425     /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
426     /// for the whole connection.
427     ///
428     /// The `task` is an optional parked task for the `Connection` that might
429     /// be blocked on needing more window capacity.
set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>)430     pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
431         tracing::trace!(
432             "set_target_connection_window; target={}; available={}, reserved={}",
433             target,
434             self.flow.available(),
435             self.in_flight_data,
436         );
437 
438         // The current target connection window is our `available` plus any
439         // in-flight data reserved by streams.
440         //
441         // Update the flow controller with the difference between the new
442         // target and the current target.
443         let current = (self.flow.available() + self.in_flight_data).checked_size();
444         if target > current {
445             self.flow.assign_capacity(target - current);
446         } else {
447             self.flow.claim_capacity(current - target);
448         }
449 
450         // If changing the target capacity means we gained a bunch of capacity,
451         // enough that we went over the update threshold, then schedule sending
452         // a connection WINDOW_UPDATE.
453         if self.flow.unclaimed_capacity().is_some() {
454             if let Some(task) = task.take() {
455                 task.wake();
456             }
457         }
458     }
459 
apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), RecvError>460     pub(crate) fn apply_local_settings(
461         &mut self,
462         settings: &frame::Settings,
463         store: &mut Store,
464     ) -> Result<(), RecvError> {
465         let target = if let Some(val) = settings.initial_window_size() {
466             val
467         } else {
468             return Ok(());
469         };
470 
471         let old_sz = self.init_window_sz;
472         self.init_window_sz = target;
473 
474         tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
475 
476         // Per RFC 7540 §6.9.2:
477         //
478         // In addition to changing the flow-control window for streams that are
479         // not yet active, a SETTINGS frame can alter the initial flow-control
480         // window size for streams with active flow-control windows (that is,
481         // streams in the "open" or "half-closed (remote)" state). When the
482         // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
483         // the size of all stream flow-control windows that it maintains by the
484         // difference between the new value and the old value.
485         //
486         // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
487         // space in a flow-control window to become negative. A sender MUST
488         // track the negative flow-control window and MUST NOT send new
489         // flow-controlled frames until it receives WINDOW_UPDATE frames that
490         // cause the flow-control window to become positive.
491 
492         if target < old_sz {
493             // We must decrease the (local) window on every open stream.
494             let dec = old_sz - target;
495             tracing::trace!("decrementing all windows; dec={}", dec);
496 
497             store.for_each(|mut stream| {
498                 stream.recv_flow.dec_recv_window(dec);
499                 Ok(())
500             })
501         } else if target > old_sz {
502             // We must increase the (local) window on every open stream.
503             let inc = target - old_sz;
504             tracing::trace!("incrementing all windows; inc={}", inc);
505             store.for_each(|mut stream| {
506                 // XXX: Shouldn't the peer have already noticed our
507                 // overflow and sent us a GOAWAY?
508                 stream
509                     .recv_flow
510                     .inc_window(inc)
511                     .map_err(RecvError::Connection)?;
512                 stream.recv_flow.assign_capacity(inc);
513                 Ok(())
514             })
515         } else {
516             // size is the same... so do nothing
517             Ok(())
518         }
519     }
520 
is_end_stream(&self, stream: &store::Ptr) -> bool521     pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
522         if !stream.state.is_recv_closed() {
523             return false;
524         }
525 
526         stream.pending_recv.is_empty()
527     }
528 
recv_data( &mut self, frame: frame::Data, stream: &mut store::Ptr, ) -> Result<(), RecvError>529     pub fn recv_data(
530         &mut self,
531         frame: frame::Data,
532         stream: &mut store::Ptr,
533     ) -> Result<(), RecvError> {
534         let sz = frame.payload().len();
535 
536         // This should have been enforced at the codec::FramedRead layer, so
537         // this is just a sanity check.
538         assert!(sz <= MAX_WINDOW_SIZE as usize);
539 
540         let sz = sz as WindowSize;
541 
542         let is_ignoring_frame = stream.state.is_local_reset();
543 
544         if !is_ignoring_frame && !stream.state.is_recv_streaming() {
545             // TODO: There are cases where this can be a stream error of
546             // STREAM_CLOSED instead...
547 
548             // Receiving a DATA frame when not expecting one is a protocol
549             // error.
550             proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
551             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
552         }
553 
554         tracing::trace!(
555             "recv_data; size={}; connection={}; stream={}",
556             sz,
557             self.flow.window_size(),
558             stream.recv_flow.window_size()
559         );
560 
561         if is_ignoring_frame {
562             tracing::trace!(
563                 "recv_data; frame ignored on locally reset {:?} for some time",
564                 stream.id,
565             );
566             return self.ignore_data(sz);
567         }
568 
569         // Ensure that there is enough capacity on the connection before acting
570         // on the stream.
571         self.consume_connection_window(sz)?;
572 
573         if stream.recv_flow.window_size() < sz {
574             // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
575             // > A receiver MAY respond with a stream error (Section 5.4.2) or
576             // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
577             // > it is unable to accept a frame.
578             //
579             // So, for violating the **stream** window, we can send either a
580             // stream or connection error. We've opted to send a stream
581             // error.
582             return Err(RecvError::Stream {
583                 id: stream.id,
584                 reason: Reason::FLOW_CONTROL_ERROR,
585             });
586         }
587 
588         if stream.dec_content_length(frame.payload().len()).is_err() {
589             proto_err!(stream:
590                 "recv_data: content-length overflow; stream={:?}; len={:?}",
591                 stream.id,
592                 frame.payload().len(),
593             );
594             return Err(RecvError::Stream {
595                 id: stream.id,
596                 reason: Reason::PROTOCOL_ERROR,
597             });
598         }
599 
600         if frame.is_end_stream() {
601             if stream.ensure_content_length_zero().is_err() {
602                 proto_err!(stream:
603                     "recv_data: content-length underflow; stream={:?}; len={:?}",
604                     stream.id,
605                     frame.payload().len(),
606                 );
607                 return Err(RecvError::Stream {
608                     id: stream.id,
609                     reason: Reason::PROTOCOL_ERROR,
610                 });
611             }
612 
613             if stream.state.recv_close().is_err() {
614                 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
615                 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
616             }
617         }
618 
619         // Update stream level flow control
620         stream.recv_flow.send_data(sz);
621 
622         // Track the data as in-flight
623         stream.in_flight_recv_data += sz;
624 
625         let event = Event::Data(frame.into_payload());
626 
627         // Push the frame onto the recv buffer
628         stream.pending_recv.push_back(&mut self.buffer, event);
629         stream.notify_recv();
630 
631         Ok(())
632     }
633 
ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError>634     pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> {
635         // Ensure that there is enough capacity on the connection...
636         self.consume_connection_window(sz)?;
637 
638         // Since we are ignoring this frame,
639         // we aren't returning the frame to the user. That means they
640         // have no way to release the capacity back to the connection. So
641         // we have to release it automatically.
642         //
643         // This call doesn't send a WINDOW_UPDATE immediately, just marks
644         // the capacity as available to be reclaimed. When the available
645         // capacity meets a threshold, a WINDOW_UPDATE is then sent.
646         self.release_connection_capacity(sz, &mut None);
647         Ok(())
648     }
649 
consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError>650     pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
651         if self.flow.window_size() < sz {
652             tracing::debug!(
653                 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
654                 self.flow.window_size(),
655                 sz,
656             );
657             return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR));
658         }
659 
660         // Update connection level flow control
661         self.flow.send_data(sz);
662 
663         // Track the data as in-flight
664         self.in_flight_data += sz;
665         Ok(())
666     }
667 
recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), RecvError>668     pub fn recv_push_promise(
669         &mut self,
670         frame: frame::PushPromise,
671         stream: &mut store::Ptr,
672     ) -> Result<(), RecvError> {
673         stream.state.reserve_remote()?;
674         if frame.is_over_size() {
675             // A frame is over size if the decoded header block was bigger than
676             // SETTINGS_MAX_HEADER_LIST_SIZE.
677             //
678             // > A server that receives a larger header block than it is willing
679             // > to handle can send an HTTP 431 (Request Header Fields Too
680             // > Large) status code [RFC6585]. A client can discard responses
681             // > that it cannot process.
682             //
683             // So, if peer is a server, we'll send a 431. In either case,
684             // an error is recorded, which will send a REFUSED_STREAM,
685             // since we don't want any of the data frames either.
686             tracing::debug!(
687                 "stream error REFUSED_STREAM -- recv_push_promise: \
688                  headers frame is over size; promised_id={:?};",
689                 frame.promised_id(),
690             );
691             return Err(RecvError::Stream {
692                 id: frame.promised_id(),
693                 reason: Reason::REFUSED_STREAM,
694             });
695         }
696 
697         let promised_id = frame.promised_id();
698         let (pseudo, fields) = frame.into_parts();
699         let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
700 
701         if let Err(e) = frame::PushPromise::validate_request(&req) {
702             use PushPromiseHeaderError::*;
703             match e {
704                 NotSafeAndCacheable => proto_err!(
705                     stream:
706                     "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
707                     req.method(),
708                     promised_id,
709                 ),
710                 InvalidContentLength(e) => proto_err!(
711                     stream:
712                     "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
713                     e,
714                     promised_id,
715                 ),
716             }
717             return Err(RecvError::Stream {
718                 id: promised_id,
719                 reason: Reason::PROTOCOL_ERROR,
720             });
721         }
722 
723         use super::peer::PollMessage::*;
724         stream
725             .pending_recv
726             .push_back(&mut self.buffer, Event::Headers(Server(req)));
727         stream.notify_recv();
728         Ok(())
729     }
730 
731     /// Ensures that `id` is not in the `Idle` state.
ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>732     pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
733         if let Ok(next) = self.next_stream_id {
734             if id >= next {
735                 tracing::debug!(
736                     "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
737                     id
738                 );
739                 return Err(Reason::PROTOCOL_ERROR);
740             }
741         }
742         // if next_stream_id is overflowed, that's ok.
743 
744         Ok(())
745     }
746 
747     /// Handle remote sending an explicit RST_STREAM.
recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream)748     pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
749         // Notify the stream
750         stream
751             .state
752             .recv_reset(frame.reason(), stream.is_pending_send);
753 
754         stream.notify_send();
755         stream.notify_recv();
756     }
757 
758     /// Handle a received error
recv_err(&mut self, err: &proto::Error, stream: &mut Stream)759     pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) {
760         // Receive an error
761         stream.state.recv_err(err);
762 
763         // If a receiver is waiting, notify it
764         stream.notify_send();
765         stream.notify_recv();
766     }
767 
go_away(&mut self, last_processed_id: StreamId)768     pub fn go_away(&mut self, last_processed_id: StreamId) {
769         assert!(self.max_stream_id >= last_processed_id);
770         self.max_stream_id = last_processed_id;
771     }
772 
recv_eof(&mut self, stream: &mut Stream)773     pub fn recv_eof(&mut self, stream: &mut Stream) {
774         stream.state.recv_eof();
775         stream.notify_send();
776         stream.notify_recv();
777     }
778 
clear_recv_buffer(&mut self, stream: &mut Stream)779     pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
780         while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) {
781             // drop it
782         }
783     }
784 
785     /// Get the max ID of streams we can receive.
786     ///
787     /// This gets lowered if we send a GOAWAY frame.
max_stream_id(&self) -> StreamId788     pub fn max_stream_id(&self) -> StreamId {
789         self.max_stream_id
790     }
791 
next_stream_id(&self) -> Result<StreamId, RecvError>792     pub fn next_stream_id(&self) -> Result<StreamId, RecvError> {
793         if let Ok(id) = self.next_stream_id {
794             Ok(id)
795         } else {
796             Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
797         }
798     }
799 
may_have_created_stream(&self, id: StreamId) -> bool800     pub fn may_have_created_stream(&self, id: StreamId) -> bool {
801         if let Ok(next_id) = self.next_stream_id {
802             // Peer::is_local_init should have been called beforehand
803             debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
804             id < next_id
805         } else {
806             true
807         }
808     }
809 
810     /// Returns true if the remote peer can reserve a stream with the given ID.
ensure_can_reserve(&self) -> Result<(), RecvError>811     pub fn ensure_can_reserve(&self) -> Result<(), RecvError> {
812         if !self.is_push_enabled {
813             proto_err!(conn: "recv_push_promise: push is disabled");
814             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
815         }
816 
817         Ok(())
818     }
819 
820     /// Add a locally reset stream to queue to be eventually reaped.
enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)821     pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
822         if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() {
823             return;
824         }
825 
826         tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
827 
828         if !counts.can_inc_num_reset_streams() {
829             // try to evict 1 stream if possible
830             // if max allow is 0, this won't be able to evict,
831             // and then we'll just bail after
832             if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) {
833                 counts.transition_after(evicted, true);
834             }
835         }
836 
837         if counts.can_inc_num_reset_streams() {
838             counts.inc_num_reset_streams();
839             self.pending_reset_expired.push(stream);
840         }
841     }
842 
843     /// Send any pending refusals.
send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,844     pub fn send_pending_refusal<T, B>(
845         &mut self,
846         cx: &mut Context,
847         dst: &mut Codec<T, Prioritized<B>>,
848     ) -> Poll<io::Result<()>>
849     where
850         T: AsyncWrite + Unpin,
851         B: Buf,
852     {
853         if let Some(stream_id) = self.refused {
854             ready!(dst.poll_ready(cx))?;
855 
856             // Create the RST_STREAM frame
857             let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
858 
859             // Buffer the frame
860             dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
861         }
862 
863         self.refused = None;
864 
865         Poll::Ready(Ok(()))
866     }
867 
clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)868     pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
869         let now = Instant::now();
870         let reset_duration = self.reset_duration;
871         while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
872             let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
873             now - reset_at > reset_duration
874         }) {
875             counts.transition_after(stream, true);
876         }
877     }
878 
clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )879     pub fn clear_queues(
880         &mut self,
881         clear_pending_accept: bool,
882         store: &mut Store,
883         counts: &mut Counts,
884     ) {
885         self.clear_stream_window_update_queue(store, counts);
886         self.clear_all_reset_streams(store, counts);
887 
888         if clear_pending_accept {
889             self.clear_all_pending_accept(store, counts);
890         }
891     }
892 
clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)893     fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
894         while let Some(stream) = self.pending_window_updates.pop(store) {
895             counts.transition(stream, |_, stream| {
896                 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
897             })
898         }
899     }
900 
901     /// Called on EOF
clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)902     fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
903         while let Some(stream) = self.pending_reset_expired.pop(store) {
904             counts.transition_after(stream, true);
905         }
906     }
907 
clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)908     fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
909         while let Some(stream) = self.pending_accept.pop(store) {
910             counts.transition_after(stream, false);
911         }
912     }
913 
poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,914     pub fn poll_complete<T, B>(
915         &mut self,
916         cx: &mut Context,
917         store: &mut Store,
918         counts: &mut Counts,
919         dst: &mut Codec<T, Prioritized<B>>,
920     ) -> Poll<io::Result<()>>
921     where
922         T: AsyncWrite + Unpin,
923         B: Buf,
924     {
925         // Send any pending connection level window updates
926         ready!(self.send_connection_window_update(cx, dst))?;
927 
928         // Send any pending stream level window updates
929         ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
930 
931         Poll::Ready(Ok(()))
932     }
933 
934     /// Send connection level window update
send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,935     fn send_connection_window_update<T, B>(
936         &mut self,
937         cx: &mut Context,
938         dst: &mut Codec<T, Prioritized<B>>,
939     ) -> Poll<io::Result<()>>
940     where
941         T: AsyncWrite + Unpin,
942         B: Buf,
943     {
944         if let Some(incr) = self.flow.unclaimed_capacity() {
945             let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
946 
947             // Ensure the codec has capacity
948             ready!(dst.poll_ready(cx))?;
949 
950             // Buffer the WINDOW_UPDATE frame
951             dst.buffer(frame.into())
952                 .expect("invalid WINDOW_UPDATE frame");
953 
954             // Update flow control
955             self.flow
956                 .inc_window(incr)
957                 .expect("unexpected flow control state");
958         }
959 
960         Poll::Ready(Ok(()))
961     }
962 
963     /// Send stream level window update
send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,964     pub fn send_stream_window_updates<T, B>(
965         &mut self,
966         cx: &mut Context,
967         store: &mut Store,
968         counts: &mut Counts,
969         dst: &mut Codec<T, Prioritized<B>>,
970     ) -> Poll<io::Result<()>>
971     where
972         T: AsyncWrite + Unpin,
973         B: Buf,
974     {
975         loop {
976             // Ensure the codec has capacity
977             ready!(dst.poll_ready(cx))?;
978 
979             // Get the next stream
980             let stream = match self.pending_window_updates.pop(store) {
981                 Some(stream) => stream,
982                 None => return Poll::Ready(Ok(())),
983             };
984 
985             counts.transition(stream, |_, stream| {
986                 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
987                 debug_assert!(!stream.is_pending_window_update);
988 
989                 if !stream.state.is_recv_streaming() {
990                     // No need to send window updates on the stream if the stream is
991                     // no longer receiving data.
992                     //
993                     // TODO: is this correct? We could possibly send a window
994                     // update on a ReservedRemote stream if we already know
995                     // we want to stream the data faster...
996                     return;
997                 }
998 
999                 // TODO: de-dup
1000                 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1001                     // Create the WINDOW_UPDATE frame
1002                     let frame = frame::WindowUpdate::new(stream.id, incr);
1003 
1004                     // Buffer it
1005                     dst.buffer(frame.into())
1006                         .expect("invalid WINDOW_UPDATE frame");
1007 
1008                     // Update flow control
1009                     stream
1010                         .recv_flow
1011                         .inc_window(incr)
1012                         .expect("unexpected flow control state");
1013                 }
1014             })
1015         }
1016     }
1017 
next_incoming(&mut self, store: &mut Store) -> Option<store::Key>1018     pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1019         self.pending_accept.pop(store).map(|ptr| ptr.key())
1020     }
1021 
poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>1022     pub fn poll_data(
1023         &mut self,
1024         cx: &Context,
1025         stream: &mut Stream,
1026     ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1027         // TODO: Return error when the stream is reset
1028         match stream.pending_recv.pop_front(&mut self.buffer) {
1029             Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1030             Some(event) => {
1031                 // Frame is trailer
1032                 stream.pending_recv.push_front(&mut self.buffer, event);
1033 
1034                 // Notify the recv task. This is done just in case
1035                 // `poll_trailers` was called.
1036                 //
1037                 // It is very likely that `notify_recv` will just be a no-op (as
1038                 // the task will be None), so this isn't really much of a
1039                 // performance concern. It also means we don't have to track
1040                 // state to see if `poll_trailers` was called before `poll_data`
1041                 // returned `None`.
1042                 stream.notify_recv();
1043 
1044                 // No more data frames
1045                 Poll::Ready(None)
1046             }
1047             None => self.schedule_recv(cx, stream),
1048         }
1049     }
1050 
poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1051     pub fn poll_trailers(
1052         &mut self,
1053         cx: &Context,
1054         stream: &mut Stream,
1055     ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1056         match stream.pending_recv.pop_front(&mut self.buffer) {
1057             Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1058             Some(event) => {
1059                 // Frame is not trailers.. not ready to poll trailers yet.
1060                 stream.pending_recv.push_front(&mut self.buffer, event);
1061 
1062                 Poll::Pending
1063             }
1064             None => self.schedule_recv(cx, stream),
1065         }
1066     }
1067 
schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1068     fn schedule_recv<T>(
1069         &mut self,
1070         cx: &Context,
1071         stream: &mut Stream,
1072     ) -> Poll<Option<Result<T, proto::Error>>> {
1073         if stream.state.ensure_recv_open()? {
1074             // Request to get notified once more frames arrive
1075             stream.recv_task = Some(cx.waker().clone());
1076             Poll::Pending
1077         } else {
1078             // No more frames will be received
1079             Poll::Ready(None)
1080         }
1081     }
1082 }
1083 
1084 // ===== impl Open =====
1085 
1086 impl Open {
is_push_promise(&self) -> bool1087     pub fn is_push_promise(&self) -> bool {
1088         use self::Open::*;
1089 
1090         match *self {
1091             PushPromise => true,
1092             _ => false,
1093         }
1094     }
1095 }
1096 
1097 // ===== impl RecvHeaderBlockError =====
1098 
1099 impl<T> From<RecvError> for RecvHeaderBlockError<T> {
from(err: RecvError) -> Self1100     fn from(err: RecvError) -> Self {
1101         RecvHeaderBlockError::State(err)
1102     }
1103 }
1104