1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use crate::error::to_std_io_result;
4 use crate::prelude::*;
5 use crate::Cancellable;
6 use crate::InputStream;
7 use crate::Seekable;
8 use futures_core::task::{Context, Poll};
9 use futures_io::{AsyncBufRead, AsyncRead};
10 use glib::object::IsA;
11 use glib::translate::*;
12 use glib::Priority;
13 use std::future::Future;
14 use std::io;
15 use std::mem;
16 use std::pin::Pin;
17 use std::ptr;
18 
19 pub trait InputStreamExtManual: Sized {
20     #[doc(alias = "g_input_stream_read")]
read<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, buffer: B, cancellable: Option<&C>, ) -> Result<usize, glib::Error>21     fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
22         &self,
23         buffer: B,
24         cancellable: Option<&C>,
25     ) -> Result<usize, glib::Error>;
26 
27     #[doc(alias = "g_input_stream_read_all")]
read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, buffer: B, cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>28     fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
29         &self,
30         buffer: B,
31         cancellable: Option<&C>,
32     ) -> Result<(usize, Option<glib::Error>), glib::Error>;
33 
34     #[doc(alias = "g_input_stream_read_all_async")]
read_all_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )35     fn read_all_async<
36         B: AsMut<[u8]> + Send + 'static,
37         Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static,
38         C: IsA<Cancellable>,
39     >(
40         &self,
41         buffer: B,
42         io_priority: Priority,
43         cancellable: Option<&C>,
44         callback: Q,
45     );
46 
47     #[doc(alias = "g_input_stream_read_async")]
read_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )48     fn read_async<
49         B: AsMut<[u8]> + Send + 'static,
50         Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static,
51         C: IsA<Cancellable>,
52     >(
53         &self,
54         buffer: B,
55         io_priority: Priority,
56         cancellable: Option<&C>,
57         callback: Q,
58     );
59 
read_all_async_future<B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >60     fn read_all_async_future<B: AsMut<[u8]> + Send + 'static>(
61         &self,
62         buffer: B,
63         io_priority: Priority,
64     ) -> Pin<
65         Box<
66             dyn std::future::Future<
67                     Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
68                 > + 'static,
69         >,
70     >;
71 
read_async_future<B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>72     fn read_async_future<B: AsMut<[u8]> + Send + 'static>(
73         &self,
74         buffer: B,
75         io_priority: Priority,
76     ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>;
77 
into_read(self) -> InputStreamRead<Self> where Self: IsA<InputStream>,78     fn into_read(self) -> InputStreamRead<Self>
79     where
80         Self: IsA<InputStream>,
81     {
82         InputStreamRead(self)
83     }
84 
into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self> where Self: IsA<InputStream>,85     fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self>
86     where
87         Self: IsA<InputStream>,
88     {
89         InputStreamAsyncBufRead::new(self, buffer_size)
90     }
91 }
92 
93 impl<O: IsA<InputStream>> InputStreamExtManual for O {
read<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, mut buffer: B, cancellable: Option<&C>, ) -> Result<usize, glib::Error>94     fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>(
95         &self,
96         mut buffer: B,
97         cancellable: Option<&C>,
98     ) -> Result<usize, glib::Error> {
99         let cancellable = cancellable.map(|c| c.as_ref());
100         let gcancellable = cancellable.to_glib_none();
101         let buffer = buffer.as_mut();
102         let buffer_ptr = buffer.as_mut_ptr();
103         let count = buffer.len();
104         unsafe {
105             let mut error = ptr::null_mut();
106             let ret = ffi::g_input_stream_read(
107                 self.as_ref().to_glib_none().0,
108                 buffer_ptr,
109                 count,
110                 gcancellable.0,
111                 &mut error,
112             );
113             if error.is_null() {
114                 Ok(ret as usize)
115             } else {
116                 Err(from_glib_full(error))
117             }
118         }
119     }
120 
read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, mut buffer: B, cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>121     fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>(
122         &self,
123         mut buffer: B,
124         cancellable: Option<&C>,
125     ) -> Result<(usize, Option<glib::Error>), glib::Error> {
126         let cancellable = cancellable.map(|c| c.as_ref());
127         let gcancellable = cancellable.to_glib_none();
128         let buffer = buffer.as_mut();
129         let buffer_ptr = buffer.as_mut_ptr();
130         let count = buffer.len();
131         unsafe {
132             let mut bytes_read = mem::MaybeUninit::uninit();
133             let mut error = ptr::null_mut();
134             let _ = ffi::g_input_stream_read_all(
135                 self.as_ref().to_glib_none().0,
136                 buffer_ptr,
137                 count,
138                 bytes_read.as_mut_ptr(),
139                 gcancellable.0,
140                 &mut error,
141             );
142 
143             let bytes_read = bytes_read.assume_init();
144             if error.is_null() {
145                 Ok((bytes_read, None))
146             } else if bytes_read != 0 {
147                 Ok((bytes_read, Some(from_glib_full(error))))
148             } else {
149                 Err(from_glib_full(error))
150             }
151         }
152     }
153 
read_all_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )154     fn read_all_async<
155         B: AsMut<[u8]> + Send + 'static,
156         Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static,
157         C: IsA<Cancellable>,
158     >(
159         &self,
160         buffer: B,
161         io_priority: Priority,
162         cancellable: Option<&C>,
163         callback: Q,
164     ) {
165         let cancellable = cancellable.map(|c| c.as_ref());
166         let gcancellable = cancellable.to_glib_none();
167         let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
168         // Need to do this after boxing as the contents pointer might change by moving into the box
169         let (count, buffer_ptr) = {
170             let buffer = &mut (*user_data).as_mut().unwrap().1;
171             let slice = (*buffer).as_mut();
172             (slice.len(), slice.as_mut_ptr())
173         };
174         unsafe extern "C" fn read_all_async_trampoline<
175             B: AsMut<[u8]> + Send + 'static,
176             Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static,
177         >(
178             _source_object: *mut glib::gobject_ffi::GObject,
179             res: *mut ffi::GAsyncResult,
180             user_data: glib::ffi::gpointer,
181         ) {
182             let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
183             let (callback, buffer) = user_data.take().unwrap();
184 
185             let mut error = ptr::null_mut();
186             let mut bytes_read = mem::MaybeUninit::uninit();
187             let _ = ffi::g_input_stream_read_all_finish(
188                 _source_object as *mut _,
189                 res,
190                 bytes_read.as_mut_ptr(),
191                 &mut error,
192             );
193 
194             let bytes_read = bytes_read.assume_init();
195             let result = if error.is_null() {
196                 Ok((buffer, bytes_read, None))
197             } else if bytes_read != 0 {
198                 Ok((buffer, bytes_read, Some(from_glib_full(error))))
199             } else {
200                 Err((buffer, from_glib_full(error)))
201             };
202 
203             callback(result);
204         }
205         let callback = read_all_async_trampoline::<B, Q>;
206         unsafe {
207             ffi::g_input_stream_read_all_async(
208                 self.as_ref().to_glib_none().0,
209                 buffer_ptr,
210                 count,
211                 io_priority.into_glib(),
212                 gcancellable.0,
213                 Some(callback),
214                 Box::into_raw(user_data) as *mut _,
215             );
216         }
217     }
218 
read_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )219     fn read_async<
220         B: AsMut<[u8]> + Send + 'static,
221         Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static,
222         C: IsA<Cancellable>,
223     >(
224         &self,
225         buffer: B,
226         io_priority: Priority,
227         cancellable: Option<&C>,
228         callback: Q,
229     ) {
230         let cancellable = cancellable.map(|c| c.as_ref());
231         let gcancellable = cancellable.to_glib_none();
232         let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer)));
233         // Need to do this after boxing as the contents pointer might change by moving into the box
234         let (count, buffer_ptr) = {
235             let buffer = &mut (*user_data).as_mut().unwrap().1;
236             let slice = (*buffer).as_mut();
237             (slice.len(), slice.as_mut_ptr())
238         };
239         unsafe extern "C" fn read_async_trampoline<
240             B: AsMut<[u8]> + Send + 'static,
241             Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static,
242         >(
243             _source_object: *mut glib::gobject_ffi::GObject,
244             res: *mut ffi::GAsyncResult,
245             user_data: glib::ffi::gpointer,
246         ) {
247             let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _);
248             let (callback, buffer) = user_data.take().unwrap();
249 
250             let mut error = ptr::null_mut();
251             let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error);
252 
253             let result = if error.is_null() {
254                 Ok((buffer, ret as usize))
255             } else {
256                 Err((buffer, from_glib_full(error)))
257             };
258 
259             callback(result);
260         }
261         let callback = read_async_trampoline::<B, Q>;
262         unsafe {
263             ffi::g_input_stream_read_async(
264                 self.as_ref().to_glib_none().0,
265                 buffer_ptr,
266                 count,
267                 io_priority.into_glib(),
268                 gcancellable.0,
269                 Some(callback),
270                 Box::into_raw(user_data) as *mut _,
271             );
272         }
273     }
274 
read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >275     fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
276         &self,
277         buffer: B,
278         io_priority: Priority,
279     ) -> Pin<
280         Box<
281             dyn std::future::Future<
282                     Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
283                 > + 'static,
284         >,
285     > {
286         Box::pin(crate::GioFuture::new(
287             self,
288             move |obj, cancellable, send| {
289                 obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| {
290                     send.resolve(res);
291                 });
292             },
293         ))
294     }
295 
read_async_future<'a, B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>296     fn read_async_future<'a, B: AsMut<[u8]> + Send + 'static>(
297         &self,
298         buffer: B,
299         io_priority: Priority,
300     ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>
301     {
302         Box::pin(crate::GioFuture::new(
303             self,
304             move |obj, cancellable, send| {
305                 obj.read_async(buffer, io_priority, Some(cancellable), move |res| {
306                     send.resolve(res);
307                 });
308             },
309         ))
310     }
311 }
312 
313 #[derive(Debug)]
314 pub struct InputStreamRead<T: IsA<InputStream>>(T);
315 
316 impl<T: IsA<InputStream>> InputStreamRead<T> {
into_input_stream(self) -> T317     pub fn into_input_stream(self) -> T {
318         self.0
319     }
320 
input_stream(&self) -> &T321     pub fn input_stream(&self) -> &T {
322         &self.0
323     }
324 }
325 
326 impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>327     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
328         let gio_result = self.0.as_ref().read(buf, crate::NONE_CANCELLABLE);
329         to_std_io_result(gio_result)
330     }
331 }
332 
333 impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> {
seek(&mut self, pos: io::SeekFrom) -> io::Result<u64>334     fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
335         let (pos, type_) = match pos {
336             io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set),
337             io::SeekFrom::End(pos) => (pos, glib::SeekType::End),
338             io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur),
339         };
340         let seekable: &Seekable = self.0.as_ref();
341         let gio_result = seekable
342             .seek(pos, type_, crate::NONE_CANCELLABLE)
343             .map(|_| seekable.tell() as u64);
344         to_std_io_result(gio_result)
345     }
346 }
347 
348 enum State {
349     Waiting {
350         buffer: Vec<u8>,
351     },
352     Transitioning,
353     Reading {
354         pending: Pin<
355             Box<
356                 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
357                     + 'static,
358             >,
359         >,
360     },
361     HasData {
362         buffer: Vec<u8>,
363         valid: (usize, usize), // first index is inclusive, second is exclusive
364     },
365     Failed(crate::IOErrorEnum),
366 }
367 
368 impl State {
into_buffer(self) -> Vec<u8>369     fn into_buffer(self) -> Vec<u8> {
370         match self {
371             State::Waiting { buffer } => buffer,
372             _ => panic!("Invalid state"),
373         }
374     }
375 
376     #[doc(alias = "get_pending")]
pending( &mut self, ) -> &mut Pin< Box< dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> + 'static, >, >377     fn pending(
378         &mut self,
379     ) -> &mut Pin<
380         Box<
381             dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
382                 + 'static,
383         >,
384     > {
385         match self {
386             State::Reading { ref mut pending } => pending,
387             _ => panic!("Invalid state"),
388         }
389     }
390 }
391 pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> {
392     stream: T,
393     state: State,
394 }
395 
396 impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> {
into_input_stream(self) -> T397     pub fn into_input_stream(self) -> T {
398         self.stream
399     }
400 
input_stream(&self) -> &T401     pub fn input_stream(&self) -> &T {
402         &self.stream
403     }
404 
new(stream: T, buffer_size: usize) -> Self405     fn new(stream: T, buffer_size: usize) -> Self {
406         let buffer = vec![0; buffer_size];
407 
408         Self {
409             stream,
410             state: State::Waiting { buffer },
411         }
412     }
set_reading( &mut self, ) -> &mut Pin< Box< dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> + 'static, >, >413     fn set_reading(
414         &mut self,
415     ) -> &mut Pin<
416         Box<
417             dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>>
418                 + 'static,
419         >,
420     > {
421         match self.state {
422             State::Waiting { .. } => {
423                 let waiting = mem::replace(&mut self.state, State::Transitioning);
424                 let buffer = waiting.into_buffer();
425                 let pending = self
426                     .input_stream()
427                     .read_async_future(buffer, Priority::default());
428                 self.state = State::Reading { pending };
429             }
430             State::Reading { .. } => {}
431             _ => panic!("Invalid state"),
432         };
433 
434         self.state.pending()
435     }
436 
437     #[doc(alias = "get_data")]
data(&self) -> Poll<io::Result<&[u8]>>438     fn data(&self) -> Poll<io::Result<&[u8]>> {
439         if let State::HasData {
440             ref buffer,
441             valid: (i, j),
442         } = self.state
443         {
444             return Poll::Ready(Ok(&buffer[i..j]));
445         }
446         panic!("Invalid state")
447     }
448 
set_waiting(&mut self, buffer: Vec<u8>)449     fn set_waiting(&mut self, buffer: Vec<u8>) {
450         match self.state {
451             State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer },
452             _ => panic!("Invalid state"),
453         }
454     }
455 
set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize))456     fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) {
457         match self.state {
458             State::Reading { .. } | State::Transitioning { .. } => {
459                 self.state = State::HasData { buffer, valid }
460             }
461             _ => panic!("Invalid state"),
462         }
463     }
464 
poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>>465     fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> {
466         match self.state {
467             State::Failed(kind) => Poll::Ready(Err(io::Error::new(
468                 io::ErrorKind::from(kind),
469                 BufReadError::Failed,
470             ))),
471             State::HasData { .. } => self.data(),
472             State::Transitioning => panic!("Invalid state"),
473             State::Waiting { .. } | State::Reading { .. } => {
474                 let pending = self.set_reading();
475                 match Pin::new(pending).poll(cx) {
476                     Poll::Ready(Ok((buffer, res))) => {
477                         if res == 0 {
478                             self.set_waiting(buffer);
479                             Poll::Ready(Ok(&[]))
480                         } else {
481                             self.set_has_data(buffer, (0, res));
482                             self.data()
483                         }
484                     }
485                     Poll::Ready(Err((_, err))) => {
486                         let kind = err.kind::<crate::IOErrorEnum>().unwrap();
487                         self.state = State::Failed(kind);
488                         Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err)))
489                     }
490                     Poll::Pending => Poll::Pending,
491                 }
492             }
493         }
494     }
495 
consume(&mut self, amt: usize)496     fn consume(&mut self, amt: usize) {
497         if amt == 0 {
498             return;
499         }
500 
501         if let State::HasData { .. } = self.state {
502             let has_data = mem::replace(&mut self.state, State::Transitioning);
503             if let State::HasData {
504                 buffer,
505                 valid: (i, j),
506             } = has_data
507             {
508                 let available = j - i;
509                 if amt > available {
510                     panic!(
511                         "Cannot consume {} bytes as only {} are available",
512                         amt, available
513                     )
514                 }
515                 let remaining = available - amt;
516                 if remaining == 0 {
517                     return self.set_waiting(buffer);
518                 } else {
519                     return self.set_has_data(buffer, (i + amt, j));
520                 }
521             }
522         }
523 
524         panic!("Invalid state")
525     }
526 }
527 
528 #[derive(thiserror::Error, Debug)]
529 enum BufReadError {
530     #[error("Previous read operation failed")]
531     Failed,
532 }
533 
534 impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> {
poll_read( self: Pin<&mut Self>, cx: &mut Context, out_buf: &mut [u8], ) -> Poll<io::Result<usize>>535     fn poll_read(
536         self: Pin<&mut Self>,
537         cx: &mut Context,
538         out_buf: &mut [u8],
539     ) -> Poll<io::Result<usize>> {
540         let reader = self.get_mut();
541         let poll = reader.poll_fill_buf(cx);
542 
543         let poll = poll.map_ok(|buffer| {
544             let copied = buffer.len().min(out_buf.len());
545             out_buf[..copied].copy_from_slice(&buffer[..copied]);
546             copied
547         });
548 
549         if let Poll::Ready(Ok(consumed)) = poll {
550             reader.consume(consumed);
551         }
552         poll
553     }
554 }
555 
556 impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> {
poll_fill_buf( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Result<&[u8], futures_io::Error>>557     fn poll_fill_buf(
558         self: Pin<&mut Self>,
559         cx: &mut Context,
560     ) -> Poll<Result<&[u8], futures_io::Error>> {
561         self.get_mut().poll_fill_buf(cx)
562     }
563 
consume(self: Pin<&mut Self>, amt: usize)564     fn consume(self: Pin<&mut Self>, amt: usize) {
565         self.get_mut().consume(amt);
566     }
567 }
568 
569 impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {}
570 
571 #[cfg(test)]
572 mod tests {
573     use crate::prelude::*;
574     use crate::test_util::run_async;
575     use crate::MemoryInputStream;
576     use glib::Bytes;
577     use std::io::Read;
578 
579     #[test]
read_all_async()580     fn read_all_async() {
581         let ret = run_async(|tx, l| {
582             let b = Bytes::from_owned(vec![1, 2, 3]);
583             let strm = MemoryInputStream::from_bytes(&b);
584 
585             let buf = vec![0; 10];
586             strm.read_all_async(
587                 buf,
588                 glib::PRIORITY_DEFAULT_IDLE,
589                 crate::NONE_CANCELLABLE,
590                 move |ret| {
591                     tx.send(ret).unwrap();
592                     l.quit();
593                 },
594             );
595         });
596 
597         let (buf, count, err) = ret.unwrap();
598         assert_eq!(count, 3);
599         assert!(err.is_none());
600         assert_eq!(buf[0], 1);
601         assert_eq!(buf[1], 2);
602         assert_eq!(buf[2], 3);
603     }
604 
605     #[test]
read_all()606     fn read_all() {
607         let b = Bytes::from_owned(vec![1, 2, 3]);
608         let strm = MemoryInputStream::from_bytes(&b);
609         let mut buf = vec![0; 10];
610 
611         let ret = strm.read_all(&mut buf, crate::NONE_CANCELLABLE).unwrap();
612 
613         assert_eq!(ret.0, 3);
614         assert!(ret.1.is_none());
615         assert_eq!(buf[0], 1);
616         assert_eq!(buf[1], 2);
617         assert_eq!(buf[2], 3);
618     }
619 
620     #[test]
read()621     fn read() {
622         let b = Bytes::from_owned(vec![1, 2, 3]);
623         let strm = MemoryInputStream::from_bytes(&b);
624         let mut buf = vec![0; 10];
625 
626         let ret = strm.read(&mut buf, crate::NONE_CANCELLABLE);
627 
628         assert_eq!(ret.unwrap(), 3);
629         assert_eq!(buf[0], 1);
630         assert_eq!(buf[1], 2);
631         assert_eq!(buf[2], 3);
632     }
633 
634     #[test]
read_async()635     fn read_async() {
636         let ret = run_async(|tx, l| {
637             let b = Bytes::from_owned(vec![1, 2, 3]);
638             let strm = MemoryInputStream::from_bytes(&b);
639 
640             let buf = vec![0; 10];
641             strm.read_async(
642                 buf,
643                 glib::PRIORITY_DEFAULT_IDLE,
644                 crate::NONE_CANCELLABLE,
645                 move |ret| {
646                     tx.send(ret).unwrap();
647                     l.quit();
648                 },
649             );
650         });
651 
652         let (buf, count) = ret.unwrap();
653         assert_eq!(count, 3);
654         assert_eq!(buf[0], 1);
655         assert_eq!(buf[1], 2);
656         assert_eq!(buf[2], 3);
657     }
658 
659     #[test]
read_bytes_async()660     fn read_bytes_async() {
661         let ret = run_async(|tx, l| {
662             let b = Bytes::from_owned(vec![1, 2, 3]);
663             let strm = MemoryInputStream::from_bytes(&b);
664 
665             strm.read_bytes_async(
666                 10,
667                 glib::PRIORITY_DEFAULT_IDLE,
668                 crate::NONE_CANCELLABLE,
669                 move |ret| {
670                     tx.send(ret).unwrap();
671                     l.quit();
672                 },
673             );
674         });
675 
676         let bytes = ret.unwrap();
677         assert_eq!(bytes, vec![1, 2, 3]);
678     }
679 
680     #[test]
skip_async()681     fn skip_async() {
682         let ret = run_async(|tx, l| {
683             let b = Bytes::from_owned(vec![1, 2, 3]);
684             let strm = MemoryInputStream::from_bytes(&b);
685 
686             strm.skip_async(
687                 10,
688                 glib::PRIORITY_DEFAULT_IDLE,
689                 crate::NONE_CANCELLABLE,
690                 move |ret| {
691                     tx.send(ret).unwrap();
692                     l.quit();
693                 },
694             );
695         });
696 
697         let skipped = ret.unwrap();
698         assert_eq!(skipped, 3);
699     }
700 
701     #[test]
std_io_read()702     fn std_io_read() {
703         let b = Bytes::from_owned(vec![1, 2, 3]);
704         let mut read = MemoryInputStream::from_bytes(&b).into_read();
705         let mut buf = [0u8; 10];
706 
707         let ret = read.read(&mut buf);
708 
709         assert_eq!(ret.unwrap(), 3);
710         assert_eq!(buf[0], 1);
711         assert_eq!(buf[1], 2);
712         assert_eq!(buf[2], 3);
713     }
714 
715     #[test]
into_input_stream()716     fn into_input_stream() {
717         let b = Bytes::from_owned(vec![1, 2, 3]);
718         let stream = MemoryInputStream::from_bytes(&b);
719         let stream_clone = stream.clone();
720         let stream = stream.into_read().into_input_stream();
721 
722         assert_eq!(stream, stream_clone);
723     }
724 }
725