1 // Take a look at the license at the top of the repository in the LICENSE file. 2 3 use crate::error::to_std_io_result; 4 use crate::prelude::*; 5 use crate::Cancellable; 6 use crate::PollableOutputStream; 7 use futures_channel::oneshot; 8 use futures_core::task::{Context, Poll}; 9 use futures_core::Future; 10 use futures_io::AsyncWrite; 11 use glib::object::{Cast, IsA}; 12 use glib::translate::*; 13 use std::cell::RefCell; 14 use std::io; 15 use std::mem::transmute; 16 use std::pin::Pin; 17 18 use futures_core::stream::Stream; 19 20 pub trait PollableOutputStreamExtManual { 21 #[doc(alias = "g_pollable_output_stream_create_source")] create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>22 fn create_source<F, C>( 23 &self, 24 cancellable: Option<&C>, 25 name: Option<&str>, 26 priority: glib::Priority, 27 func: F, 28 ) -> glib::Source 29 where 30 F: FnMut(&Self) -> glib::Continue + 'static, 31 C: IsA<Cancellable>; 32 create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>33 fn create_source_future<C: IsA<Cancellable>>( 34 &self, 35 cancellable: Option<&C>, 36 priority: glib::Priority, 37 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>; 38 create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>39 fn create_source_stream<C: IsA<Cancellable>>( 40 &self, 41 cancellable: Option<&C>, 42 priority: glib::Priority, 43 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>; 44 into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self> where Self: IsA<PollableOutputStream>,45 fn into_async_write(self) -> Result<OutputStreamAsyncWrite<Self>, Self> 46 where 47 Self: IsA<PollableOutputStream>, 48 { 49 if self.can_poll() { 50 Ok(OutputStreamAsyncWrite(self, None)) 51 } else { 52 Err(self) 53 } 54 } 55 } 56 57 impl<O: IsA<PollableOutputStream>> PollableOutputStreamExtManual for O { create_source<F, C>( &self, cancellable: Option<&C>, name: Option<&str>, priority: glib::Priority, func: F, ) -> glib::Source where F: FnMut(&Self) -> glib::Continue + 'static, C: IsA<Cancellable>,58 fn create_source<F, C>( 59 &self, 60 cancellable: Option<&C>, 61 name: Option<&str>, 62 priority: glib::Priority, 63 func: F, 64 ) -> glib::Source 65 where 66 F: FnMut(&Self) -> glib::Continue + 'static, 67 C: IsA<Cancellable>, 68 { 69 unsafe extern "C" fn trampoline< 70 O: IsA<PollableOutputStream>, 71 F: FnMut(&O) -> glib::Continue + 'static, 72 >( 73 stream: *mut ffi::GPollableOutputStream, 74 func: glib::ffi::gpointer, 75 ) -> glib::ffi::gboolean { 76 let func: &RefCell<F> = &*(func as *const RefCell<F>); 77 let mut func = func.borrow_mut(); 78 (&mut *func)(PollableOutputStream::from_glib_borrow(stream).unsafe_cast_ref()) 79 .into_glib() 80 } 81 unsafe extern "C" fn destroy_closure<O, F>(ptr: glib::ffi::gpointer) { 82 Box::<RefCell<F>>::from_raw(ptr as *mut _); 83 } 84 let cancellable = cancellable.map(|c| c.as_ref()); 85 let gcancellable = cancellable.to_glib_none(); 86 unsafe { 87 let source = ffi::g_pollable_output_stream_create_source( 88 self.as_ref().to_glib_none().0, 89 gcancellable.0, 90 ); 91 92 let trampoline = trampoline::<Self, F> as glib::ffi::gpointer; 93 glib::ffi::g_source_set_callback( 94 source, 95 Some(transmute::< 96 _, 97 unsafe extern "C" fn(glib::ffi::gpointer) -> glib::ffi::gboolean, 98 >(trampoline)), 99 Box::into_raw(Box::new(RefCell::new(func))) as glib::ffi::gpointer, 100 Some(destroy_closure::<Self, F>), 101 ); 102 glib::ffi::g_source_set_priority(source, priority.into_glib()); 103 104 if let Some(name) = name { 105 glib::ffi::g_source_set_name(source, name.to_glib_none().0); 106 } 107 108 from_glib_full(source) 109 } 110 } 111 create_source_future<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>>112 fn create_source_future<C: IsA<Cancellable>>( 113 &self, 114 cancellable: Option<&C>, 115 priority: glib::Priority, 116 ) -> Pin<Box<dyn std::future::Future<Output = ()> + 'static>> { 117 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned(); 118 119 let obj = self.clone(); 120 Box::pin(glib::SourceFuture::new(move |send| { 121 let mut send = Some(send); 122 obj.create_source(cancellable.as_ref(), None, priority, move |_| { 123 let _ = send.take().unwrap().send(()); 124 glib::Continue(false) 125 }) 126 })) 127 } 128 create_source_stream<C: IsA<Cancellable>>( &self, cancellable: Option<&C>, priority: glib::Priority, ) -> Pin<Box<dyn Stream<Item = ()> + 'static>>129 fn create_source_stream<C: IsA<Cancellable>>( 130 &self, 131 cancellable: Option<&C>, 132 priority: glib::Priority, 133 ) -> Pin<Box<dyn Stream<Item = ()> + 'static>> { 134 let cancellable: Option<Cancellable> = cancellable.map(|c| c.as_ref()).cloned(); 135 136 let obj = self.clone(); 137 Box::pin(glib::SourceStream::new(move |send| { 138 let send = Some(send); 139 obj.create_source(cancellable.as_ref(), None, priority, move |_| { 140 if send.as_ref().unwrap().unbounded_send(()).is_err() { 141 glib::Continue(false) 142 } else { 143 glib::Continue(true) 144 } 145 }) 146 })) 147 } 148 } 149 150 #[derive(Debug)] 151 pub struct OutputStreamAsyncWrite<T: IsA<PollableOutputStream>>( 152 T, 153 Option<oneshot::Receiver<Result<(), glib::Error>>>, 154 ); 155 156 impl<T: IsA<PollableOutputStream>> OutputStreamAsyncWrite<T> { into_output_stream(self) -> T157 pub fn into_output_stream(self) -> T { 158 self.0 159 } 160 output_stream(&self) -> &T161 pub fn output_stream(&self) -> &T { 162 &self.0 163 } 164 } 165 166 impl<T: IsA<PollableOutputStream>> AsyncWrite for OutputStreamAsyncWrite<T> { poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>>167 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> { 168 let stream = Pin::get_ref(self.as_ref()); 169 let gio_result = stream 170 .0 171 .as_ref() 172 .write_nonblocking(buf, crate::NONE_CANCELLABLE); 173 174 match gio_result { 175 Ok(size) => Poll::Ready(Ok(size as usize)), 176 Err(err) => { 177 let kind = err.kind::<crate::IOErrorEnum>().unwrap(); 178 if kind == crate::IOErrorEnum::WouldBlock { 179 let mut waker = Some(cx.waker().clone()); 180 let source = stream.0.as_ref().create_source( 181 crate::NONE_CANCELLABLE, 182 None, 183 glib::PRIORITY_DEFAULT, 184 move |_| { 185 if let Some(waker) = waker.take() { 186 waker.wake(); 187 } 188 glib::Continue(false) 189 }, 190 ); 191 let main_context = glib::MainContext::ref_thread_default(); 192 source.attach(Some(&main_context)); 193 194 Poll::Pending 195 } else { 196 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err))) 197 } 198 } 199 } 200 } 201 poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>202 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { 203 let stream = unsafe { Pin::get_unchecked_mut(self) }; 204 205 let rx = if let Some(ref mut rx) = stream.1 { 206 rx 207 } else { 208 let (tx, rx) = oneshot::channel(); 209 stream.0.as_ref().flush_async( 210 glib::PRIORITY_DEFAULT, 211 crate::NONE_CANCELLABLE, 212 move |res| { 213 let _ = tx.send(res); 214 }, 215 ); 216 217 stream.1 = Some(rx); 218 stream.1.as_mut().unwrap() 219 }; 220 221 match Pin::new(rx).poll(cx) { 222 Poll::Ready(Ok(res)) => { 223 let _ = stream.1.take(); 224 Poll::Ready(to_std_io_result(res)) 225 } 226 Poll::Ready(Err(_)) => { 227 let _ = stream.1.take(); 228 Poll::Ready(Ok(())) 229 } 230 Poll::Pending => Poll::Pending, 231 } 232 } 233 poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>234 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { 235 let stream = unsafe { Pin::get_unchecked_mut(self) }; 236 237 let rx = if let Some(ref mut rx) = stream.1 { 238 rx 239 } else { 240 let (tx, rx) = oneshot::channel(); 241 stream.0.as_ref().close_async( 242 glib::PRIORITY_DEFAULT, 243 crate::NONE_CANCELLABLE, 244 move |res| { 245 let _ = tx.send(res); 246 }, 247 ); 248 249 stream.1 = Some(rx); 250 stream.1.as_mut().unwrap() 251 }; 252 253 match Pin::new(rx).poll(cx) { 254 Poll::Ready(Ok(res)) => { 255 let _ = stream.1.take(); 256 Poll::Ready(to_std_io_result(res)) 257 } 258 Poll::Ready(Err(_)) => { 259 let _ = stream.1.take(); 260 Poll::Ready(Ok(())) 261 } 262 Poll::Pending => Poll::Pending, 263 } 264 } 265 } 266