1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 use crate::connection::Http3State; 8 use crate::{ 9 features::extended_connect::{ExtendedConnectEvents, ExtendedConnectType, SessionCloseReason}, 10 CloseType, Http3StreamInfo, HttpRecvStreamEvents, Priority, RecvStreamEvents, SendStreamEvents, 11 }; 12 use neqo_common::Header; 13 use neqo_transport::AppError; 14 use neqo_transport::StreamId; 15 use std::cell::RefCell; 16 use std::collections::VecDeque; 17 use std::rc::Rc; 18 19 #[derive(Debug, PartialEq, Eq, Clone)] 20 pub(crate) enum Http3ServerConnEvent { 21 /// Headers are ready. 22 Headers { 23 stream_info: Http3StreamInfo, 24 headers: Vec<Header>, 25 fin: bool, 26 }, 27 PriorityUpdate { 28 stream_id: StreamId, 29 priority: Priority, 30 }, 31 /// Request data is ready. 32 DataReadable { 33 stream_info: Http3StreamInfo, 34 }, 35 DataWritable { 36 stream_info: Http3StreamInfo, 37 }, 38 StreamReset { 39 stream_info: Http3StreamInfo, 40 error: AppError, 41 }, 42 StreamStopSending { 43 stream_info: Http3StreamInfo, 44 error: AppError, 45 }, 46 /// Connection state change. 47 StateChange(Http3State), 48 ExtendedConnect { 49 stream_id: StreamId, 50 headers: Vec<Header>, 51 }, 52 ExtendedConnectClosed { 53 connect_type: ExtendedConnectType, 54 stream_id: StreamId, 55 reason: SessionCloseReason, 56 }, 57 ExtendedConnectNewStream(Http3StreamInfo), 58 } 59 60 #[derive(Debug, Default, Clone)] 61 pub(crate) struct Http3ServerConnEvents { 62 events: Rc<RefCell<VecDeque<Http3ServerConnEvent>>>, 63 } 64 65 impl SendStreamEvents for Http3ServerConnEvents { send_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType)66 fn send_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType) { 67 if close_type != CloseType::Done { 68 self.insert(Http3ServerConnEvent::StreamStopSending { 69 stream_info, 70 error: close_type.error().unwrap(), 71 }); 72 } 73 } 74 data_writable(&self, stream_info: Http3StreamInfo)75 fn data_writable(&self, stream_info: Http3StreamInfo) { 76 self.insert(Http3ServerConnEvent::DataWritable { stream_info }); 77 } 78 } 79 80 impl RecvStreamEvents for Http3ServerConnEvents { 81 /// Add a new `DataReadable` event data_readable(&self, stream_info: Http3StreamInfo)82 fn data_readable(&self, stream_info: Http3StreamInfo) { 83 self.insert(Http3ServerConnEvent::DataReadable { stream_info }); 84 } 85 recv_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType)86 fn recv_closed(&self, stream_info: Http3StreamInfo, close_type: CloseType) { 87 if close_type != CloseType::Done { 88 self.remove_events_for_stream_id(stream_info); 89 self.insert(Http3ServerConnEvent::StreamReset { 90 stream_info, 91 error: close_type.error().unwrap(), 92 }); 93 } 94 } 95 } 96 97 impl HttpRecvStreamEvents for Http3ServerConnEvents { 98 /// Add a new `HeaderReady` event. header_ready( &self, stream_info: Http3StreamInfo, headers: Vec<Header>, _interim: bool, fin: bool, )99 fn header_ready( 100 &self, 101 stream_info: Http3StreamInfo, 102 headers: Vec<Header>, 103 _interim: bool, 104 fin: bool, 105 ) { 106 self.insert(Http3ServerConnEvent::Headers { 107 stream_info, 108 headers, 109 fin, 110 }); 111 } 112 extended_connect_new_session(&self, stream_id: StreamId, headers: Vec<Header>)113 fn extended_connect_new_session(&self, stream_id: StreamId, headers: Vec<Header>) { 114 self.insert(Http3ServerConnEvent::ExtendedConnect { stream_id, headers }); 115 } 116 } 117 118 impl ExtendedConnectEvents for Http3ServerConnEvents { session_start( &self, _connect_type: ExtendedConnectType, _stream_id: StreamId, _status: u16, )119 fn session_start( 120 &self, 121 _connect_type: ExtendedConnectType, 122 _stream_id: StreamId, 123 _status: u16, 124 ) { 125 } 126 session_end( &self, connect_type: ExtendedConnectType, stream_id: StreamId, reason: SessionCloseReason, )127 fn session_end( 128 &self, 129 connect_type: ExtendedConnectType, 130 stream_id: StreamId, 131 reason: SessionCloseReason, 132 ) { 133 self.insert(Http3ServerConnEvent::ExtendedConnectClosed { 134 connect_type, 135 stream_id, 136 reason, 137 }); 138 } 139 extended_connect_new_stream(&self, stream_info: Http3StreamInfo)140 fn extended_connect_new_stream(&self, stream_info: Http3StreamInfo) { 141 self.insert(Http3ServerConnEvent::ExtendedConnectNewStream(stream_info)); 142 } 143 } 144 145 impl Http3ServerConnEvents { insert(&self, event: Http3ServerConnEvent)146 fn insert(&self, event: Http3ServerConnEvent) { 147 self.events.borrow_mut().push_back(event); 148 } 149 remove<F>(&self, f: F) where F: Fn(&Http3ServerConnEvent) -> bool,150 fn remove<F>(&self, f: F) 151 where 152 F: Fn(&Http3ServerConnEvent) -> bool, 153 { 154 self.events.borrow_mut().retain(|evt| !f(evt)); 155 } 156 has_events(&self) -> bool157 pub fn has_events(&self) -> bool { 158 !self.events.borrow().is_empty() 159 } 160 next_event(&self) -> Option<Http3ServerConnEvent>161 pub fn next_event(&self) -> Option<Http3ServerConnEvent> { 162 self.events.borrow_mut().pop_front() 163 } 164 connection_state_change(&self, state: Http3State)165 pub fn connection_state_change(&self, state: Http3State) { 166 self.insert(Http3ServerConnEvent::StateChange(state)); 167 } 168 priority_update(&self, stream_id: StreamId, priority: Priority)169 pub fn priority_update(&self, stream_id: StreamId, priority: Priority) { 170 self.insert(Http3ServerConnEvent::PriorityUpdate { 171 stream_id, 172 priority, 173 }); 174 } 175 remove_events_for_stream_id(&self, stream_info: Http3StreamInfo)176 fn remove_events_for_stream_id(&self, stream_info: Http3StreamInfo) { 177 self.remove(|evt| { 178 matches!(evt, 179 Http3ServerConnEvent::Headers { stream_info: x, .. } | Http3ServerConnEvent::DataReadable { stream_info: x, .. } if *x == stream_info) 180 }); 181 } 182 } 183