1 use std::ffi::c_void; 2 use std::future::Future; 3 use std::pin::Pin; 4 use std::ptr; 5 use std::sync::{ 6 atomic::{AtomicBool, Ordering}, 7 Arc, Mutex, Weak, 8 }; 9 use std::task::{Context, Poll}; 10 11 use futures_util::stream::{FuturesUnordered, Stream}; 12 use libc::c_int; 13 14 use super::error::hyper_code; 15 use super::UserDataPointer; 16 17 type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>; 18 type BoxAny = Box<dyn AsTaskType + Send + Sync>; 19 20 /// Return in a poll function to indicate it was ready. 21 pub const HYPER_POLL_READY: c_int = 0; 22 /// Return in a poll function to indicate it is still pending. 23 /// 24 /// The passed in `hyper_waker` should be registered to wake up the task at 25 /// some later point. 26 pub const HYPER_POLL_PENDING: c_int = 1; 27 /// Return in a poll function indicate an error. 28 pub const HYPER_POLL_ERROR: c_int = 3; 29 30 /// A task executor for `hyper_task`s. 31 pub struct hyper_executor { 32 /// The executor of all task futures. 33 /// 34 /// There should never be contention on the mutex, as it is only locked 35 /// to drive the futures. However, we cannot gaurantee proper usage from 36 /// `hyper_executor_poll()`, which in C could potentially be called inside 37 /// one of the stored futures. The mutex isn't re-entrant, so doing so 38 /// would result in a deadlock, but that's better than data corruption. 39 driver: Mutex<FuturesUnordered<TaskFuture>>, 40 41 /// The queue of futures that need to be pushed into the `driver`. 42 /// 43 /// This is has a separate mutex since `spawn` could be called from inside 44 /// a future, which would mean the driver's mutex is already locked. 45 spawn_queue: Mutex<Vec<TaskFuture>>, 46 47 /// This is used to track when a future calls `wake` while we are within 48 /// `hyper_executor::poll_next`. 49 is_woken: Arc<ExecWaker>, 50 } 51 52 #[derive(Clone)] 53 pub(crate) struct WeakExec(Weak<hyper_executor>); 54 55 struct ExecWaker(AtomicBool); 56 57 /// An async task. 58 pub struct hyper_task { 59 future: BoxFuture<BoxAny>, 60 output: Option<BoxAny>, 61 userdata: UserDataPointer, 62 } 63 64 struct TaskFuture { 65 task: Option<Box<hyper_task>>, 66 } 67 68 /// An async context for a task that contains the related waker. 69 pub struct hyper_context<'a>(Context<'a>); 70 71 /// A waker that is saved and used to waken a pending task. 72 pub struct hyper_waker { 73 waker: std::task::Waker, 74 } 75 76 /// A descriptor for what type a `hyper_task` value is. 77 #[repr(C)] 78 pub enum hyper_task_return_type { 79 /// The value of this task is null (does not imply an error). 80 HYPER_TASK_EMPTY, 81 /// The value of this task is `hyper_error *`. 82 HYPER_TASK_ERROR, 83 /// The value of this task is `hyper_clientconn *`. 84 HYPER_TASK_CLIENTCONN, 85 /// The value of this task is `hyper_response *`. 86 HYPER_TASK_RESPONSE, 87 /// The value of this task is `hyper_buf *`. 88 HYPER_TASK_BUF, 89 } 90 91 pub(crate) unsafe trait AsTaskType { as_task_type(&self) -> hyper_task_return_type92 fn as_task_type(&self) -> hyper_task_return_type; 93 } 94 95 pub(crate) trait IntoDynTaskType { into_dyn_task_type(self) -> BoxAny96 fn into_dyn_task_type(self) -> BoxAny; 97 } 98 99 // ===== impl hyper_executor ===== 100 101 impl hyper_executor { new() -> Arc<hyper_executor>102 fn new() -> Arc<hyper_executor> { 103 Arc::new(hyper_executor { 104 driver: Mutex::new(FuturesUnordered::new()), 105 spawn_queue: Mutex::new(Vec::new()), 106 is_woken: Arc::new(ExecWaker(AtomicBool::new(false))), 107 }) 108 } 109 downgrade(exec: &Arc<hyper_executor>) -> WeakExec110 pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec { 111 WeakExec(Arc::downgrade(exec)) 112 } 113 spawn(&self, task: Box<hyper_task>)114 fn spawn(&self, task: Box<hyper_task>) { 115 self.spawn_queue 116 .lock() 117 .unwrap() 118 .push(TaskFuture { task: Some(task) }); 119 } 120 poll_next(&self) -> Option<Box<hyper_task>>121 fn poll_next(&self) -> Option<Box<hyper_task>> { 122 // Drain the queue first. 123 self.drain_queue(); 124 125 let waker = futures_util::task::waker_ref(&self.is_woken); 126 let mut cx = Context::from_waker(&waker); 127 128 loop { 129 match Pin::new(&mut *self.driver.lock().unwrap()).poll_next(&mut cx) { 130 Poll::Ready(val) => return val, 131 Poll::Pending => { 132 // Check if any of the pending tasks tried to spawn 133 // some new tasks. If so, drain into the driver and loop. 134 if self.drain_queue() { 135 continue; 136 } 137 138 // If the driver called `wake` while we were polling, 139 // we should poll again immediately! 140 if self.is_woken.0.swap(false, Ordering::SeqCst) { 141 continue; 142 } 143 144 return None; 145 } 146 } 147 } 148 } 149 drain_queue(&self) -> bool150 fn drain_queue(&self) -> bool { 151 let mut queue = self.spawn_queue.lock().unwrap(); 152 if queue.is_empty() { 153 return false; 154 } 155 156 let driver = self.driver.lock().unwrap(); 157 158 for task in queue.drain(..) { 159 driver.push(task); 160 } 161 162 true 163 } 164 } 165 166 impl futures_util::task::ArcWake for ExecWaker { wake_by_ref(me: &Arc<ExecWaker>)167 fn wake_by_ref(me: &Arc<ExecWaker>) { 168 me.0.store(true, Ordering::SeqCst); 169 } 170 } 171 172 // ===== impl WeakExec ===== 173 174 impl WeakExec { new() -> Self175 pub(crate) fn new() -> Self { 176 WeakExec(Weak::new()) 177 } 178 } 179 180 impl crate::rt::Executor<BoxFuture<()>> for WeakExec { execute(&self, fut: BoxFuture<()>)181 fn execute(&self, fut: BoxFuture<()>) { 182 if let Some(exec) = self.0.upgrade() { 183 exec.spawn(hyper_task::boxed(fut)); 184 } 185 } 186 } 187 188 ffi_fn! { 189 /// Creates a new task executor. 190 fn hyper_executor_new() -> *const hyper_executor { 191 Arc::into_raw(hyper_executor::new()) 192 } ?= ptr::null() 193 } 194 195 ffi_fn! { 196 /// Frees an executor and any incomplete tasks still part of it. 197 fn hyper_executor_free(exec: *const hyper_executor) { 198 drop(non_null!(Arc::from_raw(exec) ?= ())); 199 } 200 } 201 202 ffi_fn! { 203 /// Push a task onto the executor. 204 /// 205 /// The executor takes ownership of the task, it should not be accessed 206 /// again unless returned back to the user with `hyper_executor_poll`. 207 fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code { 208 let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG); 209 let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG); 210 exec.spawn(task); 211 hyper_code::HYPERE_OK 212 } 213 } 214 215 ffi_fn! { 216 /// Polls the executor, trying to make progress on any tasks that have notified 217 /// that they are ready again. 218 /// 219 /// If ready, returns a task from the executor that has completed. 220 /// 221 /// If there are no ready tasks, this returns `NULL`. 222 fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task { 223 let exec = non_null!(&*exec ?= ptr::null_mut()); 224 match exec.poll_next() { 225 Some(task) => Box::into_raw(task), 226 None => ptr::null_mut(), 227 } 228 } ?= ptr::null_mut() 229 } 230 231 // ===== impl hyper_task ===== 232 233 impl hyper_task { boxed<F>(fut: F) -> Box<hyper_task> where F: Future + Send + 'static, F::Output: IntoDynTaskType + Send + Sync + 'static,234 pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task> 235 where 236 F: Future + Send + 'static, 237 F::Output: IntoDynTaskType + Send + Sync + 'static, 238 { 239 Box::new(hyper_task { 240 future: Box::pin(async move { fut.await.into_dyn_task_type() }), 241 output: None, 242 userdata: UserDataPointer(ptr::null_mut()), 243 }) 244 } 245 output_type(&self) -> hyper_task_return_type246 fn output_type(&self) -> hyper_task_return_type { 247 match self.output { 248 None => hyper_task_return_type::HYPER_TASK_EMPTY, 249 Some(ref val) => val.as_task_type(), 250 } 251 } 252 } 253 254 impl Future for TaskFuture { 255 type Output = Box<hyper_task>; 256 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>257 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 258 match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) { 259 Poll::Ready(val) => { 260 let mut task = self.task.take().unwrap(); 261 task.output = Some(val); 262 Poll::Ready(task) 263 } 264 Poll::Pending => Poll::Pending, 265 } 266 } 267 } 268 269 ffi_fn! { 270 /// Free a task. 271 fn hyper_task_free(task: *mut hyper_task) { 272 drop(non_null!(Box::from_raw(task) ?= ())); 273 } 274 } 275 276 ffi_fn! { 277 /// Takes the output value of this task. 278 /// 279 /// This must only be called once polling the task on an executor has finished 280 /// this task. 281 /// 282 /// Use `hyper_task_type` to determine the type of the `void *` return value. 283 fn hyper_task_value(task: *mut hyper_task) -> *mut c_void { 284 let task = non_null!(&mut *task ?= ptr::null_mut()); 285 286 if let Some(val) = task.output.take() { 287 let p = Box::into_raw(val) as *mut c_void; 288 // protect from returning fake pointers to empty types 289 if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() { 290 ptr::null_mut() 291 } else { 292 p 293 } 294 } else { 295 ptr::null_mut() 296 } 297 } ?= ptr::null_mut() 298 } 299 300 ffi_fn! { 301 /// Query the return type of this task. 302 fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type { 303 // instead of blowing up spectacularly, just say this null task 304 // doesn't have a value to retrieve. 305 non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type() 306 } 307 } 308 309 ffi_fn! { 310 /// Set a user data pointer to be associated with this task. 311 /// 312 /// This value will be passed to task callbacks, and can be checked later 313 /// with `hyper_task_userdata`. 314 fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) { 315 if task.is_null() { 316 return; 317 } 318 319 unsafe { (*task).userdata = UserDataPointer(userdata) }; 320 } 321 } 322 323 ffi_fn! { 324 /// Retrieve the userdata that has been set via `hyper_task_set_userdata`. 325 fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void { 326 non_null!(&*task ?= ptr::null_mut()).userdata.0 327 } ?= ptr::null_mut() 328 } 329 330 // ===== impl AsTaskType ===== 331 332 unsafe impl AsTaskType for () { as_task_type(&self) -> hyper_task_return_type333 fn as_task_type(&self) -> hyper_task_return_type { 334 hyper_task_return_type::HYPER_TASK_EMPTY 335 } 336 } 337 338 unsafe impl AsTaskType for crate::Error { as_task_type(&self) -> hyper_task_return_type339 fn as_task_type(&self) -> hyper_task_return_type { 340 hyper_task_return_type::HYPER_TASK_ERROR 341 } 342 } 343 344 impl<T> IntoDynTaskType for T 345 where 346 T: AsTaskType + Send + Sync + 'static, 347 { into_dyn_task_type(self) -> BoxAny348 fn into_dyn_task_type(self) -> BoxAny { 349 Box::new(self) 350 } 351 } 352 353 impl<T> IntoDynTaskType for crate::Result<T> 354 where 355 T: IntoDynTaskType + Send + Sync + 'static, 356 { into_dyn_task_type(self) -> BoxAny357 fn into_dyn_task_type(self) -> BoxAny { 358 match self { 359 Ok(val) => val.into_dyn_task_type(), 360 Err(err) => Box::new(err), 361 } 362 } 363 } 364 365 impl<T> IntoDynTaskType for Option<T> 366 where 367 T: IntoDynTaskType + Send + Sync + 'static, 368 { into_dyn_task_type(self) -> BoxAny369 fn into_dyn_task_type(self) -> BoxAny { 370 match self { 371 Some(val) => val.into_dyn_task_type(), 372 None => ().into_dyn_task_type(), 373 } 374 } 375 } 376 377 // ===== impl hyper_context ===== 378 379 impl hyper_context<'_> { wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b>380 pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> { 381 // A struct with only one field has the same layout as that field. 382 unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) } 383 } 384 } 385 386 ffi_fn! { 387 /// Copies a waker out of the task context. 388 fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker { 389 let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone(); 390 Box::into_raw(Box::new(hyper_waker { waker })) 391 } ?= ptr::null_mut() 392 } 393 394 // ===== impl hyper_waker ===== 395 396 ffi_fn! { 397 /// Free a waker that hasn't been woken. 398 fn hyper_waker_free(waker: *mut hyper_waker) { 399 drop(non_null!(Box::from_raw(waker) ?= ())); 400 } 401 } 402 403 ffi_fn! { 404 /// Wake up the task associated with a waker. 405 /// 406 /// NOTE: This consumes the waker. You should not use or free the waker afterwards. 407 fn hyper_waker_wake(waker: *mut hyper_waker) { 408 let waker = non_null!(Box::from_raw(waker) ?= ()); 409 waker.waker.wake(); 410 } 411 } 412