1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 use crate::hframe::HFrame; 8 use crate::qlog; 9 use crate::Header; 10 use crate::{Error, Res}; 11 12 use neqo_common::{qdebug, qinfo, qtrace, Encoder}; 13 use neqo_qpack::encoder::QPackEncoder; 14 use neqo_transport::{AppError, Connection}; 15 use std::cmp::min; 16 use std::fmt::Debug; 17 18 const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2 19 const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header) 20 const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3 21 const MAX_DATA_HEADER_SIZE_3_LIMIT: usize = MAX_DATA_HEADER_SIZE_3 + 5; // 16383 + 5 (size of the next buffer data frame header) 22 const MAX_DATA_HEADER_SIZE_5: usize = (1 << 30) - 1; // Maximal amount of data with DATA frame header size 3 23 const MAX_DATA_HEADER_SIZE_5_LIMIT: usize = MAX_DATA_HEADER_SIZE_5 + 9; // 1073741823 + 9 (size of the next buffer data frame header) 24 25 pub(crate) trait SendMessageEvents: Debug { data_writable(&self, stream_id: u64)26 fn data_writable(&self, stream_id: u64); remove_send_side_event(&self, stream_id: u64)27 fn remove_send_side_event(&self, stream_id: u64); stop_sending(&self, stream_id: u64, app_err: AppError)28 fn stop_sending(&self, stream_id: u64, app_err: AppError); 29 } 30 31 /* 32 * SendMessage states: 33 * Uninitialized 34 * Initialized : Headers are present but still not encoded. A message body may be present as well. 35 * The client side sends a message body using the send_body() function that directly 36 * writes into a transport stream. The server side sets headers and body when 37 * initializing a send message (TODO: make server use send_body as well) 38 * SendingInitialMessage : sending headers and maybe message body. From here we may switch to 39 * SendingData or Closed (if the app does not want to send data and 40 * has already closed the send stream). 41 * SendingData : We are sending request data until the app closes the stream. 42 * Closed 43 */ 44 45 #[derive(PartialEq, Debug)] 46 enum SendMessageState { 47 Uninitialized, 48 Initialized { 49 headers: Vec<Header>, 50 data: Option<Vec<u8>>, 51 fin: bool, 52 }, 53 SendingInitialMessage { 54 buf: Vec<u8>, 55 fin: bool, 56 }, 57 SendingData, 58 Closed, 59 } 60 61 impl SendMessageState { is_sending_closed(&self) -> bool62 pub fn is_sending_closed(&self) -> bool { 63 match self { 64 Self::Initialized { fin, .. } | Self::SendingInitialMessage { fin, .. } => *fin, 65 Self::SendingData => false, 66 _ => true, 67 } 68 } 69 done(&self) -> bool70 pub fn done(&self) -> bool { 71 matches!(self, Self::Closed) 72 } 73 is_state_sending_data(&self) -> bool74 pub fn is_state_sending_data(&self) -> bool { 75 matches!(self, Self::SendingData) 76 } 77 } 78 79 #[derive(Debug)] 80 pub(crate) struct SendMessage { 81 state: SendMessageState, 82 stream_id: u64, 83 conn_events: Box<dyn SendMessageEvents>, 84 } 85 86 impl SendMessage { new(stream_id: u64, conn_events: Box<dyn SendMessageEvents>) -> Self87 pub fn new(stream_id: u64, conn_events: Box<dyn SendMessageEvents>) -> Self { 88 qinfo!("Create a request stream_id={}", stream_id); 89 Self { 90 state: SendMessageState::Uninitialized, 91 stream_id, 92 conn_events, 93 } 94 } 95 new_with_headers( stream_id: u64, headers: Vec<Header>, conn_events: Box<dyn SendMessageEvents>, ) -> Self96 pub fn new_with_headers( 97 stream_id: u64, 98 headers: Vec<Header>, 99 conn_events: Box<dyn SendMessageEvents>, 100 ) -> Self { 101 qinfo!("Create a request stream_id={}", stream_id); 102 Self { 103 state: SendMessageState::Initialized { 104 headers, 105 data: None, 106 fin: false, 107 }, 108 stream_id, 109 conn_events, 110 } 111 } 112 set_message(&mut self, headers: &[Header], data: Option<&[u8]>) -> Res<()>113 pub fn set_message(&mut self, headers: &[Header], data: Option<&[u8]>) -> Res<()> { 114 if !matches!(self.state, SendMessageState::Uninitialized) { 115 return Err(Error::AlreadyInitialized); 116 } 117 118 self.state = SendMessageState::Initialized { 119 headers: headers.to_vec(), 120 data: data.map(|d| d.to_vec()), 121 fin: true, 122 }; 123 Ok(()) 124 } 125 send_body(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize>126 pub fn send_body(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> { 127 qtrace!( 128 [self], 129 "send_body: state={:?} len={}", 130 self.state, 131 buf.len() 132 ); 133 match self.state { 134 SendMessageState::Uninitialized 135 | SendMessageState::Initialized { .. } 136 | SendMessageState::SendingInitialMessage { .. } => Ok(0), 137 SendMessageState::SendingData => { 138 let available = conn 139 .stream_avail_send_space(self.stream_id) 140 .map_err(|e| Error::map_stream_send_errors(&e))?; 141 if available <= 2 { 142 return Ok(0); 143 } 144 let to_send; 145 if available <= MAX_DATA_HEADER_SIZE_2_LIMIT { 146 // 63 + 3 147 to_send = min(min(buf.len(), available - 2), MAX_DATA_HEADER_SIZE_2); 148 } else if available <= MAX_DATA_HEADER_SIZE_3_LIMIT { 149 // 16383 + 5 150 to_send = min(min(buf.len(), available - 3), MAX_DATA_HEADER_SIZE_3); 151 } else if available <= MAX_DATA_HEADER_SIZE_5 { 152 // 1073741823 + 9 153 to_send = min(min(buf.len(), available - 5), MAX_DATA_HEADER_SIZE_5_LIMIT); 154 } else { 155 to_send = min(buf.len(), available - 9); 156 } 157 158 qinfo!( 159 [self], 160 "send_request_body: available={} to_send={}.", 161 available, 162 to_send 163 ); 164 165 let data_frame = HFrame::Data { 166 len: to_send as u64, 167 }; 168 let mut enc = Encoder::default(); 169 data_frame.encode(&mut enc); 170 let sent_fh = conn 171 .stream_send(self.stream_id, &enc) 172 .map_err(|e| Error::map_stream_send_errors(&e))?; 173 debug_assert_eq!(sent_fh, enc.len()); 174 175 let sent = conn 176 .stream_send(self.stream_id, &buf[..to_send]) 177 .map_err(|e| Error::map_stream_send_errors(&e))?; 178 qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, to_send); 179 Ok(sent) 180 } 181 SendMessageState::Closed => Err(Error::AlreadyClosed), 182 } 183 } 184 done(&self) -> bool185 pub fn done(&self) -> bool { 186 self.state.done() 187 } 188 stream_writable(&self)189 pub fn stream_writable(&self) { 190 if self.state.is_state_sending_data() { 191 self.conn_events.data_writable(self.stream_id); 192 } 193 } 194 195 /// # Errors 196 /// `ClosedCriticalStream` if the encoder stream is closed. 197 /// `InternalError` if an unexpected error occurred. ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()>198 fn ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> { 199 if let SendMessageState::Initialized { headers, data, fin } = &self.state { 200 qdebug!([self], "Encoding headers"); 201 let header_block = encoder.encode_header_block(conn, &headers, self.stream_id)?; 202 let hframe = HFrame::Headers { 203 header_block: header_block.to_vec(), 204 }; 205 let mut d = Encoder::default(); 206 hframe.encode(&mut d); 207 if let Some(buf) = data { 208 qdebug!([self], "Encoding data"); 209 let d_frame = HFrame::Data { 210 len: buf.len() as u64, 211 }; 212 d_frame.encode(&mut d); 213 d.encode(&buf); 214 } 215 216 self.state = SendMessageState::SendingInitialMessage { 217 buf: d.into(), 218 fin: *fin, 219 }; 220 } 221 Ok(()) 222 } 223 224 /// # Errors 225 /// `ClosedCriticalStream` if the encoder stream is closed. 226 /// `InternalError` if an unexpected error occurred. 227 /// `InvalidStreamId` if the stream does not exist, 228 /// `AlreadyClosed` if the stream has already been closed. 229 /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output` 230 /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.) send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()>231 pub fn send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> { 232 self.ensure_encoded(conn, encoder)?; 233 234 let label = if ::log::log_enabled!(::log::Level::Debug) { 235 format!("{}", self) 236 } else { 237 String::new() 238 }; 239 240 if let SendMessageState::SendingInitialMessage { ref mut buf, fin } = self.state { 241 let sent = Error::map_error( 242 conn.stream_send(self.stream_id, &buf), 243 Error::HttpInternal(5), 244 )?; 245 qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, sent); 246 247 qtrace!([label], "{} bytes sent", sent); 248 249 if sent == buf.len() { 250 if fin { 251 Error::map_error( 252 conn.stream_close_send(self.stream_id), 253 Error::HttpInternal(6), 254 )?; 255 self.state = SendMessageState::Closed; 256 qtrace!([label], "done sending request"); 257 } else { 258 self.state = SendMessageState::SendingData; 259 self.conn_events.data_writable(self.stream_id); 260 qtrace!([label], "change to state SendingData"); 261 } 262 } else { 263 let b = buf.split_off(sent); 264 *buf = b; 265 } 266 } 267 Ok(()) 268 } 269 270 // SendMessage owns headers and sends them. It may also own data for the server side. 271 // This method returns if they're still being sent. Request body (if any) is sent by 272 // http client afterwards using `send_request_body` after receiving DataWritable event. has_data_to_send(&self) -> bool273 pub fn has_data_to_send(&self) -> bool { 274 matches!( 275 self.state, 276 SendMessageState::Initialized { .. } | SendMessageState::SendingInitialMessage { .. } 277 ) 278 } 279 close(&mut self, conn: &mut Connection) -> Res<()>280 pub fn close(&mut self, conn: &mut Connection) -> Res<()> { 281 match self.state { 282 SendMessageState::SendingInitialMessage { ref mut fin, .. } 283 | SendMessageState::Initialized { ref mut fin, .. } => { 284 *fin = true; 285 } 286 _ => { 287 self.state = SendMessageState::Closed; 288 conn.stream_close_send(self.stream_id)?; 289 } 290 } 291 292 self.conn_events.remove_send_side_event(self.stream_id); 293 Ok(()) 294 } 295 stop_sending(&mut self, app_err: AppError)296 pub fn stop_sending(&mut self, app_err: AppError) { 297 if !self.state.is_sending_closed() { 298 self.conn_events.remove_send_side_event(self.stream_id); 299 self.conn_events.stop_sending(self.stream_id, app_err); 300 } 301 } 302 } 303 304 impl ::std::fmt::Display for SendMessage { fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result305 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { 306 write!(f, "SendMesage {}", self.stream_id) 307 } 308 } 309