1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use futures_channel::oneshot;
4 use futures_core::task::{Context, Poll};
5 use std::future::Future;
6 use std::pin::{self, Pin};
7 
8 use crate::prelude::*;
9 use crate::Cancellable;
10 
11 pub struct GioFuture<F, O, T, E> {
12     obj: O,
13     schedule_operation: Option<F>,
14     cancellable: Option<Cancellable>,
15     receiver: Option<oneshot::Receiver<Result<T, E>>>,
16 }
17 
18 pub struct GioFutureResult<T, E> {
19     sender: ThreadGuard<oneshot::Sender<Result<T, E>>>,
20 }
21 
22 unsafe impl<T, E> Send for GioFutureResult<T, E> {}
23 
24 impl<T, E> GioFutureResult<T, E> {
resolve(self, res: Result<T, E>)25     pub fn resolve(self, res: Result<T, E>) {
26         let _ = self.sender.into_inner().send(res);
27     }
28 }
29 
30 impl<F, O, T: 'static, E: 'static> GioFuture<F, O, T, E>
31 where
32     O: Clone + 'static,
33     F: FnOnce(&O, &Cancellable, GioFutureResult<T, E>) + 'static,
34 {
new(obj: &O, schedule_operation: F) -> GioFuture<F, O, T, E>35     pub fn new(obj: &O, schedule_operation: F) -> GioFuture<F, O, T, E> {
36         GioFuture {
37             obj: obj.clone(),
38             schedule_operation: Some(schedule_operation),
39             cancellable: Some(Cancellable::new()),
40             receiver: None,
41         }
42     }
43 }
44 
45 impl<F, O, T, E> Future for GioFuture<F, O, T, E>
46 where
47     O: Clone + 'static,
48     F: FnOnce(&O, &Cancellable, GioFutureResult<T, E>) + 'static,
49 {
50     type Output = Result<T, E>;
51 
poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, E>>52     fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, E>> {
53         let GioFuture {
54             ref obj,
55             ref mut schedule_operation,
56             ref mut cancellable,
57             ref mut receiver,
58             ..
59         } = *self;
60 
61         if let Some(schedule_operation) = schedule_operation.take() {
62             let main_context = glib::MainContext::ref_thread_default();
63             assert!(
64                 main_context.is_owner(),
65                 "Spawning futures only allowed if the thread is owning the MainContext"
66             );
67 
68             // Channel for sending back the GIO async operation
69             // result to our future here.
70             //
71             // In theory, we could directly continue polling the
72             // corresponding task from the GIO async operation
73             // callback, however this would break at the very
74             // least the g_main_current_source() API.
75             let (send, recv) = oneshot::channel();
76 
77             schedule_operation(
78                 obj,
79                 cancellable.as_ref().unwrap(),
80                 GioFutureResult {
81                     sender: ThreadGuard::new(send),
82                 },
83             );
84 
85             *receiver = Some(recv);
86         }
87 
88         // At this point we must have a receiver
89         let res = {
90             let receiver = receiver.as_mut().unwrap();
91             Pin::new(receiver).poll(ctx)
92         };
93 
94         match res {
95             Poll::Pending => Poll::Pending,
96             Poll::Ready(Err(_)) => panic!("Async operation sender was unexpectedly closed"),
97             Poll::Ready(Ok(v)) => {
98                 // Get rid of the reference to the cancellable and receiver
99                 let _ = cancellable.take();
100                 let _ = receiver.take();
101                 Poll::Ready(v)
102             }
103         }
104     }
105 }
106 
107 impl<F, O, T, E> Drop for GioFuture<F, O, T, E> {
drop(&mut self)108     fn drop(&mut self) {
109         if let Some(cancellable) = self.cancellable.take() {
110             cancellable.cancel();
111         }
112         let _ = self.receiver.take();
113     }
114 }
115 
116 impl<F, O, T, E> Unpin for GioFuture<F, O, T, E> {}
117 
118 // Actual thread IDs can be reused by the OS once the old thread finished.
119 // This works around it by using our own counter for threads.
120 //
121 // Taken from the fragile crate
122 use std::sync::atomic::{AtomicUsize, Ordering};
next_thread_id() -> usize123 fn next_thread_id() -> usize {
124     static mut COUNTER: AtomicUsize = AtomicUsize::new(0);
125     unsafe { COUNTER.fetch_add(1, Ordering::SeqCst) }
126 }
127 
128 #[doc(alias = "get_thread_id")]
thread_id() -> usize129 fn thread_id() -> usize {
130     thread_local!(static THREAD_ID: usize = next_thread_id());
131     THREAD_ID.with(|&x| x)
132 }
133 
134 // Taken from glib-rs, but we don't want this to be public API
135 struct ThreadGuard<T> {
136     thread_id: usize,
137     value: Option<T>,
138 }
139 
140 impl<T> ThreadGuard<T> {
new(value: T) -> Self141     fn new(value: T) -> Self {
142         Self {
143             thread_id: thread_id(),
144             value: Some(value),
145         }
146     }
147 
into_inner(mut self) -> T148     fn into_inner(mut self) -> T {
149         if self.thread_id != thread_id() {
150             panic!("Value accessed from different thread than where it was created");
151         }
152 
153         self.value.take().expect("into_inner() called twice")
154     }
155 }
156 
157 impl<T> Drop for ThreadGuard<T> {
drop(&mut self)158     fn drop(&mut self) {
159         if self.thread_id != thread_id() {
160             panic!("Value dropped on a different thread than where it was created");
161         }
162     }
163 }
164 
165 unsafe impl<T> Send for ThreadGuard<T> {}
166