1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use crate::error::to_std_io_result;
4 use crate::prelude::*;
5 use crate::Cancellable;
6 use crate::OutputStream;
7 use crate::Seekable;
8 use glib::object::IsA;
9 use glib::translate::*;
10 use glib::Priority;
11 use std::io;
12 use std::mem;
13 use std::pin::Pin;
14 use std::ptr;
15 
16 pub trait OutputStreamExtManual: Sized + OutputStreamExt {
17     #[doc(alias = "g_output_stream_write_async")]
write_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )18     fn write_async<
19         B: AsRef<[u8]> + Send + 'static,
20         Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static,
21         C: IsA<Cancellable>,
22     >(
23         &self,
24         buffer: B,
25         io_priority: Priority,
26         cancellable: Option<&C>,
27         callback: Q,
28     );
29 
30     #[doc(alias = "g_output_stream_write_all")]
write_all<C: IsA<Cancellable>>( &self, buffer: &[u8], cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>31     fn write_all<C: IsA<Cancellable>>(
32         &self,
33         buffer: &[u8],
34         cancellable: Option<&C>,
35     ) -> Result<(usize, Option<glib::Error>), glib::Error>;
36 
37     #[doc(alias = "g_output_stream_write_all_async")]
write_all_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )38     fn write_all_async<
39         B: AsRef<[u8]> + Send + 'static,
40         Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static,
41         C: IsA<Cancellable>,
42     >(
43         &self,
44         buffer: B,
45         io_priority: Priority,
46         cancellable: Option<&C>,
47         callback: Q,
48     );
49 
write_async_future<B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>50     fn write_async_future<B: AsRef<[u8]> + Send + 'static>(
51         &self,
52         buffer: B,
53         io_priority: Priority,
54     ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>;
55 
write_all_async_future<B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >56     fn write_all_async_future<B: AsRef<[u8]> + Send + 'static>(
57         &self,
58         buffer: B,
59         io_priority: Priority,
60     ) -> Pin<
61         Box<
62             dyn std::future::Future<
63                     Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
64                 > + 'static,
65         >,
66     >;
67 
into_write(self) -> OutputStreamWrite<Self> where Self: IsA<OutputStream>,68     fn into_write(self) -> OutputStreamWrite<Self>
69     where
70         Self: IsA<OutputStream>,
71     {
72         OutputStreamWrite(self)
73     }
74 }
75 
76 impl<O: IsA<OutputStream>> OutputStreamExtManual for O {
write_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )77     fn write_async<
78         B: AsRef<[u8]> + Send + 'static,
79         Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static,
80         C: IsA<Cancellable>,
81     >(
82         &self,
83         buffer: B,
84         io_priority: Priority,
85         cancellable: Option<&C>,
86         callback: Q,
87     ) {
88         let cancellable = cancellable.map(|c| c.as_ref());
89         let gcancellable = cancellable.to_glib_none();
90         let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
91         // Need to do this after boxing as the contents pointer might change by moving into the box
92         let (count, buffer_ptr) = {
93             let buffer = &(*user_data).as_ref().unwrap().1;
94             let slice = buffer.as_ref();
95             (slice.len(), slice.as_ptr())
96         };
97         unsafe extern "C" fn write_async_trampoline<
98             B: AsRef<[u8]> + Send + 'static,
99             Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static,
100         >(
101             _source_object: *mut glib::gobject_ffi::GObject,
102             res: *mut ffi::GAsyncResult,
103             user_data: glib::ffi::gpointer,
104         ) {
105             let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
106             let (callback, buffer) = user_data.take().unwrap();
107 
108             let mut error = ptr::null_mut();
109             let ret = ffi::g_output_stream_write_finish(_source_object as *mut _, res, &mut error);
110             let result = if error.is_null() {
111                 Ok((buffer, ret as usize))
112             } else {
113                 Err((buffer, from_glib_full(error)))
114             };
115             callback(result);
116         }
117         let callback = write_async_trampoline::<B, Q>;
118         unsafe {
119             ffi::g_output_stream_write_async(
120                 self.as_ref().to_glib_none().0,
121                 mut_override(buffer_ptr),
122                 count,
123                 io_priority.into_glib(),
124                 gcancellable.0,
125                 Some(callback),
126                 Box::into_raw(user_data) as *mut _,
127             );
128         }
129     }
130 
write_all<C: IsA<Cancellable>>( &self, buffer: &[u8], cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>131     fn write_all<C: IsA<Cancellable>>(
132         &self,
133         buffer: &[u8],
134         cancellable: Option<&C>,
135     ) -> Result<(usize, Option<glib::Error>), glib::Error> {
136         let cancellable = cancellable.map(|c| c.as_ref());
137         let gcancellable = cancellable.to_glib_none();
138         let count = buffer.len() as usize;
139         unsafe {
140             let mut bytes_written = mem::MaybeUninit::uninit();
141             let mut error = ptr::null_mut();
142             let _ = ffi::g_output_stream_write_all(
143                 self.as_ref().to_glib_none().0,
144                 buffer.to_glib_none().0,
145                 count,
146                 bytes_written.as_mut_ptr(),
147                 gcancellable.0,
148                 &mut error,
149             );
150 
151             let bytes_written = bytes_written.assume_init();
152             if error.is_null() {
153                 Ok((bytes_written, None))
154             } else if bytes_written != 0 {
155                 Ok((bytes_written, Some(from_glib_full(error))))
156             } else {
157                 Err(from_glib_full(error))
158             }
159         }
160     }
161 
write_all_async< B: AsRef<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )162     fn write_all_async<
163         B: AsRef<[u8]> + Send + 'static,
164         Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static,
165         C: IsA<Cancellable>,
166     >(
167         &self,
168         buffer: B,
169         io_priority: Priority,
170         cancellable: Option<&C>,
171         callback: Q,
172     ) {
173         let cancellable = cancellable.map(|c| c.as_ref());
174         let gcancellable = cancellable.to_glib_none();
175         let user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
176         // Need to do this after boxing as the contents pointer might change by moving into the box
177         let (count, buffer_ptr) = {
178             let buffer = &(*user_data).as_ref().unwrap().1;
179             let slice = buffer.as_ref();
180             (slice.len(), slice.as_ptr())
181         };
182         unsafe extern "C" fn write_all_async_trampoline<
183             B: AsRef<[u8]> + Send + 'static,
184             Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static,
185         >(
186             _source_object: *mut glib::gobject_ffi::GObject,
187             res: *mut ffi::GAsyncResult,
188             user_data: glib::ffi::gpointer,
189         ) {
190             let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
191             let (callback, buffer) = user_data.take().unwrap();
192 
193             let mut error = ptr::null_mut();
194             let mut bytes_written = mem::MaybeUninit::uninit();
195             let _ = ffi::g_output_stream_write_all_finish(
196                 _source_object as *mut _,
197                 res,
198                 bytes_written.as_mut_ptr(),
199                 &mut error,
200             );
201             let bytes_written = bytes_written.assume_init();
202             let result = if error.is_null() {
203                 Ok((buffer, bytes_written, None))
204             } else if bytes_written != 0 {
205                 Ok((buffer, bytes_written, from_glib_full(error)))
206             } else {
207                 Err((buffer, from_glib_full(error)))
208             };
209             callback(result);
210         }
211         let callback = write_all_async_trampoline::<B, Q>;
212         unsafe {
213             ffi::g_output_stream_write_all_async(
214                 self.as_ref().to_glib_none().0,
215                 mut_override(buffer_ptr),
216                 count,
217                 io_priority.into_glib(),
218                 gcancellable.0,
219                 Some(callback),
220                 Box::into_raw(user_data) as *mut _,
221             );
222         }
223     }
224 
write_async_future<'a, B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>225     fn write_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
226         &self,
227         buffer: B,
228         io_priority: Priority,
229     ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
230     {
231         Box::pin(crate::GioFuture::new(
232             self,
233             move |obj, cancellable, send| {
234                 obj.write_async(buffer, io_priority, Some(cancellable), move |res| {
235                     send.resolve(res);
236                 });
237             },
238         ))
239     }
240 
write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >241     fn write_all_async_future<'a, B: AsRef<[u8]> + Send + 'static>(
242         &self,
243         buffer: B,
244         io_priority: Priority,
245     ) -> Pin<
246         Box<
247             dyn std::future::Future<
248                     Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
249                 > + 'static,
250         >,
251     > {
252         Box::pin(crate::GioFuture::new(
253             self,
254             move |obj, cancellable, send| {
255                 obj.write_all_async(buffer, io_priority, Some(cancellable), move |res| {
256                     send.resolve(res);
257                 });
258             },
259         ))
260     }
261 }
262 
263 #[derive(Debug)]
264 pub struct OutputStreamWrite<T: IsA<OutputStream>>(T);
265 
266 impl<T: IsA<OutputStream>> OutputStreamWrite<T> {
into_output_stream(self) -> T267     pub fn into_output_stream(self) -> T {
268         self.0
269     }
270 
output_stream(&self) -> &T271     pub fn output_stream(&self) -> &T {
272         &self.0
273     }
274 }
275 
276 impl<T: IsA<OutputStream>> io::Write for OutputStreamWrite<T> {
write(&mut self, buf: &[u8]) -> io::Result<usize>277     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
278         let result = self
279             .0
280             .as_ref()
281             .write(buf, crate::NONE_CANCELLABLE)
282             .map(|size| size as usize);
283         to_std_io_result(result)
284     }
285 
flush(&mut self) -> io::Result<()>286     fn flush(&mut self) -> io::Result<()> {
287         let gio_result = self.0.as_ref().flush(crate::NONE_CANCELLABLE);
288         to_std_io_result(gio_result)
289     }
290 }
291 
292 impl<T: IsA<OutputStream> + IsA<Seekable>> io::Seek for OutputStreamWrite<T> {
seek(&mut self, pos: io::SeekFrom) -> io::Result<u64>293     fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
294         let (pos, type_) = match pos {
295             io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
296             io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
297             io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
298         };
299         let seekable: &Seekable = self.0.as_ref();
300         let gio_result = seekable
301             .seek(pos, type_, crate::NONE_CANCELLABLE)
302             .map(|_| seekable.tell() as u64);
303         to_std_io_result(gio_result)
304     }
305 }
306 
307 #[cfg(test)]
308 mod tests {
309     use crate::prelude::*;
310     use crate::test_util::run_async;
311     use crate::MemoryInputStream;
312     use crate::MemoryOutputStream;
313     use glib::Bytes;
314     use std::io::Write;
315 
316     #[test]
splice_async()317     fn splice_async() {
318         let ret = run_async(|tx, l| {
319             let input = MemoryInputStream::new();
320             let b = Bytes::from_owned(vec![1, 2, 3]);
321             input.add_bytes(&b);
322 
323             let strm = MemoryOutputStream::new_resizable();
324             strm.splice_async(
325                 &input,
326                 crate::OutputStreamSpliceFlags::CLOSE_SOURCE,
327                 glib::PRIORITY_DEFAULT_IDLE,
328                 crate::NONE_CANCELLABLE,
329                 move |ret| {
330                     tx.send(ret).unwrap();
331                     l.quit();
332                 },
333             );
334         });
335 
336         assert_eq!(ret.unwrap(), 3);
337     }
338 
339     #[test]
write_async()340     fn write_async() {
341         let ret = run_async(|tx, l| {
342             let strm = MemoryOutputStream::new_resizable();
343 
344             let buf = vec![1, 2, 3];
345             strm.write_async(
346                 buf,
347                 glib::PRIORITY_DEFAULT_IDLE,
348                 crate::NONE_CANCELLABLE,
349                 move |ret| {
350                     tx.send(ret).unwrap();
351                     l.quit();
352                 },
353             );
354         });
355 
356         let (buf, size) = ret.unwrap();
357         assert_eq!(buf, vec![1, 2, 3]);
358         assert_eq!(size, 3);
359     }
360 
361     #[test]
write_all_async()362     fn write_all_async() {
363         let ret = run_async(|tx, l| {
364             let strm = MemoryOutputStream::new_resizable();
365 
366             let buf = vec![1, 2, 3];
367             strm.write_all_async(
368                 buf,
369                 glib::PRIORITY_DEFAULT_IDLE,
370                 crate::NONE_CANCELLABLE,
371                 move |ret| {
372                     tx.send(ret).unwrap();
373                     l.quit();
374                 },
375             );
376         });
377 
378         let (buf, size, err) = ret.unwrap();
379         assert_eq!(buf, vec![1, 2, 3]);
380         assert_eq!(size, 3);
381         assert!(err.is_none());
382     }
383 
384     #[test]
write_bytes_async()385     fn write_bytes_async() {
386         let ret = run_async(|tx, l| {
387             let strm = MemoryOutputStream::new_resizable();
388 
389             let b = Bytes::from_owned(vec![1, 2, 3]);
390             strm.write_bytes_async(
391                 &b,
392                 glib::PRIORITY_DEFAULT_IDLE,
393                 crate::NONE_CANCELLABLE,
394                 move |ret| {
395                     tx.send(ret).unwrap();
396                     l.quit();
397                 },
398             );
399         });
400 
401         assert_eq!(ret.unwrap(), 3);
402     }
403 
404     #[test]
std_io_write()405     fn std_io_write() {
406         let b = Bytes::from_owned(vec![1, 2, 3]);
407         let mut write = MemoryOutputStream::new_resizable().into_write();
408 
409         let ret = write.write(&b);
410 
411         let stream = write.into_output_stream();
412         stream.close(crate::NONE_CANCELLABLE).unwrap();
413         assert_eq!(ret.unwrap(), 3);
414         assert_eq!(stream.steal_as_bytes(), [1, 2, 3].as_ref());
415     }
416 
417     #[test]
into_output_stream()418     fn into_output_stream() {
419         let stream = MemoryOutputStream::new_resizable();
420         let stream_clone = stream.clone();
421         let stream = stream.into_write().into_output_stream();
422 
423         assert_eq!(stream, stream_clone);
424     }
425 }
426