1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 use super::WebTransportSession; 8 use crate::{ 9 CloseType, Http3StreamInfo, Http3StreamType, ReceiveOutput, RecvStream, RecvStreamEvents, Res, 10 SendStream, SendStreamEvents, Stream, 11 }; 12 use neqo_common::Encoder; 13 use neqo_transport::{Connection, StreamId}; 14 use std::cell::RefCell; 15 use std::rc::Rc; 16 17 pub const WEBTRANSPORT_UNI_STREAM: u64 = 0x54; 18 pub const WEBTRANSPORT_STREAM: u64 = 0x41; 19 20 #[derive(Debug)] 21 pub struct WebTransportRecvStream { 22 stream_id: StreamId, 23 events: Box<dyn RecvStreamEvents>, 24 session: Rc<RefCell<WebTransportSession>>, 25 session_id: StreamId, 26 fin: bool, 27 } 28 29 impl WebTransportRecvStream { new( stream_id: StreamId, session_id: StreamId, events: Box<dyn RecvStreamEvents>, session: Rc<RefCell<WebTransportSession>>, ) -> Self30 pub fn new( 31 stream_id: StreamId, 32 session_id: StreamId, 33 events: Box<dyn RecvStreamEvents>, 34 session: Rc<RefCell<WebTransportSession>>, 35 ) -> Self { 36 Self { 37 stream_id, 38 events, 39 session_id, 40 session, 41 fin: false, 42 } 43 } 44 get_info(&self) -> Http3StreamInfo45 fn get_info(&self) -> Http3StreamInfo { 46 Http3StreamInfo::new(self.stream_id, self.stream_type()) 47 } 48 } 49 50 impl Stream for WebTransportRecvStream { stream_type(&self) -> Http3StreamType51 fn stream_type(&self) -> Http3StreamType { 52 Http3StreamType::WebTransport(self.session_id) 53 } 54 } 55 56 impl RecvStream for WebTransportRecvStream { receive(&mut self, _conn: &mut Connection) -> Res<(ReceiveOutput, bool)>57 fn receive(&mut self, _conn: &mut Connection) -> Res<(ReceiveOutput, bool)> { 58 self.events.data_readable(self.get_info()); 59 Ok((ReceiveOutput::NoOutput, false)) 60 } 61 reset(&mut self, close_type: CloseType) -> Res<()>62 fn reset(&mut self, close_type: CloseType) -> Res<()> { 63 if !matches!(close_type, CloseType::ResetApp(_)) { 64 self.events.recv_closed(self.get_info(), close_type); 65 } 66 self.session.borrow_mut().remove_recv_stream(self.stream_id); 67 Ok(()) 68 } 69 read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)>70 fn read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)> { 71 let (amount, fin) = conn.stream_recv(self.stream_id, buf)?; 72 self.fin = fin; 73 if fin { 74 self.session.borrow_mut().remove_recv_stream(self.stream_id); 75 } 76 Ok((amount, fin)) 77 } 78 } 79 80 #[derive(Debug, PartialEq)] 81 enum WebTransportSenderStreamState { 82 SendingInit { buf: Vec<u8>, fin: bool }, 83 SendingData, 84 Done, 85 } 86 87 #[derive(Debug)] 88 pub struct WebTransportSendStream { 89 stream_id: StreamId, 90 state: WebTransportSenderStreamState, 91 events: Box<dyn SendStreamEvents>, 92 session: Rc<RefCell<WebTransportSession>>, 93 session_id: StreamId, 94 } 95 96 impl WebTransportSendStream { new( stream_id: StreamId, session_id: StreamId, events: Box<dyn SendStreamEvents>, session: Rc<RefCell<WebTransportSession>>, local: bool, ) -> Self97 pub fn new( 98 stream_id: StreamId, 99 session_id: StreamId, 100 events: Box<dyn SendStreamEvents>, 101 session: Rc<RefCell<WebTransportSession>>, 102 local: bool, 103 ) -> Self { 104 Self { 105 stream_id, 106 state: if local { 107 let mut d = Encoder::default(); 108 if stream_id.is_uni() { 109 d.encode_varint(WEBTRANSPORT_UNI_STREAM); 110 } else { 111 d.encode_varint(WEBTRANSPORT_STREAM); 112 } 113 d.encode_varint(session_id.as_u64()); 114 WebTransportSenderStreamState::SendingInit { 115 buf: d.into(), 116 fin: false, 117 } 118 } else { 119 WebTransportSenderStreamState::SendingData 120 }, 121 events, 122 session_id, 123 session, 124 } 125 } 126 set_done(&mut self, close_type: CloseType)127 fn set_done(&mut self, close_type: CloseType) { 128 self.state = WebTransportSenderStreamState::Done; 129 self.events.send_closed(self.get_info(), close_type); 130 self.session.borrow_mut().remove_send_stream(self.stream_id); 131 } 132 get_info(&self) -> Http3StreamInfo133 fn get_info(&self) -> Http3StreamInfo { 134 Http3StreamInfo::new(self.stream_id, self.stream_type()) 135 } 136 } 137 138 impl Stream for WebTransportSendStream { stream_type(&self) -> Http3StreamType139 fn stream_type(&self) -> Http3StreamType { 140 Http3StreamType::WebTransport(self.session_id) 141 } 142 } 143 144 impl SendStream for WebTransportSendStream { send(&mut self, conn: &mut Connection) -> Res<()>145 fn send(&mut self, conn: &mut Connection) -> Res<()> { 146 if let WebTransportSenderStreamState::SendingInit { ref mut buf, fin } = self.state { 147 let sent = conn.stream_send(self.stream_id, &buf[..])?; 148 if sent == buf.len() { 149 if fin { 150 conn.stream_close_send(self.stream_id)?; 151 self.set_done(CloseType::Done); 152 } else { 153 self.state = WebTransportSenderStreamState::SendingData; 154 } 155 } else { 156 let b = buf.split_off(sent); 157 *buf = b; 158 } 159 } 160 Ok(()) 161 } 162 has_data_to_send(&self) -> bool163 fn has_data_to_send(&self) -> bool { 164 matches!( 165 self.state, 166 WebTransportSenderStreamState::SendingInit { .. } 167 ) 168 } 169 stream_writable(&self)170 fn stream_writable(&self) { 171 self.events.data_writable(self.get_info()); 172 } 173 done(&self) -> bool174 fn done(&self) -> bool { 175 self.state == WebTransportSenderStreamState::Done 176 } 177 send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize>178 fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> { 179 self.send(conn)?; 180 if self.state == WebTransportSenderStreamState::SendingData { 181 let sent = conn.stream_send(self.stream_id, buf)?; 182 Ok(sent) 183 } else { 184 Ok(0) 185 } 186 } 187 handle_stop_sending(&mut self, close_type: CloseType)188 fn handle_stop_sending(&mut self, close_type: CloseType) { 189 self.set_done(close_type); 190 } 191 close(&mut self, conn: &mut Connection) -> Res<()>192 fn close(&mut self, conn: &mut Connection) -> Res<()> { 193 if let WebTransportSenderStreamState::SendingInit { ref mut fin, .. } = self.state { 194 *fin = true; 195 } else { 196 self.state = WebTransportSenderStreamState::Done; 197 conn.stream_close_send(self.stream_id)?; 198 self.set_done(CloseType::Done); 199 } 200 Ok(()) 201 } 202 } 203