1 //
2 // Copyright 2021 Signal Messenger, LLC.
3 // SPDX-License-Identifier: AGPL-3.0-only
4 //
5 
6 use neon::prelude::*;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::{Arc, Mutex};
10 use std::task::{Poll, Wake};
11 
12 /// Adds support for executing futures on a Neon [Channel][].
13 ///
14 /// [Channel]: https://docs.rs/neon/0.9.0/neon/event/struct.Channel.html
15 pub trait ChannelEx {
16     /// Schedules the future to run on the JavaScript main thread until complete.
send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)17     fn send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send);
18     /// Polls the future synchronously, then schedules it to run on the JavaScript main thread from
19     /// then on.
start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)20     fn start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send);
21 }
22 
23 impl ChannelEx for Channel {
send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)24     fn send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send) {
25         let self_for_task = self.clone();
26         self.send(move |_| {
27             let task = Arc::new(FutureTask {
28                 channel: self_for_task,
29                 future: Mutex::new(Some(Box::pin(future))),
30             });
31             task.poll();
32             Ok(())
33         })
34     }
35 
start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)36     fn start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send) {
37         let task = Arc::new(FutureTask {
38             channel: self,
39             future: Mutex::new(Some(Box::pin(future))),
40         });
41         task.poll();
42     }
43 }
44 
45 /// Used to "send" a task from a thread to itself through a multi-threaded interface.
46 pub(crate) struct AssertSendSafe<T>(T);
47 unsafe impl<T> Send for AssertSendSafe<T> {}
48 impl<T> AssertSendSafe<T> {
wrap(value: T) -> Self49     pub unsafe fn wrap(value: T) -> Self {
50         Self(value)
51     }
52 }
53 
54 impl<T: Future> Future for AssertSendSafe<T> {
55     type Output = T::Output;
poll(self: Pin<&mut Self>, context: &mut std::task::Context) -> Poll<T::Output>56     fn poll(self: Pin<&mut Self>, context: &mut std::task::Context) -> Poll<T::Output> {
57         // See https://doc.rust-lang.org/std/pin/index.html#projections-and-structural-pinning
58         let future = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
59         future.poll(context)
60     }
61 }
62 
63 /// Implements waking for futures scheduled on the JavaScript microtask queue.
64 ///
65 /// When the task is awoken, it reschedules itself on the channel to re-poll the top-level Future.
66 struct FutureTask<F>
67 where
68     F: Future<Output = ()> + 'static + Send,
69 {
70     channel: Arc<Channel>,
71     future: Mutex<Option<Pin<Box<F>>>>,
72 }
73 
74 impl<F> FutureTask<F>
75 where
76     F: Future<Output = ()> + 'static + Send,
77 {
78     /// Polls the top-level future, while setting `self` up as the waker once more.
79     ///
80     /// When the future completes, it is replaced by `None` to avoid accidentally polling twice.
poll(self: &Arc<Self>)81     fn poll(self: &Arc<Self>) {
82         let future = &mut *self.future.lock().expect("Lock can be taken");
83         if let Some(active_future) = future {
84             match active_future
85                 .as_mut()
86                 .poll(&mut std::task::Context::from_waker(&self.clone().into()))
87             {
88                 Poll::Ready(_) => *future = None,
89                 Poll::Pending => {}
90             }
91         }
92     }
93 }
94 
95 impl<F> Wake for FutureTask<F>
96 where
97     F: Future<Output = ()> + 'static + Send,
98 {
wake(self: Arc<Self>)99     fn wake(self: Arc<Self>) {
100         let channel = self.channel.clone();
101         channel.send(move |_cx| {
102             self.poll();
103             Ok(())
104         })
105     }
106 }
107