1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use crate::prelude::*;
4 use crate::Cancellable;
5 use crate::PollableInputStream;
6 use futures_core::task::{Context, Poll};
7 use futures_io::AsyncRead;
8 use glib::object::{Cast, IsA};
9 use glib::translate::*;
10 use std::cell::RefCell;
11 use std::io;
12 use std::mem::transmute;
13 use std::ptr;
14 
15 use futures_core::stream::Stream;
16 use std::pin::Pin;
17 
18 pub trait PollableInputStreamExtManual: Sized {
19     #[doc(alias = "g_pollable_input_stream_create_source")]
create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>20     fn create_source<F, C>(
21         &self,
22         cancellable: Option<&C>,
23         name: Option<&str>,
24         priority: glib::Priority,
25         func: F,
26     ) -> glib::Source
27     where
28         F: FnMut(&Self) -> glib::Continue + 'static,
29         C: IsA<Cancellable>;
30 
create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>31     fn create_source_future<C: IsA<Cancellable>>(
32         &self,
33         cancellable: Option<&C>,
34         priority: glib::Priority,
35     ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>;
36 
create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>37     fn create_source_stream<C: IsA<Cancellable>>(
38         &self,
39         cancellable: Option<&C>,
40         priority: glib::Priority,
41     ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>;
42 
43     #[doc(alias = "g_pollable_input_stream_read_nonblocking")]
read_nonblocking<C: IsA<Cancellable>>( &self, buffer: &mut [u8], cancellable: Option<&C>, ) -> Result<isize, glib::Error>44     fn read_nonblocking<C: IsA<Cancellable>>(
45         &self,
46         buffer: &mut [u8],
47         cancellable: Option<&C>,
48     ) -> Result<isize, glib::Error>;
49 
into_async_read(self) -> Result<InputStreamAsyncRead<Self>, Self> where Self: IsA<PollableInputStream>,50     fn into_async_read(self) -> Result<InputStreamAsyncRead<Self>, Self>
51     where
52         Self: IsA<PollableInputStream>,
53     {
54         if self.can_poll() {
55             Ok(InputStreamAsyncRead(self))
56         } else {
57             Err(self)
58         }
59     }
60 }
61 
62 impl<O: IsA<PollableInputStream>> PollableInputStreamExtManual for O {
create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>,63     fn create_source<F, C>(
64         &self,
65         cancellable: Option<&C>,
66         name: Option<&str>,
67         priority: glib::Priority,
68         func: F,
69     ) -> glib::Source
70     where
71         F: FnMut(&Self) -> glib::Continue + 'static,
72         C: IsA<Cancellable>,
73     {
74         unsafe extern "C" fn trampoline<
75             O: IsA<PollableInputStream>,
76             F: FnMut(&O) -> glib::Continue + 'static,
77         >(
78             stream: *mut ffi::GPollableInputStream,
79             func: glib::ffi::gpointer,
80         ) -> glib::ffi::gboolean {
81             let func: &RefCell<F> = &*(func as *const RefCell<F>);
82             let mut func = func.borrow_mut();
83             (&mut *func)(PollableInputStream::from_glib_borrow(stream).unsafe_cast_ref())
84                 .into_glib()
85         }
86         unsafe extern "C" fn destroy_closure<O, F>(ptr: glib::ffi::gpointer) {
87             Box::<RefCell<F>>::from_raw(ptr as *mut _);
88         }
89         let cancellable = cancellable.map(|c| c.as_ref());
90         let gcancellable = cancellable.to_glib_none();
91         unsafe {
92             let source = ffi::g_pollable_input_stream_create_source(
93                 self.as_ref().to_glib_none().0,
94                 gcancellable.0,
95             );
96 
97             let trampoline = trampoline::<Self, F> as glib::ffi::gpointer;
98             glib::ffi::g_source_set_callback(
99                 source,
100                 Some(transmute::<
101                     _,
102                     unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean,
103                 >(trampoline)),
104                 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer,
105                 Some(destroy_closure::<Self, F>),
106             );
107             glib::ffi::g_source_set_priority(source, priority.into_glib());
108 
109             if let Some(name) = name {
110                 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
111             }
112 
113             from_glib_full(source)
114         }
115     }
116 
read_nonblocking<C: IsA<Cancellable>>( &self, buffer: &mut [u8], cancellable: Option<&C>, ) -> Result<isize, glib::Error>117     fn read_nonblocking<C: IsA<Cancellable>>(
118         &self,
119         buffer: &mut [u8],
120         cancellable: Option<&C>,
121     ) -> Result<isize, glib::Error> {
122         let cancellable = cancellable.map(|c| c.as_ref());
123         let gcancellable = cancellable.to_glib_none();
124         let count = buffer.len() as usize;
125         unsafe {
126             let mut error = ptr::null_mut();
127             let ret = ffi::g_pollable_input_stream_read_nonblocking(
128                 self.as_ref().to_glib_none().0,
129                 buffer.to_glib_none().0,
130                 count,
131                 gcancellable.0,
132                 &mut error,
133             );
134             if error.is_null() {
135                 Ok(ret)
136             } else {
137                 Err(from_glib_full(error))
138             }
139         }
140     }
141 
create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>142     fn create_source_future<C: IsA<Cancellable>>(
143         &self,
144         cancellable: Option<&C>,
145         priority: glib::Priority,
146     ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> {
147         let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
148 
149         let obj = self.clone();
150         Box::pin(glib::SourceFuture::new(move |send| {
151             let mut send = Some(send);
152             obj.create_source(cancellable.as_ref(), None, priority, move |_| {
153                 let _ = send.take().unwrap().send(());
154                 glib::Continue(false)
155             })
156         }))
157     }
158 
create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>159     fn create_source_stream<C: IsA<Cancellable>>(
160         &self,
161         cancellable: Option<&C>,
162         priority: glib::Priority,
163     ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> {
164         let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned();
165 
166         let obj = self.clone();
167         Box::pin(glib::SourceStream::new(move |send| {
168             obj.create_source(cancellable.as_ref(), None, priority, move |_| {
169                 if send.unbounded_send(()).is_err() {
170                     glib::Continue(false)
171                 } else {
172                     glib::Continue(true)
173                 }
174             })
175         }))
176     }
177 }
178 
179 #[derive(Debug)]
180 pub struct InputStreamAsyncRead<T: IsA<PollableInputStream>>(T);
181 
182 impl<T: IsA<PollableInputStream>> InputStreamAsyncRead<T> {
into_input_stream(self) -> T183     pub fn into_input_stream(self) -> T {
184         self.0
185     }
186 
input_stream(&self) -> &T187     pub fn input_stream(&self) -> &T {
188         &self.0
189     }
190 }
191 
192 impl<T: IsA<PollableInputStream>> AsyncRead for InputStreamAsyncRead<T> {
poll_read( self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8], ) -> Poll<io::Result<usize>>193     fn poll_read(
194         self: Pin<&mut Self>,
195         cx: &mut Context,
196         buf: &mut [u8],
197     ) -> Poll<io::Result<usize>> {
198         let stream = Pin::get_ref(self.as_ref());
199         let gio_result = stream
200             .0
201             .as_ref()
202             .read_nonblocking(buf, crate::NONE_CANCELLABLE);
203 
204         match gio_result {
205             Ok(size) => Poll::Ready(Ok(size as usize)),
206             Err(err) => {
207                 let kind = err.kind::<crate::IOErrorEnum>().unwrap();
208                 if kind == crate::IOErrorEnum::WouldBlock {
209                     let mut waker = Some(cx.waker().clone());
210                     let source = stream.0.as_ref().create_source(
211                         crate::NONE_CANCELLABLE,
212                         None,
213                         glib::PRIORITY_DEFAULT,
214                         move |_| {
215                             if let Some(waker) = waker.take() {
216                                 waker.wake();
217                             }
218                             glib::Continue(false)
219                         },
220                     );
221                     let main_context = glib::MainContext::ref_thread_default();
222                     source.attach(Some(&main_context));
223 
224                     Poll::Pending
225                 } else {
226                     Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
227                 }
228             }
229         }
230     }
231 }
232