1 // Take a look at the license at the top of the repository in the LICENSE file.
2
3 use crate::prelude::*;
4 use crate::subclass::prelude::*;
5 use crate::InputStream;
6
7 use std::any::Any;
8 use std::io::{Read, Seek};
9
10 mod imp {
11 use super::*;
12 use std::cell::RefCell;
13
14 pub(super) enum Reader {
15 Read(AnyReader),
16 ReadSeek(AnyReader),
17 }
18
19 #[derive(Default)]
20 pub struct ReadInputStream {
21 pub(super) read: RefCell<Option<Reader>>,
22 }
23
24 #[glib::object_subclass]
25 impl ObjectSubclass for ReadInputStream {
26 const NAME: &'static str = "ReadInputStream";
27 type Type = super::ReadInputStream;
28 type ParentType = InputStream;
29 type Interfaces = (crate::Seekable,);
30 }
31
32 impl ObjectImpl for ReadInputStream {}
33
34 impl InputStreamImpl for ReadInputStream {
read( &self, _stream: &Self::Type, buffer: &mut [u8], _cancellable: Option<&crate::Cancellable>, ) -> Result<usize, glib::Error>35 fn read(
36 &self,
37 _stream: &Self::Type,
38 buffer: &mut [u8],
39 _cancellable: Option<&crate::Cancellable>,
40 ) -> Result<usize, glib::Error> {
41 let mut read = self.read.borrow_mut();
42 let read = match *read {
43 None => {
44 return Err(glib::Error::new(
45 crate::IOErrorEnum::Closed,
46 "Already closed",
47 ));
48 }
49 Some(Reader::Read(ref mut read)) => read,
50 Some(Reader::ReadSeek(ref mut read)) => read,
51 };
52
53 loop {
54 match std_error_to_gio_error(read.read(buffer)) {
55 None => continue,
56 Some(res) => return res,
57 }
58 }
59 }
60
close( &self, _stream: &Self::Type, _cancellable: Option<&crate::Cancellable>, ) -> Result<(), glib::Error>61 fn close(
62 &self,
63 _stream: &Self::Type,
64 _cancellable: Option<&crate::Cancellable>,
65 ) -> Result<(), glib::Error> {
66 let _ = self.read.borrow_mut().take();
67 Ok(())
68 }
69 }
70
71 impl SeekableImpl for ReadInputStream {
tell(&self, _seekable: &Self::Type) -> i6472 fn tell(&self, _seekable: &Self::Type) -> i64 {
73 // XXX: stream_position is not stable yet
74 // let mut read = self.read.borrow_mut();
75 // match *read {
76 // Some(Reader::ReadSeek(ref mut read)) => {
77 // read.stream_position().map(|pos| pos as i64).unwrap_or(-1)
78 // },
79 // _ => -1,
80 // };
81 -1
82 }
83
can_seek(&self, _seekable: &Self::Type) -> bool84 fn can_seek(&self, _seekable: &Self::Type) -> bool {
85 let read = self.read.borrow();
86 matches!(*read, Some(Reader::ReadSeek(_)))
87 }
88
seek( &self, _seekable: &Self::Type, offset: i64, type_: glib::SeekType, _cancellable: Option<&crate::Cancellable>, ) -> Result<(), glib::Error>89 fn seek(
90 &self,
91 _seekable: &Self::Type,
92 offset: i64,
93 type_: glib::SeekType,
94 _cancellable: Option<&crate::Cancellable>,
95 ) -> Result<(), glib::Error> {
96 use std::io::SeekFrom;
97
98 let mut read = self.read.borrow_mut();
99 match *read {
100 Some(Reader::ReadSeek(ref mut read)) => {
101 let pos = match type_ {
102 glib::SeekType::Cur => SeekFrom::Current(offset),
103 glib::SeekType::Set => {
104 if offset < 0 {
105 return Err(glib::Error::new(
106 crate::IOErrorEnum::InvalidArgument,
107 "Invalid Argument",
108 ));
109 } else {
110 SeekFrom::Start(offset as u64)
111 }
112 }
113 glib::SeekType::End => SeekFrom::End(offset),
114 _ => unimplemented!(),
115 };
116
117 loop {
118 match std_error_to_gio_error(read.seek(pos)) {
119 None => continue,
120 Some(res) => return res.map(|_| ()),
121 }
122 }
123 }
124 _ => Err(glib::Error::new(
125 crate::IOErrorEnum::NotSupported,
126 "Truncating not supported",
127 )),
128 }
129 }
130
can_truncate(&self, _seekable: &Self::Type) -> bool131 fn can_truncate(&self, _seekable: &Self::Type) -> bool {
132 false
133 }
134
truncate( &self, _seekable: &Self::Type, _offset: i64, _cancellable: Option<&crate::Cancellable>, ) -> Result<(), glib::Error>135 fn truncate(
136 &self,
137 _seekable: &Self::Type,
138 _offset: i64,
139 _cancellable: Option<&crate::Cancellable>,
140 ) -> Result<(), glib::Error> {
141 Err(glib::Error::new(
142 crate::IOErrorEnum::NotSupported,
143 "Truncating not supported",
144 ))
145 }
146 }
147 }
148
149 glib::wrapper! {
150 pub struct ReadInputStream(ObjectSubclass<imp::ReadInputStream>) @extends crate::InputStream, @implements crate::Seekable;
151 }
152
153 impl ReadInputStream {
new<R: Read + Send + 'static>(read: R) -> ReadInputStream154 pub fn new<R: Read + Send + 'static>(read: R) -> ReadInputStream {
155 let obj = glib::Object::new(&[]).expect("Failed to create read input stream");
156
157 let imp = imp::ReadInputStream::from_instance(&obj);
158 *imp.read.borrow_mut() = Some(imp::Reader::Read(AnyReader::new(read)));
159
160 obj
161 }
162
new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream163 pub fn new_seekable<R: Read + Seek + Send + 'static>(read: R) -> ReadInputStream {
164 let obj = glib::Object::new(&[]).expect("Failed to create read input stream");
165
166 let imp = imp::ReadInputStream::from_instance(&obj);
167 *imp.read.borrow_mut() = Some(imp::Reader::ReadSeek(AnyReader::new_seekable(read)));
168
169 obj
170 }
171
close_and_take(&self) -> Box<dyn Any + Send + 'static>172 pub fn close_and_take(&self) -> Box<dyn Any + Send + 'static> {
173 let imp = imp::ReadInputStream::from_instance(self);
174 let inner = imp.read.borrow_mut().take();
175
176 let ret = match inner {
177 None => {
178 panic!("Stream already closed or inner taken");
179 }
180 Some(imp::Reader::Read(read)) => read.reader,
181 Some(imp::Reader::ReadSeek(read)) => read.reader,
182 };
183
184 let _ = self.close(crate::NONE_CANCELLABLE);
185
186 match ret {
187 AnyOrPanic::Any(r) => r,
188 AnyOrPanic::Panic(p) => std::panic::resume_unwind(p),
189 }
190 }
191 }
192
193 enum AnyOrPanic {
194 Any(Box<dyn Any + Send + 'static>),
195 Panic(Box<dyn Any + Send + 'static>),
196 }
197
198 // Helper struct for dynamically dispatching to any kind of Reader and
199 // catching panics along the way
200 struct AnyReader {
201 reader: AnyOrPanic,
202 read_fn: fn(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>,
203 seek_fn: Option<fn(s: &mut AnyReader, pos: std::io::SeekFrom) -> std::io::Result<u64>>,
204 }
205
206 impl AnyReader {
new<R: Read + Any + Send + 'static>(r: R) -> Self207 fn new<R: Read + Any + Send + 'static>(r: R) -> Self {
208 Self {
209 reader: AnyOrPanic::Any(Box::new(r)),
210 read_fn: Self::read_fn::<R>,
211 seek_fn: None,
212 }
213 }
214
new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self215 fn new_seekable<R: Read + Seek + Any + Send + 'static>(r: R) -> Self {
216 Self {
217 reader: AnyOrPanic::Any(Box::new(r)),
218 read_fn: Self::read_fn::<R>,
219 seek_fn: Some(Self::seek_fn::<R>),
220 }
221 }
222
read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize>223 fn read_fn<R: Read + 'static>(s: &mut AnyReader, buffer: &mut [u8]) -> std::io::Result<usize> {
224 s.with_inner(|r: &mut R| r.read(buffer))
225 }
226
seek_fn<R: Seek + 'static>( s: &mut AnyReader, pos: std::io::SeekFrom, ) -> std::io::Result<u64>227 fn seek_fn<R: Seek + 'static>(
228 s: &mut AnyReader,
229 pos: std::io::SeekFrom,
230 ) -> std::io::Result<u64> {
231 s.with_inner(|r: &mut R| r.seek(pos))
232 }
233
with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>( &mut self, func: F, ) -> std::io::Result<T>234 fn with_inner<R: 'static, T, F: FnOnce(&mut R) -> std::io::Result<T>>(
235 &mut self,
236 func: F,
237 ) -> std::io::Result<T> {
238 match self.reader {
239 AnyOrPanic::Any(ref mut reader) => {
240 let r = reader.downcast_mut::<R>().unwrap();
241 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(r))) {
242 Ok(res) => res,
243 Err(panic) => {
244 self.reader = AnyOrPanic::Panic(panic);
245 Err(std::io::Error::new(std::io::ErrorKind::Other, "Panicked"))
246 }
247 }
248 }
249 AnyOrPanic::Panic(_) => Err(std::io::Error::new(
250 std::io::ErrorKind::Other,
251 "Panicked before",
252 )),
253 }
254 }
255
read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize>256 fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
257 (self.read_fn)(self, buffer)
258 }
259
seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64>260 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
261 if let Some(ref seek_fn) = self.seek_fn {
262 seek_fn(self, pos)
263 } else {
264 unreachable!()
265 }
266 }
267 }
268
std_error_to_gio_error<T>( res: Result<T, std::io::Error>, ) -> Option<Result<T, glib::Error>>269 pub(crate) fn std_error_to_gio_error<T>(
270 res: Result<T, std::io::Error>,
271 ) -> Option<Result<T, glib::Error>> {
272 match res {
273 Ok(res) => Some(Ok(res)),
274 Err(err) => {
275 use std::io::ErrorKind;
276
277 #[allow(clippy::wildcard_in_or_patterns)]
278 match err.kind() {
279 ErrorKind::NotFound => Some(Err(glib::Error::new(
280 crate::IOErrorEnum::NotFound,
281 "Not Found",
282 ))),
283 ErrorKind::PermissionDenied => Some(Err(glib::Error::new(
284 crate::IOErrorEnum::PermissionDenied,
285 "Permission Denied",
286 ))),
287 ErrorKind::ConnectionRefused => Some(Err(glib::Error::new(
288 crate::IOErrorEnum::ConnectionRefused,
289 "Connection Refused",
290 ))),
291 ErrorKind::ConnectionReset
292 | ErrorKind::ConnectionAborted
293 | ErrorKind::NotConnected => Some(Err(glib::Error::new(
294 crate::IOErrorEnum::NotConnected,
295 "Connection Reset",
296 ))),
297 ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => Some(Err(glib::Error::new(
298 crate::IOErrorEnum::AddressInUse,
299 "Address In Use",
300 ))),
301 ErrorKind::BrokenPipe => Some(Err(glib::Error::new(
302 crate::IOErrorEnum::BrokenPipe,
303 "Broken Pipe",
304 ))),
305 ErrorKind::AlreadyExists => Some(Err(glib::Error::new(
306 crate::IOErrorEnum::Exists,
307 "Already Exists",
308 ))),
309 ErrorKind::WouldBlock => Some(Err(glib::Error::new(
310 crate::IOErrorEnum::WouldBlock,
311 "Would Block",
312 ))),
313 ErrorKind::InvalidInput | ErrorKind::InvalidData => Some(Err(glib::Error::new(
314 crate::IOErrorEnum::InvalidData,
315 "Invalid Input",
316 ))),
317 ErrorKind::TimedOut => Some(Err(glib::Error::new(
318 crate::IOErrorEnum::TimedOut,
319 "Timed Out",
320 ))),
321 ErrorKind::Interrupted => None,
322 ErrorKind::UnexpectedEof => Some(Err(glib::Error::new(
323 crate::IOErrorEnum::Closed,
324 "Unexpected Eof",
325 ))),
326 ErrorKind::WriteZero | _ => Some(Err(glib::Error::new(
327 crate::IOErrorEnum::Failed,
328 format!("Unknown error: {:?}", err).as_str(),
329 ))),
330 }
331 }
332 }
333 }
334
335 #[cfg(test)]
336 mod tests {
337 use super::*;
338 use std::io::Cursor;
339
340 #[test]
test_read()341 fn test_read() {
342 let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
343 let stream = ReadInputStream::new(cursor);
344
345 let mut buf = [0u8; 1024];
346 assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(10));
347 assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
348
349 assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(0));
350
351 let inner = stream.close_and_take();
352 assert!(inner.is::<Cursor<Vec<u8>>>());
353 let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
354 assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
355 }
356
357 #[test]
test_read_seek()358 fn test_read_seek() {
359 let cursor = Cursor::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
360 let stream = ReadInputStream::new_seekable(cursor);
361
362 let mut buf = [0u8; 1024];
363 assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(10));
364 assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
365
366 assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(0));
367
368 assert!(stream.can_seek());
369 assert_eq!(
370 stream.seek(0, glib::SeekType::Set, crate::NONE_CANCELLABLE),
371 Ok(())
372 );
373 assert_eq!(stream.read(&mut buf[..], crate::NONE_CANCELLABLE), Ok(10));
374 assert_eq!(&buf[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..]);
375
376 let inner = stream.close_and_take();
377 assert!(inner.is::<Cursor<Vec<u8>>>());
378 let inner = inner.downcast_ref::<Cursor<Vec<u8>>>().unwrap();
379 assert_eq!(inner.get_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
380 }
381 }
382