1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use crate::prelude::*;
4 use crate::subclass::prelude::*;
5 use crate::InputStream;
6 
7 use std::any::Any;
8 use std::io::{Read, Seek};
9 
10 mod imp {
11     use super::*;
12     use std::cell::RefCell;
13 
14     pub(super) enum Reader {
15         Read(AnyReader),
16         ReadSeek(AnyReader),
17     }
18 
19     #[derive(Default)]
20     pub struct ReadInputStream {
21         pub(super) read: RefCell<Option<Reader>>,
22     }
23 
24     #[glib::object_subclass]
25     impl ObjectSubclass for ReadInputStream {
26         const NAME: &'static str = "ReadInputStream";
27         type Type = super::ReadInputStream;
28         type ParentType = InputStream;
29         type Interfaces = (crate::Seekable,);
30     }
31 
32     impl ObjectImpl for ReadInputStream {}
33 
34     impl InputStreamImpl for ReadInputStream {
read( &self, _stream: &Self::Type, buffer: &mut [u8], _cancellable: Option<&crate::Cancellable>, ) -> Result<usize, glib::Error>35         fn read(
36             &self,
37             _stream: &Self::Type,
38             buffer: &mut [u8],
39             _cancellable: Option<&crate::Cancellable>,
40         ) -> Result<usize, glib::Error> {
41             let mut read = self.read.borrow_mut();
42             let read = match *read {
43                 None => {
44                     return Err(glib::Error::new(
45                         crate::IOErrorEnum::Closed,
46                         "Already closed",
47                     ));
48                 }
49                 Some(Reader::Read(ref mut read)) => read,
50                 Some(Reader::ReadSeek(ref mut read)) => read,
51             };
52 
53             loop {
54                 match std_error_to_gio_error(read.read(buffer)) {
55                     None => continue,
56                     Some(res) => return res,
57                 }
58             }
59         }
60 
close( &self, _stream: &Self::Type, _cancellable: Option<&crate::Cancellable>, ) -> Result<(), glib::Error>61         fn close(
62             &self,
63             _stream: &Self::Type,
64             _cancellable: Option<&crate::Cancellable>,
65         ) -> Result<(), glib::Error> {
66             let _ = self.read.borrow_mut().take();
67             Ok(())
68         }
69     }
70 
71     impl SeekableImpl for ReadInputStream {
tell(&self, _seekable: &Self::Type) -> i6472         fn tell(&self, _seekable: &Self::Type) -> i64 {
73             // XXX: stream_position is not stable yet
74             // let mut read = self.read.borrow_mut();
75             // match *read {
76             //     Some(Reader::ReadSeek(ref mut read)) => {
77             //         read.stream_position().map(|pos| pos as i64).unwrap_or(-1)
78             //     },
79             //     _ => -1,
80             // };
81             -1
82         }
83 
can_seek(&self, _seekable: &Self::Type) -> bool84         fn can_seek(&self, _seekable: &Self::Type) -> bool {
85             let read = self.read.borrow();
86             matches!(*read, Some(Reader::ReadSeek(_)))
87         }
88 
seek( &self, _seekable: &Self::Type, offset: i64, type_: glib::SeekType, _cancellable: Option<&crate::Cancellable>, ) -> Result<(), glib::Error>89         fn seek(
90             &self,
91             _seekable: &Self::Type,
92             offset: i64,
93             type_: glib::SeekType,
94             _cancellable: Option<&crate::Cancellable>,
95         ) -> Result<(), glib::Error> {
96             use std::io::SeekFrom;
97 
98             let mut read = self.read.borrow_mut();
99             match *read {
100                 Some(Reader::ReadSeek(ref mut read)) => {
101                     let pos = match type_ {
102                         glib::SeekType::Cur => SeekFrom::Current(offset),
103                         glib::SeekType::Set => {
104                             if offset < 0 {
105                                 return Err(glib::Error::new(
106                                     crate::IOErrorEnum::InvalidArgument,
107                                     "Invalid Argument",
108                                 ));
109                             } else {
110                                 SeekFrom::Start(offset as u64)
111                             }
112                         }
113                         glib::SeekType::End => SeekFrom::End(offset),
114                         _ => unimplemented!(),
115                     };
116 
117                     loop {
118                         match std_error_to_gio_error(read.seek(pos)) {
119                             None => continue,
120                             Some(res) => return res.map(|_| ()),
121                         }
122                     }
123                 }
124                 _ => Err(glib::Error::new(
125                     crate::IOErrorEnum::NotSupported,
126                     "Truncating not supported",
127                 )),
128             }
129         }
130 
can_truncate(&self, _seekable: &Self::Type) -> bool131         fn can_truncate(&self, _seekable: &Self::Type) -> bool {
132             false
133         }
134 
truncate( &self, _seekable: &Self::Type, _offset: i64, _cancellable: Option<&crate::Cancellable>, ) -> Result<(), glib::Error>135         fn truncate(
136             &self,
137             _seekable: &Self::Type,
138             _offset: i64,
139             _cancellable: Option<&crate::Cancellable>,
140         ) -> Result<(), glib::Error> {
141             Err(glib::Error::new(
142                 crate::IOErrorEnum::NotSupported,
143                 "Truncating not supported",
144             ))
145         }
146     }
147 }
148 
149 glib::wrapper! {
150     pub struct ReadInputStream(ObjectSubclass<imp::ReadInputStream>) @extends crate::InputStream, @implements crate::Seekable;
151 }
152 
153 impl ReadInputStream {
new<R: Read + Send + 'static>(read: R) -> ReadInputStream154     pub fn new<R: Read + Send + 'static>(read: R) -> ReadInputStream {
155         let obj = glib::Object::new(&[]).expect("Failed to create read input stream");
156 
157         let imp = imp::ReadInputStream::from_instance(&obj);
158         *imp.read.borrow_mut() = Some(imp::Reader::Read(AnyReader::new(read)));
159 
160         obj
161     }
162 
new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream163     pub fn new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream {
164         let obj = glib::Object::new(&[]).expect("Failed to create read input stream");
165 
166         let imp = imp::ReadInputStream::from_instance(&obj);
167         *imp.read.borrow_mut() = Some(imp::Reader::ReadSeek(AnyReader::new_seekable(read)));
168 
169         obj
170     }
171 
close_and_take(&self) -> Box<dyn Any + Send + 'static>172     pub fn close_and_take(&self) -> Box<dyn Any + Send + 'static> {
173         let imp = imp::ReadInputStream::from_instance(self);
174         let inner = imp.read.borrow_mut().take();
175 
176         let ret = match inner {
177             None => {
178                 panic!("Stream already closed or inner taken");
179             }
180             Some(imp::Reader::Read(read)) => read.reader,
181             Some(imp::Reader::ReadSeek(read)) => read.reader,
182         };
183 
184         let _ = self.close(crate::NONE_CANCELLABLE);
185 
186         match ret {
187             AnyOrPanic::Any(r) => r,
188             AnyOrPanic::Panic(p) => std::panic::resume_unwind(p),
189         }
190     }
191 }
192 
193 enum AnyOrPanic {
194     Any(Box<dyn Any + Send + 'static>),
195     Panic(Box<dyn Any + Send + 'static>),
196 }
197 
198 // Helper struct for dynamically dispatching to any kind of Reader and
199 // catching panics along the way
200 struct AnyReader {
201     reader: AnyOrPanic,
202     read_fn: fn(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>,
203     seek_fn: Option<fn(s: &mut AnyReader, pos: std::io::SeekFrom) -> std::io::Result<u64>>,
204 }
205 
206 impl AnyReader {
new<R: Read + Any + Send + 'static>(r: R) -> Self207     fn new<R: Read + Any + Send + 'static>(r: R) -> Self {
208         Self {
209             reader: AnyOrPanic::Any(Box::new(r)),
210             read_fn: Self::read_fn::<R>,
211             seek_fn: None,
212         }
213     }
214 
new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self215     fn new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self {
216         Self {
217             reader: AnyOrPanic::Any(Box::new(r)),
218             read_fn: Self::read_fn::<R>,
219             seek_fn: Some(Self::seek_fn::<R>),
220         }
221     }
222 
read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>223     fn read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize> {
224         s.with_inner(|r: &mut R| r.read(buffer))
225     }
226 
seek_fn<R: Seek + 'static>( s: &mut AnyReader, pos: std::io::SeekFrom, ) -> std::io::Result<u64>227     fn seek_fn<R: Seek + 'static>(
228         s: &mut AnyReader,
229         pos: std::io::SeekFrom,
230     ) -> std::io::Result<u64> {
231         s.with_inner(|r: &mut R| r.seek(pos))
232     }
233 
with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>( &mut self, func: F, ) -> std::io::Result<T>234     fn with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>(
235         &mut self,
236         func: F,
237     ) -> std::io::Result<T> {
238         match self.reader {
239             AnyOrPanic::Any(ref mut reader) => {
240                 let r = reader.downcast_mut::<R>().unwrap();
241                 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(r))) {
242                     Ok(res) => res,
243                     Err(panic) => {
244                         self.reader = AnyOrPanic::Panic(panic);
245                         Err(std::io::Error::new(std::io::ErrorKind::Other, "Panicked"))
246                     }
247                 }
248             }
249             AnyOrPanic::Panic(_) => Err(std::io::Error::new(
250                 std::io::ErrorKind::Other,
251                 "Panicked before",
252             )),
253         }
254     }
255 
read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize>256     fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
257         (self.read_fn)(self, buffer)
258     }
259 
seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64>260     fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
261         if let Some(ref seek_fn) = self.seek_fn {
262             seek_fn(self, pos)
263         } else {
264             unreachable!()
265         }
266     }
267 }
268 
std_error_to_gio_error<T>( res: Result<T, std::io::Error>, ) -> Option<Result<T, glib::Error>>269 pub(crate) fn std_error_to_gio_error<T>(
270     res: Result<T, std::io::Error>,
271 ) -> Option<Result<T, glib::Error>> {
272     match res {
273         Ok(res) => Some(Ok(res)),
274         Err(err) => {
275             use std::io::ErrorKind;
276 
277             #[allow(clippy::wildcard_in_or_patterns)]
278             match err.kind() {
279                 ErrorKind::NotFound => Some(Err(glib::Error::new(
280                     crate::IOErrorEnum::NotFound,
281                     "Not Found",
282                 ))),
283                 ErrorKind::PermissionDenied => Some(Err(glib::Error::new(
284                     crate::IOErrorEnum::PermissionDenied,
285                     "Permission Denied",
286                 ))),
287                 ErrorKind::ConnectionRefused => Some(Err(glib::Error::new(
288                     crate::IOErrorEnum::ConnectionRefused,
289                     "Connection Refused",
290                 ))),
291                 ErrorKind::ConnectionReset
292                 | ErrorKind::ConnectionAborted
293                 | ErrorKind::NotConnected => Some(Err(glib::Error::new(
294                     crate::IOErrorEnum::NotConnected,
295                     "Connection Reset",
296                 ))),
297                 ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => Some(Err(glib::Error::new(
298                     crate::IOErrorEnum::AddressInUse,
299                     "Address In Use",
300                 ))),
301                 ErrorKind::BrokenPipe => Some(Err(glib::Error::new(
302                     crate::IOErrorEnum::BrokenPipe,
303                     "Broken Pipe",
304                 ))),
305                 ErrorKind::AlreadyExists => Some(Err(glib::Error::new(
306                     crate::IOErrorEnum::Exists,
307                     "Already Exists",
308                 ))),
309                 ErrorKind::WouldBlock => Some(Err(glib::Error::new(
310                     crate::IOErrorEnum::WouldBlock,
311                     "Would Block",
312                 ))),
313                 ErrorKind::InvalidInput | ErrorKind::InvalidData => Some(Err(glib::Error::new(
314                     crate::IOErrorEnum::InvalidData,
315                     "Invalid Input",
316                 ))),
317                 ErrorKind::TimedOut => Some(Err(glib::Error::new(
318                     crate::IOErrorEnum::TimedOut,
319                     "Timed Out",
320                 ))),
321                 ErrorKind::Interrupted => None,
322                 ErrorKind::UnexpectedEof => Some(Err(glib::Error::new(
323                     crate::IOErrorEnum::Closed,
324                     "Unexpected Eof",
325                 ))),
326                 ErrorKind::WriteZero | _ => Some(Err(glib::Error::new(
327                     crate::IOErrorEnum::Failed,
328                     format!("Unknown error: {:?}", err).as_str(),
329                 ))),
330             }
331         }
332     }
333 }
334 
335 #[cfg(test)]
336 mod tests {
337     use super::*;
338     use std::io::Cursor;
339 
340     #[test]
test_read()341     fn test_read() {
342         let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
343         let stream = ReadInputStream::new(cursor);
344 
345         let mut buf = [0u8; 1024];
346         assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(10));
347         assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
348 
349         assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(0));
350 
351         let inner = stream.close_and_take();
352         assert!(inner.is::<Cursor<Vec<u8>>>());
353         let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
354         assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
355     }
356 
357     #[test]
test_read_seek()358     fn test_read_seek() {
359         let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
360         let stream = ReadInputStream::new_seekable(cursor);
361 
362         let mut buf = [0u8; 1024];
363         assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(10));
364         assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
365 
366         assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(0));
367 
368         assert!(stream.can_seek());
369         assert_eq!(
370             stream.seek(0, glib::SeekType::Set, crate::NONE_CANCELLABLE),
371             Ok(())
372         );
373         assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(10));
374         assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
375 
376         let inner = stream.close_and_take();
377         assert!(inner.is::<Cursor<Vec<u8>>>());
378         let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
379         assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
380     }
381 }
382