1 // Take a look at the license at the top of the repository in the LICENSE file. 2 3 use crate::error::to_std_io_result; 4 use crate::prelude::*; 5 use crate::Cancellable; 6 use crate::OutputStream; 7 use crate::Seekable; 8 use glib::object::IsA; 9 use glib::translate::*; 10 use glib::Priority; 11 use std::io; 12 use std::mem; 13 use std::pin::Pin; 14 use std::ptr; 15 16 pub trait OutputStreamExtManual: Sized + OutputStreamExt { 17 #[doc(alias = "g_output_stream_write_async")] write_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )18 fn write_async< 19 B: AsRef<[u8]> + Send + 'static, 20 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, 21 C: IsA<Cancellable>, 22 >( 23 &self, 24 buffer: B, 25 io_priority: Priority, 26 cancellable: Option<&C>, 27 callback: Q, 28 ); 29 30 #[doc(alias = "g_output_stream_write_all")] write_all<C: IsA<Cancellable>>( &self, buffer: &[u8], cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>31 fn write_all<C: IsA<Cancellable>>( 32 &self, 33 buffer: &[u8], 34 cancellable: Option<&C>, 35 ) -> Result<(usize, Option<glib::Error>), glib::Error>; 36 37 #[doc(alias = "g_output_stream_write_all_async")] write_all_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )38 fn write_all_async< 39 B: AsRef<[u8]> + Send + 'static, 40 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, 41 C: IsA<Cancellable>, 42 >( 43 &self, 44 buffer: B, 45 io_priority: Priority, 46 cancellable: Option<&C>, 47 callback: Q, 48 ); 49 write_async_future<B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>50 fn write_async_future<B: AsRef<[u8]> + Send + 'static>( 51 &self, 52 buffer: B, 53 io_priority: Priority, 54 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>; 55 write_all_async_future<B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >56 fn write_all_async_future<B: AsRef<[u8]> + Send + 'static>( 57 &self, 58 buffer: B, 59 io_priority: Priority, 60 ) -> Pin< 61 Box< 62 dyn std::future::Future< 63 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, 64 > + 'static, 65 >, 66 >; 67 into_write(self) -> OutputStreamWrite<Self> where Self: IsA<OutputStream>,68 fn into_write(self) -> OutputStreamWrite<Self> 69 where 70 Self: IsA<OutputStream>, 71 { 72 OutputStreamWrite(self) 73 } 74 } 75 76 impl<O: IsA<OutputStream>> OutputStreamExtManual for O { write_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )77 fn write_async< 78 B: AsRef<[u8]> + Send + 'static, 79 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, 80 C: IsA<Cancellable>, 81 >( 82 &self, 83 buffer: B, 84 io_priority: Priority, 85 cancellable: Option<&C>, 86 callback: Q, 87 ) { 88 let cancellable = cancellable.map(|c| c.as_ref()); 89 let gcancellable = cancellable.to_glib_none(); 90 let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer))); 91 // Need to do this after boxing as the contents pointer might change by moving into the box 92 let (count, buffer_ptr) = { 93 let buffer = &(*user_data).as_ref().unwrap().1; 94 let slice = buffer.as_ref(); 95 (slice.len(), slice.as_ptr()) 96 }; 97 unsafe extern "C" fn write_async_trampoline< 98 B: AsRef<[u8]> + Send + 'static, 99 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, 100 >( 101 _source_object: *mut glib::gobject_ffi::GObject, 102 res: *mut ffi::GAsyncResult, 103 user_data: glib::ffi::gpointer, 104 ) { 105 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _); 106 let (callback, buffer) = user_data.take().unwrap(); 107 108 let mut error = ptr::null_mut(); 109 let ret = ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error); 110 let result = if error.is_null() { 111 Ok((buffer, ret as usize)) 112 } else { 113 Err((buffer, from_glib_full(error))) 114 }; 115 callback(result); 116 } 117 let callback = write_async_trampoline::<B, Q>; 118 unsafe { 119 ffi::g_output_stream_write_async( 120 self.as_ref().to_glib_none().0, 121 mut_override(buffer_ptr), 122 count, 123 io_priority.into_glib(), 124 gcancellable.0, 125 Some(callback), 126 Box::into_raw(user_data) as *mut _, 127 ); 128 } 129 } 130 write_all<C: IsA<Cancellable>>( &self, buffer: &[u8], cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>131 fn write_all<C: IsA<Cancellable>>( 132 &self, 133 buffer: &[u8], 134 cancellable: Option<&C>, 135 ) -> Result<(usize, Option<glib::Error>), glib::Error> { 136 let cancellable = cancellable.map(|c| c.as_ref()); 137 let gcancellable = cancellable.to_glib_none(); 138 let count = buffer.len() as usize; 139 unsafe { 140 let mut bytes_written = mem::MaybeUninit::uninit(); 141 let mut error = ptr::null_mut(); 142 let _ = ffi::g_output_stream_write_all( 143 self.as_ref().to_glib_none().0, 144 buffer.to_glib_none().0, 145 count, 146 bytes_written.as_mut_ptr(), 147 gcancellable.0, 148 &mut error, 149 ); 150 151 let bytes_written = bytes_written.assume_init(); 152 if error.is_null() { 153 Ok((bytes_written, None)) 154 } else if bytes_written != 0 { 155 Ok((bytes_written, Some(from_glib_full(error)))) 156 } else { 157 Err(from_glib_full(error)) 158 } 159 } 160 } 161 write_all_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )162 fn write_all_async< 163 B: AsRef<[u8]> + Send + 'static, 164 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, 165 C: IsA<Cancellable>, 166 >( 167 &self, 168 buffer: B, 169 io_priority: Priority, 170 cancellable: Option<&C>, 171 callback: Q, 172 ) { 173 let cancellable = cancellable.map(|c| c.as_ref()); 174 let gcancellable = cancellable.to_glib_none(); 175 let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer))); 176 // Need to do this after boxing as the contents pointer might change by moving into the box 177 let (count, buffer_ptr) = { 178 let buffer = &(*user_data).as_ref().unwrap().1; 179 let slice = buffer.as_ref(); 180 (slice.len(), slice.as_ptr()) 181 }; 182 unsafe extern "C" fn write_all_async_trampoline< 183 B: AsRef<[u8]> + Send + 'static, 184 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, 185 >( 186 _source_object: *mut glib::gobject_ffi::GObject, 187 res: *mut ffi::GAsyncResult, 188 user_data: glib::ffi::gpointer, 189 ) { 190 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _); 191 let (callback, buffer) = user_data.take().unwrap(); 192 193 let mut error = ptr::null_mut(); 194 let mut bytes_written = mem::MaybeUninit::uninit(); 195 let _ = ffi::g_output_stream_write_all_finish( 196 _source_object as *mut _, 197 res, 198 bytes_written.as_mut_ptr(), 199 &mut error, 200 ); 201 let bytes_written = bytes_written.assume_init(); 202 let result = if error.is_null() { 203 Ok((buffer, bytes_written, None)) 204 } else if bytes_written != 0 { 205 Ok((buffer, bytes_written, from_glib_full(error))) 206 } else { 207 Err((buffer, from_glib_full(error))) 208 }; 209 callback(result); 210 } 211 let callback = write_all_async_trampoline::<B, Q>; 212 unsafe { 213 ffi::g_output_stream_write_all_async( 214 self.as_ref().to_glib_none().0, 215 mut_override(buffer_ptr), 216 count, 217 io_priority.into_glib(), 218 gcancellable.0, 219 Some(callback), 220 Box::into_raw(user_data) as *mut _, 221 ); 222 } 223 } 224 write_async_future<'a, B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>225 fn write_async_future<'a, B: AsRef<[u8]> + Send + 'static>( 226 &self, 227 buffer: B, 228 io_priority: Priority, 229 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>> 230 { 231 Box::pin(crate::GioFuture::new( 232 self, 233 move |obj, cancellable, send| { 234 obj.write_async(buffer, io_priority, Some(cancellable), move |res| { 235 send.resolve(res); 236 }); 237 }, 238 )) 239 } 240 write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >241 fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>( 242 &self, 243 buffer: B, 244 io_priority: Priority, 245 ) -> Pin< 246 Box< 247 dyn std::future::Future< 248 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, 249 > + 'static, 250 >, 251 > { 252 Box::pin(crate::GioFuture::new( 253 self, 254 move |obj, cancellable, send| { 255 obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| { 256 send.resolve(res); 257 }); 258 }, 259 )) 260 } 261 } 262 263 #[derive(Debug)] 264 pub struct OutputStreamWrite<T: IsA<OutputStream>>(T); 265 266 impl<T: IsA<OutputStream>> OutputStreamWrite<T> { into_output_stream(self) -> T267 pub fn into_output_stream(self) -> T { 268 self.0 269 } 270 output_stream(&self) -> &T271 pub fn output_stream(&self) -> &T { 272 &self.0 273 } 274 } 275 276 impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> { write(&mut self, buf: &[u8]) -> io::Result<usize>277 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 278 let result = self 279 .0 280 .as_ref() 281 .write(buf, crate::NONE_CANCELLABLE) 282 .map(|size| size as usize); 283 to_std_io_result(result) 284 } 285 flush(&mut self) -> io::Result<()>286 fn flush(&mut self) -> io::Result<()> { 287 let gio_result = self.0.as_ref().flush(crate::NONE_CANCELLABLE); 288 to_std_io_result(gio_result) 289 } 290 } 291 292 impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> { seek(&mut self, pos: io::SeekFrom) -> io::Result<u64>293 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> { 294 let (pos, type_) = match pos { 295 io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set), 296 io::SeekFrom::End(pos) => (pos, glib::SeekType::End), 297 io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur), 298 }; 299 let seekable: &Seekable = self.0.as_ref(); 300 let gio_result = seekable 301 .seek(pos, type_, crate::NONE_CANCELLABLE) 302 .map(|_| seekable.tell() as u64); 303 to_std_io_result(gio_result) 304 } 305 } 306 307 #[cfg(test)] 308 mod tests { 309 use crate::prelude::*; 310 use crate::test_util::run_async; 311 use crate::MemoryInputStream; 312 use crate::MemoryOutputStream; 313 use glib::Bytes; 314 use std::io::Write; 315 316 #[test] splice_async()317 fn splice_async() { 318 let ret = run_async(|tx, l| { 319 let input = MemoryInputStream::new(); 320 let b = Bytes::from_owned(vec![1, 2, 3]); 321 input.add_bytes(&b); 322 323 let strm = MemoryOutputStream::new_resizable(); 324 strm.splice_async( 325 &input, 326 crate::OutputStreamSpliceFlags::CLOSE_SOURCE, 327 glib::PRIORITY_DEFAULT_IDLE, 328 crate::NONE_CANCELLABLE, 329 move |ret| { 330 tx.send(ret).unwrap(); 331 l.quit(); 332 }, 333 ); 334 }); 335 336 assert_eq!(ret.unwrap(), 3); 337 } 338 339 #[test] write_async()340 fn write_async() { 341 let ret = run_async(|tx, l| { 342 let strm = MemoryOutputStream::new_resizable(); 343 344 let buf = vec![1, 2, 3]; 345 strm.write_async( 346 buf, 347 glib::PRIORITY_DEFAULT_IDLE, 348 crate::NONE_CANCELLABLE, 349 move |ret| { 350 tx.send(ret).unwrap(); 351 l.quit(); 352 }, 353 ); 354 }); 355 356 let (buf, size) = ret.unwrap(); 357 assert_eq!(buf, vec![1, 2, 3]); 358 assert_eq!(size, 3); 359 } 360 361 #[test] write_all_async()362 fn write_all_async() { 363 let ret = run_async(|tx, l| { 364 let strm = MemoryOutputStream::new_resizable(); 365 366 let buf = vec![1, 2, 3]; 367 strm.write_all_async( 368 buf, 369 glib::PRIORITY_DEFAULT_IDLE, 370 crate::NONE_CANCELLABLE, 371 move |ret| { 372 tx.send(ret).unwrap(); 373 l.quit(); 374 }, 375 ); 376 }); 377 378 let (buf, size, err) = ret.unwrap(); 379 assert_eq!(buf, vec![1, 2, 3]); 380 assert_eq!(size, 3); 381 assert!(err.is_none()); 382 } 383 384 #[test] write_bytes_async()385 fn write_bytes_async() { 386 let ret = run_async(|tx, l| { 387 let strm = MemoryOutputStream::new_resizable(); 388 389 let b = Bytes::from_owned(vec![1, 2, 3]); 390 strm.write_bytes_async( 391 &b, 392 glib::PRIORITY_DEFAULT_IDLE, 393 crate::NONE_CANCELLABLE, 394 move |ret| { 395 tx.send(ret).unwrap(); 396 l.quit(); 397 }, 398 ); 399 }); 400 401 assert_eq!(ret.unwrap(), 3); 402 } 403 404 #[test] std_io_write()405 fn std_io_write() { 406 let b = Bytes::from_owned(vec![1, 2, 3]); 407 let mut write = MemoryOutputStream::new_resizable().into_write(); 408 409 let ret = write.write(&b); 410 411 let stream = write.into_output_stream(); 412 stream.close(crate::NONE_CANCELLABLE).unwrap(); 413 assert_eq!(ret.unwrap(), 3); 414 assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref()); 415 } 416 417 #[test] into_output_stream()418 fn into_output_stream() { 419 let stream = MemoryOutputStream::new_resizable(); 420 let stream_clone = stream.clone(); 421 let stream = stream.into_write().into_output_stream(); 422 423 assert_eq!(stream, stream_clone); 424 } 425 } 426