1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use crate::connection::Http3Transaction;
8 use crate::hframe::{HFrame, HFrameReader};
9 use crate::server_connection_events::Http3ServerConnEvents;
10 use crate::Header;
11 use crate::{Error, Res};
12 use neqo_common::{matches, qdebug, qinfo, qtrace, Encoder};
13 use neqo_qpack::decoder::QPackDecoder;
14 use neqo_qpack::encoder::QPackEncoder;
15 use neqo_transport::Connection;
16 use std::mem;
17 
18 #[derive(PartialEq, Debug)]
19 enum TransactionRecvState {
20     WaitingForHeaders,
21     DecodingHeaders { header_block: Vec<u8>, fin: bool },
22     WaitingForData,
23     ReadingData { remaining_data_len: usize },
24     Closed,
25 }
26 
27 #[derive(PartialEq, Debug)]
28 enum TransactionSendState {
29     Initial,
30     SendingResponse { buf: Vec<u8> },
31     Closed,
32 }
33 
34 #[derive(Debug)]
35 pub struct TransactionServer {
36     recv_state: TransactionRecvState,
37     send_state: TransactionSendState,
38     stream_id: u64,
39     frame_reader: HFrameReader,
40     conn_events: Http3ServerConnEvents,
41 }
42 
43 impl TransactionServer {
new(stream_id: u64, conn_events: Http3ServerConnEvents) -> Self44     pub fn new(stream_id: u64, conn_events: Http3ServerConnEvents) -> Self {
45         qinfo!("Create a request stream_id={}", stream_id);
46         Self {
47             recv_state: TransactionRecvState::WaitingForHeaders,
48             send_state: TransactionSendState::Initial,
49             stream_id,
50             frame_reader: HFrameReader::new(),
51             conn_events,
52         }
53     }
54 
set_response(&mut self, headers: &[Header], data: Vec<u8>, encoder: &mut QPackEncoder)55     pub fn set_response(&mut self, headers: &[Header], data: Vec<u8>, encoder: &mut QPackEncoder) {
56         qdebug!([self], "Encoding headers");
57         let header_block = encoder.encode_header_block(&headers, self.stream_id);
58         let hframe = HFrame::Headers {
59             header_block: header_block.to_vec(),
60         };
61         let mut d = Encoder::default();
62         hframe.encode(&mut d);
63         if !data.is_empty() {
64             qdebug!([self], "Encoding data");
65             let d_frame = HFrame::Data {
66                 len: data.len() as u64,
67             };
68             d_frame.encode(&mut d);
69             d.encode(&data);
70         }
71 
72         self.send_state = TransactionSendState::SendingResponse { buf: d.into() };
73     }
74 
recv_frame(&mut self, conn: &mut Connection) -> Res<(Option<HFrame>, bool)>75     fn recv_frame(&mut self, conn: &mut Connection) -> Res<(Option<HFrame>, bool)> {
76         qtrace!([self], "receiving a frame");
77         let fin = self.frame_reader.receive(conn, self.stream_id)?;
78         if !self.frame_reader.done() {
79             Ok((None, fin))
80         } else {
81             qinfo!([self], "A new frame has been received.");
82             Ok((Some(self.frame_reader.get_frame()?), fin))
83         }
84     }
85 
handle_data_frame(&mut self, len: u64, fin: bool) -> Res<()>86     fn handle_data_frame(&mut self, len: u64, fin: bool) -> Res<()> {
87         qinfo!([self], "A new data frame len={} fin={}", len, fin);
88         if len > 0 {
89             if fin {
90                 return Err(Error::HttpFrameError);
91             }
92             self.recv_state = TransactionRecvState::ReadingData {
93                 remaining_data_len: len as usize,
94             };
95         } else if fin {
96             self.conn_events.data(self.stream_id, Vec::new(), true);
97             self.recv_state = TransactionRecvState::Closed;
98         }
99         Ok(())
100     }
101 }
102 
103 impl ::std::fmt::Display for TransactionServer {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result104     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
105         write!(f, "TransactionServer {}", self.stream_id)
106     }
107 }
108 
109 impl Http3Transaction for TransactionServer {
send(&mut self, conn: &mut Connection, _encoder: &mut QPackEncoder) -> Res<()>110     fn send(&mut self, conn: &mut Connection, _encoder: &mut QPackEncoder) -> Res<()> {
111         qtrace!([self], "Sending response.");
112         let label = if ::log::log_enabled!(::log::Level::Debug) {
113             format!("{}", self)
114         } else {
115             String::new()
116         };
117         if let TransactionSendState::SendingResponse { ref mut buf } = self.send_state {
118             let sent = conn.stream_send(self.stream_id, &buf[..])?;
119             qinfo!([label], "{} bytes sent", sent);
120             if sent == buf.len() {
121                 conn.stream_close_send(self.stream_id)?;
122                 self.send_state = TransactionSendState::Closed;
123                 qinfo!([label], "done sending request");
124             } else {
125                 let mut b = buf.split_off(sent);
126                 mem::swap(buf, &mut b);
127             }
128         }
129 
130         Ok(())
131     }
132 
receive(&mut self, conn: &mut Connection, decoder: &mut QPackDecoder) -> Res<()>133     fn receive(&mut self, conn: &mut Connection, decoder: &mut QPackDecoder) -> Res<()> {
134         let label = if ::log::log_enabled!(::log::Level::Debug) {
135             format!("{}", self)
136         } else {
137             String::new()
138         };
139 
140         loop {
141             qtrace!(
142                 [label],
143                 "[recv_state={:?}] receiving data.",
144                 self.recv_state
145             );
146             match self.recv_state {
147                 TransactionRecvState::WaitingForHeaders => {
148                     match self.recv_frame(conn)? {
149                         (None, true) => {
150                             // Stream has been closed without any data, just ignore it.
151                             self.recv_state = TransactionRecvState::Closed;
152                             return Ok(());
153                         }
154                         (None, false) => {
155                             // We do not have a complete frame.
156                             return Ok(());
157                         }
158                         (Some(HFrame::Headers { header_block }), fin) => {
159                             if !header_block.is_empty() {
160                                 // Next step decoding headers.
161                                 self.recv_state =
162                                     TransactionRecvState::DecodingHeaders { header_block, fin };
163                             } else {
164                                 self.conn_events.headers(self.stream_id, Vec::new(), fin);
165                                 if fin {
166                                     self.recv_state = TransactionRecvState::Closed;
167                                     return Ok(());
168                                 } else {
169                                     self.recv_state = TransactionRecvState::WaitingForData;
170                                 }
171                             }
172                         }
173                         // server can only receive a Header frame at this point.
174                         _ => {
175                             return Err(Error::HttpFrameUnexpected);
176                         }
177                     }
178                 }
179                 TransactionRecvState::DecodingHeaders {
180                     ref mut header_block,
181                     fin,
182                 } => match decoder.decode_header_block(header_block, self.stream_id)? {
183                     Some(headers) => {
184                         self.conn_events.headers(self.stream_id, headers, fin);
185                         if fin {
186                             self.recv_state = TransactionRecvState::Closed;
187                             return Ok(());
188                         } else {
189                             self.recv_state = TransactionRecvState::WaitingForData;
190                         }
191                     }
192                     None => {
193                         qinfo!([self], "decoding header is blocked.");
194                         return Ok(());
195                     }
196                 },
197                 TransactionRecvState::WaitingForData => {
198                     match self.recv_frame(conn)? {
199                         (None, true) => {
200                             // Inform the app tthat tthe stream is done.
201                             self.conn_events.data(self.stream_id, Vec::new(), true);
202                             self.recv_state = TransactionRecvState::Closed;
203                             return Ok(());
204                         }
205                         (None, false) => {
206                             // Still reading a frame.
207                             return Ok(());
208                         }
209                         (Some(HFrame::Data { len }), fin) => {
210                             self.handle_data_frame(len, fin)?;
211                             if fin {
212                                 return Ok(());
213                             }
214                         }
215                         _ => {
216                             return Err(Error::HttpFrameUnexpected);
217                         }
218                     };
219                 }
220                 TransactionRecvState::ReadingData {
221                     ref mut remaining_data_len,
222                 } => {
223                     // TODO add available(stream_id) to neqo_transport.
224                     assert!(*remaining_data_len > 0);
225                     while *remaining_data_len != 0 {
226                         let to_read = if *remaining_data_len > 1024 {
227                             1024
228                         } else {
229                             *remaining_data_len
230                         };
231 
232                         let mut data = vec![0x0; to_read];
233                         let (amount, fin) = conn.stream_recv(self.stream_id, &mut data[..])?;
234                         assert!(amount <= to_read);
235                         if amount > 0 {
236                             data.truncate(amount);
237                             self.conn_events.data(self.stream_id, data, fin);
238                             *remaining_data_len -= amount;
239                         }
240                         if fin {
241                             if *remaining_data_len > 0 {
242                                 return Err(Error::HttpFrameError);
243                             }
244                             self.recv_state = TransactionRecvState::Closed;
245                             return Ok(());
246                         } else if *remaining_data_len == 0 {
247                             self.recv_state = TransactionRecvState::WaitingForData;
248                             break;
249                         }
250                         if amount == 0 {
251                             return Ok(());
252                         }
253                     }
254                 }
255                 TransactionRecvState::Closed => {
256                     panic!("Stream readable after being closed!");
257                 }
258             };
259         }
260     }
261 
has_data_to_send(&self) -> bool262     fn has_data_to_send(&self) -> bool {
263         matches!(self.send_state, TransactionSendState::SendingResponse { .. })
264     }
265 
reset_receiving_side(&mut self)266     fn reset_receiving_side(&mut self) {
267         self.recv_state = TransactionRecvState::Closed;
268     }
269 
stop_sending(&mut self)270     fn stop_sending(&mut self) {}
271 
done(&self) -> bool272     fn done(&self) -> bool {
273         self.send_state == TransactionSendState::Closed
274             && self.recv_state == TransactionRecvState::Closed
275     }
276 
close_send(&mut self, _conn: &mut Connection) -> Res<()>277     fn close_send(&mut self, _conn: &mut Connection) -> Res<()> {
278         Ok(())
279     }
280 }
281