1 use super::recv::RecvHeaderBlockError;
2 use super::store::{self, Entry, Resolve, Store};
3 use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
4 use crate::codec::{Codec, SendError, UserError};
5 use crate::ext::Protocol;
6 use crate::frame::{self, Frame, Reason};
7 use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
8 use crate::{client, proto, server};
9 
10 use bytes::{Buf, Bytes};
11 use http::{HeaderMap, Request, Response};
12 use std::task::{Context, Poll, Waker};
13 use tokio::io::AsyncWrite;
14 
15 use crate::PollExt;
16 use std::sync::{Arc, Mutex};
17 use std::{fmt, io};
18 
19 #[derive(Debug)]
20 pub(crate) struct Streams<B, P>
21 where
22     P: Peer,
23 {
24     /// Holds most of the connection and stream related state for processing
25     /// HTTP/2 frames associated with streams.
26     inner: Arc<Mutex<Inner>>,
27 
28     /// This is the queue of frames to be written to the wire. This is split out
29     /// to avoid requiring a `B` generic on all public API types even if `B` is
30     /// not technically required.
31     ///
32     /// Currently, splitting this out requires a second `Arc` + `Mutex`.
33     /// However, it should be possible to avoid this duplication with a little
34     /// bit of unsafe code. This optimization has been postponed until it has
35     /// been shown to be necessary.
36     send_buffer: Arc<SendBuffer<B>>,
37 
38     _p: ::std::marker::PhantomData<P>,
39 }
40 
41 // Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
42 // Ensures that the methods only get one instantiation, instead of two (client and server)
43 #[derive(Debug)]
44 pub(crate) struct DynStreams<'a, B> {
45     inner: &'a Mutex<Inner>,
46 
47     send_buffer: &'a SendBuffer<B>,
48 
49     peer: peer::Dyn,
50 }
51 
52 /// Reference to the stream state
53 #[derive(Debug)]
54 pub(crate) struct StreamRef<B> {
55     opaque: OpaqueStreamRef,
56     send_buffer: Arc<SendBuffer<B>>,
57 }
58 
59 /// Reference to the stream state that hides the send data chunk generic
60 pub(crate) struct OpaqueStreamRef {
61     inner: Arc<Mutex<Inner>>,
62     key: store::Key,
63 }
64 
65 /// Fields needed to manage state related to managing the set of streams. This
66 /// is mostly split out to make ownership happy.
67 ///
68 /// TODO: better name
69 #[derive(Debug)]
70 struct Inner {
71     /// Tracks send & recv stream concurrency.
72     counts: Counts,
73 
74     /// Connection level state and performs actions on streams
75     actions: Actions,
76 
77     /// Stores stream state
78     store: Store,
79 
80     /// The number of stream refs to this shared state.
81     refs: usize,
82 }
83 
84 #[derive(Debug)]
85 struct Actions {
86     /// Manages state transitions initiated by receiving frames
87     recv: Recv,
88 
89     /// Manages state transitions initiated by sending frames
90     send: Send,
91 
92     /// Task that calls `poll_complete`.
93     task: Option<Waker>,
94 
95     /// If the connection errors, a copy is kept for any StreamRefs.
96     conn_error: Option<proto::Error>,
97 }
98 
99 /// Contains the buffer of frames to be written to the wire.
100 #[derive(Debug)]
101 struct SendBuffer<B> {
102     inner: Mutex<Buffer<Frame<B>>>,
103 }
104 
105 // ===== impl Streams =====
106 
107 impl<B, P> Streams<B, P>
108 where
109     B: Buf,
110     P: Peer,
111 {
new(config: Config) -> Self112     pub fn new(config: Config) -> Self {
113         let peer = P::r#dyn();
114 
115         Streams {
116             inner: Inner::new(peer, config),
117             send_buffer: Arc::new(SendBuffer::new()),
118             _p: ::std::marker::PhantomData,
119         }
120     }
121 
set_target_connection_window_size(&mut self, size: WindowSize)122     pub fn set_target_connection_window_size(&mut self, size: WindowSize) {
123         let mut me = self.inner.lock().unwrap();
124         let me = &mut *me;
125 
126         me.actions
127             .recv
128             .set_target_connection_window(size, &mut me.actions.task)
129     }
130 
next_incoming(&mut self) -> Option<StreamRef<B>>131     pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
132         let mut me = self.inner.lock().unwrap();
133         let me = &mut *me;
134         me.actions.recv.next_incoming(&mut me.store).map(|key| {
135             let stream = &mut me.store.resolve(key);
136             tracing::trace!(
137                 "next_incoming; id={:?}, state={:?}",
138                 stream.id,
139                 stream.state
140             );
141             // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
142             // the lock, so it can't.
143             me.refs += 1;
144             StreamRef {
145                 opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
146                 send_buffer: self.send_buffer.clone(),
147             }
148         })
149     }
150 
send_pending_refusal<T>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin,151     pub fn send_pending_refusal<T>(
152         &mut self,
153         cx: &mut Context,
154         dst: &mut Codec<T, Prioritized<B>>,
155     ) -> Poll<io::Result<()>>
156     where
157         T: AsyncWrite + Unpin,
158     {
159         let mut me = self.inner.lock().unwrap();
160         let me = &mut *me;
161         me.actions.recv.send_pending_refusal(cx, dst)
162     }
163 
clear_expired_reset_streams(&mut self)164     pub fn clear_expired_reset_streams(&mut self) {
165         let mut me = self.inner.lock().unwrap();
166         let me = &mut *me;
167         me.actions
168             .recv
169             .clear_expired_reset_streams(&mut me.store, &mut me.counts);
170     }
171 
poll_complete<T>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin,172     pub fn poll_complete<T>(
173         &mut self,
174         cx: &mut Context,
175         dst: &mut Codec<T, Prioritized<B>>,
176     ) -> Poll<io::Result<()>>
177     where
178         T: AsyncWrite + Unpin,
179     {
180         let mut me = self.inner.lock().unwrap();
181         me.poll_complete(&self.send_buffer, cx, dst)
182     }
183 
apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error>184     pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
185         let mut me = self.inner.lock().unwrap();
186         let me = &mut *me;
187 
188         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
189         let send_buffer = &mut *send_buffer;
190 
191         me.counts.apply_remote_settings(frame);
192 
193         me.actions.send.apply_remote_settings(
194             frame,
195             send_buffer,
196             &mut me.store,
197             &mut me.counts,
198             &mut me.actions.task,
199         )
200     }
201 
apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error>202     pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
203         let mut me = self.inner.lock().unwrap();
204         let me = &mut *me;
205 
206         me.actions.recv.apply_local_settings(frame, &mut me.store)
207     }
208 
send_request( &mut self, mut request: Request<()>, end_of_stream: bool, pending: Option<&OpaqueStreamRef>, ) -> Result<StreamRef<B>, SendError>209     pub fn send_request(
210         &mut self,
211         mut request: Request<()>,
212         end_of_stream: bool,
213         pending: Option<&OpaqueStreamRef>,
214     ) -> Result<StreamRef<B>, SendError> {
215         use super::stream::ContentLength;
216         use http::Method;
217 
218         let protocol = request.extensions_mut().remove::<Protocol>();
219 
220         // Clear before taking lock, incase extensions contain a StreamRef.
221         request.extensions_mut().clear();
222 
223         // TODO: There is a hazard with assigning a stream ID before the
224         // prioritize layer. If prioritization reorders new streams, this
225         // implicitly closes the earlier stream IDs.
226         //
227         // See: hyperium/h2#11
228         let mut me = self.inner.lock().unwrap();
229         let me = &mut *me;
230 
231         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
232         let send_buffer = &mut *send_buffer;
233 
234         me.actions.ensure_no_conn_error()?;
235         me.actions.send.ensure_next_stream_id()?;
236 
237         // The `pending` argument is provided by the `Client`, and holds
238         // a store `Key` of a `Stream` that may have been not been opened
239         // yet.
240         //
241         // If that stream is still pending, the Client isn't allowed to
242         // queue up another pending stream. They should use `poll_ready`.
243         if let Some(stream) = pending {
244             if me.store.resolve(stream.key).is_pending_open {
245                 return Err(UserError::Rejected.into());
246             }
247         }
248 
249         if me.counts.peer().is_server() {
250             // Servers cannot open streams. PushPromise must first be reserved.
251             return Err(UserError::UnexpectedFrameType.into());
252         }
253 
254         let stream_id = me.actions.send.open()?;
255 
256         let mut stream = Stream::new(
257             stream_id,
258             me.actions.send.init_window_sz(),
259             me.actions.recv.init_window_sz(),
260         );
261 
262         if *request.method() == Method::HEAD {
263             stream.content_length = ContentLength::Head;
264         }
265 
266         // Convert the message
267         let headers =
268             client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
269 
270         let mut stream = me.store.insert(stream.id, stream);
271 
272         let sent = me.actions.send.send_headers(
273             headers,
274             send_buffer,
275             &mut stream,
276             &mut me.counts,
277             &mut me.actions.task,
278         );
279 
280         // send_headers can return a UserError, if it does,
281         // we should forget about this stream.
282         if let Err(err) = sent {
283             stream.unlink();
284             stream.remove();
285             return Err(err.into());
286         }
287 
288         // Given that the stream has been initialized, it should not be in the
289         // closed state.
290         debug_assert!(!stream.state.is_closed());
291 
292         // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
293         // the lock, so it can't.
294         me.refs += 1;
295 
296         Ok(StreamRef {
297             opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
298             send_buffer: self.send_buffer.clone(),
299         })
300     }
301 
is_extended_connect_protocol_enabled(&self) -> bool302     pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
303         self.inner
304             .lock()
305             .unwrap()
306             .actions
307             .send
308             .is_extended_connect_protocol_enabled()
309     }
310 }
311 
312 impl<B> DynStreams<'_, B> {
recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error>313     pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
314         let mut me = self.inner.lock().unwrap();
315 
316         me.recv_headers(self.peer, &self.send_buffer, frame)
317     }
318 
recv_data(&mut self, frame: frame::Data) -> Result<(), Error>319     pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
320         let mut me = self.inner.lock().unwrap();
321         me.recv_data(self.peer, &self.send_buffer, frame)
322     }
323 
recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error>324     pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
325         let mut me = self.inner.lock().unwrap();
326 
327         me.recv_reset(&self.send_buffer, frame)
328     }
329 
330     /// Notify all streams that a connection-level error happened.
handle_error(&mut self, err: proto::Error) -> StreamId331     pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
332         let mut me = self.inner.lock().unwrap();
333         me.handle_error(&self.send_buffer, err)
334     }
335 
recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error>336     pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
337         let mut me = self.inner.lock().unwrap();
338         me.recv_go_away(&self.send_buffer, frame)
339     }
340 
last_processed_id(&self) -> StreamId341     pub fn last_processed_id(&self) -> StreamId {
342         self.inner.lock().unwrap().actions.recv.last_processed_id()
343     }
344 
recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error>345     pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
346         let mut me = self.inner.lock().unwrap();
347         me.recv_window_update(&self.send_buffer, frame)
348     }
349 
recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error>350     pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
351         let mut me = self.inner.lock().unwrap();
352         me.recv_push_promise(&self.send_buffer, frame)
353     }
354 
recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()>355     pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
356         let mut me = self.inner.lock().map_err(|_| ())?;
357         me.recv_eof(&self.send_buffer, clear_pending_accept)
358     }
359 
send_reset(&mut self, id: StreamId, reason: Reason)360     pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
361         let mut me = self.inner.lock().unwrap();
362         me.send_reset(&self.send_buffer, id, reason)
363     }
364 
send_go_away(&mut self, last_processed_id: StreamId)365     pub fn send_go_away(&mut self, last_processed_id: StreamId) {
366         let mut me = self.inner.lock().unwrap();
367         me.actions.recv.go_away(last_processed_id);
368     }
369 }
370 
371 impl Inner {
new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>>372     fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
373         Arc::new(Mutex::new(Inner {
374             counts: Counts::new(peer, &config),
375             actions: Actions {
376                 recv: Recv::new(peer, &config),
377                 send: Send::new(&config),
378                 task: None,
379                 conn_error: None,
380             },
381             store: Store::new(),
382             refs: 1,
383         }))
384     }
385 
recv_headers<B>( &mut self, peer: peer::Dyn, send_buffer: &SendBuffer<B>, frame: frame::Headers, ) -> Result<(), Error>386     fn recv_headers<B>(
387         &mut self,
388         peer: peer::Dyn,
389         send_buffer: &SendBuffer<B>,
390         frame: frame::Headers,
391     ) -> Result<(), Error> {
392         let id = frame.stream_id();
393 
394         // The GOAWAY process has begun. All streams with a greater ID than
395         // specified as part of GOAWAY should be ignored.
396         if id > self.actions.recv.max_stream_id() {
397             tracing::trace!(
398                 "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
399                 id,
400                 self.actions.recv.max_stream_id()
401             );
402             return Ok(());
403         }
404 
405         let key = match self.store.find_entry(id) {
406             Entry::Occupied(e) => e.key(),
407             Entry::Vacant(e) => {
408                 // Client: it's possible to send a request, and then send
409                 // a RST_STREAM while the response HEADERS were in transit.
410                 //
411                 // Server: we can't reset a stream before having received
412                 // the request headers, so don't allow.
413                 if !peer.is_server() {
414                     // This may be response headers for a stream we've already
415                     // forgotten about...
416                     if self.actions.may_have_forgotten_stream(peer, id) {
417                         tracing::debug!(
418                             "recv_headers for old stream={:?}, sending STREAM_CLOSED",
419                             id,
420                         );
421                         return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
422                     }
423                 }
424 
425                 match self
426                     .actions
427                     .recv
428                     .open(id, Open::Headers, &mut self.counts)?
429                 {
430                     Some(stream_id) => {
431                         let stream = Stream::new(
432                             stream_id,
433                             self.actions.send.init_window_sz(),
434                             self.actions.recv.init_window_sz(),
435                         );
436 
437                         e.insert(stream)
438                     }
439                     None => return Ok(()),
440                 }
441             }
442         };
443 
444         let stream = self.store.resolve(key);
445 
446         if stream.state.is_local_reset() {
447             // Locally reset streams must ignore frames "for some time".
448             // This is because the remote may have sent trailers before
449             // receiving the RST_STREAM frame.
450             tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
451             return Ok(());
452         }
453 
454         let actions = &mut self.actions;
455         let mut send_buffer = send_buffer.inner.lock().unwrap();
456         let send_buffer = &mut *send_buffer;
457 
458         self.counts.transition(stream, |counts, stream| {
459             tracing::trace!(
460                 "recv_headers; stream={:?}; state={:?}",
461                 stream.id,
462                 stream.state
463             );
464 
465             let res = if stream.state.is_recv_headers() {
466                 match actions.recv.recv_headers(frame, stream, counts) {
467                     Ok(()) => Ok(()),
468                     Err(RecvHeaderBlockError::Oversize(resp)) => {
469                         if let Some(resp) = resp {
470                             let sent = actions.send.send_headers(
471                                 resp, send_buffer, stream, counts, &mut actions.task);
472                             debug_assert!(sent.is_ok(), "oversize response should not fail");
473 
474                             actions.send.schedule_implicit_reset(
475                                 stream,
476                                 Reason::REFUSED_STREAM,
477                                 counts,
478                                 &mut actions.task);
479 
480                             actions.recv.enqueue_reset_expiration(stream, counts);
481 
482                             Ok(())
483                         } else {
484                             Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
485                         }
486                     },
487                     Err(RecvHeaderBlockError::State(err)) => Err(err),
488                 }
489             } else {
490                 if !frame.is_end_stream() {
491                     // Receiving trailers that don't set EOS is a "malformed"
492                     // message. Malformed messages are a stream error.
493                     proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
494                     return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
495                 }
496 
497                 actions.recv.recv_trailers(frame, stream)
498             };
499 
500             actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
501         })
502     }
503 
recv_data<B>( &mut self, peer: peer::Dyn, send_buffer: &SendBuffer<B>, frame: frame::Data, ) -> Result<(), Error>504     fn recv_data<B>(
505         &mut self,
506         peer: peer::Dyn,
507         send_buffer: &SendBuffer<B>,
508         frame: frame::Data,
509     ) -> Result<(), Error> {
510         let id = frame.stream_id();
511 
512         let stream = match self.store.find_mut(&id) {
513             Some(stream) => stream,
514             None => {
515                 // The GOAWAY process has begun. All streams with a greater ID
516                 // than specified as part of GOAWAY should be ignored.
517                 if id > self.actions.recv.max_stream_id() {
518                     tracing::trace!(
519                         "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
520                         id,
521                         self.actions.recv.max_stream_id()
522                     );
523                     return Ok(());
524                 }
525 
526                 if self.actions.may_have_forgotten_stream(peer, id) {
527                     tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
528 
529                     let sz = frame.payload().len();
530                     // This should have been enforced at the codec::FramedRead layer, so
531                     // this is just a sanity check.
532                     assert!(sz <= super::MAX_WINDOW_SIZE as usize);
533                     let sz = sz as WindowSize;
534 
535                     self.actions.recv.ignore_data(sz)?;
536                     return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
537                 }
538 
539                 proto_err!(conn: "recv_data: stream not found; id={:?}", id);
540                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
541             }
542         };
543 
544         let actions = &mut self.actions;
545         let mut send_buffer = send_buffer.inner.lock().unwrap();
546         let send_buffer = &mut *send_buffer;
547 
548         self.counts.transition(stream, |counts, stream| {
549             let sz = frame.payload().len();
550             let res = actions.recv.recv_data(frame, stream);
551 
552             // Any stream error after receiving a DATA frame means
553             // we won't give the data to the user, and so they can't
554             // release the capacity. We do it automatically.
555             if let Err(Error::Reset(..)) = res {
556                 actions
557                     .recv
558                     .release_connection_capacity(sz as WindowSize, &mut None);
559             }
560             actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
561         })
562     }
563 
recv_reset<B>( &mut self, send_buffer: &SendBuffer<B>, frame: frame::Reset, ) -> Result<(), Error>564     fn recv_reset<B>(
565         &mut self,
566         send_buffer: &SendBuffer<B>,
567         frame: frame::Reset,
568     ) -> Result<(), Error> {
569         let id = frame.stream_id();
570 
571         if id.is_zero() {
572             proto_err!(conn: "recv_reset: invalid stream ID 0");
573             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
574         }
575 
576         // The GOAWAY process has begun. All streams with a greater ID than
577         // specified as part of GOAWAY should be ignored.
578         if id > self.actions.recv.max_stream_id() {
579             tracing::trace!(
580                 "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
581                 id,
582                 self.actions.recv.max_stream_id()
583             );
584             return Ok(());
585         }
586 
587         let stream = match self.store.find_mut(&id) {
588             Some(stream) => stream,
589             None => {
590                 // TODO: Are there other error cases?
591                 self.actions
592                     .ensure_not_idle(self.counts.peer(), id)
593                     .map_err(Error::library_go_away)?;
594 
595                 return Ok(());
596             }
597         };
598 
599         let mut send_buffer = send_buffer.inner.lock().unwrap();
600         let send_buffer = &mut *send_buffer;
601 
602         let actions = &mut self.actions;
603 
604         self.counts.transition(stream, |counts, stream| {
605             actions.recv.recv_reset(frame, stream);
606             actions.send.handle_error(send_buffer, stream, counts);
607             assert!(stream.state.is_closed());
608             Ok(())
609         })
610     }
611 
recv_window_update<B>( &mut self, send_buffer: &SendBuffer<B>, frame: frame::WindowUpdate, ) -> Result<(), Error>612     fn recv_window_update<B>(
613         &mut self,
614         send_buffer: &SendBuffer<B>,
615         frame: frame::WindowUpdate,
616     ) -> Result<(), Error> {
617         let id = frame.stream_id();
618 
619         let mut send_buffer = send_buffer.inner.lock().unwrap();
620         let send_buffer = &mut *send_buffer;
621 
622         if id.is_zero() {
623             self.actions
624                 .send
625                 .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
626                 .map_err(Error::library_go_away)?;
627         } else {
628             // The remote may send window updates for streams that the local now
629             // considers closed. It's ok...
630             if let Some(mut stream) = self.store.find_mut(&id) {
631                 // This result is ignored as there is nothing to do when there
632                 // is an error. The stream is reset by the function on error and
633                 // the error is informational.
634                 let _ = self.actions.send.recv_stream_window_update(
635                     frame.size_increment(),
636                     send_buffer,
637                     &mut stream,
638                     &mut self.counts,
639                     &mut self.actions.task,
640                 );
641             } else {
642                 self.actions
643                     .ensure_not_idle(self.counts.peer(), id)
644                     .map_err(Error::library_go_away)?;
645             }
646         }
647 
648         Ok(())
649     }
650 
handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId651     fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
652         let actions = &mut self.actions;
653         let counts = &mut self.counts;
654         let mut send_buffer = send_buffer.inner.lock().unwrap();
655         let send_buffer = &mut *send_buffer;
656 
657         let last_processed_id = actions.recv.last_processed_id();
658 
659         self.store.for_each(|stream| {
660             counts.transition(stream, |counts, stream| {
661                 actions.recv.handle_error(&err, &mut *stream);
662                 actions.send.handle_error(send_buffer, stream, counts);
663             })
664         });
665 
666         actions.conn_error = Some(err);
667 
668         last_processed_id
669     }
670 
recv_go_away<B>( &mut self, send_buffer: &SendBuffer<B>, frame: &frame::GoAway, ) -> Result<(), Error>671     fn recv_go_away<B>(
672         &mut self,
673         send_buffer: &SendBuffer<B>,
674         frame: &frame::GoAway,
675     ) -> Result<(), Error> {
676         let actions = &mut self.actions;
677         let counts = &mut self.counts;
678         let mut send_buffer = send_buffer.inner.lock().unwrap();
679         let send_buffer = &mut *send_buffer;
680 
681         let last_stream_id = frame.last_stream_id();
682 
683         actions.send.recv_go_away(last_stream_id)?;
684 
685         let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
686 
687         self.store.for_each(|stream| {
688             if stream.id > last_stream_id {
689                 counts.transition(stream, |counts, stream| {
690                     actions.recv.handle_error(&err, &mut *stream);
691                     actions.send.handle_error(send_buffer, stream, counts);
692                 })
693             }
694         });
695 
696         actions.conn_error = Some(err);
697 
698         Ok(())
699     }
700 
recv_push_promise<B>( &mut self, send_buffer: &SendBuffer<B>, frame: frame::PushPromise, ) -> Result<(), Error>701     fn recv_push_promise<B>(
702         &mut self,
703         send_buffer: &SendBuffer<B>,
704         frame: frame::PushPromise,
705     ) -> Result<(), Error> {
706         let id = frame.stream_id();
707         let promised_id = frame.promised_id();
708 
709         // First, ensure that the initiating stream is still in a valid state.
710         let parent_key = match self.store.find_mut(&id) {
711             Some(stream) => {
712                 // The GOAWAY process has begun. All streams with a greater ID
713                 // than specified as part of GOAWAY should be ignored.
714                 if id > self.actions.recv.max_stream_id() {
715                     tracing::trace!(
716                         "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
717                         id,
718                         self.actions.recv.max_stream_id()
719                     );
720                     return Ok(());
721                 }
722 
723                 // The stream must be receive open
724                 stream.state.ensure_recv_open()?;
725                 stream.key()
726             }
727             None => {
728                 proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
729                 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into());
730             }
731         };
732 
733         // TODO: Streams in the reserved states do not count towards the concurrency
734         // limit. However, it seems like there should be a cap otherwise this
735         // could grow in memory indefinitely.
736 
737         // Ensure that we can reserve streams
738         self.actions.recv.ensure_can_reserve()?;
739 
740         // Next, open the stream.
741         //
742         // If `None` is returned, then the stream is being refused. There is no
743         // further work to be done.
744         if self
745             .actions
746             .recv
747             .open(promised_id, Open::PushPromise, &mut self.counts)?
748             .is_none()
749         {
750             return Ok(());
751         }
752 
753         // Try to handle the frame and create a corresponding key for the pushed stream
754         // this requires a bit of indirection to make the borrow checker happy.
755         let child_key: Option<store::Key> = {
756             // Create state for the stream
757             let stream = self.store.insert(promised_id, {
758                 Stream::new(
759                     promised_id,
760                     self.actions.send.init_window_sz(),
761                     self.actions.recv.init_window_sz(),
762                 )
763             });
764 
765             let actions = &mut self.actions;
766 
767             self.counts.transition(stream, |counts, stream| {
768                 let stream_valid = actions.recv.recv_push_promise(frame, stream);
769 
770                 match stream_valid {
771                     Ok(()) => Ok(Some(stream.key())),
772                     _ => {
773                         let mut send_buffer = send_buffer.inner.lock().unwrap();
774                         actions
775                             .reset_on_recv_stream_err(
776                                 &mut *send_buffer,
777                                 stream,
778                                 counts,
779                                 stream_valid,
780                             )
781                             .map(|()| None)
782                     }
783                 }
784             })?
785         };
786         // If we're successful, push the headers and stream...
787         if let Some(child) = child_key {
788             let mut ppp = self.store[parent_key].pending_push_promises.take();
789             ppp.push(&mut self.store.resolve(child));
790 
791             let parent = &mut self.store.resolve(parent_key);
792             parent.pending_push_promises = ppp;
793             parent.notify_recv();
794         };
795 
796         Ok(())
797     }
798 
recv_eof<B>( &mut self, send_buffer: &SendBuffer<B>, clear_pending_accept: bool, ) -> Result<(), ()>799     fn recv_eof<B>(
800         &mut self,
801         send_buffer: &SendBuffer<B>,
802         clear_pending_accept: bool,
803     ) -> Result<(), ()> {
804         let actions = &mut self.actions;
805         let counts = &mut self.counts;
806         let mut send_buffer = send_buffer.inner.lock().unwrap();
807         let send_buffer = &mut *send_buffer;
808 
809         if actions.conn_error.is_none() {
810             actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
811         }
812 
813         tracing::trace!("Streams::recv_eof");
814 
815         self.store.for_each(|stream| {
816             counts.transition(stream, |counts, stream| {
817                 actions.recv.recv_eof(stream);
818 
819                 // This handles resetting send state associated with the
820                 // stream
821                 actions.send.handle_error(send_buffer, stream, counts);
822             })
823         });
824 
825         actions.clear_queues(clear_pending_accept, &mut self.store, counts);
826         Ok(())
827     }
828 
poll_complete<T, B>( &mut self, send_buffer: &SendBuffer<B>, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,829     fn poll_complete<T, B>(
830         &mut self,
831         send_buffer: &SendBuffer<B>,
832         cx: &mut Context,
833         dst: &mut Codec<T, Prioritized<B>>,
834     ) -> Poll<io::Result<()>>
835     where
836         T: AsyncWrite + Unpin,
837         B: Buf,
838     {
839         let mut send_buffer = send_buffer.inner.lock().unwrap();
840         let send_buffer = &mut *send_buffer;
841 
842         // Send WINDOW_UPDATE frames first
843         //
844         // TODO: It would probably be better to interleave updates w/ data
845         // frames.
846         ready!(self
847             .actions
848             .recv
849             .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
850 
851         // Send any other pending frames
852         ready!(self.actions.send.poll_complete(
853             cx,
854             send_buffer,
855             &mut self.store,
856             &mut self.counts,
857             dst
858         ))?;
859 
860         // Nothing else to do, track the task
861         self.actions.task = Some(cx.waker().clone());
862 
863         Poll::Ready(Ok(()))
864     }
865 
send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason)866     fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) {
867         let key = match self.store.find_entry(id) {
868             Entry::Occupied(e) => e.key(),
869             Entry::Vacant(e) => {
870                 // Resetting a stream we don't know about? That could be OK...
871                 //
872                 // 1. As a server, we just received a request, but that request
873                 //    was bad, so we're resetting before even accepting it.
874                 //    This is totally fine.
875                 //
876                 // 2. The remote may have sent us a frame on new stream that
877                 //    it's *not* supposed to have done, and thus, we don't know
878                 //    the stream. In that case, sending a reset will "open" the
879                 //    stream in our store. Maybe that should be a connection
880                 //    error instead? At least for now, we need to update what
881                 //    our vision of the next stream is.
882                 if self.counts.peer().is_local_init(id) {
883                     // We normally would open this stream, so update our
884                     // next-send-id record.
885                     self.actions.send.maybe_reset_next_stream_id(id);
886                 }
887 
888                 let stream = Stream::new(id, 0, 0);
889 
890                 e.insert(stream)
891             }
892         };
893 
894         let stream = self.store.resolve(key);
895         let mut send_buffer = send_buffer.inner.lock().unwrap();
896         let send_buffer = &mut *send_buffer;
897         self.actions.send_reset(
898             stream,
899             reason,
900             Initiator::Library,
901             &mut self.counts,
902             send_buffer,
903         );
904     }
905 }
906 
907 impl<B> Streams<B, client::Peer>
908 where
909     B: Buf,
910 {
poll_pending_open( &mut self, cx: &Context, pending: Option<&OpaqueStreamRef>, ) -> Poll<Result<(), crate::Error>>911     pub fn poll_pending_open(
912         &mut self,
913         cx: &Context,
914         pending: Option<&OpaqueStreamRef>,
915     ) -> Poll<Result<(), crate::Error>> {
916         let mut me = self.inner.lock().unwrap();
917         let me = &mut *me;
918 
919         me.actions.ensure_no_conn_error()?;
920         me.actions.send.ensure_next_stream_id()?;
921 
922         if let Some(pending) = pending {
923             let mut stream = me.store.resolve(pending.key);
924             tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
925             if stream.is_pending_open {
926                 stream.wait_send(cx);
927                 return Poll::Pending;
928             }
929         }
930         Poll::Ready(Ok(()))
931     }
932 }
933 
934 impl<B, P> Streams<B, P>
935 where
936     P: Peer,
937 {
as_dyn(&self) -> DynStreams<B>938     pub fn as_dyn(&self) -> DynStreams<B> {
939         let Self {
940             inner,
941             send_buffer,
942             _p,
943         } = self;
944         DynStreams {
945             inner,
946             send_buffer,
947             peer: P::r#dyn(),
948         }
949     }
950 
951     /// This function is safe to call multiple times.
952     ///
953     /// A `Result` is returned to avoid panicking if the mutex is poisoned.
recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()>954     pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
955         self.as_dyn().recv_eof(clear_pending_accept)
956     }
957 
max_send_streams(&self) -> usize958     pub(crate) fn max_send_streams(&self) -> usize {
959         self.inner.lock().unwrap().counts.max_send_streams()
960     }
961 
max_recv_streams(&self) -> usize962     pub(crate) fn max_recv_streams(&self) -> usize {
963         self.inner.lock().unwrap().counts.max_recv_streams()
964     }
965 
966     #[cfg(feature = "unstable")]
num_active_streams(&self) -> usize967     pub fn num_active_streams(&self) -> usize {
968         let me = self.inner.lock().unwrap();
969         me.store.num_active_streams()
970     }
971 
has_streams(&self) -> bool972     pub fn has_streams(&self) -> bool {
973         let me = self.inner.lock().unwrap();
974         me.counts.has_streams()
975     }
976 
has_streams_or_other_references(&self) -> bool977     pub fn has_streams_or_other_references(&self) -> bool {
978         let me = self.inner.lock().unwrap();
979         me.counts.has_streams() || me.refs > 1
980     }
981 
982     #[cfg(feature = "unstable")]
num_wired_streams(&self) -> usize983     pub fn num_wired_streams(&self) -> usize {
984         let me = self.inner.lock().unwrap();
985         me.store.num_wired_streams()
986     }
987 }
988 
989 // no derive because we don't need B and P to be Clone.
990 impl<B, P> Clone for Streams<B, P>
991 where
992     P: Peer,
993 {
clone(&self) -> Self994     fn clone(&self) -> Self {
995         self.inner.lock().unwrap().refs += 1;
996         Streams {
997             inner: self.inner.clone(),
998             send_buffer: self.send_buffer.clone(),
999             _p: ::std::marker::PhantomData,
1000         }
1001     }
1002 }
1003 
1004 impl<B, P> Drop for Streams<B, P>
1005 where
1006     P: Peer,
1007 {
drop(&mut self)1008     fn drop(&mut self) {
1009         if let Ok(mut inner) = self.inner.lock() {
1010             inner.refs -= 1;
1011             if inner.refs == 1 {
1012                 if let Some(task) = inner.actions.task.take() {
1013                     task.wake();
1014                 }
1015             }
1016         }
1017     }
1018 }
1019 
1020 // ===== impl StreamRef =====
1021 
1022 impl<B> StreamRef<B> {
send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> where B: Buf,1023     pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
1024     where
1025         B: Buf,
1026     {
1027         let mut me = self.opaque.inner.lock().unwrap();
1028         let me = &mut *me;
1029 
1030         let stream = me.store.resolve(self.opaque.key);
1031         let actions = &mut me.actions;
1032         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1033         let send_buffer = &mut *send_buffer;
1034 
1035         me.counts.transition(stream, |counts, stream| {
1036             // Create the data frame
1037             let mut frame = frame::Data::new(stream.id, data);
1038             frame.set_end_stream(end_stream);
1039 
1040             // Send the data frame
1041             actions
1042                 .send
1043                 .send_data(frame, send_buffer, stream, counts, &mut actions.task)
1044         })
1045     }
1046 
send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError>1047     pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
1048         let mut me = self.opaque.inner.lock().unwrap();
1049         let me = &mut *me;
1050 
1051         let stream = me.store.resolve(self.opaque.key);
1052         let actions = &mut me.actions;
1053         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1054         let send_buffer = &mut *send_buffer;
1055 
1056         me.counts.transition(stream, |counts, stream| {
1057             // Create the trailers frame
1058             let frame = frame::Headers::trailers(stream.id, trailers);
1059 
1060             // Send the trailers frame
1061             actions
1062                 .send
1063                 .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
1064         })
1065     }
1066 
send_reset(&mut self, reason: Reason)1067     pub fn send_reset(&mut self, reason: Reason) {
1068         let mut me = self.opaque.inner.lock().unwrap();
1069         let me = &mut *me;
1070 
1071         let stream = me.store.resolve(self.opaque.key);
1072         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1073         let send_buffer = &mut *send_buffer;
1074 
1075         me.actions
1076             .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
1077     }
1078 
send_response( &mut self, mut response: Response<()>, end_of_stream: bool, ) -> Result<(), UserError>1079     pub fn send_response(
1080         &mut self,
1081         mut response: Response<()>,
1082         end_of_stream: bool,
1083     ) -> Result<(), UserError> {
1084         // Clear before taking lock, incase extensions contain a StreamRef.
1085         response.extensions_mut().clear();
1086         let mut me = self.opaque.inner.lock().unwrap();
1087         let me = &mut *me;
1088 
1089         let stream = me.store.resolve(self.opaque.key);
1090         let actions = &mut me.actions;
1091         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1092         let send_buffer = &mut *send_buffer;
1093 
1094         me.counts.transition(stream, |counts, stream| {
1095             let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
1096 
1097             actions
1098                 .send
1099                 .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
1100         })
1101     }
1102 
send_push_promise( &mut self, mut request: Request<()>, ) -> Result<StreamRef<B>, UserError>1103     pub fn send_push_promise(
1104         &mut self,
1105         mut request: Request<()>,
1106     ) -> Result<StreamRef<B>, UserError> {
1107         // Clear before taking lock, incase extensions contain a StreamRef.
1108         request.extensions_mut().clear();
1109         let mut me = self.opaque.inner.lock().unwrap();
1110         let me = &mut *me;
1111 
1112         let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1113         let send_buffer = &mut *send_buffer;
1114 
1115         let actions = &mut me.actions;
1116         let promised_id = actions.send.reserve_local()?;
1117 
1118         let child_key = {
1119             let mut child_stream = me.store.insert(
1120                 promised_id,
1121                 Stream::new(
1122                     promised_id,
1123                     actions.send.init_window_sz(),
1124                     actions.recv.init_window_sz(),
1125                 ),
1126             );
1127             child_stream.state.reserve_local()?;
1128             child_stream.is_pending_push = true;
1129             child_stream.key()
1130         };
1131 
1132         let pushed = {
1133             let mut stream = me.store.resolve(self.opaque.key);
1134 
1135             let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
1136 
1137             actions
1138                 .send
1139                 .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
1140         };
1141 
1142         if let Err(err) = pushed {
1143             let mut child_stream = me.store.resolve(child_key);
1144             child_stream.unlink();
1145             child_stream.remove();
1146             return Err(err.into());
1147         }
1148 
1149         me.refs += 1;
1150         let opaque =
1151             OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
1152 
1153         Ok(StreamRef {
1154             opaque,
1155             send_buffer: self.send_buffer.clone(),
1156         })
1157     }
1158 
1159     /// Called by the server after the stream is accepted. Given that clients
1160     /// initialize streams by sending HEADERS, the request will always be
1161     /// available.
1162     ///
1163     /// # Panics
1164     ///
1165     /// This function panics if the request isn't present.
take_request(&self) -> Request<()>1166     pub fn take_request(&self) -> Request<()> {
1167         let mut me = self.opaque.inner.lock().unwrap();
1168         let me = &mut *me;
1169 
1170         let mut stream = me.store.resolve(self.opaque.key);
1171         me.actions.recv.take_request(&mut stream)
1172     }
1173 
1174     /// Called by a client to see if the current stream is pending open
is_pending_open(&self) -> bool1175     pub fn is_pending_open(&self) -> bool {
1176         let mut me = self.opaque.inner.lock().unwrap();
1177         me.store.resolve(self.opaque.key).is_pending_open
1178     }
1179 
1180     /// Request capacity to send data
reserve_capacity(&mut self, capacity: WindowSize)1181     pub fn reserve_capacity(&mut self, capacity: WindowSize) {
1182         let mut me = self.opaque.inner.lock().unwrap();
1183         let me = &mut *me;
1184 
1185         let mut stream = me.store.resolve(self.opaque.key);
1186 
1187         me.actions
1188             .send
1189             .reserve_capacity(capacity, &mut stream, &mut me.counts)
1190     }
1191 
1192     /// Returns the stream's current send capacity.
capacity(&self) -> WindowSize1193     pub fn capacity(&self) -> WindowSize {
1194         let mut me = self.opaque.inner.lock().unwrap();
1195         let me = &mut *me;
1196 
1197         let mut stream = me.store.resolve(self.opaque.key);
1198 
1199         me.actions.send.capacity(&mut stream)
1200     }
1201 
1202     /// Request to be notified when the stream's capacity increases
poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>>1203     pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
1204         let mut me = self.opaque.inner.lock().unwrap();
1205         let me = &mut *me;
1206 
1207         let mut stream = me.store.resolve(self.opaque.key);
1208 
1209         me.actions.send.poll_capacity(cx, &mut stream)
1210     }
1211 
1212     /// Request to be notified for if a `RST_STREAM` is received for this stream.
poll_reset( &mut self, cx: &Context, mode: proto::PollReset, ) -> Poll<Result<Reason, crate::Error>>1213     pub(crate) fn poll_reset(
1214         &mut self,
1215         cx: &Context,
1216         mode: proto::PollReset,
1217     ) -> Poll<Result<Reason, crate::Error>> {
1218         let mut me = self.opaque.inner.lock().unwrap();
1219         let me = &mut *me;
1220 
1221         let mut stream = me.store.resolve(self.opaque.key);
1222 
1223         me.actions
1224             .send
1225             .poll_reset(cx, &mut stream, mode)
1226             .map_err(From::from)
1227     }
1228 
clone_to_opaque(&self) -> OpaqueStreamRef where B: 'static,1229     pub fn clone_to_opaque(&self) -> OpaqueStreamRef
1230     where
1231         B: 'static,
1232     {
1233         self.opaque.clone()
1234     }
1235 
stream_id(&self) -> StreamId1236     pub fn stream_id(&self) -> StreamId {
1237         self.opaque.stream_id()
1238     }
1239 }
1240 
1241 impl<B> Clone for StreamRef<B> {
clone(&self) -> Self1242     fn clone(&self) -> Self {
1243         StreamRef {
1244             opaque: self.opaque.clone(),
1245             send_buffer: self.send_buffer.clone(),
1246         }
1247     }
1248 }
1249 
1250 // ===== impl OpaqueStreamRef =====
1251 
1252 impl OpaqueStreamRef {
new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef1253     fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
1254         stream.ref_inc();
1255         OpaqueStreamRef {
1256             inner,
1257             key: stream.key(),
1258         }
1259     }
1260     /// Called by a client to check for a received response.
poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>>1261     pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
1262         let mut me = self.inner.lock().unwrap();
1263         let me = &mut *me;
1264 
1265         let mut stream = me.store.resolve(self.key);
1266 
1267         me.actions.recv.poll_response(cx, &mut stream)
1268     }
1269     /// Called by a client to check for a pushed request.
poll_pushed( &mut self, cx: &Context, ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>>1270     pub fn poll_pushed(
1271         &mut self,
1272         cx: &Context,
1273     ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
1274         let mut me = self.inner.lock().unwrap();
1275         let me = &mut *me;
1276 
1277         let mut stream = me.store.resolve(self.key);
1278         me.actions
1279             .recv
1280             .poll_pushed(cx, &mut stream)
1281             .map_ok_(|(h, key)| {
1282                 me.refs += 1;
1283                 let opaque_ref =
1284                     OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
1285                 (h, opaque_ref)
1286             })
1287     }
1288 
is_end_stream(&self) -> bool1289     pub fn is_end_stream(&self) -> bool {
1290         let mut me = self.inner.lock().unwrap();
1291         let me = &mut *me;
1292 
1293         let stream = me.store.resolve(self.key);
1294 
1295         me.actions.recv.is_end_stream(&stream)
1296     }
1297 
poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>>1298     pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
1299         let mut me = self.inner.lock().unwrap();
1300         let me = &mut *me;
1301 
1302         let mut stream = me.store.resolve(self.key);
1303 
1304         me.actions.recv.poll_data(cx, &mut stream)
1305     }
1306 
poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>>1307     pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1308         let mut me = self.inner.lock().unwrap();
1309         let me = &mut *me;
1310 
1311         let mut stream = me.store.resolve(self.key);
1312 
1313         me.actions.recv.poll_trailers(cx, &mut stream)
1314     }
1315 
available_recv_capacity(&self) -> isize1316     pub(crate) fn available_recv_capacity(&self) -> isize {
1317         let me = self.inner.lock().unwrap();
1318         let me = &*me;
1319 
1320         let stream = &me.store[self.key];
1321         stream.recv_flow.available().into()
1322     }
1323 
used_recv_capacity(&self) -> WindowSize1324     pub(crate) fn used_recv_capacity(&self) -> WindowSize {
1325         let me = self.inner.lock().unwrap();
1326         let me = &*me;
1327 
1328         let stream = &me.store[self.key];
1329         stream.in_flight_recv_data
1330     }
1331 
1332     /// Releases recv capacity back to the peer. This may result in sending
1333     /// WINDOW_UPDATE frames on both the stream and connection.
release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError>1334     pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
1335         let mut me = self.inner.lock().unwrap();
1336         let me = &mut *me;
1337 
1338         let mut stream = me.store.resolve(self.key);
1339 
1340         me.actions
1341             .recv
1342             .release_capacity(capacity, &mut stream, &mut me.actions.task)
1343     }
1344 
clear_recv_buffer(&mut self)1345     pub(crate) fn clear_recv_buffer(&mut self) {
1346         let mut me = self.inner.lock().unwrap();
1347         let me = &mut *me;
1348 
1349         let mut stream = me.store.resolve(self.key);
1350 
1351         me.actions.recv.clear_recv_buffer(&mut stream);
1352     }
1353 
stream_id(&self) -> StreamId1354     pub fn stream_id(&self) -> StreamId {
1355         self.inner.lock().unwrap().store[self.key].id
1356     }
1357 }
1358 
1359 impl fmt::Debug for OpaqueStreamRef {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1360     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1361         use std::sync::TryLockError::*;
1362 
1363         match self.inner.try_lock() {
1364             Ok(me) => {
1365                 let stream = &me.store[self.key];
1366                 fmt.debug_struct("OpaqueStreamRef")
1367                     .field("stream_id", &stream.id)
1368                     .field("ref_count", &stream.ref_count)
1369                     .finish()
1370             }
1371             Err(Poisoned(_)) => fmt
1372                 .debug_struct("OpaqueStreamRef")
1373                 .field("inner", &"<Poisoned>")
1374                 .finish(),
1375             Err(WouldBlock) => fmt
1376                 .debug_struct("OpaqueStreamRef")
1377                 .field("inner", &"<Locked>")
1378                 .finish(),
1379         }
1380     }
1381 }
1382 
1383 impl Clone for OpaqueStreamRef {
clone(&self) -> Self1384     fn clone(&self) -> Self {
1385         // Increment the ref count
1386         let mut inner = self.inner.lock().unwrap();
1387         inner.store.resolve(self.key).ref_inc();
1388         inner.refs += 1;
1389 
1390         OpaqueStreamRef {
1391             inner: self.inner.clone(),
1392             key: self.key.clone(),
1393         }
1394     }
1395 }
1396 
1397 impl Drop for OpaqueStreamRef {
drop(&mut self)1398     fn drop(&mut self) {
1399         drop_stream_ref(&self.inner, self.key);
1400     }
1401 }
1402 
1403 // TODO: Move back in fn above
drop_stream_ref(inner: &Mutex<Inner>, key: store::Key)1404 fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
1405     let mut me = match inner.lock() {
1406         Ok(inner) => inner,
1407         Err(_) => {
1408             if ::std::thread::panicking() {
1409                 tracing::trace!("StreamRef::drop; mutex poisoned");
1410                 return;
1411             } else {
1412                 panic!("StreamRef::drop; mutex poisoned");
1413             }
1414         }
1415     };
1416 
1417     let me = &mut *me;
1418     me.refs -= 1;
1419     let mut stream = me.store.resolve(key);
1420 
1421     tracing::trace!("drop_stream_ref; stream={:?}", stream);
1422 
1423     // decrement the stream's ref count by 1.
1424     stream.ref_dec();
1425 
1426     let actions = &mut me.actions;
1427 
1428     // If the stream is not referenced and it is already
1429     // closed (does not have to go through logic below
1430     // of canceling the stream), we should notify the task
1431     // (connection) so that it can close properly
1432     if stream.ref_count == 0 && stream.is_closed() {
1433         if let Some(task) = actions.task.take() {
1434             task.wake();
1435         }
1436     }
1437 
1438     me.counts.transition(stream, |counts, stream| {
1439         maybe_cancel(stream, actions, counts);
1440 
1441         if stream.ref_count == 0 {
1442             // Release any recv window back to connection, no one can access
1443             // it anymore.
1444             actions
1445                 .recv
1446                 .release_closed_capacity(stream, &mut actions.task);
1447 
1448             // We won't be able to reach our push promises anymore
1449             let mut ppp = stream.pending_push_promises.take();
1450             while let Some(promise) = ppp.pop(stream.store_mut()) {
1451                 counts.transition(promise, |counts, stream| {
1452                     maybe_cancel(stream, actions, counts);
1453                 });
1454             }
1455         }
1456     });
1457 }
1458 
maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts)1459 fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
1460     if stream.is_canceled_interest() {
1461         actions
1462             .send
1463             .schedule_implicit_reset(stream, Reason::CANCEL, counts, &mut actions.task);
1464         actions.recv.enqueue_reset_expiration(stream, counts);
1465     }
1466 }
1467 
1468 // ===== impl SendBuffer =====
1469 
1470 impl<B> SendBuffer<B> {
new() -> Self1471     fn new() -> Self {
1472         let inner = Mutex::new(Buffer::new());
1473         SendBuffer { inner }
1474     }
1475 }
1476 
1477 // ===== impl Actions =====
1478 
1479 impl Actions {
send_reset<B>( &mut self, stream: store::Ptr, reason: Reason, initiator: Initiator, counts: &mut Counts, send_buffer: &mut Buffer<Frame<B>>, )1480     fn send_reset<B>(
1481         &mut self,
1482         stream: store::Ptr,
1483         reason: Reason,
1484         initiator: Initiator,
1485         counts: &mut Counts,
1486         send_buffer: &mut Buffer<Frame<B>>,
1487     ) {
1488         counts.transition(stream, |counts, stream| {
1489             self.send.send_reset(
1490                 reason,
1491                 initiator,
1492                 send_buffer,
1493                 stream,
1494                 counts,
1495                 &mut self.task,
1496             );
1497             self.recv.enqueue_reset_expiration(stream, counts);
1498             // if a RecvStream is parked, ensure it's notified
1499             stream.notify_recv();
1500         });
1501     }
1502 
reset_on_recv_stream_err<B>( &mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, res: Result<(), Error>, ) -> Result<(), Error>1503     fn reset_on_recv_stream_err<B>(
1504         &mut self,
1505         buffer: &mut Buffer<Frame<B>>,
1506         stream: &mut store::Ptr,
1507         counts: &mut Counts,
1508         res: Result<(), Error>,
1509     ) -> Result<(), Error> {
1510         if let Err(Error::Reset(stream_id, reason, initiator)) = res {
1511             debug_assert_eq!(stream_id, stream.id);
1512             // Reset the stream.
1513             self.send
1514                 .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
1515             Ok(())
1516         } else {
1517             res
1518         }
1519     }
1520 
ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason>1521     fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
1522         if peer.is_local_init(id) {
1523             self.send.ensure_not_idle(id)
1524         } else {
1525             self.recv.ensure_not_idle(id)
1526         }
1527     }
1528 
ensure_no_conn_error(&self) -> Result<(), proto::Error>1529     fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
1530         if let Some(ref err) = self.conn_error {
1531             Err(err.clone())
1532         } else {
1533             Ok(())
1534         }
1535     }
1536 
1537     /// Check if we possibly could have processed and since forgotten this stream.
1538     ///
1539     /// If we send a RST_STREAM for a stream, we will eventually "forget" about
1540     /// the stream to free up memory. It's possible that the remote peer had
1541     /// frames in-flight, and by the time we receive them, our own state is
1542     /// gone. We *could* tear everything down by sending a GOAWAY, but it
1543     /// is more likely to be latency/memory constraints that caused this,
1544     /// and not a bad actor. So be less catastrophic, the spec allows
1545     /// us to send another RST_STREAM of STREAM_CLOSED.
may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool1546     fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
1547         if id.is_zero() {
1548             return false;
1549         }
1550         if peer.is_local_init(id) {
1551             self.send.may_have_created_stream(id)
1552         } else {
1553             self.recv.may_have_created_stream(id)
1554         }
1555     }
1556 
clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts)1557     fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
1558         self.recv.clear_queues(clear_pending_accept, store, counts);
1559         self.send.clear_queues(store, counts);
1560     }
1561 }
1562