1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use crate::error::to_std_io_result;
4 use crate::prelude::*;
5 use crate::Cancellable;
6 use crate::PollableOutputStream;
7 use futures_channel::oneshot;
8 use futures_core::task::{Context, Poll};
9 use futures_core::Future;
10 use futures_io::AsyncWrite;
11 use glib::object::{Cast, IsA};
12 use glib::translate::*;
13 use std::cell::RefCell;
14 use std::io;
15 use std::mem::transmute;
16 use std::pin::Pin;
17 
18 use futures_core::stream::Stream;
19 
20 pub trait PollableOutputStreamExtManual {
21     #[doc(alias = "g_pollable_output_stream_create_source")]
create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>22     fn create_source<F, C>(
23         &self,
24         cancellable: Option<&C>,
25         name: Option<&str>,
26         priority: glib::Priority,
27         func: F,
28     ) -> glib::Source
29     where
30         F: FnMut(&Self) -> glib::Continue + 'static,
31         C: IsA<Cancellable>;
32 
create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>33     fn create_source_future<C: IsA<Cancellable>>(
34         &self,
35         cancellable: Option<&C>,
36         priority: glib::Priority,
37     ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>;
38 
create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>39     fn create_source_stream<C: IsA<Cancellable>>(
40         &self,
41         cancellable: Option<&C>,
42         priority: glib::Priority,
43     ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>;
44 
into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self> where Self: IsA<PollableOutputStream>,45     fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self>
46     where
47         Self: IsA<PollableOutputStream>,
48     {
49         if self.can_poll() {
50             Ok(OutputStreamAsyncWrite(self, None))
51         } else {
52             Err(self)
53         }
54     }
55 }
56 
57 impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O {
create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>,58     fn create_source<F, C>(
59         &self,
60         cancellable: Option<&C>,
61         name: Option<&str>,
62         priority: glib::Priority,
63         func: F,
64     ) -> glib::Source
65     where
66         F: FnMut(&Self) -> glib::Continue + 'static,
67         C: IsA<Cancellable>,
68     {
69         unsafe extern "C" fn trampoline<
70             O: IsA<PollableOutputStream>,
71             F: FnMut(&O) -> glib::Continue + 'static,
72         >(
73             stream: *mut ffi::GPollableOutputStream,
74             func: glib::ffi::gpointer,
75         ) -> glib::ffi::gboolean {
76             let func: &RefCell<F> = &*(func as *const RefCell<F>);
77             let mut func = func.borrow_mut();
78             (&mut *func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref())
79                 .into_glib()
80         }
81         unsafe extern "C" fn destroy_closure<O, F>(ptr: glib::ffi::gpointer) {
82             Box::<RefCell<F>>::from_raw(ptr as *mut _);
83         }
84         let cancellable = cancellable.map(|c| c.as_ref());
85         let gcancellable = cancellable.to_glib_none();
86         unsafe {
87             let source = ffi::g_pollable_output_stream_create_source(
88                 self.as_ref().to_glib_none().0,
89                 gcancellable.0,
90             );
91 
92             let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
93             glib::ffi::g_source_set_callback(
94                 source,
95                 Some(transmute::<
96                     _,
97                     unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
98                 >(trampoline)),
99                 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
100                 Some(destroy_closure::<Self, F>),
101             );
102             glib::ffi::g_source_set_priority(source, priority.into_glib());
103 
104             if let Some(name) = name {
105                 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
106             }
107 
108             from_glib_full(source)
109         }
110     }
111 
create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>112     fn create_source_future<C: IsA<Cancellable>>(
113         &self,
114         cancellable: Option<&C>,
115         priority: glib::Priority,
116     ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
117         let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
118 
119         let obj = self.clone();
120         Box::pin(glib::SourceFuture::new(move |send| {
121             let mut send = Some(send);
122             obj.create_source(cancellable.as_ref(), None, priority, move |_| {
123                 let _ = send.take().unwrap().send(());
124                 glib::Continue(false)
125             })
126         }))
127     }
128 
create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>129     fn create_source_stream<C: IsA<Cancellable>>(
130         &self,
131         cancellable: Option<&C>,
132         priority: glib::Priority,
133     ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
134         let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
135 
136         let obj = self.clone();
137         Box::pin(glib::SourceStream::new(move |send| {
138             let send = Some(send);
139             obj.create_source(cancellable.as_ref(), None, priority, move |_| {
140                 if send.as_ref().unwrap().unbounded_send(()).is_err() {
141                     glib::Continue(false)
142                 } else {
143                     glib::Continue(true)
144                 }
145             })
146         }))
147     }
148 }
149 
150 #[derive(Debug)]
151 pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>(
152     T,
153     Option<oneshot::Receiver<Result<(), glib::Error>>>,
154 );
155 
156 impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> {
into_output_stream(self) -> T157     pub fn into_output_stream(self) -> T {
158         self.0
159     }
160 
output_stream(&self) -> &T161     pub fn output_stream(&self) -> &T {
162         &self.0
163     }
164 }
165 
166 impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> {
poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>167     fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
168         let stream = Pin::get_ref(self.as_ref());
169         let gio_result = stream
170             .0
171             .as_ref()
172             .write_nonblocking(buf, crate::NONE_CANCELLABLE);
173 
174         match gio_result {
175             Ok(size) => Poll::Ready(Ok(size as usize)),
176             Err(err) => {
177                 let kind = err.kind::<crate::IOErrorEnum>().unwrap();
178                 if kind == crate::IOErrorEnum::WouldBlock {
179                     let mut waker = Some(cx.waker().clone());
180                     let source = stream.0.as_ref().create_source(
181                         crate::NONE_CANCELLABLE,
182                         None,
183                         glib::PRIORITY_DEFAULT,
184                         move |_| {
185                             if let Some(waker) = waker.take() {
186                                 waker.wake();
187                             }
188                             glib::Continue(false)
189                         },
190                     );
191                     let main_context = glib::MainContext::ref_thread_default();
192                     source.attach(Some(&main_context));
193 
194                     Poll::Pending
195                 } else {
196                     Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
197                 }
198             }
199         }
200     }
201 
poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>202     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
203         let stream = unsafe { Pin::get_unchecked_mut(self) };
204 
205         let rx = if let Some(ref mut rx) = stream.1 {
206             rx
207         } else {
208             let (tx, rx) = oneshot::channel();
209             stream.0.as_ref().flush_async(
210                 glib::PRIORITY_DEFAULT,
211                 crate::NONE_CANCELLABLE,
212                 move |res| {
213                     let _ = tx.send(res);
214                 },
215             );
216 
217             stream.1 = Some(rx);
218             stream.1.as_mut().unwrap()
219         };
220 
221         match Pin::new(rx).poll(cx) {
222             Poll::Ready(Ok(res)) => {
223                 let _ = stream.1.take();
224                 Poll::Ready(to_std_io_result(res))
225             }
226             Poll::Ready(Err(_)) => {
227                 let _ = stream.1.take();
228                 Poll::Ready(Ok(()))
229             }
230             Poll::Pending => Poll::Pending,
231         }
232     }
233 
poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>234     fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
235         let stream = unsafe { Pin::get_unchecked_mut(self) };
236 
237         let rx = if let Some(ref mut rx) = stream.1 {
238             rx
239         } else {
240             let (tx, rx) = oneshot::channel();
241             stream.0.as_ref().close_async(
242                 glib::PRIORITY_DEFAULT,
243                 crate::NONE_CANCELLABLE,
244                 move |res| {
245                     let _ = tx.send(res);
246                 },
247             );
248 
249             stream.1 = Some(rx);
250             stream.1.as_mut().unwrap()
251         };
252 
253         match Pin::new(rx).poll(cx) {
254             Poll::Ready(Ok(res)) => {
255                 let _ = stream.1.take();
256                 Poll::Ready(to_std_io_result(res))
257             }
258             Poll::Ready(Err(_)) => {
259                 let _ = stream.1.take();
260                 Poll::Ready(Ok(()))
261             }
262             Poll::Pending => Poll::Pending,
263         }
264     }
265 }
266