1 use helix_view::Editor;
2 
3 use crate::compositor::Compositor;
4 
5 use futures_util::future::{self, BoxFuture, Future, FutureExt};
6 use futures_util::stream::{FuturesUnordered, StreamExt};
7 
8 pub type Callback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
9 pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>;
10 
11 pub struct Job {
12     pub future: BoxFuture<'static, anyhow::Result<Option<Callback>>>,
13     /// Do we need to wait for this job to finish before exiting?
14     pub wait: bool,
15 }
16 
17 #[derive(Default)]
18 pub struct Jobs {
19     pub futures: FuturesUnordered<JobFuture>,
20     /// These are the ones that need to complete before we exit.
21     pub wait_futures: FuturesUnordered<JobFuture>,
22 }
23 
24 impl Job {
new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Job25     pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Job {
26         Job {
27             future: f.map(|r| r.map(|()| None)).boxed(),
28             wait: false,
29         }
30     }
31 
with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>( f: F, ) -> Job32     pub fn with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
33         f: F,
34     ) -> Job {
35         Job {
36             future: f.map(|r| r.map(Some)).boxed(),
37             wait: false,
38         }
39     }
40 
wait_before_exiting(mut self) -> Job41     pub fn wait_before_exiting(mut self) -> Job {
42         self.wait = true;
43         self
44     }
45 }
46 
47 impl Jobs {
new() -> Jobs48     pub fn new() -> Jobs {
49         Jobs::default()
50     }
51 
spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F)52     pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
53         self.add(Job::new(f));
54     }
55 
callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>( &mut self, f: F, )56     pub fn callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
57         &mut self,
58         f: F,
59     ) {
60         self.add(Job::with_callback(f));
61     }
62 
handle_callback( &self, editor: &mut Editor, compositor: &mut Compositor, call: anyhow::Result<Option<Callback>>, )63     pub fn handle_callback(
64         &self,
65         editor: &mut Editor,
66         compositor: &mut Compositor,
67         call: anyhow::Result<Option<Callback>>,
68     ) {
69         match call {
70             Ok(None) => {}
71             Ok(Some(call)) => {
72                 call(editor, compositor);
73             }
74             Err(e) => {
75                 editor.set_error(format!("Async job failed: {}", e));
76             }
77         }
78     }
79 
next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>>80     pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> {
81         tokio::select! {
82             event = self.futures.next() => {  event }
83             event = self.wait_futures.next() => { event }
84         }
85     }
86 
add(&self, j: Job)87     pub fn add(&self, j: Job) {
88         if j.wait {
89             self.wait_futures.push(j.future);
90         } else {
91             self.futures.push(j.future);
92         }
93     }
94 
95     /// Blocks until all the jobs that need to be waited on are done.
finish(&mut self)96     pub fn finish(&mut self) {
97         let wait_futures = std::mem::take(&mut self.wait_futures);
98         helix_lsp::block_on(wait_futures.for_each(|_| future::ready(())));
99     }
100 }
101