1 use super::*;
2 use crate::codec::{RecvError, UserError};
3 use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4 use crate::{frame, proto};
5 use std::task::Context;
6 
7 use http::{HeaderMap, Request, Response};
8 
9 use std::io;
10 use std::task::{Poll, Waker};
11 use std::time::{Duration, Instant};
12 
13 #[derive(Debug)]
14 pub(super) struct Recv {
15     /// Initial window size of remote initiated streams
16     init_window_sz: WindowSize,
17 
18     /// Connection level flow control governing received data
19     flow: FlowControl,
20 
21     /// Amount of connection window capacity currently used by outstanding streams.
22     in_flight_data: WindowSize,
23 
24     /// The lowest stream ID that is still idle
25     next_stream_id: Result<StreamId, StreamIdOverflow>,
26 
27     /// The stream ID of the last processed stream
28     last_processed_id: StreamId,
29 
30     /// Any streams with a higher ID are ignored.
31     ///
32     /// This starts as MAX, but is lowered when a GOAWAY is received.
33     ///
34     /// > After sending a GOAWAY frame, the sender can discard frames for
35     /// > streams initiated by the receiver with identifiers higher than
36     /// > the identified last stream.
37     max_stream_id: StreamId,
38 
39     /// Streams that have pending window updates
40     pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41 
42     /// New streams to be accepted
43     pending_accept: store::Queue<stream::NextAccept>,
44 
45     /// Locally reset streams that should be reaped when they expire
46     pending_reset_expired: store::Queue<stream::NextResetExpire>,
47 
48     /// How long locally reset streams should ignore received frames
49     reset_duration: Duration,
50 
51     /// Holds frames that are waiting to be read
52     buffer: Buffer<Event>,
53 
54     /// Refused StreamId, this represents a frame that must be sent out.
55     refused: Option<StreamId>,
56 
57     /// If push promises are allowed to be recevied.
58     is_push_enabled: bool,
59 }
60 
61 #[derive(Debug)]
62 pub(super) enum Event {
63     Headers(peer::PollMessage),
64     Data(Bytes),
65     Trailers(HeaderMap),
66 }
67 
68 #[derive(Debug)]
69 pub(super) enum RecvHeaderBlockError<T> {
70     Oversize(T),
71     State(RecvError),
72 }
73 
74 #[derive(Debug)]
75 pub(crate) enum Open {
76     PushPromise,
77     Headers,
78 }
79 
80 #[derive(Debug, Clone, Copy)]
81 struct Indices {
82     head: store::Key,
83     tail: store::Key,
84 }
85 
86 impl Recv {
new(peer: peer::Dyn, config: &Config) -> Self87     pub fn new(peer: peer::Dyn, config: &Config) -> Self {
88         let next_stream_id = if peer.is_server() { 1 } else { 2 };
89 
90         let mut flow = FlowControl::new();
91 
92         // connections always have the default window size, regardless of
93         // settings
94         flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
95             .expect("invalid initial remote window size");
96         flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
97 
98         Recv {
99             init_window_sz: config.local_init_window_sz,
100             flow,
101             in_flight_data: 0 as WindowSize,
102             next_stream_id: Ok(next_stream_id.into()),
103             pending_window_updates: store::Queue::new(),
104             last_processed_id: StreamId::ZERO,
105             max_stream_id: StreamId::MAX,
106             pending_accept: store::Queue::new(),
107             pending_reset_expired: store::Queue::new(),
108             reset_duration: config.local_reset_duration,
109             buffer: Buffer::new(),
110             refused: None,
111             is_push_enabled: config.local_push_enabled,
112         }
113     }
114 
115     /// Returns the initial receive window size
init_window_sz(&self) -> WindowSize116     pub fn init_window_sz(&self) -> WindowSize {
117         self.init_window_sz
118     }
119 
120     /// Returns the ID of the last processed stream
last_processed_id(&self) -> StreamId121     pub fn last_processed_id(&self) -> StreamId {
122         self.last_processed_id
123     }
124 
125     /// Update state reflecting a new, remotely opened stream
126     ///
127     /// Returns the stream state if successful. `None` if refused
open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, RecvError>128     pub fn open(
129         &mut self,
130         id: StreamId,
131         mode: Open,
132         counts: &mut Counts,
133     ) -> Result<Option<StreamId>, RecvError> {
134         assert!(self.refused.is_none());
135 
136         counts.peer().ensure_can_open(id, mode)?;
137 
138         let next_id = self.next_stream_id()?;
139         if id < next_id {
140             proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
141             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
142         }
143 
144         self.next_stream_id = id.next_id();
145 
146         if !counts.can_inc_num_recv_streams() {
147             self.refused = Some(id);
148             return Ok(None);
149         }
150 
151         Ok(Some(id))
152     }
153 
154     /// Transition the stream state based on receiving headers
155     ///
156     /// The caller ensures that the frame represents headers and not trailers.
recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>157     pub fn recv_headers(
158         &mut self,
159         frame: frame::Headers,
160         stream: &mut store::Ptr,
161         counts: &mut Counts,
162     ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
163         log::trace!("opening stream; init_window={}", self.init_window_sz);
164         let is_initial = stream.state.recv_open(frame.is_end_stream())?;
165 
166         if is_initial {
167             // TODO: be smarter about this logic
168             if frame.stream_id() > self.last_processed_id {
169                 self.last_processed_id = frame.stream_id();
170             }
171 
172             // Increment the number of concurrent streams
173             counts.inc_num_recv_streams(stream);
174         }
175 
176         if !stream.content_length.is_head() {
177             use super::stream::ContentLength;
178             use http::header;
179 
180             if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
181                 let content_length = match frame::parse_u64(content_length.as_bytes()) {
182                     Ok(v) => v,
183                     Err(()) => {
184                         proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
185                         return Err(RecvError::Stream {
186                             id: stream.id,
187                             reason: Reason::PROTOCOL_ERROR,
188                         }
189                         .into());
190                     }
191                 };
192 
193                 stream.content_length = ContentLength::Remaining(content_length);
194             }
195         }
196 
197         if frame.is_over_size() {
198             // A frame is over size if the decoded header block was bigger than
199             // SETTINGS_MAX_HEADER_LIST_SIZE.
200             //
201             // > A server that receives a larger header block than it is willing
202             // > to handle can send an HTTP 431 (Request Header Fields Too
203             // > Large) status code [RFC6585]. A client can discard responses
204             // > that it cannot process.
205             //
206             // So, if peer is a server, we'll send a 431. In either case,
207             // an error is recorded, which will send a REFUSED_STREAM,
208             // since we don't want any of the data frames either.
209             log::debug!(
210                 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
211                  recv_headers: frame is over size; stream={:?}",
212                 stream.id
213             );
214             return if counts.peer().is_server() && is_initial {
215                 let mut res = frame::Headers::new(
216                     stream.id,
217                     frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
218                     HeaderMap::new(),
219                 );
220                 res.set_end_stream();
221                 Err(RecvHeaderBlockError::Oversize(Some(res)))
222             } else {
223                 Err(RecvHeaderBlockError::Oversize(None))
224             };
225         }
226 
227         let stream_id = frame.stream_id();
228         let (pseudo, fields) = frame.into_parts();
229         let message = counts
230             .peer()
231             .convert_poll_message(pseudo, fields, stream_id)?;
232 
233         // Push the frame onto the stream's recv buffer
234         stream
235             .pending_recv
236             .push_back(&mut self.buffer, Event::Headers(message));
237         stream.notify_recv();
238 
239         // Only servers can receive a headers frame that initiates the stream.
240         // This is verified in `Streams` before calling this function.
241         if counts.peer().is_server() {
242             self.pending_accept.push(stream);
243         }
244 
245         Ok(())
246     }
247 
248     /// Called by the server to get the request
249     ///
250     /// TODO: Should this fn return `Result`?
take_request(&mut self, stream: &mut store::Ptr) -> Request<()>251     pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
252         use super::peer::PollMessage::*;
253 
254         match stream.pending_recv.pop_front(&mut self.buffer) {
255             Some(Event::Headers(Server(request))) => request,
256             _ => panic!(),
257         }
258     }
259 
260     /// Called by the client to get pushed response
poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>261     pub fn poll_pushed(
262         &mut self,
263         cx: &Context,
264         stream: &mut store::Ptr,
265     ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
266         use super::peer::PollMessage::*;
267 
268         let mut ppp = stream.pending_push_promises.take();
269         let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
270             match pushed.pending_recv.pop_front(&mut self.buffer) {
271                 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
272                 // When frames are pushed into the queue, it is verified that
273                 // the first frame is a HEADERS frame.
274                 _ => panic!("Headers not set on pushed stream"),
275             }
276         });
277         stream.pending_push_promises = ppp;
278         if let Some(p) = pushed {
279             Poll::Ready(Some(Ok(p)))
280         } else {
281             let is_open = stream.state.ensure_recv_open()?;
282 
283             if is_open {
284                 stream.recv_task = Some(cx.waker().clone());
285                 Poll::Pending
286             } else {
287                 Poll::Ready(None)
288             }
289         }
290     }
291 
292     /// Called by the client to get the response
poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>293     pub fn poll_response(
294         &mut self,
295         cx: &Context,
296         stream: &mut store::Ptr,
297     ) -> Poll<Result<Response<()>, proto::Error>> {
298         use super::peer::PollMessage::*;
299 
300         // If the buffer is not empty, then the first frame must be a HEADERS
301         // frame or the user violated the contract.
302         match stream.pending_recv.pop_front(&mut self.buffer) {
303             Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
304             Some(_) => panic!("poll_response called after response returned"),
305             None => {
306                 stream.state.ensure_recv_open()?;
307 
308                 stream.recv_task = Some(cx.waker().clone());
309                 Poll::Pending
310             }
311         }
312     }
313 
314     /// Transition the stream based on receiving trailers
recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), RecvError>315     pub fn recv_trailers(
316         &mut self,
317         frame: frame::Headers,
318         stream: &mut store::Ptr,
319     ) -> Result<(), RecvError> {
320         // Transition the state
321         stream.state.recv_close()?;
322 
323         if stream.ensure_content_length_zero().is_err() {
324             proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};",  stream.id);
325             return Err(RecvError::Stream {
326                 id: stream.id,
327                 reason: Reason::PROTOCOL_ERROR,
328             });
329         }
330 
331         let trailers = frame.into_fields();
332 
333         // Push the frame onto the stream's recv buffer
334         stream
335             .pending_recv
336             .push_back(&mut self.buffer, Event::Trailers(trailers));
337         stream.notify_recv();
338 
339         Ok(())
340     }
341 
342     /// Releases capacity of the connection
release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)343     pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
344         log::trace!(
345             "release_connection_capacity; size={}, connection in_flight_data={}",
346             capacity,
347             self.in_flight_data,
348         );
349 
350         // Decrement in-flight data
351         self.in_flight_data -= capacity;
352 
353         // Assign capacity to connection
354         self.flow.assign_capacity(capacity);
355 
356         if self.flow.unclaimed_capacity().is_some() {
357             if let Some(task) = task.take() {
358                 task.wake();
359             }
360         }
361     }
362 
363     /// Releases capacity back to the connection & stream
release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>364     pub fn release_capacity(
365         &mut self,
366         capacity: WindowSize,
367         stream: &mut store::Ptr,
368         task: &mut Option<Waker>,
369     ) -> Result<(), UserError> {
370         log::trace!("release_capacity; size={}", capacity);
371 
372         if capacity > stream.in_flight_recv_data {
373             return Err(UserError::ReleaseCapacityTooBig);
374         }
375 
376         self.release_connection_capacity(capacity, task);
377 
378         // Decrement in-flight data
379         stream.in_flight_recv_data -= capacity;
380 
381         // Assign capacity to stream
382         stream.recv_flow.assign_capacity(capacity);
383 
384         if stream.recv_flow.unclaimed_capacity().is_some() {
385             // Queue the stream for sending the WINDOW_UPDATE frame.
386             self.pending_window_updates.push(stream);
387 
388             if let Some(task) = task.take() {
389                 task.wake();
390             }
391         }
392 
393         Ok(())
394     }
395 
396     /// Release any unclaimed capacity for a closed stream.
release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)397     pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
398         debug_assert_eq!(stream.ref_count, 0);
399 
400         if stream.in_flight_recv_data == 0 {
401             return;
402         }
403 
404         log::trace!(
405             "auto-release closed stream ({:?}) capacity: {:?}",
406             stream.id,
407             stream.in_flight_recv_data,
408         );
409 
410         self.release_connection_capacity(stream.in_flight_recv_data, task);
411         stream.in_flight_recv_data = 0;
412 
413         self.clear_recv_buffer(stream);
414     }
415 
416     /// Set the "target" connection window size.
417     ///
418     /// By default, all new connections start with 64kb of window size. As
419     /// streams used and release capacity, we will send WINDOW_UPDATEs for the
420     /// connection to bring it back up to the initial "target".
421     ///
422     /// Setting a target means that we will try to tell the peer about
423     /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
424     /// for the whole connection.
425     ///
426     /// The `task` is an optional parked task for the `Connection` that might
427     /// be blocked on needing more window capacity.
set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>)428     pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
429         log::trace!(
430             "set_target_connection_window; target={}; available={}, reserved={}",
431             target,
432             self.flow.available(),
433             self.in_flight_data,
434         );
435 
436         // The current target connection window is our `available` plus any
437         // in-flight data reserved by streams.
438         //
439         // Update the flow controller with the difference between the new
440         // target and the current target.
441         let current = (self.flow.available() + self.in_flight_data).checked_size();
442         if target > current {
443             self.flow.assign_capacity(target - current);
444         } else {
445             self.flow.claim_capacity(current - target);
446         }
447 
448         // If changing the target capacity means we gained a bunch of capacity,
449         // enough that we went over the update threshold, then schedule sending
450         // a connection WINDOW_UPDATE.
451         if self.flow.unclaimed_capacity().is_some() {
452             if let Some(task) = task.take() {
453                 task.wake();
454             }
455         }
456     }
457 
apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), RecvError>458     pub(crate) fn apply_local_settings(
459         &mut self,
460         settings: &frame::Settings,
461         store: &mut Store,
462     ) -> Result<(), RecvError> {
463         let target = if let Some(val) = settings.initial_window_size() {
464             val
465         } else {
466             return Ok(());
467         };
468 
469         let old_sz = self.init_window_sz;
470         self.init_window_sz = target;
471 
472         log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
473 
474         // Per RFC 7540 §6.9.2:
475         //
476         // In addition to changing the flow-control window for streams that are
477         // not yet active, a SETTINGS frame can alter the initial flow-control
478         // window size for streams with active flow-control windows (that is,
479         // streams in the "open" or "half-closed (remote)" state). When the
480         // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
481         // the size of all stream flow-control windows that it maintains by the
482         // difference between the new value and the old value.
483         //
484         // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
485         // space in a flow-control window to become negative. A sender MUST
486         // track the negative flow-control window and MUST NOT send new
487         // flow-controlled frames until it receives WINDOW_UPDATE frames that
488         // cause the flow-control window to become positive.
489 
490         if target < old_sz {
491             // We must decrease the (local) window on every open stream.
492             let dec = old_sz - target;
493             log::trace!("decrementing all windows; dec={}", dec);
494 
495             store.for_each(|mut stream| {
496                 stream.recv_flow.dec_recv_window(dec);
497                 Ok(())
498             })
499         } else if target > old_sz {
500             // We must increase the (local) window on every open stream.
501             let inc = target - old_sz;
502             log::trace!("incrementing all windows; inc={}", inc);
503             store.for_each(|mut stream| {
504                 // XXX: Shouldn't the peer have already noticed our
505                 // overflow and sent us a GOAWAY?
506                 stream
507                     .recv_flow
508                     .inc_window(inc)
509                     .map_err(RecvError::Connection)?;
510                 stream.recv_flow.assign_capacity(inc);
511                 Ok(())
512             })
513         } else {
514             // size is the same... so do nothing
515             Ok(())
516         }
517     }
518 
is_end_stream(&self, stream: &store::Ptr) -> bool519     pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
520         if !stream.state.is_recv_closed() {
521             return false;
522         }
523 
524         stream.pending_recv.is_empty()
525     }
526 
recv_data( &mut self, frame: frame::Data, stream: &mut store::Ptr, ) -> Result<(), RecvError>527     pub fn recv_data(
528         &mut self,
529         frame: frame::Data,
530         stream: &mut store::Ptr,
531     ) -> Result<(), RecvError> {
532         let sz = frame.payload().len();
533 
534         // This should have been enforced at the codec::FramedRead layer, so
535         // this is just a sanity check.
536         assert!(sz <= MAX_WINDOW_SIZE as usize);
537 
538         let sz = sz as WindowSize;
539 
540         let is_ignoring_frame = stream.state.is_local_reset();
541 
542         if !is_ignoring_frame && !stream.state.is_recv_streaming() {
543             // TODO: There are cases where this can be a stream error of
544             // STREAM_CLOSED instead...
545 
546             // Receiving a DATA frame when not expecting one is a protocol
547             // error.
548             proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
549             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
550         }
551 
552         log::trace!(
553             "recv_data; size={}; connection={}; stream={}",
554             sz,
555             self.flow.window_size(),
556             stream.recv_flow.window_size()
557         );
558 
559         if is_ignoring_frame {
560             log::trace!(
561                 "recv_data; frame ignored on locally reset {:?} for some time",
562                 stream.id,
563             );
564             return self.ignore_data(sz);
565         }
566 
567         // Ensure that there is enough capacity on the connection before acting
568         // on the stream.
569         self.consume_connection_window(sz)?;
570 
571         if stream.recv_flow.window_size() < sz {
572             // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
573             // > A receiver MAY respond with a stream error (Section 5.4.2) or
574             // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
575             // > it is unable to accept a frame.
576             //
577             // So, for violating the **stream** window, we can send either a
578             // stream or connection error. We've opted to send a stream
579             // error.
580             return Err(RecvError::Stream {
581                 id: stream.id,
582                 reason: Reason::FLOW_CONTROL_ERROR,
583             });
584         }
585 
586         if stream.dec_content_length(frame.payload().len()).is_err() {
587             proto_err!(stream:
588                 "recv_data: content-length overflow; stream={:?}; len={:?}",
589                 stream.id,
590                 frame.payload().len(),
591             );
592             return Err(RecvError::Stream {
593                 id: stream.id,
594                 reason: Reason::PROTOCOL_ERROR,
595             });
596         }
597 
598         if frame.is_end_stream() {
599             if stream.ensure_content_length_zero().is_err() {
600                 proto_err!(stream:
601                     "recv_data: content-length underflow; stream={:?}; len={:?}",
602                     stream.id,
603                     frame.payload().len(),
604                 );
605                 return Err(RecvError::Stream {
606                     id: stream.id,
607                     reason: Reason::PROTOCOL_ERROR,
608                 });
609             }
610 
611             if stream.state.recv_close().is_err() {
612                 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
613                 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
614             }
615         }
616 
617         // Update stream level flow control
618         stream.recv_flow.send_data(sz);
619 
620         // Track the data as in-flight
621         stream.in_flight_recv_data += sz;
622 
623         let event = Event::Data(frame.into_payload());
624 
625         // Push the frame onto the recv buffer
626         stream.pending_recv.push_back(&mut self.buffer, event);
627         stream.notify_recv();
628 
629         Ok(())
630     }
631 
ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError>632     pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> {
633         // Ensure that there is enough capacity on the connection...
634         self.consume_connection_window(sz)?;
635 
636         // Since we are ignoring this frame,
637         // we aren't returning the frame to the user. That means they
638         // have no way to release the capacity back to the connection. So
639         // we have to release it automatically.
640         //
641         // This call doesn't send a WINDOW_UPDATE immediately, just marks
642         // the capacity as available to be reclaimed. When the available
643         // capacity meets a threshold, a WINDOW_UPDATE is then sent.
644         self.release_connection_capacity(sz, &mut None);
645         Ok(())
646     }
647 
consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError>648     pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
649         if self.flow.window_size() < sz {
650             log::debug!(
651                 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
652                 self.flow.window_size(),
653                 sz,
654             );
655             return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR));
656         }
657 
658         // Update connection level flow control
659         self.flow.send_data(sz);
660 
661         // Track the data as in-flight
662         self.in_flight_data += sz;
663         Ok(())
664     }
665 
recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), RecvError>666     pub fn recv_push_promise(
667         &mut self,
668         frame: frame::PushPromise,
669         stream: &mut store::Ptr,
670     ) -> Result<(), RecvError> {
671         stream.state.reserve_remote()?;
672         if frame.is_over_size() {
673             // A frame is over size if the decoded header block was bigger than
674             // SETTINGS_MAX_HEADER_LIST_SIZE.
675             //
676             // > A server that receives a larger header block than it is willing
677             // > to handle can send an HTTP 431 (Request Header Fields Too
678             // > Large) status code [RFC6585]. A client can discard responses
679             // > that it cannot process.
680             //
681             // So, if peer is a server, we'll send a 431. In either case,
682             // an error is recorded, which will send a REFUSED_STREAM,
683             // since we don't want any of the data frames either.
684             log::debug!(
685                 "stream error REFUSED_STREAM -- recv_push_promise: \
686                  headers frame is over size; promised_id={:?};",
687                 frame.promised_id(),
688             );
689             return Err(RecvError::Stream {
690                 id: frame.promised_id(),
691                 reason: Reason::REFUSED_STREAM,
692             });
693         }
694 
695         let promised_id = frame.promised_id();
696         let (pseudo, fields) = frame.into_parts();
697         let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
698 
699         if let Err(e) = frame::PushPromise::validate_request(&req) {
700             use PushPromiseHeaderError::*;
701             match e {
702                 NotSafeAndCacheable => proto_err!(
703                     stream:
704                     "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
705                     req.method(),
706                     promised_id,
707                 ),
708                 InvalidContentLength(e) => proto_err!(
709                     stream:
710                     "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
711                     e,
712                     promised_id,
713                 ),
714             }
715             return Err(RecvError::Stream {
716                 id: promised_id,
717                 reason: Reason::PROTOCOL_ERROR,
718             });
719         }
720 
721         use super::peer::PollMessage::*;
722         stream
723             .pending_recv
724             .push_back(&mut self.buffer, Event::Headers(Server(req)));
725         stream.notify_recv();
726         Ok(())
727     }
728 
729     /// Ensures that `id` is not in the `Idle` state.
ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>730     pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
731         if let Ok(next) = self.next_stream_id {
732             if id >= next {
733                 log::debug!(
734                     "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
735                     id
736                 );
737                 return Err(Reason::PROTOCOL_ERROR);
738             }
739         }
740         // if next_stream_id is overflowed, that's ok.
741 
742         Ok(())
743     }
744 
745     /// Handle remote sending an explicit RST_STREAM.
recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream)746     pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
747         // Notify the stream
748         stream
749             .state
750             .recv_reset(frame.reason(), stream.is_pending_send);
751 
752         stream.notify_send();
753         stream.notify_recv();
754     }
755 
756     /// Handle a received error
recv_err(&mut self, err: &proto::Error, stream: &mut Stream)757     pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) {
758         // Receive an error
759         stream.state.recv_err(err);
760 
761         // If a receiver is waiting, notify it
762         stream.notify_send();
763         stream.notify_recv();
764     }
765 
go_away(&mut self, last_processed_id: StreamId)766     pub fn go_away(&mut self, last_processed_id: StreamId) {
767         assert!(self.max_stream_id >= last_processed_id);
768         self.max_stream_id = last_processed_id;
769     }
770 
recv_eof(&mut self, stream: &mut Stream)771     pub fn recv_eof(&mut self, stream: &mut Stream) {
772         stream.state.recv_eof();
773         stream.notify_send();
774         stream.notify_recv();
775     }
776 
clear_recv_buffer(&mut self, stream: &mut Stream)777     pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
778         while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) {
779             // drop it
780         }
781     }
782 
783     /// Get the max ID of streams we can receive.
784     ///
785     /// This gets lowered if we send a GOAWAY frame.
max_stream_id(&self) -> StreamId786     pub fn max_stream_id(&self) -> StreamId {
787         self.max_stream_id
788     }
789 
next_stream_id(&self) -> Result<StreamId, RecvError>790     pub fn next_stream_id(&self) -> Result<StreamId, RecvError> {
791         if let Ok(id) = self.next_stream_id {
792             Ok(id)
793         } else {
794             Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
795         }
796     }
797 
may_have_created_stream(&self, id: StreamId) -> bool798     pub fn may_have_created_stream(&self, id: StreamId) -> bool {
799         if let Ok(next_id) = self.next_stream_id {
800             // Peer::is_local_init should have been called beforehand
801             debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
802             id < next_id
803         } else {
804             true
805         }
806     }
807 
808     /// Returns true if the remote peer can reserve a stream with the given ID.
ensure_can_reserve(&self) -> Result<(), RecvError>809     pub fn ensure_can_reserve(&self) -> Result<(), RecvError> {
810         if !self.is_push_enabled {
811             proto_err!(conn: "recv_push_promise: push is disabled");
812             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
813         }
814 
815         Ok(())
816     }
817 
818     /// Add a locally reset stream to queue to be eventually reaped.
enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)819     pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
820         if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() {
821             return;
822         }
823 
824         log::trace!("enqueue_reset_expiration; {:?}", stream.id);
825 
826         if !counts.can_inc_num_reset_streams() {
827             // try to evict 1 stream if possible
828             // if max allow is 0, this won't be able to evict,
829             // and then we'll just bail after
830             if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) {
831                 counts.transition_after(evicted, true);
832             }
833         }
834 
835         if counts.can_inc_num_reset_streams() {
836             counts.inc_num_reset_streams();
837             self.pending_reset_expired.push(stream);
838         }
839     }
840 
841     /// Send any pending refusals.
send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,842     pub fn send_pending_refusal<T, B>(
843         &mut self,
844         cx: &mut Context,
845         dst: &mut Codec<T, Prioritized<B>>,
846     ) -> Poll<io::Result<()>>
847     where
848         T: AsyncWrite + Unpin,
849         B: Buf,
850     {
851         if let Some(stream_id) = self.refused {
852             ready!(dst.poll_ready(cx))?;
853 
854             // Create the RST_STREAM frame
855             let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
856 
857             // Buffer the frame
858             dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
859         }
860 
861         self.refused = None;
862 
863         Poll::Ready(Ok(()))
864     }
865 
clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)866     pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
867         let now = Instant::now();
868         let reset_duration = self.reset_duration;
869         while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
870             let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
871             now - reset_at > reset_duration
872         }) {
873             counts.transition_after(stream, true);
874         }
875     }
876 
clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )877     pub fn clear_queues(
878         &mut self,
879         clear_pending_accept: bool,
880         store: &mut Store,
881         counts: &mut Counts,
882     ) {
883         self.clear_stream_window_update_queue(store, counts);
884         self.clear_all_reset_streams(store, counts);
885 
886         if clear_pending_accept {
887             self.clear_all_pending_accept(store, counts);
888         }
889     }
890 
clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)891     fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
892         while let Some(stream) = self.pending_window_updates.pop(store) {
893             counts.transition(stream, |_, stream| {
894                 log::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
895             })
896         }
897     }
898 
899     /// Called on EOF
clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)900     fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
901         while let Some(stream) = self.pending_reset_expired.pop(store) {
902             counts.transition_after(stream, true);
903         }
904     }
905 
clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)906     fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
907         while let Some(stream) = self.pending_accept.pop(store) {
908             counts.transition_after(stream, false);
909         }
910     }
911 
poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,912     pub fn poll_complete<T, B>(
913         &mut self,
914         cx: &mut Context,
915         store: &mut Store,
916         counts: &mut Counts,
917         dst: &mut Codec<T, Prioritized<B>>,
918     ) -> Poll<io::Result<()>>
919     where
920         T: AsyncWrite + Unpin,
921         B: Buf,
922     {
923         // Send any pending connection level window updates
924         ready!(self.send_connection_window_update(cx, dst))?;
925 
926         // Send any pending stream level window updates
927         ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
928 
929         Poll::Ready(Ok(()))
930     }
931 
932     /// Send connection level window update
send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,933     fn send_connection_window_update<T, B>(
934         &mut self,
935         cx: &mut Context,
936         dst: &mut Codec<T, Prioritized<B>>,
937     ) -> Poll<io::Result<()>>
938     where
939         T: AsyncWrite + Unpin,
940         B: Buf,
941     {
942         if let Some(incr) = self.flow.unclaimed_capacity() {
943             let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
944 
945             // Ensure the codec has capacity
946             ready!(dst.poll_ready(cx))?;
947 
948             // Buffer the WINDOW_UPDATE frame
949             dst.buffer(frame.into())
950                 .expect("invalid WINDOW_UPDATE frame");
951 
952             // Update flow control
953             self.flow
954                 .inc_window(incr)
955                 .expect("unexpected flow control state");
956         }
957 
958         Poll::Ready(Ok(()))
959     }
960 
961     /// Send stream level window update
send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,962     pub fn send_stream_window_updates<T, B>(
963         &mut self,
964         cx: &mut Context,
965         store: &mut Store,
966         counts: &mut Counts,
967         dst: &mut Codec<T, Prioritized<B>>,
968     ) -> Poll<io::Result<()>>
969     where
970         T: AsyncWrite + Unpin,
971         B: Buf,
972     {
973         loop {
974             // Ensure the codec has capacity
975             ready!(dst.poll_ready(cx))?;
976 
977             // Get the next stream
978             let stream = match self.pending_window_updates.pop(store) {
979                 Some(stream) => stream,
980                 None => return Poll::Ready(Ok(())),
981             };
982 
983             counts.transition(stream, |_, stream| {
984                 log::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
985                 debug_assert!(!stream.is_pending_window_update);
986 
987                 if !stream.state.is_recv_streaming() {
988                     // No need to send window updates on the stream if the stream is
989                     // no longer receiving data.
990                     //
991                     // TODO: is this correct? We could possibly send a window
992                     // update on a ReservedRemote stream if we already know
993                     // we want to stream the data faster...
994                     return;
995                 }
996 
997                 // TODO: de-dup
998                 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
999                     // Create the WINDOW_UPDATE frame
1000                     let frame = frame::WindowUpdate::new(stream.id, incr);
1001 
1002                     // Buffer it
1003                     dst.buffer(frame.into())
1004                         .expect("invalid WINDOW_UPDATE frame");
1005 
1006                     // Update flow control
1007                     stream
1008                         .recv_flow
1009                         .inc_window(incr)
1010                         .expect("unexpected flow control state");
1011                 }
1012             })
1013         }
1014     }
1015 
next_incoming(&mut self, store: &mut Store) -> Option<store::Key>1016     pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1017         self.pending_accept.pop(store).map(|ptr| ptr.key())
1018     }
1019 
poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>1020     pub fn poll_data(
1021         &mut self,
1022         cx: &Context,
1023         stream: &mut Stream,
1024     ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1025         // TODO: Return error when the stream is reset
1026         match stream.pending_recv.pop_front(&mut self.buffer) {
1027             Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1028             Some(event) => {
1029                 // Frame is trailer
1030                 stream.pending_recv.push_front(&mut self.buffer, event);
1031 
1032                 // Notify the recv task. This is done just in case
1033                 // `poll_trailers` was called.
1034                 //
1035                 // It is very likely that `notify_recv` will just be a no-op (as
1036                 // the task will be None), so this isn't really much of a
1037                 // performance concern. It also means we don't have to track
1038                 // state to see if `poll_trailers` was called before `poll_data`
1039                 // returned `None`.
1040                 stream.notify_recv();
1041 
1042                 // No more data frames
1043                 Poll::Ready(None)
1044             }
1045             None => self.schedule_recv(cx, stream),
1046         }
1047     }
1048 
poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1049     pub fn poll_trailers(
1050         &mut self,
1051         cx: &Context,
1052         stream: &mut Stream,
1053     ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1054         match stream.pending_recv.pop_front(&mut self.buffer) {
1055             Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1056             Some(event) => {
1057                 // Frame is not trailers.. not ready to poll trailers yet.
1058                 stream.pending_recv.push_front(&mut self.buffer, event);
1059 
1060                 Poll::Pending
1061             }
1062             None => self.schedule_recv(cx, stream),
1063         }
1064     }
1065 
schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1066     fn schedule_recv<T>(
1067         &mut self,
1068         cx: &Context,
1069         stream: &mut Stream,
1070     ) -> Poll<Option<Result<T, proto::Error>>> {
1071         if stream.state.ensure_recv_open()? {
1072             // Request to get notified once more frames arrive
1073             stream.recv_task = Some(cx.waker().clone());
1074             Poll::Pending
1075         } else {
1076             // No more frames will be received
1077             Poll::Ready(None)
1078         }
1079     }
1080 }
1081 
1082 // ===== impl Open =====
1083 
1084 impl Open {
is_push_promise(&self) -> bool1085     pub fn is_push_promise(&self) -> bool {
1086         use self::Open::*;
1087 
1088         match *self {
1089             PushPromise => true,
1090             _ => false,
1091         }
1092     }
1093 }
1094 
1095 // ===== impl RecvHeaderBlockError =====
1096 
1097 impl<T> From<RecvError> for RecvHeaderBlockError<T> {
from(err: RecvError) -> Self1098     fn from(err: RecvError) -> Self {
1099         RecvHeaderBlockError::State(err)
1100     }
1101 }
1102