1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use crate::hframe::HFrame;
8 use crate::qlog;
9 use crate::Header;
10 use crate::{Error, Res};
11 
12 use neqo_common::{qdebug, qinfo, qtrace, Encoder};
13 use neqo_qpack::encoder::QPackEncoder;
14 use neqo_transport::{AppError, Connection};
15 use std::cmp::min;
16 use std::fmt::Debug;
17 
18 const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2
19 const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header)
20 const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3
21 const MAX_DATA_HEADER_SIZE_3_LIMIT: usize = MAX_DATA_HEADER_SIZE_3 + 5; // 16383 + 5 (size of the next buffer data frame header)
22 const MAX_DATA_HEADER_SIZE_5: usize = (1 << 30) - 1; // Maximal amount of data with DATA frame header size 3
23 const MAX_DATA_HEADER_SIZE_5_LIMIT: usize = MAX_DATA_HEADER_SIZE_5 + 9; // 1073741823 + 9 (size of the next buffer data frame header)
24 
25 pub(crate) trait SendMessageEvents: Debug {
data_writable(&self, stream_id: u64)26     fn data_writable(&self, stream_id: u64);
remove_send_side_event(&self, stream_id: u64)27     fn remove_send_side_event(&self, stream_id: u64);
stop_sending(&self, stream_id: u64, app_err: AppError)28     fn stop_sending(&self, stream_id: u64, app_err: AppError);
29 }
30 
31 /*
32  *  SendMessage states:
33  *    Uninitialized
34  *    Initialized : Headers are present but still not encoded. A message body may be present as well.
35  *                  The client side sends a message body using the send_body() function that directly
36  *                  writes into a transport stream. The server side sets headers and body when
37  *                  initializing a send message (TODO: make server use send_body as well)
38  *    SendingInitialMessage : sending headers and maybe message body. From here we may switch to
39  *                     SendingData or Closed (if the app does not want to send data and
40  *                     has already closed the send stream).
41  *    SendingData : We are sending request data until the app closes the stream.
42  *    Closed
43  */
44 
45 #[derive(PartialEq, Debug)]
46 enum SendMessageState {
47     Uninitialized,
48     Initialized {
49         headers: Vec<Header>,
50         data: Option<Vec<u8>>,
51         fin: bool,
52     },
53     SendingInitialMessage {
54         buf: Vec<u8>,
55         fin: bool,
56     },
57     SendingData,
58     Closed,
59 }
60 
61 impl SendMessageState {
is_sending_closed(&self) -> bool62     pub fn is_sending_closed(&self) -> bool {
63         match self {
64             Self::Initialized { fin, .. } | Self::SendingInitialMessage { fin, .. } => *fin,
65             Self::SendingData => false,
66             _ => true,
67         }
68     }
69 
done(&self) -> bool70     pub fn done(&self) -> bool {
71         matches!(self, Self::Closed)
72     }
73 
is_state_sending_data(&self) -> bool74     pub fn is_state_sending_data(&self) -> bool {
75         matches!(self, Self::SendingData)
76     }
77 }
78 
79 #[derive(Debug)]
80 pub(crate) struct SendMessage {
81     state: SendMessageState,
82     stream_id: u64,
83     conn_events: Box<dyn SendMessageEvents>,
84 }
85 
86 impl SendMessage {
new(stream_id: u64, conn_events: Box<dyn SendMessageEvents>) -> Self87     pub fn new(stream_id: u64, conn_events: Box<dyn SendMessageEvents>) -> Self {
88         qinfo!("Create a request stream_id={}", stream_id);
89         Self {
90             state: SendMessageState::Uninitialized,
91             stream_id,
92             conn_events,
93         }
94     }
95 
new_with_headers( stream_id: u64, headers: Vec<Header>, conn_events: Box<dyn SendMessageEvents>, ) -> Self96     pub fn new_with_headers(
97         stream_id: u64,
98         headers: Vec<Header>,
99         conn_events: Box<dyn SendMessageEvents>,
100     ) -> Self {
101         qinfo!("Create a request stream_id={}", stream_id);
102         Self {
103             state: SendMessageState::Initialized {
104                 headers,
105                 data: None,
106                 fin: false,
107             },
108             stream_id,
109             conn_events,
110         }
111     }
112 
set_message(&mut self, headers: &[Header], data: Option<&[u8]>) -> Res<()>113     pub fn set_message(&mut self, headers: &[Header], data: Option<&[u8]>) -> Res<()> {
114         if !matches!(self.state, SendMessageState::Uninitialized) {
115             return Err(Error::AlreadyInitialized);
116         }
117 
118         self.state = SendMessageState::Initialized {
119             headers: headers.to_vec(),
120             data: data.map(|d| d.to_vec()),
121             fin: true,
122         };
123         Ok(())
124     }
125 
send_body(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize>126     pub fn send_body(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> {
127         qtrace!(
128             [self],
129             "send_body: state={:?} len={}",
130             self.state,
131             buf.len()
132         );
133         match self.state {
134             SendMessageState::Uninitialized
135             | SendMessageState::Initialized { .. }
136             | SendMessageState::SendingInitialMessage { .. } => Ok(0),
137             SendMessageState::SendingData => {
138                 let available = conn
139                     .stream_avail_send_space(self.stream_id)
140                     .map_err(|e| Error::map_stream_send_errors(&e))?;
141                 if available <= 2 {
142                     return Ok(0);
143                 }
144                 let to_send;
145                 if available <= MAX_DATA_HEADER_SIZE_2_LIMIT {
146                     // 63 + 3
147                     to_send = min(min(buf.len(), available - 2), MAX_DATA_HEADER_SIZE_2);
148                 } else if available <= MAX_DATA_HEADER_SIZE_3_LIMIT {
149                     // 16383 + 5
150                     to_send = min(min(buf.len(), available - 3), MAX_DATA_HEADER_SIZE_3);
151                 } else if available <= MAX_DATA_HEADER_SIZE_5 {
152                     // 1073741823 + 9
153                     to_send = min(min(buf.len(), available - 5), MAX_DATA_HEADER_SIZE_5_LIMIT);
154                 } else {
155                     to_send = min(buf.len(), available - 9);
156                 }
157 
158                 qinfo!(
159                     [self],
160                     "send_request_body: available={} to_send={}.",
161                     available,
162                     to_send
163                 );
164 
165                 let data_frame = HFrame::Data {
166                     len: to_send as u64,
167                 };
168                 let mut enc = Encoder::default();
169                 data_frame.encode(&mut enc);
170                 let sent_fh = conn
171                     .stream_send(self.stream_id, &enc)
172                     .map_err(|e| Error::map_stream_send_errors(&e))?;
173                 debug_assert_eq!(sent_fh, enc.len());
174 
175                 let sent = conn
176                     .stream_send(self.stream_id, &buf[..to_send])
177                     .map_err(|e| Error::map_stream_send_errors(&e))?;
178                 qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, to_send);
179                 Ok(sent)
180             }
181             SendMessageState::Closed => Err(Error::AlreadyClosed),
182         }
183     }
184 
done(&self) -> bool185     pub fn done(&self) -> bool {
186         self.state.done()
187     }
188 
stream_writable(&self)189     pub fn stream_writable(&self) {
190         if self.state.is_state_sending_data() {
191             self.conn_events.data_writable(self.stream_id);
192         }
193     }
194 
195     /// # Errors
196     /// `ClosedCriticalStream` if the encoder stream is closed.
197     /// `InternalError` if an unexpected error occurred.
ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()>198     fn ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
199         if let SendMessageState::Initialized { headers, data, fin } = &self.state {
200             qdebug!([self], "Encoding headers");
201             let header_block = encoder.encode_header_block(conn, &headers, self.stream_id)?;
202             let hframe = HFrame::Headers {
203                 header_block: header_block.to_vec(),
204             };
205             let mut d = Encoder::default();
206             hframe.encode(&mut d);
207             if let Some(buf) = data {
208                 qdebug!([self], "Encoding data");
209                 let d_frame = HFrame::Data {
210                     len: buf.len() as u64,
211                 };
212                 d_frame.encode(&mut d);
213                 d.encode(&buf);
214             }
215 
216             self.state = SendMessageState::SendingInitialMessage {
217                 buf: d.into(),
218                 fin: *fin,
219             };
220         }
221         Ok(())
222     }
223 
224     /// # Errors
225     /// `ClosedCriticalStream` if the encoder stream is closed.
226     /// `InternalError` if an unexpected error occurred.
227     /// `InvalidStreamId` if the stream does not exist,
228     /// `AlreadyClosed` if the stream has already been closed.
229     /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output`
230     /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.)
send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()>231     pub fn send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
232         self.ensure_encoded(conn, encoder)?;
233 
234         let label = if ::log::log_enabled!(::log::Level::Debug) {
235             format!("{}", self)
236         } else {
237             String::new()
238         };
239 
240         if let SendMessageState::SendingInitialMessage { ref mut buf, fin } = self.state {
241             let sent = Error::map_error(
242                 conn.stream_send(self.stream_id, &buf),
243                 Error::HttpInternal(5),
244             )?;
245             qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, sent);
246 
247             qtrace!([label], "{} bytes sent", sent);
248 
249             if sent == buf.len() {
250                 if fin {
251                     Error::map_error(
252                         conn.stream_close_send(self.stream_id),
253                         Error::HttpInternal(6),
254                     )?;
255                     self.state = SendMessageState::Closed;
256                     qtrace!([label], "done sending request");
257                 } else {
258                     self.state = SendMessageState::SendingData;
259                     self.conn_events.data_writable(self.stream_id);
260                     qtrace!([label], "change to state SendingData");
261                 }
262             } else {
263                 let b = buf.split_off(sent);
264                 *buf = b;
265             }
266         }
267         Ok(())
268     }
269 
270     // SendMessage owns headers and sends them. It may also own data for the server side.
271     // This method returns if they're still being sent. Request body (if any) is sent by
272     // http client afterwards using `send_request_body` after receiving DataWritable event.
has_data_to_send(&self) -> bool273     pub fn has_data_to_send(&self) -> bool {
274         matches!(
275             self.state,
276             SendMessageState::Initialized { .. } | SendMessageState::SendingInitialMessage { .. }
277         )
278     }
279 
close(&mut self, conn: &mut Connection) -> Res<()>280     pub fn close(&mut self, conn: &mut Connection) -> Res<()> {
281         match self.state {
282             SendMessageState::SendingInitialMessage { ref mut fin, .. }
283             | SendMessageState::Initialized { ref mut fin, .. } => {
284                 *fin = true;
285             }
286             _ => {
287                 self.state = SendMessageState::Closed;
288                 conn.stream_close_send(self.stream_id)?;
289             }
290         }
291 
292         self.conn_events.remove_send_side_event(self.stream_id);
293         Ok(())
294     }
295 
stop_sending(&mut self, app_err: AppError)296     pub fn stop_sending(&mut self, app_err: AppError) {
297         if !self.state.is_sending_closed() {
298             self.conn_events.remove_send_side_event(self.stream_id);
299             self.conn_events.stop_sending(self.stream_id, app_err);
300         }
301     }
302 }
303 
304 impl ::std::fmt::Display for SendMessage {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result305     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
306         write!(f, "SendMesage {}", self.stream_id)
307     }
308 }
309