1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 use crate::connection::Http3Transaction; 8 use crate::hframe::{HFrame, HFrameReader}; 9 use crate::server_connection_events::Http3ServerConnEvents; 10 use crate::Header; 11 use crate::{Error, Res}; 12 use neqo_common::{matches, qdebug, qinfo, qtrace, Encoder}; 13 use neqo_qpack::decoder::QPackDecoder; 14 use neqo_qpack::encoder::QPackEncoder; 15 use neqo_transport::Connection; 16 use std::mem; 17 18 #[derive(PartialEq, Debug)] 19 enum TransactionRecvState { 20 WaitingForHeaders, 21 DecodingHeaders { header_block: Vec<u8>, fin: bool }, 22 WaitingForData, 23 ReadingData { remaining_data_len: usize }, 24 Closed, 25 } 26 27 #[derive(PartialEq, Debug)] 28 enum TransactionSendState { 29 Initial, 30 SendingResponse { buf: Vec<u8> }, 31 Closed, 32 } 33 34 #[derive(Debug)] 35 pub struct TransactionServer { 36 recv_state: TransactionRecvState, 37 send_state: TransactionSendState, 38 stream_id: u64, 39 frame_reader: HFrameReader, 40 conn_events: Http3ServerConnEvents, 41 } 42 43 impl TransactionServer { new(stream_id: u64, conn_events: Http3ServerConnEvents) -> Self44 pub fn new(stream_id: u64, conn_events: Http3ServerConnEvents) -> Self { 45 qinfo!("Create a request stream_id={}", stream_id); 46 Self { 47 recv_state: TransactionRecvState::WaitingForHeaders, 48 send_state: TransactionSendState::Initial, 49 stream_id, 50 frame_reader: HFrameReader::new(), 51 conn_events, 52 } 53 } 54 set_response(&mut self, headers: &[Header], data: Vec<u8>, encoder: &mut QPackEncoder)55 pub fn set_response(&mut self, headers: &[Header], data: Vec<u8>, encoder: &mut QPackEncoder) { 56 qdebug!([self], "Encoding headers"); 57 let header_block = encoder.encode_header_block(&headers, self.stream_id); 58 let hframe = HFrame::Headers { 59 header_block: header_block.to_vec(), 60 }; 61 let mut d = Encoder::default(); 62 hframe.encode(&mut d); 63 if !data.is_empty() { 64 qdebug!([self], "Encoding data"); 65 let d_frame = HFrame::Data { 66 len: data.len() as u64, 67 }; 68 d_frame.encode(&mut d); 69 d.encode(&data); 70 } 71 72 self.send_state = TransactionSendState::SendingResponse { buf: d.into() }; 73 } 74 recv_frame(&mut self, conn: &mut Connection) -> Res<(Option<HFrame>, bool)>75 fn recv_frame(&mut self, conn: &mut Connection) -> Res<(Option<HFrame>, bool)> { 76 qtrace!([self], "receiving a frame"); 77 let fin = self.frame_reader.receive(conn, self.stream_id)?; 78 if !self.frame_reader.done() { 79 Ok((None, fin)) 80 } else { 81 qinfo!([self], "A new frame has been received."); 82 Ok((Some(self.frame_reader.get_frame()?), fin)) 83 } 84 } 85 handle_data_frame(&mut self, len: u64, fin: bool) -> Res<()>86 fn handle_data_frame(&mut self, len: u64, fin: bool) -> Res<()> { 87 qinfo!([self], "A new data frame len={} fin={}", len, fin); 88 if len > 0 { 89 if fin { 90 return Err(Error::HttpFrameError); 91 } 92 self.recv_state = TransactionRecvState::ReadingData { 93 remaining_data_len: len as usize, 94 }; 95 } else if fin { 96 self.conn_events.data(self.stream_id, Vec::new(), true); 97 self.recv_state = TransactionRecvState::Closed; 98 } 99 Ok(()) 100 } 101 } 102 103 impl ::std::fmt::Display for TransactionServer { fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result104 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { 105 write!(f, "TransactionServer {}", self.stream_id) 106 } 107 } 108 109 impl Http3Transaction for TransactionServer { send(&mut self, conn: &mut Connection, _encoder: &mut QPackEncoder) -> Res<()>110 fn send(&mut self, conn: &mut Connection, _encoder: &mut QPackEncoder) -> Res<()> { 111 qtrace!([self], "Sending response."); 112 let label = if ::log::log_enabled!(::log::Level::Debug) { 113 format!("{}", self) 114 } else { 115 String::new() 116 }; 117 if let TransactionSendState::SendingResponse { ref mut buf } = self.send_state { 118 let sent = conn.stream_send(self.stream_id, &buf[..])?; 119 qinfo!([label], "{} bytes sent", sent); 120 if sent == buf.len() { 121 conn.stream_close_send(self.stream_id)?; 122 self.send_state = TransactionSendState::Closed; 123 qinfo!([label], "done sending request"); 124 } else { 125 let mut b = buf.split_off(sent); 126 mem::swap(buf, &mut b); 127 } 128 } 129 130 Ok(()) 131 } 132 receive(&mut self, conn: &mut Connection, decoder: &mut QPackDecoder) -> Res<()>133 fn receive(&mut self, conn: &mut Connection, decoder: &mut QPackDecoder) -> Res<()> { 134 let label = if ::log::log_enabled!(::log::Level::Debug) { 135 format!("{}", self) 136 } else { 137 String::new() 138 }; 139 140 loop { 141 qtrace!( 142 [label], 143 "[recv_state={:?}] receiving data.", 144 self.recv_state 145 ); 146 match self.recv_state { 147 TransactionRecvState::WaitingForHeaders => { 148 match self.recv_frame(conn)? { 149 (None, true) => { 150 // Stream has been closed without any data, just ignore it. 151 self.recv_state = TransactionRecvState::Closed; 152 return Ok(()); 153 } 154 (None, false) => { 155 // We do not have a complete frame. 156 return Ok(()); 157 } 158 (Some(HFrame::Headers { header_block }), fin) => { 159 if !header_block.is_empty() { 160 // Next step decoding headers. 161 self.recv_state = 162 TransactionRecvState::DecodingHeaders { header_block, fin }; 163 } else { 164 self.conn_events.headers(self.stream_id, Vec::new(), fin); 165 if fin { 166 self.recv_state = TransactionRecvState::Closed; 167 return Ok(()); 168 } else { 169 self.recv_state = TransactionRecvState::WaitingForData; 170 } 171 } 172 } 173 // server can only receive a Header frame at this point. 174 _ => { 175 return Err(Error::HttpFrameUnexpected); 176 } 177 } 178 } 179 TransactionRecvState::DecodingHeaders { 180 ref mut header_block, 181 fin, 182 } => match decoder.decode_header_block(header_block, self.stream_id)? { 183 Some(headers) => { 184 self.conn_events.headers(self.stream_id, headers, fin); 185 if fin { 186 self.recv_state = TransactionRecvState::Closed; 187 return Ok(()); 188 } else { 189 self.recv_state = TransactionRecvState::WaitingForData; 190 } 191 } 192 None => { 193 qinfo!([self], "decoding header is blocked."); 194 return Ok(()); 195 } 196 }, 197 TransactionRecvState::WaitingForData => { 198 match self.recv_frame(conn)? { 199 (None, true) => { 200 // Inform the app tthat tthe stream is done. 201 self.conn_events.data(self.stream_id, Vec::new(), true); 202 self.recv_state = TransactionRecvState::Closed; 203 return Ok(()); 204 } 205 (None, false) => { 206 // Still reading a frame. 207 return Ok(()); 208 } 209 (Some(HFrame::Data { len }), fin) => { 210 self.handle_data_frame(len, fin)?; 211 if fin { 212 return Ok(()); 213 } 214 } 215 _ => { 216 return Err(Error::HttpFrameUnexpected); 217 } 218 }; 219 } 220 TransactionRecvState::ReadingData { 221 ref mut remaining_data_len, 222 } => { 223 // TODO add available(stream_id) to neqo_transport. 224 assert!(*remaining_data_len > 0); 225 while *remaining_data_len != 0 { 226 let to_read = if *remaining_data_len > 1024 { 227 1024 228 } else { 229 *remaining_data_len 230 }; 231 232 let mut data = vec![0x0; to_read]; 233 let (amount, fin) = conn.stream_recv(self.stream_id, &mut data[..])?; 234 assert!(amount <= to_read); 235 if amount > 0 { 236 data.truncate(amount); 237 self.conn_events.data(self.stream_id, data, fin); 238 *remaining_data_len -= amount; 239 } 240 if fin { 241 if *remaining_data_len > 0 { 242 return Err(Error::HttpFrameError); 243 } 244 self.recv_state = TransactionRecvState::Closed; 245 return Ok(()); 246 } else if *remaining_data_len == 0 { 247 self.recv_state = TransactionRecvState::WaitingForData; 248 break; 249 } 250 if amount == 0 { 251 return Ok(()); 252 } 253 } 254 } 255 TransactionRecvState::Closed => { 256 panic!("Stream readable after being closed!"); 257 } 258 }; 259 } 260 } 261 has_data_to_send(&self) -> bool262 fn has_data_to_send(&self) -> bool { 263 matches!(self.send_state, TransactionSendState::SendingResponse { .. }) 264 } 265 reset_receiving_side(&mut self)266 fn reset_receiving_side(&mut self) { 267 self.recv_state = TransactionRecvState::Closed; 268 } 269 stop_sending(&mut self)270 fn stop_sending(&mut self) {} 271 done(&self) -> bool272 fn done(&self) -> bool { 273 self.send_state == TransactionSendState::Closed 274 && self.recv_state == TransactionRecvState::Closed 275 } 276 close_send(&mut self, _conn: &mut Connection) -> Res<()>277 fn close_send(&mut self, _conn: &mut Connection) -> Res<()> { 278 Ok(()) 279 } 280 } 281