1 // 2 // Copyright 2021 Signal Messenger, LLC. 3 // SPDX-License-Identifier: AGPL-3.0-only 4 // 5 6 use neon::prelude::*; 7 use std::future::Future; 8 use std::pin::Pin; 9 use std::sync::{Arc, Mutex}; 10 use std::task::{Poll, Wake}; 11 12 /// Adds support for executing futures on a Neon [Channel][]. 13 /// 14 /// [Channel]: https://docs.rs/neon/0.9.0/neon/event/struct.Channel.html 15 pub trait ChannelEx { 16 /// Schedules the future to run on the JavaScript main thread until complete. send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)17 fn send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send); 18 /// Polls the future synchronously, then schedules it to run on the JavaScript main thread from 19 /// then on. start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)20 fn start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send); 21 } 22 23 impl ChannelEx for Channel { send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)24 fn send_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send) { 25 let self_for_task = self.clone(); 26 self.send(move |_| { 27 let task = Arc::new(FutureTask { 28 channel: self_for_task, 29 future: Mutex::new(Some(Box::pin(future))), 30 }); 31 task.poll(); 32 Ok(()) 33 }) 34 } 35 start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send)36 fn start_future(self: Arc<Self>, future: impl Future<Output = ()> + 'static + Send) { 37 let task = Arc::new(FutureTask { 38 channel: self, 39 future: Mutex::new(Some(Box::pin(future))), 40 }); 41 task.poll(); 42 } 43 } 44 45 /// Used to "send" a task from a thread to itself through a multi-threaded interface. 46 pub(crate) struct AssertSendSafe<T>(T); 47 unsafe impl<T> Send for AssertSendSafe<T> {} 48 impl<T> AssertSendSafe<T> { wrap(value: T) -> Self49 pub unsafe fn wrap(value: T) -> Self { 50 Self(value) 51 } 52 } 53 54 impl<T: Future> Future for AssertSendSafe<T> { 55 type Output = T::Output; poll(self: Pin<&mut Self>, context: &mut std::task::Context) -> Poll<T::Output>56 fn poll(self: Pin<&mut Self>, context: &mut std::task::Context) -> Poll<T::Output> { 57 // See https://doc.rust-lang.org/std/pin/index.html#projections-and-structural-pinning 58 let future = unsafe { self.map_unchecked_mut(|s| &mut s.0) }; 59 future.poll(context) 60 } 61 } 62 63 /// Implements waking for futures scheduled on the JavaScript microtask queue. 64 /// 65 /// When the task is awoken, it reschedules itself on the channel to re-poll the top-level Future. 66 struct FutureTask<F> 67 where 68 F: Future<Output = ()> + 'static + Send, 69 { 70 channel: Arc<Channel>, 71 future: Mutex<Option<Pin<Box<F>>>>, 72 } 73 74 impl<F> FutureTask<F> 75 where 76 F: Future<Output = ()> + 'static + Send, 77 { 78 /// Polls the top-level future, while setting `self` up as the waker once more. 79 /// 80 /// When the future completes, it is replaced by `None` to avoid accidentally polling twice. poll(self: &Arc<Self>)81 fn poll(self: &Arc<Self>) { 82 let future = &mut *self.future.lock().expect("Lock can be taken"); 83 if let Some(active_future) = future { 84 match active_future 85 .as_mut() 86 .poll(&mut std::task::Context::from_waker(&self.clone().into())) 87 { 88 Poll::Ready(_) => *future = None, 89 Poll::Pending => {} 90 } 91 } 92 } 93 } 94 95 impl<F> Wake for FutureTask<F> 96 where 97 F: Future<Output = ()> + 'static + Send, 98 { wake(self: Arc<Self>)99 fn wake(self: Arc<Self>) { 100 let channel = self.channel.clone(); 101 channel.send(move |_cx| { 102 self.poll(); 103 Ok(()) 104 }) 105 } 106 } 107