1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use crate::connection::Http3State;
8 use crate::{
9     features::extended_connect::{ExtendedConnectEvents, ExtendedConnectType, SessionCloseReason},
10     CloseType, Http3StreamInfo, HttpRecvStreamEvents, Priority, RecvStreamEvents, SendStreamEvents,
11 };
12 use neqo_common::Header;
13 use neqo_transport::AppError;
14 use neqo_transport::StreamId;
15 use std::cell::RefCell;
16 use std::collections::VecDeque;
17 use std::rc::Rc;
18 
19 #[derive(Debug, PartialEq, Eq, Clone)]
20 pub(crate) enum Http3ServerConnEvent {
21     /// Headers are ready.
22     Headers {
23         stream_info: Http3StreamInfo,
24         headers: Vec<Header>,
25         fin: bool,
26     },
27     PriorityUpdate {
28         stream_id: StreamId,
29         priority: Priority,
30     },
31     /// Request data is ready.
32     DataReadable {
33         stream_info: Http3StreamInfo,
34     },
35     DataWritable {
36         stream_info: Http3StreamInfo,
37     },
38     StreamReset {
39         stream_info: Http3StreamInfo,
40         error: AppError,
41     },
42     StreamStopSending {
43         stream_info: Http3StreamInfo,
44         error: AppError,
45     },
46     /// Connection state change.
47     StateChange(Http3State),
48     ExtendedConnect {
49         stream_id: StreamId,
50         headers: Vec<Header>,
51     },
52     ExtendedConnectClosed {
53         connect_type: ExtendedConnectType,
54         stream_id: StreamId,
55         reason: SessionCloseReason,
56     },
57     ExtendedConnectNewStream(Http3StreamInfo),
58 }
59 
60 #[derive(Debug, Default, Clone)]
61 pub(crate) struct Http3ServerConnEvents {
62     events: Rc<RefCell<VecDeque<Http3ServerConnEvent>>>,
63 }
64 
65 impl SendStreamEvents for Http3ServerConnEvents {
send_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType)66     fn send_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType) {
67         if close_type != CloseType::Done {
68             self.insert(Http3ServerConnEvent::StreamStopSending {
69                 stream_info,
70                 error: close_type.error().unwrap(),
71             });
72         }
73     }
74 
data_writable(&self, stream_info: Http3StreamInfo)75     fn data_writable(&self, stream_info: Http3StreamInfo) {
76         self.insert(Http3ServerConnEvent::DataWritable { stream_info });
77     }
78 }
79 
80 impl RecvStreamEvents for Http3ServerConnEvents {
81     /// Add a new `DataReadable` event
data_readable(&self, stream_info: Http3StreamInfo)82     fn data_readable(&self, stream_info: Http3StreamInfo) {
83         self.insert(Http3ServerConnEvent::DataReadable { stream_info });
84     }
85 
recv_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType)86     fn recv_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType) {
87         if close_type != CloseType::Done {
88             self.remove_events_for_stream_id(stream_info);
89             self.insert(Http3ServerConnEvent::StreamReset {
90                 stream_info,
91                 error: close_type.error().unwrap(),
92             });
93         }
94     }
95 }
96 
97 impl HttpRecvStreamEvents for Http3ServerConnEvents {
98     /// Add a new `HeaderReady` event.
header_ready( &self, stream_info: Http3StreamInfo, headers: Vec<Header>, _interim: bool, fin: bool, )99     fn header_ready(
100         &self,
101         stream_info: Http3StreamInfo,
102         headers: Vec<Header>,
103         _interim: bool,
104         fin: bool,
105     ) {
106         self.insert(Http3ServerConnEvent::Headers {
107             stream_info,
108             headers,
109             fin,
110         });
111     }
112 
extended_connect_new_session(&self, stream_id: StreamId, headers: Vec<Header>)113     fn extended_connect_new_session(&self, stream_id: StreamId, headers: Vec<Header>) {
114         self.insert(Http3ServerConnEvent::ExtendedConnect { stream_id, headers });
115     }
116 }
117 
118 impl ExtendedConnectEvents for Http3ServerConnEvents {
session_start( &self, _connect_type: ExtendedConnectType, _stream_id: StreamId, _status: u16, )119     fn session_start(
120         &self,
121         _connect_type: ExtendedConnectType,
122         _stream_id: StreamId,
123         _status: u16,
124     ) {
125     }
126 
session_end( &self, connect_type: ExtendedConnectType, stream_id: StreamId, reason: SessionCloseReason, )127     fn session_end(
128         &self,
129         connect_type: ExtendedConnectType,
130         stream_id: StreamId,
131         reason: SessionCloseReason,
132     ) {
133         self.insert(Http3ServerConnEvent::ExtendedConnectClosed {
134             connect_type,
135             stream_id,
136             reason,
137         });
138     }
139 
extended_connect_new_stream(&self, stream_info: Http3StreamInfo)140     fn extended_connect_new_stream(&self, stream_info: Http3StreamInfo) {
141         self.insert(Http3ServerConnEvent::ExtendedConnectNewStream(stream_info));
142     }
143 }
144 
145 impl Http3ServerConnEvents {
insert(&self, event: Http3ServerConnEvent)146     fn insert(&self, event: Http3ServerConnEvent) {
147         self.events.borrow_mut().push_back(event);
148     }
149 
remove<F>(&self, f: F) where F: Fn(&Http3ServerConnEvent) -> bool,150     fn remove<F>(&self, f: F)
151     where
152         F: Fn(&Http3ServerConnEvent) -> bool,
153     {
154         self.events.borrow_mut().retain(|evt| !f(evt));
155     }
156 
has_events(&self) -> bool157     pub fn has_events(&self) -> bool {
158         !self.events.borrow().is_empty()
159     }
160 
next_event(&self) -> Option<Http3ServerConnEvent>161     pub fn next_event(&self) -> Option<Http3ServerConnEvent> {
162         self.events.borrow_mut().pop_front()
163     }
164 
connection_state_change(&self, state: Http3State)165     pub fn connection_state_change(&self, state: Http3State) {
166         self.insert(Http3ServerConnEvent::StateChange(state));
167     }
168 
priority_update(&self, stream_id: StreamId, priority: Priority)169     pub fn priority_update(&self, stream_id: StreamId, priority: Priority) {
170         self.insert(Http3ServerConnEvent::PriorityUpdate {
171             stream_id,
172             priority,
173         });
174     }
175 
remove_events_for_stream_id(&self, stream_info: Http3StreamInfo)176     fn remove_events_for_stream_id(&self, stream_info: Http3StreamInfo) {
177         self.remove(|evt| {
178             matches!(evt,
179                 Http3ServerConnEvent::Headers { stream_info: x, .. } | Http3ServerConnEvent::DataReadable { stream_info: x, .. } if *x == stream_info)
180         });
181     }
182 }
183