1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use super::WebTransportSession;
8 use crate::{
9     CloseType, Http3StreamInfo, Http3StreamType, ReceiveOutput, RecvStream, RecvStreamEvents, Res,
10     SendStream, SendStreamEvents, Stream,
11 };
12 use neqo_common::Encoder;
13 use neqo_transport::{Connection, StreamId};
14 use std::cell::RefCell;
15 use std::rc::Rc;
16 
17 pub const WEBTRANSPORT_UNI_STREAM: u64 = 0x54;
18 pub const WEBTRANSPORT_STREAM: u64 = 0x41;
19 
20 #[derive(Debug)]
21 pub struct WebTransportRecvStream {
22     stream_id: StreamId,
23     events: Box<dyn RecvStreamEvents>,
24     session: Rc<RefCell<WebTransportSession>>,
25     session_id: StreamId,
26     fin: bool,
27 }
28 
29 impl WebTransportRecvStream {
new( stream_id: StreamId, session_id: StreamId, events: Box<dyn RecvStreamEvents>, session: Rc<RefCell<WebTransportSession>>, ) -> Self30     pub fn new(
31         stream_id: StreamId,
32         session_id: StreamId,
33         events: Box<dyn RecvStreamEvents>,
34         session: Rc<RefCell<WebTransportSession>>,
35     ) -> Self {
36         Self {
37             stream_id,
38             events,
39             session_id,
40             session,
41             fin: false,
42         }
43     }
44 
get_info(&self) -> Http3StreamInfo45     fn get_info(&self) -> Http3StreamInfo {
46         Http3StreamInfo::new(self.stream_id, self.stream_type())
47     }
48 }
49 
50 impl Stream for WebTransportRecvStream {
stream_type(&self) -> Http3StreamType51     fn stream_type(&self) -> Http3StreamType {
52         Http3StreamType::WebTransport(self.session_id)
53     }
54 }
55 
56 impl RecvStream for WebTransportRecvStream {
receive(&mut self, _conn: &mut Connection) -> Res<(ReceiveOutput, bool)>57     fn receive(&mut self, _conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
58         self.events.data_readable(self.get_info());
59         Ok((ReceiveOutput::NoOutput, false))
60     }
61 
reset(&mut self, close_type: CloseType) -> Res<()>62     fn reset(&mut self, close_type: CloseType) -> Res<()> {
63         if !matches!(close_type, CloseType::ResetApp(_)) {
64             self.events.recv_closed(self.get_info(), close_type);
65         }
66         self.session.borrow_mut().remove_recv_stream(self.stream_id);
67         Ok(())
68     }
69 
read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)>70     fn read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)> {
71         let (amount, fin) = conn.stream_recv(self.stream_id, buf)?;
72         self.fin = fin;
73         if fin {
74             self.session.borrow_mut().remove_recv_stream(self.stream_id);
75         }
76         Ok((amount, fin))
77     }
78 }
79 
80 #[derive(Debug, PartialEq)]
81 enum WebTransportSenderStreamState {
82     SendingInit { buf: Vec<u8>, fin: bool },
83     SendingData,
84     Done,
85 }
86 
87 #[derive(Debug)]
88 pub struct WebTransportSendStream {
89     stream_id: StreamId,
90     state: WebTransportSenderStreamState,
91     events: Box<dyn SendStreamEvents>,
92     session: Rc<RefCell<WebTransportSession>>,
93     session_id: StreamId,
94 }
95 
96 impl WebTransportSendStream {
new( stream_id: StreamId, session_id: StreamId, events: Box<dyn SendStreamEvents>, session: Rc<RefCell<WebTransportSession>>, local: bool, ) -> Self97     pub fn new(
98         stream_id: StreamId,
99         session_id: StreamId,
100         events: Box<dyn SendStreamEvents>,
101         session: Rc<RefCell<WebTransportSession>>,
102         local: bool,
103     ) -> Self {
104         Self {
105             stream_id,
106             state: if local {
107                 let mut d = Encoder::default();
108                 if stream_id.is_uni() {
109                     d.encode_varint(WEBTRANSPORT_UNI_STREAM);
110                 } else {
111                     d.encode_varint(WEBTRANSPORT_STREAM);
112                 }
113                 d.encode_varint(session_id.as_u64());
114                 WebTransportSenderStreamState::SendingInit {
115                     buf: d.into(),
116                     fin: false,
117                 }
118             } else {
119                 WebTransportSenderStreamState::SendingData
120             },
121             events,
122             session_id,
123             session,
124         }
125     }
126 
set_done(&mut self, close_type: CloseType)127     fn set_done(&mut self, close_type: CloseType) {
128         self.state = WebTransportSenderStreamState::Done;
129         self.events.send_closed(self.get_info(), close_type);
130         self.session.borrow_mut().remove_send_stream(self.stream_id);
131     }
132 
get_info(&self) -> Http3StreamInfo133     fn get_info(&self) -> Http3StreamInfo {
134         Http3StreamInfo::new(self.stream_id, self.stream_type())
135     }
136 }
137 
138 impl Stream for WebTransportSendStream {
stream_type(&self) -> Http3StreamType139     fn stream_type(&self) -> Http3StreamType {
140         Http3StreamType::WebTransport(self.session_id)
141     }
142 }
143 
144 impl SendStream for WebTransportSendStream {
send(&mut self, conn: &mut Connection) -> Res<()>145     fn send(&mut self, conn: &mut Connection) -> Res<()> {
146         if let WebTransportSenderStreamState::SendingInit { ref mut buf, fin } = self.state {
147             let sent = conn.stream_send(self.stream_id, &buf[..])?;
148             if sent == buf.len() {
149                 if fin {
150                     conn.stream_close_send(self.stream_id)?;
151                     self.set_done(CloseType::Done);
152                 } else {
153                     self.state = WebTransportSenderStreamState::SendingData;
154                 }
155             } else {
156                 let b = buf.split_off(sent);
157                 *buf = b;
158             }
159         }
160         Ok(())
161     }
162 
has_data_to_send(&self) -> bool163     fn has_data_to_send(&self) -> bool {
164         matches!(
165             self.state,
166             WebTransportSenderStreamState::SendingInit { .. }
167         )
168     }
169 
stream_writable(&self)170     fn stream_writable(&self) {
171         self.events.data_writable(self.get_info());
172     }
173 
done(&self) -> bool174     fn done(&self) -> bool {
175         self.state == WebTransportSenderStreamState::Done
176     }
177 
send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize>178     fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> {
179         self.send(conn)?;
180         if self.state == WebTransportSenderStreamState::SendingData {
181             let sent = conn.stream_send(self.stream_id, buf)?;
182             Ok(sent)
183         } else {
184             Ok(0)
185         }
186     }
187 
handle_stop_sending(&mut self, close_type: CloseType)188     fn handle_stop_sending(&mut self, close_type: CloseType) {
189         self.set_done(close_type);
190     }
191 
close(&mut self, conn: &mut Connection) -> Res<()>192     fn close(&mut self, conn: &mut Connection) -> Res<()> {
193         if let WebTransportSenderStreamState::SendingInit { ref mut fin, .. } = self.state {
194             *fin = true;
195         } else {
196             self.state = WebTransportSenderStreamState::Done;
197             conn.stream_close_send(self.stream_id)?;
198             self.set_done(CloseType::Done);
199         }
200         Ok(())
201     }
202 }
203