1 // Take a look at the license at the top of the repository in the LICENSE file. 2 3 use crate::prelude::*; 4 use crate::Cancellable; 5 use crate::PollableInputStream; 6 use futures_core::task::{Context, Poll}; 7 use futures_io::AsyncRead; 8 use glib::object::{Cast, IsA}; 9 use glib::translate::*; 10 use std::cell::RefCell; 11 use std::io; 12 use std::mem::transmute; 13 use std::ptr; 14 15 use futures_core::stream::Stream; 16 use std::pin::Pin; 17 18 pub trait PollableInputStreamExtManual: Sized { 19 #[doc(alias = "g_pollable_input_stream_create_source")] create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>20 fn create_source<F, C>( 21 &self, 22 cancellable: Option<&C>, 23 name: Option<&str>, 24 priority: glib::Priority, 25 func: F, 26 ) -> glib::Source 27 where 28 F: FnMut(&Self) -> glib::Continue + 'static, 29 C: IsA<Cancellable>; 30 create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>31 fn create_source_future<C: IsA<Cancellable>>( 32 &self, 33 cancellable: Option<&C>, 34 priority: glib::Priority, 35 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>; 36 create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>37 fn create_source_stream<C: IsA<Cancellable>>( 38 &self, 39 cancellable: Option<&C>, 40 priority: glib::Priority, 41 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>; 42 43 #[doc(alias = "g_pollable_input_stream_read_nonblocking")] read_nonblocking<C: IsA<Cancellable>>( &self, buffer: &mut [u8], cancellable: Option<&C>, ) -> Result<isize, glib::Error>44 fn read_nonblocking<C: IsA<Cancellable>>( 45 &self, 46 buffer: &mut [u8], 47 cancellable: Option<&C>, 48 ) -> Result<isize, glib::Error>; 49 into_async_read(self) -> Result<InputStreamAsyncRead<Self>, Self> where Self: IsA<PollableInputStream>,50 fn into_async_read(self) -> Result<InputStreamAsyncRead<Self>, Self> 51 where 52 Self: IsA<PollableInputStream>, 53 { 54 if self.can_poll() { 55 Ok(InputStreamAsyncRead(self)) 56 } else { 57 Err(self) 58 } 59 } 60 } 61 62 impl<O: IsA<PollableInputStream>> PollableInputStreamExtManual for O { create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>,63 fn create_source<F, C>( 64 &self, 65 cancellable: Option<&C>, 66 name: Option<&str>, 67 priority: glib::Priority, 68 func: F, 69 ) -> glib::Source 70 where 71 F: FnMut(&Self) -> glib::Continue + 'static, 72 C: IsA<Cancellable>, 73 { 74 unsafe extern "C" fn trampoline< 75 O: IsA<PollableInputStream>, 76 F: FnMut(&O) -> glib::Continue + 'static, 77 >( 78 stream: *mut ffi::GPollableInputStream, 79 func: glib::ffi::gpointer, 80 ) -> glib::ffi::gboolean { 81 let func: &RefCell<F> = &*(func as *const RefCell<F>); 82 let mut func = func.borrow_mut(); 83 (&mut *func)(PollableInputStream::from_glib_borrow(stream).unsafe_cast_ref()) 84 .into_glib() 85 } 86 unsafe extern "C" fn destroy_closure<O, F>(ptr: glib::ffi::gpointer) { 87 Box::<RefCell<F>>::from_raw(ptr as *mut _); 88 } 89 let cancellable = cancellable.map(|c| c.as_ref()); 90 let gcancellable = cancellable.to_glib_none(); 91 unsafe { 92 let source = ffi::g_pollable_input_stream_create_source( 93 self.as_ref().to_glib_none().0, 94 gcancellable.0, 95 ); 96 97 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer; 98 glib::ffi::g_source_set_callback( 99 source, 100 Some(transmute::< 101 _, 102 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean, 103 >(trampoline)), 104 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer, 105 Some(destroy_closure::<Self, F>), 106 ); 107 glib::ffi::g_source_set_priority(source, priority.into_glib()); 108 109 if let Some(name) = name { 110 glib::ffi::g_source_set_name(source, name.to_glib_none().0); 111 } 112 113 from_glib_full(source) 114 } 115 } 116 read_nonblocking<C: IsA<Cancellable>>( &self, buffer: &mut [u8], cancellable: Option<&C>, ) -> Result<isize, glib::Error>117 fn read_nonblocking<C: IsA<Cancellable>>( 118 &self, 119 buffer: &mut [u8], 120 cancellable: Option<&C>, 121 ) -> Result<isize, glib::Error> { 122 let cancellable = cancellable.map(|c| c.as_ref()); 123 let gcancellable = cancellable.to_glib_none(); 124 let count = buffer.len() as usize; 125 unsafe { 126 let mut error = ptr::null_mut(); 127 let ret = ffi::g_pollable_input_stream_read_nonblocking( 128 self.as_ref().to_glib_none().0, 129 buffer.to_glib_none().0, 130 count, 131 gcancellable.0, 132 &mut error, 133 ); 134 if error.is_null() { 135 Ok(ret) 136 } else { 137 Err(from_glib_full(error)) 138 } 139 } 140 } 141 create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>142 fn create_source_future<C: IsA<Cancellable>>( 143 &self, 144 cancellable: Option<&C>, 145 priority: glib::Priority, 146 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> { 147 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned(); 148 149 let obj = self.clone(); 150 Box::pin(glib::SourceFuture::new(move |send| { 151 let mut send = Some(send); 152 obj.create_source(cancellable.as_ref(), None, priority, move |_| { 153 let _ = send.take().unwrap().send(()); 154 glib::Continue(false) 155 }) 156 })) 157 } 158 create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>159 fn create_source_stream<C: IsA<Cancellable>>( 160 &self, 161 cancellable: Option<&C>, 162 priority: glib::Priority, 163 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> { 164 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned(); 165 166 let obj = self.clone(); 167 Box::pin(glib::SourceStream::new(move |send| { 168 obj.create_source(cancellable.as_ref(), None, priority, move |_| { 169 if send.unbounded_send(()).is_err() { 170 glib::Continue(false) 171 } else { 172 glib::Continue(true) 173 } 174 }) 175 })) 176 } 177 } 178 179 #[derive(Debug)] 180 pub struct InputStreamAsyncRead<T: IsA<PollableInputStream>>(T); 181 182 impl<T: IsA<PollableInputStream>> InputStreamAsyncRead<T> { into_input_stream(self) -> T183 pub fn into_input_stream(self) -> T { 184 self.0 185 } 186 input_stream(&self) -> &T187 pub fn input_stream(&self) -> &T { 188 &self.0 189 } 190 } 191 192 impl<T: IsA<PollableInputStream>> AsyncRead for InputStreamAsyncRead<T> { poll_read( self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8], ) -> Poll<io::Result<usize>>193 fn poll_read( 194 self: Pin<&mut Self>, 195 cx: &mut Context, 196 buf: &mut [u8], 197 ) -> Poll<io::Result<usize>> { 198 let stream = Pin::get_ref(self.as_ref()); 199 let gio_result = stream 200 .0 201 .as_ref() 202 .read_nonblocking(buf, crate::NONE_CANCELLABLE); 203 204 match gio_result { 205 Ok(size) => Poll::Ready(Ok(size as usize)), 206 Err(err) => { 207 let kind = err.kind::<crate::IOErrorEnum>().unwrap(); 208 if kind == crate::IOErrorEnum::WouldBlock { 209 let mut waker = Some(cx.waker().clone()); 210 let source = stream.0.as_ref().create_source( 211 crate::NONE_CANCELLABLE, 212 None, 213 glib::PRIORITY_DEFAULT, 214 move |_| { 215 if let Some(waker) = waker.take() { 216 waker.wake(); 217 } 218 glib::Continue(false) 219 }, 220 ); 221 let main_context = glib::MainContext::ref_thread_default(); 222 source.attach(Some(&main_context)); 223 224 Poll::Pending 225 } else { 226 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err))) 227 } 228 } 229 } 230 } 231 } 232