1 // Take a look at the license at the top of the repository in the LICENSE file.
2
3 use futures_channel::oneshot;
4 use futures_core::task::{Context, Poll};
5 use std::future::Future;
6 use std::pin::{self, Pin};
7
8 use crate::prelude::*;
9 use crate::Cancellable;
10
11 pub struct GioFuture<F, O, T, E> {
12 obj: O,
13 schedule_operation: Option<F>,
14 cancellable: Option<Cancellable>,
15 receiver: Option<oneshot::Receiver<Result<T, E>>>,
16 }
17
18 pub struct GioFutureResult<T, E> {
19 sender: ThreadGuard<oneshot::Sender<Result<T, E>>>,
20 }
21
22 unsafe impl<T, E> Send for GioFutureResult<T, E> {}
23
24 impl<T, E> GioFutureResult<T, E> {
resolve(self, res: Result<T, E>)25 pub fn resolve(self, res: Result<T, E>) {
26 let _ = self.sender.into_inner().send(res);
27 }
28 }
29
30 impl<F, O, T: 'static, E: 'static> GioFuture<F, O, T, E>
31 where
32 O: Clone + 'static,
33 F: FnOnce(&O, &Cancellable, GioFutureResult<T, E>) + 'static,
34 {
new(obj: &O, schedule_operation: F) -> GioFuture<F, O, T, E>35 pub fn new(obj: &O, schedule_operation: F) -> GioFuture<F, O, T, E> {
36 GioFuture {
37 obj: obj.clone(),
38 schedule_operation: Some(schedule_operation),
39 cancellable: Some(Cancellable::new()),
40 receiver: None,
41 }
42 }
43 }
44
45 impl<F, O, T, E> Future for GioFuture<F, O, T, E>
46 where
47 O: Clone + 'static,
48 F: FnOnce(&O, &Cancellable, GioFutureResult<T, E>) + 'static,
49 {
50 type Output = Result<T, E>;
51
poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, E>>52 fn poll(mut self: pin::Pin<&mut Self>, ctx: &mut Context) -> Poll<Result<T, E>> {
53 let GioFuture {
54 ref obj,
55 ref mut schedule_operation,
56 ref mut cancellable,
57 ref mut receiver,
58 ..
59 } = *self;
60
61 if let Some(schedule_operation) = schedule_operation.take() {
62 let main_context = glib::MainContext::ref_thread_default();
63 assert!(
64 main_context.is_owner(),
65 "Spawning futures only allowed if the thread is owning the MainContext"
66 );
67
68 // Channel for sending back the GIO async operation
69 // result to our future here.
70 //
71 // In theory, we could directly continue polling the
72 // corresponding task from the GIO async operation
73 // callback, however this would break at the very
74 // least the g_main_current_source() API.
75 let (send, recv) = oneshot::channel();
76
77 schedule_operation(
78 obj,
79 cancellable.as_ref().unwrap(),
80 GioFutureResult {
81 sender: ThreadGuard::new(send),
82 },
83 );
84
85 *receiver = Some(recv);
86 }
87
88 // At this point we must have a receiver
89 let res = {
90 let receiver = receiver.as_mut().unwrap();
91 Pin::new(receiver).poll(ctx)
92 };
93
94 match res {
95 Poll::Pending => Poll::Pending,
96 Poll::Ready(Err(_)) => panic!("Async operation sender was unexpectedly closed"),
97 Poll::Ready(Ok(v)) => {
98 // Get rid of the reference to the cancellable and receiver
99 let _ = cancellable.take();
100 let _ = receiver.take();
101 Poll::Ready(v)
102 }
103 }
104 }
105 }
106
107 impl<F, O, T, E> Drop for GioFuture<F, O, T, E> {
drop(&mut self)108 fn drop(&mut self) {
109 if let Some(cancellable) = self.cancellable.take() {
110 cancellable.cancel();
111 }
112 let _ = self.receiver.take();
113 }
114 }
115
116 impl<F, O, T, E> Unpin for GioFuture<F, O, T, E> {}
117
118 // Actual thread IDs can be reused by the OS once the old thread finished.
119 // This works around it by using our own counter for threads.
120 //
121 // Taken from the fragile crate
122 use std::sync::atomic::{AtomicUsize, Ordering};
next_thread_id() -> usize123 fn next_thread_id() -> usize {
124 static mut COUNTER: AtomicUsize = AtomicUsize::new(0);
125 unsafe { COUNTER.fetch_add(1, Ordering::SeqCst) }
126 }
127
128 #[doc(alias = "get_thread_id")]
thread_id() -> usize129 fn thread_id() -> usize {
130 thread_local!(static THREAD_ID: usize = next_thread_id());
131 THREAD_ID.with(|&x| x)
132 }
133
134 // Taken from glib-rs, but we don't want this to be public API
135 struct ThreadGuard<T> {
136 thread_id: usize,
137 value: Option<T>,
138 }
139
140 impl<T> ThreadGuard<T> {
new(value: T) -> Self141 fn new(value: T) -> Self {
142 Self {
143 thread_id: thread_id(),
144 value: Some(value),
145 }
146 }
147
into_inner(mut self) -> T148 fn into_inner(mut self) -> T {
149 if self.thread_id != thread_id() {
150 panic!("Value accessed from different thread than where it was created");
151 }
152
153 self.value.take().expect("into_inner() called twice")
154 }
155 }
156
157 impl<T> Drop for ThreadGuard<T> {
drop(&mut self)158 fn drop(&mut self) {
159 if self.thread_id != thread_id() {
160 panic!("Value dropped on a different thread than where it was created");
161 }
162 }
163 }
164
165 unsafe impl<T> Send for ThreadGuard<T> {}
166