1 use std::ffi::c_void;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::ptr;
5 use std::sync::{
6     atomic::{AtomicBool, Ordering},
7     Arc, Mutex, Weak,
8 };
9 use std::task::{Context, Poll};
10 
11 use futures_util::stream::{FuturesUnordered, Stream};
12 use libc::c_int;
13 
14 use super::error::hyper_code;
15 use super::UserDataPointer;
16 
17 type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
18 type BoxAny = Box<dyn AsTaskType + Send + Sync>;
19 
20 /// Return in a poll function to indicate it was ready.
21 pub const HYPER_POLL_READY: c_int = 0;
22 /// Return in a poll function to indicate it is still pending.
23 ///
24 /// The passed in `hyper_waker` should be registered to wake up the task at
25 /// some later point.
26 pub const HYPER_POLL_PENDING: c_int = 1;
27 /// Return in a poll function indicate an error.
28 pub const HYPER_POLL_ERROR: c_int = 3;
29 
30 /// A task executor for `hyper_task`s.
31 pub struct hyper_executor {
32     /// The executor of all task futures.
33     ///
34     /// There should never be contention on the mutex, as it is only locked
35     /// to drive the futures. However, we cannot gaurantee proper usage from
36     /// `hyper_executor_poll()`, which in C could potentially be called inside
37     /// one of the stored futures. The mutex isn't re-entrant, so doing so
38     /// would result in a deadlock, but that's better than data corruption.
39     driver: Mutex<FuturesUnordered<TaskFuture>>,
40 
41     /// The queue of futures that need to be pushed into the `driver`.
42     ///
43     /// This is has a separate mutex since `spawn` could be called from inside
44     /// a future, which would mean the driver's mutex is already locked.
45     spawn_queue: Mutex<Vec<TaskFuture>>,
46 
47     /// This is used to track when a future calls `wake` while we are within
48     /// `hyper_executor::poll_next`.
49     is_woken: Arc<ExecWaker>,
50 }
51 
52 #[derive(Clone)]
53 pub(crate) struct WeakExec(Weak<hyper_executor>);
54 
55 struct ExecWaker(AtomicBool);
56 
57 /// An async task.
58 pub struct hyper_task {
59     future: BoxFuture<BoxAny>,
60     output: Option<BoxAny>,
61     userdata: UserDataPointer,
62 }
63 
64 struct TaskFuture {
65     task: Option<Box<hyper_task>>,
66 }
67 
68 /// An async context for a task that contains the related waker.
69 pub struct hyper_context<'a>(Context<'a>);
70 
71 /// A waker that is saved and used to waken a pending task.
72 pub struct hyper_waker {
73     waker: std::task::Waker,
74 }
75 
76 /// A descriptor for what type a `hyper_task` value is.
77 #[repr(C)]
78 pub enum hyper_task_return_type {
79     /// The value of this task is null (does not imply an error).
80     HYPER_TASK_EMPTY,
81     /// The value of this task is `hyper_error *`.
82     HYPER_TASK_ERROR,
83     /// The value of this task is `hyper_clientconn *`.
84     HYPER_TASK_CLIENTCONN,
85     /// The value of this task is `hyper_response *`.
86     HYPER_TASK_RESPONSE,
87     /// The value of this task is `hyper_buf *`.
88     HYPER_TASK_BUF,
89 }
90 
91 pub(crate) unsafe trait AsTaskType {
as_task_type(&self) -> hyper_task_return_type92     fn as_task_type(&self) -> hyper_task_return_type;
93 }
94 
95 pub(crate) trait IntoDynTaskType {
into_dyn_task_type(self) -> BoxAny96     fn into_dyn_task_type(self) -> BoxAny;
97 }
98 
99 // ===== impl hyper_executor =====
100 
101 impl hyper_executor {
new() -> Arc<hyper_executor>102     fn new() -> Arc<hyper_executor> {
103         Arc::new(hyper_executor {
104             driver: Mutex::new(FuturesUnordered::new()),
105             spawn_queue: Mutex::new(Vec::new()),
106             is_woken: Arc::new(ExecWaker(AtomicBool::new(false))),
107         })
108     }
109 
downgrade(exec: &Arc<hyper_executor>) -> WeakExec110     pub(crate) fn downgrade(exec: &Arc<hyper_executor>) -> WeakExec {
111         WeakExec(Arc::downgrade(exec))
112     }
113 
spawn(&self, task: Box<hyper_task>)114     fn spawn(&self, task: Box<hyper_task>) {
115         self.spawn_queue
116             .lock()
117             .unwrap()
118             .push(TaskFuture { task: Some(task) });
119     }
120 
poll_next(&self) -> Option<Box<hyper_task>>121     fn poll_next(&self) -> Option<Box<hyper_task>> {
122         // Drain the queue first.
123         self.drain_queue();
124 
125         let waker = futures_util::task::waker_ref(&self.is_woken);
126         let mut cx = Context::from_waker(&waker);
127 
128         loop {
129             match Pin::new(&mut *self.driver.lock().unwrap()).poll_next(&mut cx) {
130                 Poll::Ready(val) => return val,
131                 Poll::Pending => {
132                     // Check if any of the pending tasks tried to spawn
133                     // some new tasks. If so, drain into the driver and loop.
134                     if self.drain_queue() {
135                         continue;
136                     }
137 
138                     // If the driver called `wake` while we were polling,
139                     // we should poll again immediately!
140                     if self.is_woken.0.swap(false, Ordering::SeqCst) {
141                         continue;
142                     }
143 
144                     return None;
145                 }
146             }
147         }
148     }
149 
drain_queue(&self) -> bool150     fn drain_queue(&self) -> bool {
151         let mut queue = self.spawn_queue.lock().unwrap();
152         if queue.is_empty() {
153             return false;
154         }
155 
156         let driver = self.driver.lock().unwrap();
157 
158         for task in queue.drain(..) {
159             driver.push(task);
160         }
161 
162         true
163     }
164 }
165 
166 impl futures_util::task::ArcWake for ExecWaker {
wake_by_ref(me: &Arc<ExecWaker>)167     fn wake_by_ref(me: &Arc<ExecWaker>) {
168         me.0.store(true, Ordering::SeqCst);
169     }
170 }
171 
172 // ===== impl WeakExec =====
173 
174 impl WeakExec {
new() -> Self175     pub(crate) fn new() -> Self {
176         WeakExec(Weak::new())
177     }
178 }
179 
180 impl crate::rt::Executor<BoxFuture<()>> for WeakExec {
execute(&self, fut: BoxFuture<()>)181     fn execute(&self, fut: BoxFuture<()>) {
182         if let Some(exec) = self.0.upgrade() {
183             exec.spawn(hyper_task::boxed(fut));
184         }
185     }
186 }
187 
188 ffi_fn! {
189     /// Creates a new task executor.
190     fn hyper_executor_new() -> *const hyper_executor {
191         Arc::into_raw(hyper_executor::new())
192     } ?= ptr::null()
193 }
194 
195 ffi_fn! {
196     /// Frees an executor and any incomplete tasks still part of it.
197     fn hyper_executor_free(exec: *const hyper_executor) {
198         drop(non_null!(Arc::from_raw(exec) ?= ()));
199     }
200 }
201 
202 ffi_fn! {
203     /// Push a task onto the executor.
204     ///
205     /// The executor takes ownership of the task, it should not be accessed
206     /// again unless returned back to the user with `hyper_executor_poll`.
207     fn hyper_executor_push(exec: *const hyper_executor, task: *mut hyper_task) -> hyper_code {
208         let exec = non_null!(&*exec ?= hyper_code::HYPERE_INVALID_ARG);
209         let task = non_null!(Box::from_raw(task) ?= hyper_code::HYPERE_INVALID_ARG);
210         exec.spawn(task);
211         hyper_code::HYPERE_OK
212     }
213 }
214 
215 ffi_fn! {
216     /// Polls the executor, trying to make progress on any tasks that have notified
217     /// that they are ready again.
218     ///
219     /// If ready, returns a task from the executor that has completed.
220     ///
221     /// If there are no ready tasks, this returns `NULL`.
222     fn hyper_executor_poll(exec: *const hyper_executor) -> *mut hyper_task {
223         let exec = non_null!(&*exec ?= ptr::null_mut());
224         match exec.poll_next() {
225             Some(task) => Box::into_raw(task),
226             None => ptr::null_mut(),
227         }
228     } ?= ptr::null_mut()
229 }
230 
231 // ===== impl hyper_task =====
232 
233 impl hyper_task {
boxed<F>(fut: F) -> Box<hyper_task> where F: Future + Send + 'static, F::Output: IntoDynTaskType + Send + Sync + 'static,234     pub(crate) fn boxed<F>(fut: F) -> Box<hyper_task>
235     where
236         F: Future + Send + 'static,
237         F::Output: IntoDynTaskType + Send + Sync + 'static,
238     {
239         Box::new(hyper_task {
240             future: Box::pin(async move { fut.await.into_dyn_task_type() }),
241             output: None,
242             userdata: UserDataPointer(ptr::null_mut()),
243         })
244     }
245 
output_type(&self) -> hyper_task_return_type246     fn output_type(&self) -> hyper_task_return_type {
247         match self.output {
248             None => hyper_task_return_type::HYPER_TASK_EMPTY,
249             Some(ref val) => val.as_task_type(),
250         }
251     }
252 }
253 
254 impl Future for TaskFuture {
255     type Output = Box<hyper_task>;
256 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>257     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
258         match Pin::new(&mut self.task.as_mut().unwrap().future).poll(cx) {
259             Poll::Ready(val) => {
260                 let mut task = self.task.take().unwrap();
261                 task.output = Some(val);
262                 Poll::Ready(task)
263             }
264             Poll::Pending => Poll::Pending,
265         }
266     }
267 }
268 
269 ffi_fn! {
270     /// Free a task.
271     fn hyper_task_free(task: *mut hyper_task) {
272         drop(non_null!(Box::from_raw(task) ?= ()));
273     }
274 }
275 
276 ffi_fn! {
277     /// Takes the output value of this task.
278     ///
279     /// This must only be called once polling the task on an executor has finished
280     /// this task.
281     ///
282     /// Use `hyper_task_type` to determine the type of the `void *` return value.
283     fn hyper_task_value(task: *mut hyper_task) -> *mut c_void {
284         let task = non_null!(&mut *task ?= ptr::null_mut());
285 
286         if let Some(val) = task.output.take() {
287             let p = Box::into_raw(val) as *mut c_void;
288             // protect from returning fake pointers to empty types
289             if p == std::ptr::NonNull::<c_void>::dangling().as_ptr() {
290                 ptr::null_mut()
291             } else {
292                 p
293             }
294         } else {
295             ptr::null_mut()
296         }
297     } ?= ptr::null_mut()
298 }
299 
300 ffi_fn! {
301     /// Query the return type of this task.
302     fn hyper_task_type(task: *mut hyper_task) -> hyper_task_return_type {
303         // instead of blowing up spectacularly, just say this null task
304         // doesn't have a value to retrieve.
305         non_null!(&*task ?= hyper_task_return_type::HYPER_TASK_EMPTY).output_type()
306     }
307 }
308 
309 ffi_fn! {
310     /// Set a user data pointer to be associated with this task.
311     ///
312     /// This value will be passed to task callbacks, and can be checked later
313     /// with `hyper_task_userdata`.
314     fn hyper_task_set_userdata(task: *mut hyper_task, userdata: *mut c_void) {
315         if task.is_null() {
316             return;
317         }
318 
319         unsafe { (*task).userdata = UserDataPointer(userdata) };
320     }
321 }
322 
323 ffi_fn! {
324     /// Retrieve the userdata that has been set via `hyper_task_set_userdata`.
325     fn hyper_task_userdata(task: *mut hyper_task) -> *mut c_void {
326         non_null!(&*task ?= ptr::null_mut()).userdata.0
327     } ?= ptr::null_mut()
328 }
329 
330 // ===== impl AsTaskType =====
331 
332 unsafe impl AsTaskType for () {
as_task_type(&self) -> hyper_task_return_type333     fn as_task_type(&self) -> hyper_task_return_type {
334         hyper_task_return_type::HYPER_TASK_EMPTY
335     }
336 }
337 
338 unsafe impl AsTaskType for crate::Error {
as_task_type(&self) -> hyper_task_return_type339     fn as_task_type(&self) -> hyper_task_return_type {
340         hyper_task_return_type::HYPER_TASK_ERROR
341     }
342 }
343 
344 impl<T> IntoDynTaskType for T
345 where
346     T: AsTaskType + Send + Sync + 'static,
347 {
into_dyn_task_type(self) -> BoxAny348     fn into_dyn_task_type(self) -> BoxAny {
349         Box::new(self)
350     }
351 }
352 
353 impl<T> IntoDynTaskType for crate::Result<T>
354 where
355     T: IntoDynTaskType + Send + Sync + 'static,
356 {
into_dyn_task_type(self) -> BoxAny357     fn into_dyn_task_type(self) -> BoxAny {
358         match self {
359             Ok(val) => val.into_dyn_task_type(),
360             Err(err) => Box::new(err),
361         }
362     }
363 }
364 
365 impl<T> IntoDynTaskType for Option<T>
366 where
367     T: IntoDynTaskType + Send + Sync + 'static,
368 {
into_dyn_task_type(self) -> BoxAny369     fn into_dyn_task_type(self) -> BoxAny {
370         match self {
371             Some(val) => val.into_dyn_task_type(),
372             None => ().into_dyn_task_type(),
373         }
374     }
375 }
376 
377 // ===== impl hyper_context =====
378 
379 impl hyper_context<'_> {
wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b>380     pub(crate) fn wrap<'a, 'b>(cx: &'a mut Context<'b>) -> &'a mut hyper_context<'b> {
381         // A struct with only one field has the same layout as that field.
382         unsafe { std::mem::transmute::<&mut Context<'_>, &mut hyper_context<'_>>(cx) }
383     }
384 }
385 
386 ffi_fn! {
387     /// Copies a waker out of the task context.
388     fn hyper_context_waker(cx: *mut hyper_context<'_>) -> *mut hyper_waker {
389         let waker = non_null!(&mut *cx ?= ptr::null_mut()).0.waker().clone();
390         Box::into_raw(Box::new(hyper_waker { waker }))
391     } ?= ptr::null_mut()
392 }
393 
394 // ===== impl hyper_waker =====
395 
396 ffi_fn! {
397     /// Free a waker that hasn't been woken.
398     fn hyper_waker_free(waker: *mut hyper_waker) {
399         drop(non_null!(Box::from_raw(waker) ?= ()));
400     }
401 }
402 
403 ffi_fn! {
404     /// Wake up the task associated with a waker.
405     ///
406     /// NOTE: This consumes the waker. You should not use or free the waker afterwards.
407     fn hyper_waker_wake(waker: *mut hyper_waker) {
408         let waker = non_null!(Box::from_raw(waker) ?= ());
409         waker.waker.wake();
410     }
411 }
412