1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use futures_channel::{mpsc, oneshot};
4 use futures_core::future::Future;
5 use futures_core::stream::Stream;
6 use futures_core::task;
7 use futures_core::task::Poll;
8 use std::marker::Unpin;
9 use std::pin;
10 use std::pin::Pin;
11 use std::time::Duration;
12 
13 use crate::Continue;
14 use crate::MainContext;
15 use crate::Priority;
16 use crate::Source;
17 
18 /// Represents a `Future` around a `glib::Source`. The future will
19 /// be resolved once the source has provided a value
20 pub struct SourceFuture<F, T> {
21     create_source: Option<F>,
22     source: Option<(Source, oneshot::Receiver<T>)>,
23 }
24 
25 impl<F, T: 'static> SourceFuture<F, T>
26 where
27     F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
28 {
29     /// Create a new `SourceFuture`
30     ///
31     /// The provided closure should return a newly created `glib::Source` when called
32     /// and pass the value provided by the source to the oneshot sender that is passed
33     /// to the closure.
new(create_source: F) -> SourceFuture<F, T>34     pub fn new(create_source: F) -> SourceFuture<F, T> {
35         SourceFuture {
36             create_source: Some(create_source),
37             source: None,
38         }
39     }
40 }
41 
42 impl<F, T> Unpin for SourceFuture<F, T> {}
43 
44 impl<F, T> Future for SourceFuture<F, T>
45 where
46     F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
47 {
48     type Output = T;
49 
poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T>50     fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
51         let SourceFuture {
52             ref mut create_source,
53             ref mut source,
54             ..
55         } = *self;
56 
57         if let Some(create_source) = create_source.take() {
58             let main_context = MainContext::ref_thread_default();
59             assert!(
60                 main_context.is_owner(),
61                 "Spawning futures only allowed if the thread is owning the MainContext"
62             );
63 
64             // Channel for sending back the Source result to our future here.
65             //
66             // In theory, we could directly continue polling the
67             // corresponding task from the Source callback,
68             // however this would break at the very least
69             // the g_main_current_source() API.
70             let (send, recv) = oneshot::channel();
71 
72             let s = create_source(send);
73 
74             s.attach(Some(&main_context));
75             *source = Some((s, recv));
76         }
77 
78         // At this point we must have a receiver
79         let res = {
80             let &mut (_, ref mut receiver) = source.as_mut().unwrap();
81             Pin::new(receiver).poll(ctx)
82         };
83         #[allow(clippy::match_wild_err_arm)]
84         match res {
85             Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
86             Poll::Ready(Ok(v)) => {
87                 // Get rid of the reference to the source, it triggered
88                 let _ = source.take();
89                 Poll::Ready(v)
90             }
91             Poll::Pending => Poll::Pending,
92         }
93     }
94 }
95 
96 impl<T, F> Drop for SourceFuture<T, F> {
drop(&mut self)97     fn drop(&mut self) {
98         // Get rid of the source, we don't care anymore if it still triggers
99         if let Some((source, _)) = self.source.take() {
100             source.destroy();
101         }
102     }
103 }
104 
105 /// Create a `Future` that will resolve after the given number of milliseconds.
106 ///
107 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>108 pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
109     timeout_future_with_priority(crate::PRIORITY_DEFAULT, value)
110 }
111 
112 /// Create a `Future` that will resolve after the given number of milliseconds.
113 ///
114 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future_with_priority( priority: Priority, value: Duration, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>115 pub fn timeout_future_with_priority(
116     priority: Priority,
117     value: Duration,
118 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
119     Box::pin(SourceFuture::new(move |send| {
120         let mut send = Some(send);
121         crate::timeout_source_new(value, None, priority, move || {
122             let _ = send.take().unwrap().send(());
123             Continue(false)
124         })
125     }))
126 }
127 
128 /// Create a `Future` that will resolve after the given number of seconds.
129 ///
130 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>131 pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
132     timeout_future_seconds_with_priority(crate::PRIORITY_DEFAULT, value)
133 }
134 
135 /// Create a `Future` that will resolve after the given number of seconds.
136 ///
137 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future_seconds_with_priority( priority: Priority, value: u32, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>138 pub fn timeout_future_seconds_with_priority(
139     priority: Priority,
140     value: u32,
141 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
142     Box::pin(SourceFuture::new(move |send| {
143         let mut send = Some(send);
144         crate::timeout_source_new_seconds(value, None, priority, move || {
145             let _ = send.take().unwrap().send(());
146             Continue(false)
147         })
148     }))
149 }
150 
151 /// Create a `Future` that will resolve once the child process with the given pid exits
152 ///
153 /// The `Future` will resolve to the pid of the child process and the exit code.
154 ///
155 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
child_watch_future( pid: crate::Pid, ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>>156 pub fn child_watch_future(
157     pid: crate::Pid,
158 ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
159     child_watch_future_with_priority(crate::PRIORITY_DEFAULT, pid)
160 }
161 
162 /// Create a `Future` that will resolve once the child process with the given pid exits
163 ///
164 /// The `Future` will resolve to the pid of the child process and the exit code.
165 ///
166 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
child_watch_future_with_priority( priority: Priority, pid: crate::Pid, ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>>167 pub fn child_watch_future_with_priority(
168     priority: Priority,
169     pid: crate::Pid,
170 ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
171     Box::pin(SourceFuture::new(move |send| {
172         let mut send = Some(send);
173         crate::child_watch_source_new(pid, None, priority, move |pid, code| {
174             let _ = send.take().unwrap().send((pid, code));
175         })
176     }))
177 }
178 
179 #[cfg(any(unix, feature = "dox"))]
180 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
181 /// Create a `Future` that will resolve once the given UNIX signal is raised
182 ///
183 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>184 pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
185     unix_signal_future_with_priority(crate::PRIORITY_DEFAULT, signum)
186 }
187 
188 #[cfg(any(unix, feature = "dox"))]
189 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
190 /// Create a `Future` that will resolve once the given UNIX signal is raised
191 ///
192 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_future_with_priority( priority: Priority, signum: i32, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>193 pub fn unix_signal_future_with_priority(
194     priority: Priority,
195     signum: i32,
196 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
197     Box::pin(SourceFuture::new(move |send| {
198         let mut send = Some(send);
199         crate::unix_signal_source_new(signum, None, priority, move || {
200             let _ = send.take().unwrap().send(());
201             Continue(false)
202         })
203     }))
204 }
205 
206 /// Represents a `Stream` around a `glib::Source`. The stream will
207 /// be provide all values that are provided by the source
208 pub struct SourceStream<F, T> {
209     create_source: Option<F>,
210     source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
211 }
212 
213 impl<F, T> Unpin for SourceStream<F, T> {}
214 
215 impl<F, T: 'static> SourceStream<F, T>
216 where
217     F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
218 {
219     /// Create a new `SourceStream`
220     ///
221     /// The provided closure should return a newly created `glib::Source` when called
222     /// and pass the values provided by the source to the sender that is passed
223     /// to the closure.
new(create_source: F) -> SourceStream<F, T>224     pub fn new(create_source: F) -> SourceStream<F, T> {
225         SourceStream {
226             create_source: Some(create_source),
227             source: None,
228         }
229     }
230 }
231 
232 impl<F, T> Stream for SourceStream<F, T>
233 where
234     F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
235 {
236     type Item = T;
237 
poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>>238     fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
239         let SourceStream {
240             ref mut create_source,
241             ref mut source,
242             ..
243         } = *self;
244 
245         if let Some(create_source) = create_source.take() {
246             let main_context = MainContext::ref_thread_default();
247             assert!(
248                 main_context.is_owner(),
249                 "Spawning futures only allowed if the thread is owning the MainContext"
250             );
251 
252             // Channel for sending back the Source result to our future here.
253             //
254             // In theory we could directly continue polling the
255             // corresponding task from the Source callback,
256             // however this would break at the very least
257             // the g_main_current_source() API.
258             let (send, recv) = mpsc::unbounded();
259 
260             let s = create_source(send);
261 
262             s.attach(Some(&main_context));
263             *source = Some((s, recv));
264         }
265 
266         // At this point we must have a receiver
267         let res = {
268             let &mut (_, ref mut receiver) = source.as_mut().unwrap();
269             Pin::new(receiver).poll_next(ctx)
270         };
271         #[allow(clippy::match_wild_err_arm)]
272         match res {
273             Poll::Ready(v) => {
274                 if v.is_none() {
275                     // Get rid of the reference to the source, it triggered
276                     let _ = source.take();
277                 }
278                 Poll::Ready(v)
279             }
280             Poll::Pending => Poll::Pending,
281         }
282     }
283 }
284 
285 impl<T, F> Drop for SourceStream<T, F> {
drop(&mut self)286     fn drop(&mut self) {
287         // Get rid of the source, we don't care anymore if it still triggers
288         if let Some((source, _)) = self.source.take() {
289             source.destroy();
290         }
291     }
292 }
293 
294 /// Create a `Stream` that will provide a value every given number of milliseconds.
295 ///
296 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>297 pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
298     interval_stream_with_priority(crate::PRIORITY_DEFAULT, value)
299 }
300 
301 /// Create a `Stream` that will provide a value every given number of milliseconds.
302 ///
303 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream_with_priority( priority: Priority, value: Duration, ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>304 pub fn interval_stream_with_priority(
305     priority: Priority,
306     value: Duration,
307 ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
308     Box::pin(SourceStream::new(move |send| {
309         crate::timeout_source_new(value, None, priority, move || {
310             if send.unbounded_send(()).is_err() {
311                 Continue(false)
312             } else {
313                 Continue(true)
314             }
315         })
316     }))
317 }
318 
319 /// Create a `Stream` that will provide a value every given number of seconds.
320 ///
321 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>322 pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
323     interval_stream_seconds_with_priority(crate::PRIORITY_DEFAULT, value)
324 }
325 
326 /// Create a `Stream` that will provide a value every given number of seconds.
327 ///
328 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream_seconds_with_priority( priority: Priority, value: u32, ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>329 pub fn interval_stream_seconds_with_priority(
330     priority: Priority,
331     value: u32,
332 ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
333     Box::pin(SourceStream::new(move |send| {
334         crate::timeout_source_new_seconds(value, None, priority, move || {
335             if send.unbounded_send(()).is_err() {
336                 Continue(false)
337             } else {
338                 Continue(true)
339             }
340         })
341     }))
342 }
343 
344 #[cfg(any(unix, feature = "dox"))]
345 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
346 /// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
347 ///
348 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>349 pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
350     unix_signal_stream_with_priority(crate::PRIORITY_DEFAULT, signum)
351 }
352 
353 #[cfg(any(unix, feature = "dox"))]
354 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
355 /// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
356 ///
357 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_stream_with_priority( priority: Priority, signum: i32, ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>358 pub fn unix_signal_stream_with_priority(
359     priority: Priority,
360     signum: i32,
361 ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
362     Box::pin(SourceStream::new(move |send| {
363         crate::unix_signal_source_new(signum, None, priority, move || {
364             if send.unbounded_send(()).is_err() {
365                 Continue(false)
366             } else {
367                 Continue(true)
368             }
369         })
370     }))
371 }
372 
373 #[cfg(test)]
374 mod tests {
375     use super::*;
376     use futures_util::future::FutureExt;
377     use futures_util::stream::StreamExt;
378     use std::thread;
379     use std::time::Duration;
380 
381     #[test]
test_timeout()382     fn test_timeout() {
383         let c = MainContext::new();
384 
385         c.block_on(timeout_future(Duration::from_millis(20)));
386     }
387 
388     #[test]
test_timeout_send()389     fn test_timeout_send() {
390         let c = MainContext::new();
391         let l = crate::MainLoop::new(Some(&c), false);
392 
393         let l_clone = l.clone();
394         c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
395             l_clone.quit();
396             futures_util::future::ready(())
397         }));
398 
399         l.run();
400     }
401 
402     #[test]
test_interval()403     fn test_interval() {
404         let c = MainContext::new();
405 
406         let mut count = 0;
407 
408         {
409             let count = &mut count;
410             c.block_on(
411                 interval_stream(Duration::from_millis(20))
412                     .take(2)
413                     .for_each(|()| {
414                         *count += 1;
415 
416                         futures_util::future::ready(())
417                     })
418                     .map(|_| ()),
419             );
420         }
421 
422         assert_eq!(count, 2);
423     }
424 
425     #[test]
test_timeout_and_channel()426     fn test_timeout_and_channel() {
427         let c = MainContext::default();
428 
429         let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
430             let (sender, receiver) = oneshot::channel();
431 
432             thread::spawn(move || {
433                 sender.send(1).unwrap();
434             });
435 
436             receiver.then(|i| futures_util::future::ready(i.unwrap()))
437         }));
438 
439         assert_eq!(res, 1);
440     }
441 }
442