1 // Take a look at the license at the top of the repository in the LICENSE file.
2
3 use futures_channel::{mpsc, oneshot};
4 use futures_core::future::Future;
5 use futures_core::stream::Stream;
6 use futures_core::task;
7 use futures_core::task::Poll;
8 use std::marker::Unpin;
9 use std::pin;
10 use std::pin::Pin;
11 use std::time::Duration;
12
13 use crate::Continue;
14 use crate::MainContext;
15 use crate::Priority;
16 use crate::Source;
17
18 /// Represents a `Future` around a `glib::Source`. The future will
19 /// be resolved once the source has provided a value
20 pub struct SourceFuture<F, T> {
21 create_source: Option<F>,
22 source: Option<(Source, oneshot::Receiver<T>)>,
23 }
24
25 impl<F, T: 'static> SourceFuture<F, T>
26 where
27 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
28 {
29 /// Create a new `SourceFuture`
30 ///
31 /// The provided closure should return a newly created `glib::Source` when called
32 /// and pass the value provided by the source to the oneshot sender that is passed
33 /// to the closure.
new(create_source: F) -> SourceFuture<F, T>34 pub fn new(create_source: F) -> SourceFuture<F, T> {
35 SourceFuture {
36 create_source: Some(create_source),
37 source: None,
38 }
39 }
40 }
41
42 impl<F, T> Unpin for SourceFuture<F, T> {}
43
44 impl<F, T> Future for SourceFuture<F, T>
45 where
46 F: FnOnce(oneshot::Sender<T>) -> Source + 'static,
47 {
48 type Output = T;
49
poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T>50 fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<T> {
51 let SourceFuture {
52 ref mut create_source,
53 ref mut source,
54 ..
55 } = *self;
56
57 if let Some(create_source) = create_source.take() {
58 let main_context = MainContext::ref_thread_default();
59 assert!(
60 main_context.is_owner(),
61 "Spawning futures only allowed if the thread is owning the MainContext"
62 );
63
64 // Channel for sending back the Source result to our future here.
65 //
66 // In theory, we could directly continue polling the
67 // corresponding task from the Source callback,
68 // however this would break at the very least
69 // the g_main_current_source() API.
70 let (send, recv) = oneshot::channel();
71
72 let s = create_source(send);
73
74 s.attach(Some(&main_context));
75 *source = Some((s, recv));
76 }
77
78 // At this point we must have a receiver
79 let res = {
80 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
81 Pin::new(receiver).poll(ctx)
82 };
83 #[allow(clippy::match_wild_err_arm)]
84 match res {
85 Poll::Ready(Err(_)) => panic!("Source sender was unexpectedly closed"),
86 Poll::Ready(Ok(v)) => {
87 // Get rid of the reference to the source, it triggered
88 let _ = source.take();
89 Poll::Ready(v)
90 }
91 Poll::Pending => Poll::Pending,
92 }
93 }
94 }
95
96 impl<T, F> Drop for SourceFuture<T, F> {
drop(&mut self)97 fn drop(&mut self) {
98 // Get rid of the source, we don't care anymore if it still triggers
99 if let Some((source, _)) = self.source.take() {
100 source.destroy();
101 }
102 }
103 }
104
105 /// Create a `Future` that will resolve after the given number of milliseconds.
106 ///
107 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>108 pub fn timeout_future(value: Duration) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
109 timeout_future_with_priority(crate::PRIORITY_DEFAULT, value)
110 }
111
112 /// Create a `Future` that will resolve after the given number of milliseconds.
113 ///
114 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future_with_priority( priority: Priority, value: Duration, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>115 pub fn timeout_future_with_priority(
116 priority: Priority,
117 value: Duration,
118 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
119 Box::pin(SourceFuture::new(move |send| {
120 let mut send = Some(send);
121 crate::timeout_source_new(value, None, priority, move || {
122 let _ = send.take().unwrap().send(());
123 Continue(false)
124 })
125 }))
126 }
127
128 /// Create a `Future` that will resolve after the given number of seconds.
129 ///
130 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>131 pub fn timeout_future_seconds(value: u32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
132 timeout_future_seconds_with_priority(crate::PRIORITY_DEFAULT, value)
133 }
134
135 /// Create a `Future` that will resolve after the given number of seconds.
136 ///
137 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
timeout_future_seconds_with_priority( priority: Priority, value: u32, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>138 pub fn timeout_future_seconds_with_priority(
139 priority: Priority,
140 value: u32,
141 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
142 Box::pin(SourceFuture::new(move |send| {
143 let mut send = Some(send);
144 crate::timeout_source_new_seconds(value, None, priority, move || {
145 let _ = send.take().unwrap().send(());
146 Continue(false)
147 })
148 }))
149 }
150
151 /// Create a `Future` that will resolve once the child process with the given pid exits
152 ///
153 /// The `Future` will resolve to the pid of the child process and the exit code.
154 ///
155 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
child_watch_future( pid: crate::Pid, ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>>156 pub fn child_watch_future(
157 pid: crate::Pid,
158 ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
159 child_watch_future_with_priority(crate::PRIORITY_DEFAULT, pid)
160 }
161
162 /// Create a `Future` that will resolve once the child process with the given pid exits
163 ///
164 /// The `Future` will resolve to the pid of the child process and the exit code.
165 ///
166 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
child_watch_future_with_priority( priority: Priority, pid: crate::Pid, ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>>167 pub fn child_watch_future_with_priority(
168 priority: Priority,
169 pid: crate::Pid,
170 ) -> Pin<Box<dyn Future<Output = (crate::Pid, i32)> + Send + 'static>> {
171 Box::pin(SourceFuture::new(move |send| {
172 let mut send = Some(send);
173 crate::child_watch_source_new(pid, None, priority, move |pid, code| {
174 let _ = send.take().unwrap().send((pid, code));
175 })
176 }))
177 }
178
179 #[cfg(any(unix, feature = "dox"))]
180 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
181 /// Create a `Future` that will resolve once the given UNIX signal is raised
182 ///
183 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>184 pub fn unix_signal_future(signum: i32) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
185 unix_signal_future_with_priority(crate::PRIORITY_DEFAULT, signum)
186 }
187
188 #[cfg(any(unix, feature = "dox"))]
189 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
190 /// Create a `Future` that will resolve once the given UNIX signal is raised
191 ///
192 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_future_with_priority( priority: Priority, signum: i32, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>193 pub fn unix_signal_future_with_priority(
194 priority: Priority,
195 signum: i32,
196 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
197 Box::pin(SourceFuture::new(move |send| {
198 let mut send = Some(send);
199 crate::unix_signal_source_new(signum, None, priority, move || {
200 let _ = send.take().unwrap().send(());
201 Continue(false)
202 })
203 }))
204 }
205
206 /// Represents a `Stream` around a `glib::Source`. The stream will
207 /// be provide all values that are provided by the source
208 pub struct SourceStream<F, T> {
209 create_source: Option<F>,
210 source: Option<(Source, mpsc::UnboundedReceiver<T>)>,
211 }
212
213 impl<F, T> Unpin for SourceStream<F, T> {}
214
215 impl<F, T: 'static> SourceStream<F, T>
216 where
217 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
218 {
219 /// Create a new `SourceStream`
220 ///
221 /// The provided closure should return a newly created `glib::Source` when called
222 /// and pass the values provided by the source to the sender that is passed
223 /// to the closure.
new(create_source: F) -> SourceStream<F, T>224 pub fn new(create_source: F) -> SourceStream<F, T> {
225 SourceStream {
226 create_source: Some(create_source),
227 source: None,
228 }
229 }
230 }
231
232 impl<F, T> Stream for SourceStream<F, T>
233 where
234 F: FnOnce(mpsc::UnboundedSender<T>) -> Source + 'static,
235 {
236 type Item = T;
237
poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>>238 fn poll_next(mut self: pin::Pin<&mut Self>, ctx: &mut task::Context) -> Poll<Option<T>> {
239 let SourceStream {
240 ref mut create_source,
241 ref mut source,
242 ..
243 } = *self;
244
245 if let Some(create_source) = create_source.take() {
246 let main_context = MainContext::ref_thread_default();
247 assert!(
248 main_context.is_owner(),
249 "Spawning futures only allowed if the thread is owning the MainContext"
250 );
251
252 // Channel for sending back the Source result to our future here.
253 //
254 // In theory we could directly continue polling the
255 // corresponding task from the Source callback,
256 // however this would break at the very least
257 // the g_main_current_source() API.
258 let (send, recv) = mpsc::unbounded();
259
260 let s = create_source(send);
261
262 s.attach(Some(&main_context));
263 *source = Some((s, recv));
264 }
265
266 // At this point we must have a receiver
267 let res = {
268 let &mut (_, ref mut receiver) = source.as_mut().unwrap();
269 Pin::new(receiver).poll_next(ctx)
270 };
271 #[allow(clippy::match_wild_err_arm)]
272 match res {
273 Poll::Ready(v) => {
274 if v.is_none() {
275 // Get rid of the reference to the source, it triggered
276 let _ = source.take();
277 }
278 Poll::Ready(v)
279 }
280 Poll::Pending => Poll::Pending,
281 }
282 }
283 }
284
285 impl<T, F> Drop for SourceStream<T, F> {
drop(&mut self)286 fn drop(&mut self) {
287 // Get rid of the source, we don't care anymore if it still triggers
288 if let Some((source, _)) = self.source.take() {
289 source.destroy();
290 }
291 }
292 }
293
294 /// Create a `Stream` that will provide a value every given number of milliseconds.
295 ///
296 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>297 pub fn interval_stream(value: Duration) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
298 interval_stream_with_priority(crate::PRIORITY_DEFAULT, value)
299 }
300
301 /// Create a `Stream` that will provide a value every given number of milliseconds.
302 ///
303 /// The `Future` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream_with_priority( priority: Priority, value: Duration, ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>304 pub fn interval_stream_with_priority(
305 priority: Priority,
306 value: Duration,
307 ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
308 Box::pin(SourceStream::new(move |send| {
309 crate::timeout_source_new(value, None, priority, move || {
310 if send.unbounded_send(()).is_err() {
311 Continue(false)
312 } else {
313 Continue(true)
314 }
315 })
316 }))
317 }
318
319 /// Create a `Stream` that will provide a value every given number of seconds.
320 ///
321 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>322 pub fn interval_stream_seconds(value: u32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
323 interval_stream_seconds_with_priority(crate::PRIORITY_DEFAULT, value)
324 }
325
326 /// Create a `Stream` that will provide a value every given number of seconds.
327 ///
328 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
interval_stream_seconds_with_priority( priority: Priority, value: u32, ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>329 pub fn interval_stream_seconds_with_priority(
330 priority: Priority,
331 value: u32,
332 ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
333 Box::pin(SourceStream::new(move |send| {
334 crate::timeout_source_new_seconds(value, None, priority, move || {
335 if send.unbounded_send(()).is_err() {
336 Continue(false)
337 } else {
338 Continue(true)
339 }
340 })
341 }))
342 }
343
344 #[cfg(any(unix, feature = "dox"))]
345 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
346 /// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
347 ///
348 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>349 pub fn unix_signal_stream(signum: i32) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
350 unix_signal_stream_with_priority(crate::PRIORITY_DEFAULT, signum)
351 }
352
353 #[cfg(any(unix, feature = "dox"))]
354 #[cfg_attr(feature = "dox", doc(cfg(unix)))]
355 /// Create a `Stream` that will provide a value whenever the given UNIX signal is raised
356 ///
357 /// The `Stream` must be spawned on an `Executor` backed by a `glib::MainContext`.
unix_signal_stream_with_priority( priority: Priority, signum: i32, ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>>358 pub fn unix_signal_stream_with_priority(
359 priority: Priority,
360 signum: i32,
361 ) -> Pin<Box<dyn Stream<Item = ()> + Send + 'static>> {
362 Box::pin(SourceStream::new(move |send| {
363 crate::unix_signal_source_new(signum, None, priority, move || {
364 if send.unbounded_send(()).is_err() {
365 Continue(false)
366 } else {
367 Continue(true)
368 }
369 })
370 }))
371 }
372
373 #[cfg(test)]
374 mod tests {
375 use super::*;
376 use futures_util::future::FutureExt;
377 use futures_util::stream::StreamExt;
378 use std::thread;
379 use std::time::Duration;
380
381 #[test]
test_timeout()382 fn test_timeout() {
383 let c = MainContext::new();
384
385 c.block_on(timeout_future(Duration::from_millis(20)));
386 }
387
388 #[test]
test_timeout_send()389 fn test_timeout_send() {
390 let c = MainContext::new();
391 let l = crate::MainLoop::new(Some(&c), false);
392
393 let l_clone = l.clone();
394 c.spawn(timeout_future(Duration::from_millis(20)).then(move |()| {
395 l_clone.quit();
396 futures_util::future::ready(())
397 }));
398
399 l.run();
400 }
401
402 #[test]
test_interval()403 fn test_interval() {
404 let c = MainContext::new();
405
406 let mut count = 0;
407
408 {
409 let count = &mut count;
410 c.block_on(
411 interval_stream(Duration::from_millis(20))
412 .take(2)
413 .for_each(|()| {
414 *count += 1;
415
416 futures_util::future::ready(())
417 })
418 .map(|_| ()),
419 );
420 }
421
422 assert_eq!(count, 2);
423 }
424
425 #[test]
test_timeout_and_channel()426 fn test_timeout_and_channel() {
427 let c = MainContext::default();
428
429 let res = c.block_on(timeout_future(Duration::from_millis(20)).then(|()| {
430 let (sender, receiver) = oneshot::channel();
431
432 thread::spawn(move || {
433 sender.send(1).unwrap();
434 });
435
436 receiver.then(|i| futures_util::future::ready(i.unwrap()))
437 }));
438
439 assert_eq!(res, 1);
440 }
441 }
442