1 use super::*;
2 use crate::codec::UserError;
3 use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4 use crate::proto::{self, Error};
5 use std::task::Context;
6 
7 use http::{HeaderMap, Request, Response};
8 
9 use std::io;
10 use std::task::{Poll, Waker};
11 use std::time::{Duration, Instant};
12 
13 #[derive(Debug)]
14 pub(super) struct Recv {
15     /// Initial window size of remote initiated streams
16     init_window_sz: WindowSize,
17 
18     /// Connection level flow control governing received data
19     flow: FlowControl,
20 
21     /// Amount of connection window capacity currently used by outstanding streams.
22     in_flight_data: WindowSize,
23 
24     /// The lowest stream ID that is still idle
25     next_stream_id: Result<StreamId, StreamIdOverflow>,
26 
27     /// The stream ID of the last processed stream
28     last_processed_id: StreamId,
29 
30     /// Any streams with a higher ID are ignored.
31     ///
32     /// This starts as MAX, but is lowered when a GOAWAY is received.
33     ///
34     /// > After sending a GOAWAY frame, the sender can discard frames for
35     /// > streams initiated by the receiver with identifiers higher than
36     /// > the identified last stream.
37     max_stream_id: StreamId,
38 
39     /// Streams that have pending window updates
40     pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41 
42     /// New streams to be accepted
43     pending_accept: store::Queue<stream::NextAccept>,
44 
45     /// Locally reset streams that should be reaped when they expire
46     pending_reset_expired: store::Queue<stream::NextResetExpire>,
47 
48     /// How long locally reset streams should ignore received frames
49     reset_duration: Duration,
50 
51     /// Holds frames that are waiting to be read
52     buffer: Buffer<Event>,
53 
54     /// Refused StreamId, this represents a frame that must be sent out.
55     refused: Option<StreamId>,
56 
57     /// If push promises are allowed to be received.
58     is_push_enabled: bool,
59 }
60 
61 #[derive(Debug)]
62 pub(super) enum Event {
63     Headers(peer::PollMessage),
64     Data(Bytes),
65     Trailers(HeaderMap),
66 }
67 
68 #[derive(Debug)]
69 pub(super) enum RecvHeaderBlockError<T> {
70     Oversize(T),
71     State(Error),
72 }
73 
74 #[derive(Debug)]
75 pub(crate) enum Open {
76     PushPromise,
77     Headers,
78 }
79 
80 impl Recv {
new(peer: peer::Dyn, config: &Config) -> Self81     pub fn new(peer: peer::Dyn, config: &Config) -> Self {
82         let next_stream_id = if peer.is_server() { 1 } else { 2 };
83 
84         let mut flow = FlowControl::new();
85 
86         // connections always have the default window size, regardless of
87         // settings
88         flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
89             .expect("invalid initial remote window size");
90         flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
91 
92         Recv {
93             init_window_sz: config.local_init_window_sz,
94             flow,
95             in_flight_data: 0 as WindowSize,
96             next_stream_id: Ok(next_stream_id.into()),
97             pending_window_updates: store::Queue::new(),
98             last_processed_id: StreamId::ZERO,
99             max_stream_id: StreamId::MAX,
100             pending_accept: store::Queue::new(),
101             pending_reset_expired: store::Queue::new(),
102             reset_duration: config.local_reset_duration,
103             buffer: Buffer::new(),
104             refused: None,
105             is_push_enabled: config.local_push_enabled,
106         }
107     }
108 
109     /// Returns the initial receive window size
init_window_sz(&self) -> WindowSize110     pub fn init_window_sz(&self) -> WindowSize {
111         self.init_window_sz
112     }
113 
114     /// Returns the ID of the last processed stream
last_processed_id(&self) -> StreamId115     pub fn last_processed_id(&self) -> StreamId {
116         self.last_processed_id
117     }
118 
119     /// Update state reflecting a new, remotely opened stream
120     ///
121     /// Returns the stream state if successful. `None` if refused
open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, Error>122     pub fn open(
123         &mut self,
124         id: StreamId,
125         mode: Open,
126         counts: &mut Counts,
127     ) -> Result<Option<StreamId>, Error> {
128         assert!(self.refused.is_none());
129 
130         counts.peer().ensure_can_open(id, mode)?;
131 
132         let next_id = self.next_stream_id()?;
133         if id < next_id {
134             proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
135             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
136         }
137 
138         self.next_stream_id = id.next_id();
139 
140         if !counts.can_inc_num_recv_streams() {
141             self.refused = Some(id);
142             return Ok(None);
143         }
144 
145         Ok(Some(id))
146     }
147 
148     /// Transition the stream state based on receiving headers
149     ///
150     /// The caller ensures that the frame represents headers and not trailers.
recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>151     pub fn recv_headers(
152         &mut self,
153         frame: frame::Headers,
154         stream: &mut store::Ptr,
155         counts: &mut Counts,
156     ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
157         tracing::trace!("opening stream; init_window={}", self.init_window_sz);
158         let is_initial = stream.state.recv_open(&frame)?;
159 
160         if is_initial {
161             // TODO: be smarter about this logic
162             if frame.stream_id() > self.last_processed_id {
163                 self.last_processed_id = frame.stream_id();
164             }
165 
166             // Increment the number of concurrent streams
167             counts.inc_num_recv_streams(stream);
168         }
169 
170         if !stream.content_length.is_head() {
171             use super::stream::ContentLength;
172             use http::header;
173 
174             if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
175                 let content_length = match frame::parse_u64(content_length.as_bytes()) {
176                     Ok(v) => v,
177                     Err(()) => {
178                         proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
179                         return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
180                     }
181                 };
182 
183                 stream.content_length = ContentLength::Remaining(content_length);
184             }
185         }
186 
187         if frame.is_over_size() {
188             // A frame is over size if the decoded header block was bigger than
189             // SETTINGS_MAX_HEADER_LIST_SIZE.
190             //
191             // > A server that receives a larger header block than it is willing
192             // > to handle can send an HTTP 431 (Request Header Fields Too
193             // > Large) status code [RFC6585]. A client can discard responses
194             // > that it cannot process.
195             //
196             // So, if peer is a server, we'll send a 431. In either case,
197             // an error is recorded, which will send a REFUSED_STREAM,
198             // since we don't want any of the data frames either.
199             tracing::debug!(
200                 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
201                  recv_headers: frame is over size; stream={:?}",
202                 stream.id
203             );
204             return if counts.peer().is_server() && is_initial {
205                 let mut res = frame::Headers::new(
206                     stream.id,
207                     frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
208                     HeaderMap::new(),
209                 );
210                 res.set_end_stream();
211                 Err(RecvHeaderBlockError::Oversize(Some(res)))
212             } else {
213                 Err(RecvHeaderBlockError::Oversize(None))
214             };
215         }
216 
217         let stream_id = frame.stream_id();
218         let (pseudo, fields) = frame.into_parts();
219         if !pseudo.is_informational() {
220             let message = counts
221                 .peer()
222                 .convert_poll_message(pseudo, fields, stream_id)?;
223 
224             // Push the frame onto the stream's recv buffer
225             stream
226                 .pending_recv
227                 .push_back(&mut self.buffer, Event::Headers(message));
228             stream.notify_recv();
229         }
230 
231         // Only servers can receive a headers frame that initiates the stream.
232         // This is verified in `Streams` before calling this function.
233         if counts.peer().is_server() {
234             self.pending_accept.push(stream);
235         }
236 
237         Ok(())
238     }
239 
240     /// Called by the server to get the request
241     ///
242     /// TODO: Should this fn return `Result`?
take_request(&mut self, stream: &mut store::Ptr) -> Request<()>243     pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
244         use super::peer::PollMessage::*;
245 
246         match stream.pending_recv.pop_front(&mut self.buffer) {
247             Some(Event::Headers(Server(request))) => request,
248             _ => panic!(),
249         }
250     }
251 
252     /// Called by the client to get pushed response
poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>253     pub fn poll_pushed(
254         &mut self,
255         cx: &Context,
256         stream: &mut store::Ptr,
257     ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
258         use super::peer::PollMessage::*;
259 
260         let mut ppp = stream.pending_push_promises.take();
261         let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
262             match pushed.pending_recv.pop_front(&mut self.buffer) {
263                 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
264                 // When frames are pushed into the queue, it is verified that
265                 // the first frame is a HEADERS frame.
266                 _ => panic!("Headers not set on pushed stream"),
267             }
268         });
269         stream.pending_push_promises = ppp;
270         if let Some(p) = pushed {
271             Poll::Ready(Some(Ok(p)))
272         } else {
273             let is_open = stream.state.ensure_recv_open()?;
274 
275             if is_open {
276                 stream.recv_task = Some(cx.waker().clone());
277                 Poll::Pending
278             } else {
279                 Poll::Ready(None)
280             }
281         }
282     }
283 
284     /// Called by the client to get the response
poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>285     pub fn poll_response(
286         &mut self,
287         cx: &Context,
288         stream: &mut store::Ptr,
289     ) -> Poll<Result<Response<()>, proto::Error>> {
290         use super::peer::PollMessage::*;
291 
292         // If the buffer is not empty, then the first frame must be a HEADERS
293         // frame or the user violated the contract.
294         match stream.pending_recv.pop_front(&mut self.buffer) {
295             Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
296             Some(_) => panic!("poll_response called after response returned"),
297             None => {
298                 stream.state.ensure_recv_open()?;
299 
300                 stream.recv_task = Some(cx.waker().clone());
301                 Poll::Pending
302             }
303         }
304     }
305 
306     /// Transition the stream based on receiving trailers
recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), Error>307     pub fn recv_trailers(
308         &mut self,
309         frame: frame::Headers,
310         stream: &mut store::Ptr,
311     ) -> Result<(), Error> {
312         // Transition the state
313         stream.state.recv_close()?;
314 
315         if stream.ensure_content_length_zero().is_err() {
316             proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
317             return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
318         }
319 
320         let trailers = frame.into_fields();
321 
322         // Push the frame onto the stream's recv buffer
323         stream
324             .pending_recv
325             .push_back(&mut self.buffer, Event::Trailers(trailers));
326         stream.notify_recv();
327 
328         Ok(())
329     }
330 
331     /// Releases capacity of the connection
release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)332     pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
333         tracing::trace!(
334             "release_connection_capacity; size={}, connection in_flight_data={}",
335             capacity,
336             self.in_flight_data,
337         );
338 
339         // Decrement in-flight data
340         self.in_flight_data -= capacity;
341 
342         // Assign capacity to connection
343         self.flow.assign_capacity(capacity);
344 
345         if self.flow.unclaimed_capacity().is_some() {
346             if let Some(task) = task.take() {
347                 task.wake();
348             }
349         }
350     }
351 
352     /// Releases capacity back to the connection & stream
release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>353     pub fn release_capacity(
354         &mut self,
355         capacity: WindowSize,
356         stream: &mut store::Ptr,
357         task: &mut Option<Waker>,
358     ) -> Result<(), UserError> {
359         tracing::trace!("release_capacity; size={}", capacity);
360 
361         if capacity > stream.in_flight_recv_data {
362             return Err(UserError::ReleaseCapacityTooBig);
363         }
364 
365         self.release_connection_capacity(capacity, task);
366 
367         // Decrement in-flight data
368         stream.in_flight_recv_data -= capacity;
369 
370         // Assign capacity to stream
371         stream.recv_flow.assign_capacity(capacity);
372 
373         if stream.recv_flow.unclaimed_capacity().is_some() {
374             // Queue the stream for sending the WINDOW_UPDATE frame.
375             self.pending_window_updates.push(stream);
376 
377             if let Some(task) = task.take() {
378                 task.wake();
379             }
380         }
381 
382         Ok(())
383     }
384 
385     /// Release any unclaimed capacity for a closed stream.
release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)386     pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
387         debug_assert_eq!(stream.ref_count, 0);
388 
389         if stream.in_flight_recv_data == 0 {
390             return;
391         }
392 
393         tracing::trace!(
394             "auto-release closed stream ({:?}) capacity: {:?}",
395             stream.id,
396             stream.in_flight_recv_data,
397         );
398 
399         self.release_connection_capacity(stream.in_flight_recv_data, task);
400         stream.in_flight_recv_data = 0;
401 
402         self.clear_recv_buffer(stream);
403     }
404 
405     /// Set the "target" connection window size.
406     ///
407     /// By default, all new connections start with 64kb of window size. As
408     /// streams used and release capacity, we will send WINDOW_UPDATEs for the
409     /// connection to bring it back up to the initial "target".
410     ///
411     /// Setting a target means that we will try to tell the peer about
412     /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
413     /// for the whole connection.
414     ///
415     /// The `task` is an optional parked task for the `Connection` that might
416     /// be blocked on needing more window capacity.
set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>)417     pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
418         tracing::trace!(
419             "set_target_connection_window; target={}; available={}, reserved={}",
420             target,
421             self.flow.available(),
422             self.in_flight_data,
423         );
424 
425         // The current target connection window is our `available` plus any
426         // in-flight data reserved by streams.
427         //
428         // Update the flow controller with the difference between the new
429         // target and the current target.
430         let current = (self.flow.available() + self.in_flight_data).checked_size();
431         if target > current {
432             self.flow.assign_capacity(target - current);
433         } else {
434             self.flow.claim_capacity(current - target);
435         }
436 
437         // If changing the target capacity means we gained a bunch of capacity,
438         // enough that we went over the update threshold, then schedule sending
439         // a connection WINDOW_UPDATE.
440         if self.flow.unclaimed_capacity().is_some() {
441             if let Some(task) = task.take() {
442                 task.wake();
443             }
444         }
445     }
446 
apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), proto::Error>447     pub(crate) fn apply_local_settings(
448         &mut self,
449         settings: &frame::Settings,
450         store: &mut Store,
451     ) -> Result<(), proto::Error> {
452         let target = if let Some(val) = settings.initial_window_size() {
453             val
454         } else {
455             return Ok(());
456         };
457 
458         let old_sz = self.init_window_sz;
459         self.init_window_sz = target;
460 
461         tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
462 
463         // Per RFC 7540 §6.9.2:
464         //
465         // In addition to changing the flow-control window for streams that are
466         // not yet active, a SETTINGS frame can alter the initial flow-control
467         // window size for streams with active flow-control windows (that is,
468         // streams in the "open" or "half-closed (remote)" state). When the
469         // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
470         // the size of all stream flow-control windows that it maintains by the
471         // difference between the new value and the old value.
472         //
473         // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
474         // space in a flow-control window to become negative. A sender MUST
475         // track the negative flow-control window and MUST NOT send new
476         // flow-controlled frames until it receives WINDOW_UPDATE frames that
477         // cause the flow-control window to become positive.
478 
479         if target < old_sz {
480             // We must decrease the (local) window on every open stream.
481             let dec = old_sz - target;
482             tracing::trace!("decrementing all windows; dec={}", dec);
483 
484             store.for_each(|mut stream| {
485                 stream.recv_flow.dec_recv_window(dec);
486                 Ok(())
487             })
488         } else if target > old_sz {
489             // We must increase the (local) window on every open stream.
490             let inc = target - old_sz;
491             tracing::trace!("incrementing all windows; inc={}", inc);
492             store.for_each(|mut stream| {
493                 // XXX: Shouldn't the peer have already noticed our
494                 // overflow and sent us a GOAWAY?
495                 stream
496                     .recv_flow
497                     .inc_window(inc)
498                     .map_err(proto::Error::library_go_away)?;
499                 stream.recv_flow.assign_capacity(inc);
500                 Ok(())
501             })
502         } else {
503             // size is the same... so do nothing
504             Ok(())
505         }
506     }
507 
is_end_stream(&self, stream: &store::Ptr) -> bool508     pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
509         if !stream.state.is_recv_closed() {
510             return false;
511         }
512 
513         stream.pending_recv.is_empty()
514     }
515 
recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error>516     pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
517         let sz = frame.payload().len();
518 
519         // This should have been enforced at the codec::FramedRead layer, so
520         // this is just a sanity check.
521         assert!(sz <= MAX_WINDOW_SIZE as usize);
522 
523         let sz = sz as WindowSize;
524 
525         let is_ignoring_frame = stream.state.is_local_reset();
526 
527         if !is_ignoring_frame && !stream.state.is_recv_streaming() {
528             // TODO: There are cases where this can be a stream error of
529             // STREAM_CLOSED instead...
530 
531             // Receiving a DATA frame when not expecting one is a protocol
532             // error.
533             proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
534             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
535         }
536 
537         tracing::trace!(
538             "recv_data; size={}; connection={}; stream={}",
539             sz,
540             self.flow.window_size(),
541             stream.recv_flow.window_size()
542         );
543 
544         if is_ignoring_frame {
545             tracing::trace!(
546                 "recv_data; frame ignored on locally reset {:?} for some time",
547                 stream.id,
548             );
549             return Ok(self.ignore_data(sz)?);
550         }
551 
552         // Ensure that there is enough capacity on the connection before acting
553         // on the stream.
554         self.consume_connection_window(sz)?;
555 
556         if stream.recv_flow.window_size() < sz {
557             // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
558             // > A receiver MAY respond with a stream error (Section 5.4.2) or
559             // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
560             // > it is unable to accept a frame.
561             //
562             // So, for violating the **stream** window, we can send either a
563             // stream or connection error. We've opted to send a stream
564             // error.
565             return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
566         }
567 
568         if stream.dec_content_length(frame.payload().len()).is_err() {
569             proto_err!(stream:
570                 "recv_data: content-length overflow; stream={:?}; len={:?}",
571                 stream.id,
572                 frame.payload().len(),
573             );
574             return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
575         }
576 
577         if frame.is_end_stream() {
578             if stream.ensure_content_length_zero().is_err() {
579                 proto_err!(stream:
580                     "recv_data: content-length underflow; stream={:?}; len={:?}",
581                     stream.id,
582                     frame.payload().len(),
583                 );
584                 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
585             }
586 
587             if stream.state.recv_close().is_err() {
588                 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
589                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
590             }
591         }
592 
593         // Update stream level flow control
594         stream.recv_flow.send_data(sz);
595 
596         // Track the data as in-flight
597         stream.in_flight_recv_data += sz;
598 
599         let event = Event::Data(frame.into_payload());
600 
601         // Push the frame onto the recv buffer
602         stream.pending_recv.push_back(&mut self.buffer, event);
603         stream.notify_recv();
604 
605         Ok(())
606     }
607 
ignore_data(&mut self, sz: WindowSize) -> Result<(), Error>608     pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
609         // Ensure that there is enough capacity on the connection...
610         self.consume_connection_window(sz)?;
611 
612         // Since we are ignoring this frame,
613         // we aren't returning the frame to the user. That means they
614         // have no way to release the capacity back to the connection. So
615         // we have to release it automatically.
616         //
617         // This call doesn't send a WINDOW_UPDATE immediately, just marks
618         // the capacity as available to be reclaimed. When the available
619         // capacity meets a threshold, a WINDOW_UPDATE is then sent.
620         self.release_connection_capacity(sz, &mut None);
621         Ok(())
622     }
623 
consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error>624     pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
625         if self.flow.window_size() < sz {
626             tracing::debug!(
627                 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
628                 self.flow.window_size(),
629                 sz,
630             );
631             return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
632         }
633 
634         // Update connection level flow control
635         self.flow.send_data(sz);
636 
637         // Track the data as in-flight
638         self.in_flight_data += sz;
639         Ok(())
640     }
641 
recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), Error>642     pub fn recv_push_promise(
643         &mut self,
644         frame: frame::PushPromise,
645         stream: &mut store::Ptr,
646     ) -> Result<(), Error> {
647         stream.state.reserve_remote()?;
648         if frame.is_over_size() {
649             // A frame is over size if the decoded header block was bigger than
650             // SETTINGS_MAX_HEADER_LIST_SIZE.
651             //
652             // > A server that receives a larger header block than it is willing
653             // > to handle can send an HTTP 431 (Request Header Fields Too
654             // > Large) status code [RFC6585]. A client can discard responses
655             // > that it cannot process.
656             //
657             // So, if peer is a server, we'll send a 431. In either case,
658             // an error is recorded, which will send a REFUSED_STREAM,
659             // since we don't want any of the data frames either.
660             tracing::debug!(
661                 "stream error REFUSED_STREAM -- recv_push_promise: \
662                  headers frame is over size; promised_id={:?};",
663                 frame.promised_id(),
664             );
665             return Err(Error::library_reset(
666                 frame.promised_id(),
667                 Reason::REFUSED_STREAM,
668             ));
669         }
670 
671         let promised_id = frame.promised_id();
672         let (pseudo, fields) = frame.into_parts();
673         let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
674 
675         if let Err(e) = frame::PushPromise::validate_request(&req) {
676             use PushPromiseHeaderError::*;
677             match e {
678                 NotSafeAndCacheable => proto_err!(
679                     stream:
680                     "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
681                     req.method(),
682                     promised_id,
683                 ),
684                 InvalidContentLength(e) => proto_err!(
685                     stream:
686                     "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
687                     e,
688                     promised_id,
689                 ),
690             }
691             return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
692         }
693 
694         use super::peer::PollMessage::*;
695         stream
696             .pending_recv
697             .push_back(&mut self.buffer, Event::Headers(Server(req)));
698         stream.notify_recv();
699         Ok(())
700     }
701 
702     /// Ensures that `id` is not in the `Idle` state.
ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>703     pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
704         if let Ok(next) = self.next_stream_id {
705             if id >= next {
706                 tracing::debug!(
707                     "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
708                     id
709                 );
710                 return Err(Reason::PROTOCOL_ERROR);
711             }
712         }
713         // if next_stream_id is overflowed, that's ok.
714 
715         Ok(())
716     }
717 
718     /// Handle remote sending an explicit RST_STREAM.
recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream)719     pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
720         // Notify the stream
721         stream.state.recv_reset(frame, stream.is_pending_send);
722 
723         stream.notify_send();
724         stream.notify_recv();
725     }
726 
727     /// Handle a connection-level error
handle_error(&mut self, err: &proto::Error, stream: &mut Stream)728     pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
729         // Receive an error
730         stream.state.handle_error(err);
731 
732         // If a receiver is waiting, notify it
733         stream.notify_send();
734         stream.notify_recv();
735     }
736 
go_away(&mut self, last_processed_id: StreamId)737     pub fn go_away(&mut self, last_processed_id: StreamId) {
738         assert!(self.max_stream_id >= last_processed_id);
739         self.max_stream_id = last_processed_id;
740     }
741 
recv_eof(&mut self, stream: &mut Stream)742     pub fn recv_eof(&mut self, stream: &mut Stream) {
743         stream.state.recv_eof();
744         stream.notify_send();
745         stream.notify_recv();
746     }
747 
clear_recv_buffer(&mut self, stream: &mut Stream)748     pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
749         while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) {
750             // drop it
751         }
752     }
753 
754     /// Get the max ID of streams we can receive.
755     ///
756     /// This gets lowered if we send a GOAWAY frame.
max_stream_id(&self) -> StreamId757     pub fn max_stream_id(&self) -> StreamId {
758         self.max_stream_id
759     }
760 
next_stream_id(&self) -> Result<StreamId, Error>761     pub fn next_stream_id(&self) -> Result<StreamId, Error> {
762         if let Ok(id) = self.next_stream_id {
763             Ok(id)
764         } else {
765             Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
766         }
767     }
768 
may_have_created_stream(&self, id: StreamId) -> bool769     pub fn may_have_created_stream(&self, id: StreamId) -> bool {
770         if let Ok(next_id) = self.next_stream_id {
771             // Peer::is_local_init should have been called beforehand
772             debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
773             id < next_id
774         } else {
775             true
776         }
777     }
778 
779     /// Returns true if the remote peer can reserve a stream with the given ID.
ensure_can_reserve(&self) -> Result<(), Error>780     pub fn ensure_can_reserve(&self) -> Result<(), Error> {
781         if !self.is_push_enabled {
782             proto_err!(conn: "recv_push_promise: push is disabled");
783             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
784         }
785 
786         Ok(())
787     }
788 
789     /// Add a locally reset stream to queue to be eventually reaped.
enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)790     pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
791         if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() {
792             return;
793         }
794 
795         tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
796 
797         if !counts.can_inc_num_reset_streams() {
798             // try to evict 1 stream if possible
799             // if max allow is 0, this won't be able to evict,
800             // and then we'll just bail after
801             if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) {
802                 counts.transition_after(evicted, true);
803             }
804         }
805 
806         if counts.can_inc_num_reset_streams() {
807             counts.inc_num_reset_streams();
808             self.pending_reset_expired.push(stream);
809         }
810     }
811 
812     /// Send any pending refusals.
send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,813     pub fn send_pending_refusal<T, B>(
814         &mut self,
815         cx: &mut Context,
816         dst: &mut Codec<T, Prioritized<B>>,
817     ) -> Poll<io::Result<()>>
818     where
819         T: AsyncWrite + Unpin,
820         B: Buf,
821     {
822         if let Some(stream_id) = self.refused {
823             ready!(dst.poll_ready(cx))?;
824 
825             // Create the RST_STREAM frame
826             let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
827 
828             // Buffer the frame
829             dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
830         }
831 
832         self.refused = None;
833 
834         Poll::Ready(Ok(()))
835     }
836 
clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)837     pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
838         if !self.pending_reset_expired.is_empty() {
839             let now = Instant::now();
840             let reset_duration = self.reset_duration;
841             while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
842                 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
843                 now - reset_at > reset_duration
844             }) {
845                 counts.transition_after(stream, true);
846             }
847         }
848     }
849 
clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )850     pub fn clear_queues(
851         &mut self,
852         clear_pending_accept: bool,
853         store: &mut Store,
854         counts: &mut Counts,
855     ) {
856         self.clear_stream_window_update_queue(store, counts);
857         self.clear_all_reset_streams(store, counts);
858 
859         if clear_pending_accept {
860             self.clear_all_pending_accept(store, counts);
861         }
862     }
863 
clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)864     fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
865         while let Some(stream) = self.pending_window_updates.pop(store) {
866             counts.transition(stream, |_, stream| {
867                 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
868             })
869         }
870     }
871 
872     /// Called on EOF
clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)873     fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
874         while let Some(stream) = self.pending_reset_expired.pop(store) {
875             counts.transition_after(stream, true);
876         }
877     }
878 
clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)879     fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
880         while let Some(stream) = self.pending_accept.pop(store) {
881             counts.transition_after(stream, false);
882         }
883     }
884 
poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,885     pub fn poll_complete<T, B>(
886         &mut self,
887         cx: &mut Context,
888         store: &mut Store,
889         counts: &mut Counts,
890         dst: &mut Codec<T, Prioritized<B>>,
891     ) -> Poll<io::Result<()>>
892     where
893         T: AsyncWrite + Unpin,
894         B: Buf,
895     {
896         // Send any pending connection level window updates
897         ready!(self.send_connection_window_update(cx, dst))?;
898 
899         // Send any pending stream level window updates
900         ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
901 
902         Poll::Ready(Ok(()))
903     }
904 
905     /// Send connection level window update
send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,906     fn send_connection_window_update<T, B>(
907         &mut self,
908         cx: &mut Context,
909         dst: &mut Codec<T, Prioritized<B>>,
910     ) -> Poll<io::Result<()>>
911     where
912         T: AsyncWrite + Unpin,
913         B: Buf,
914     {
915         if let Some(incr) = self.flow.unclaimed_capacity() {
916             let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
917 
918             // Ensure the codec has capacity
919             ready!(dst.poll_ready(cx))?;
920 
921             // Buffer the WINDOW_UPDATE frame
922             dst.buffer(frame.into())
923                 .expect("invalid WINDOW_UPDATE frame");
924 
925             // Update flow control
926             self.flow
927                 .inc_window(incr)
928                 .expect("unexpected flow control state");
929         }
930 
931         Poll::Ready(Ok(()))
932     }
933 
934     /// Send stream level window update
send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,935     pub fn send_stream_window_updates<T, B>(
936         &mut self,
937         cx: &mut Context,
938         store: &mut Store,
939         counts: &mut Counts,
940         dst: &mut Codec<T, Prioritized<B>>,
941     ) -> Poll<io::Result<()>>
942     where
943         T: AsyncWrite + Unpin,
944         B: Buf,
945     {
946         loop {
947             // Ensure the codec has capacity
948             ready!(dst.poll_ready(cx))?;
949 
950             // Get the next stream
951             let stream = match self.pending_window_updates.pop(store) {
952                 Some(stream) => stream,
953                 None => return Poll::Ready(Ok(())),
954             };
955 
956             counts.transition(stream, |_, stream| {
957                 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
958                 debug_assert!(!stream.is_pending_window_update);
959 
960                 if !stream.state.is_recv_streaming() {
961                     // No need to send window updates on the stream if the stream is
962                     // no longer receiving data.
963                     //
964                     // TODO: is this correct? We could possibly send a window
965                     // update on a ReservedRemote stream if we already know
966                     // we want to stream the data faster...
967                     return;
968                 }
969 
970                 // TODO: de-dup
971                 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
972                     // Create the WINDOW_UPDATE frame
973                     let frame = frame::WindowUpdate::new(stream.id, incr);
974 
975                     // Buffer it
976                     dst.buffer(frame.into())
977                         .expect("invalid WINDOW_UPDATE frame");
978 
979                     // Update flow control
980                     stream
981                         .recv_flow
982                         .inc_window(incr)
983                         .expect("unexpected flow control state");
984                 }
985             })
986         }
987     }
988 
next_incoming(&mut self, store: &mut Store) -> Option<store::Key>989     pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
990         self.pending_accept.pop(store).map(|ptr| ptr.key())
991     }
992 
poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>993     pub fn poll_data(
994         &mut self,
995         cx: &Context,
996         stream: &mut Stream,
997     ) -> Poll<Option<Result<Bytes, proto::Error>>> {
998         // TODO: Return error when the stream is reset
999         match stream.pending_recv.pop_front(&mut self.buffer) {
1000             Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1001             Some(event) => {
1002                 // Frame is trailer
1003                 stream.pending_recv.push_front(&mut self.buffer, event);
1004 
1005                 // Notify the recv task. This is done just in case
1006                 // `poll_trailers` was called.
1007                 //
1008                 // It is very likely that `notify_recv` will just be a no-op (as
1009                 // the task will be None), so this isn't really much of a
1010                 // performance concern. It also means we don't have to track
1011                 // state to see if `poll_trailers` was called before `poll_data`
1012                 // returned `None`.
1013                 stream.notify_recv();
1014 
1015                 // No more data frames
1016                 Poll::Ready(None)
1017             }
1018             None => self.schedule_recv(cx, stream),
1019         }
1020     }
1021 
poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1022     pub fn poll_trailers(
1023         &mut self,
1024         cx: &Context,
1025         stream: &mut Stream,
1026     ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1027         match stream.pending_recv.pop_front(&mut self.buffer) {
1028             Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1029             Some(event) => {
1030                 // Frame is not trailers.. not ready to poll trailers yet.
1031                 stream.pending_recv.push_front(&mut self.buffer, event);
1032 
1033                 Poll::Pending
1034             }
1035             None => self.schedule_recv(cx, stream),
1036         }
1037     }
1038 
schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1039     fn schedule_recv<T>(
1040         &mut self,
1041         cx: &Context,
1042         stream: &mut Stream,
1043     ) -> Poll<Option<Result<T, proto::Error>>> {
1044         if stream.state.ensure_recv_open()? {
1045             // Request to get notified once more frames arrive
1046             stream.recv_task = Some(cx.waker().clone());
1047             Poll::Pending
1048         } else {
1049             // No more frames will be received
1050             Poll::Ready(None)
1051         }
1052     }
1053 }
1054 
1055 // ===== impl Open =====
1056 
1057 impl Open {
is_push_promise(&self) -> bool1058     pub fn is_push_promise(&self) -> bool {
1059         use self::Open::*;
1060 
1061         match *self {
1062             PushPromise => true,
1063             _ => false,
1064         }
1065     }
1066 }
1067 
1068 // ===== impl RecvHeaderBlockError =====
1069 
1070 impl<T> From<Error> for RecvHeaderBlockError<T> {
from(err: Error) -> Self1071     fn from(err: Error) -> Self {
1072         RecvHeaderBlockError::State(err)
1073     }
1074 }
1075