1 // Take a look at the license at the top of the repository in the LICENSE file. 2 3 use crate::error::to_std_io_result; 4 use crate::prelude::*; 5 use crate::Cancellable; 6 use crate::InputStream; 7 use crate::Seekable; 8 use futures_core::task::{Context, Poll}; 9 use futures_io::{AsyncBufRead, AsyncRead}; 10 use glib::object::IsA; 11 use glib::translate::*; 12 use glib::Priority; 13 use std::future::Future; 14 use std::io; 15 use std::mem; 16 use std::pin::Pin; 17 use std::ptr; 18 19 pub trait InputStreamExtManual: Sized { 20 #[doc(alias = "g_input_stream_read")] read<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, buffer: B, cancellable: Option<&C>, ) -> Result<usize, glib::Error>21 fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>( 22 &self, 23 buffer: B, 24 cancellable: Option<&C>, 25 ) -> Result<usize, glib::Error>; 26 27 #[doc(alias = "g_input_stream_read_all")] read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, buffer: B, cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>28 fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>( 29 &self, 30 buffer: B, 31 cancellable: Option<&C>, 32 ) -> Result<(usize, Option<glib::Error>), glib::Error>; 33 34 #[doc(alias = "g_input_stream_read_all_async")] read_all_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )35 fn read_all_async< 36 B: AsMut<[u8]> + Send + 'static, 37 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, 38 C: IsA<Cancellable>, 39 >( 40 &self, 41 buffer: B, 42 io_priority: Priority, 43 cancellable: Option<&C>, 44 callback: Q, 45 ); 46 47 #[doc(alias = "g_input_stream_read_async")] read_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )48 fn read_async< 49 B: AsMut<[u8]> + Send + 'static, 50 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, 51 C: IsA<Cancellable>, 52 >( 53 &self, 54 buffer: B, 55 io_priority: Priority, 56 cancellable: Option<&C>, 57 callback: Q, 58 ); 59 read_all_async_future<B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >60 fn read_all_async_future<B: AsMut<[u8]> + Send + 'static>( 61 &self, 62 buffer: B, 63 io_priority: Priority, 64 ) -> Pin< 65 Box< 66 dyn std::future::Future< 67 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, 68 > + 'static, 69 >, 70 >; 71 read_async_future<B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>72 fn read_async_future<B: AsMut<[u8]> + Send + 'static>( 73 &self, 74 buffer: B, 75 io_priority: Priority, 76 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>; 77 into_read(self) -> InputStreamRead<Self> where Self: IsA<InputStream>,78 fn into_read(self) -> InputStreamRead<Self> 79 where 80 Self: IsA<InputStream>, 81 { 82 InputStreamRead(self) 83 } 84 into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self> where Self: IsA<InputStream>,85 fn into_async_buf_read(self, buffer_size: usize) -> InputStreamAsyncBufRead<Self> 86 where 87 Self: IsA<InputStream>, 88 { 89 InputStreamAsyncBufRead::new(self, buffer_size) 90 } 91 } 92 93 impl<O: IsA<InputStream>> InputStreamExtManual for O { read<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, mut buffer: B, cancellable: Option<&C>, ) -> Result<usize, glib::Error>94 fn read<B: AsMut<[u8]>, C: IsA<Cancellable>>( 95 &self, 96 mut buffer: B, 97 cancellable: Option<&C>, 98 ) -> Result<usize, glib::Error> { 99 let cancellable = cancellable.map(|c| c.as_ref()); 100 let gcancellable = cancellable.to_glib_none(); 101 let buffer = buffer.as_mut(); 102 let buffer_ptr = buffer.as_mut_ptr(); 103 let count = buffer.len(); 104 unsafe { 105 let mut error = ptr::null_mut(); 106 let ret = ffi::g_input_stream_read( 107 self.as_ref().to_glib_none().0, 108 buffer_ptr, 109 count, 110 gcancellable.0, 111 &mut error, 112 ); 113 if error.is_null() { 114 Ok(ret as usize) 115 } else { 116 Err(from_glib_full(error)) 117 } 118 } 119 } 120 read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>( &self, mut buffer: B, cancellable: Option<&C>, ) -> Result<(usize, Option<glib::Error>), glib::Error>121 fn read_all<B: AsMut<[u8]>, C: IsA<Cancellable>>( 122 &self, 123 mut buffer: B, 124 cancellable: Option<&C>, 125 ) -> Result<(usize, Option<glib::Error>), glib::Error> { 126 let cancellable = cancellable.map(|c| c.as_ref()); 127 let gcancellable = cancellable.to_glib_none(); 128 let buffer = buffer.as_mut(); 129 let buffer_ptr = buffer.as_mut_ptr(); 130 let count = buffer.len(); 131 unsafe { 132 let mut bytes_read = mem::MaybeUninit::uninit(); 133 let mut error = ptr::null_mut(); 134 let _ = ffi::g_input_stream_read_all( 135 self.as_ref().to_glib_none().0, 136 buffer_ptr, 137 count, 138 bytes_read.as_mut_ptr(), 139 gcancellable.0, 140 &mut error, 141 ); 142 143 let bytes_read = bytes_read.assume_init(); 144 if error.is_null() { 145 Ok((bytes_read, None)) 146 } else if bytes_read != 0 { 147 Ok((bytes_read, Some(from_glib_full(error)))) 148 } else { 149 Err(from_glib_full(error)) 150 } 151 } 152 } 153 read_all_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )154 fn read_all_async< 155 B: AsMut<[u8]> + Send + 'static, 156 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, 157 C: IsA<Cancellable>, 158 >( 159 &self, 160 buffer: B, 161 io_priority: Priority, 162 cancellable: Option<&C>, 163 callback: Q, 164 ) { 165 let cancellable = cancellable.map(|c| c.as_ref()); 166 let gcancellable = cancellable.to_glib_none(); 167 let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer))); 168 // Need to do this after boxing as the contents pointer might change by moving into the box 169 let (count, buffer_ptr) = { 170 let buffer = &mut (*user_data).as_mut().unwrap().1; 171 let slice = (*buffer).as_mut(); 172 (slice.len(), slice.as_mut_ptr()) 173 }; 174 unsafe extern "C" fn read_all_async_trampoline< 175 B: AsMut<[u8]> + Send + 'static, 176 Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + Send + 'static, 177 >( 178 _source_object: *mut glib::gobject_ffi::GObject, 179 res: *mut ffi::GAsyncResult, 180 user_data: glib::ffi::gpointer, 181 ) { 182 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _); 183 let (callback, buffer) = user_data.take().unwrap(); 184 185 let mut error = ptr::null_mut(); 186 let mut bytes_read = mem::MaybeUninit::uninit(); 187 let _ = ffi::g_input_stream_read_all_finish( 188 _source_object as *mut _, 189 res, 190 bytes_read.as_mut_ptr(), 191 &mut error, 192 ); 193 194 let bytes_read = bytes_read.assume_init(); 195 let result = if error.is_null() { 196 Ok((buffer, bytes_read, None)) 197 } else if bytes_read != 0 { 198 Ok((buffer, bytes_read, Some(from_glib_full(error)))) 199 } else { 200 Err((buffer, from_glib_full(error))) 201 }; 202 203 callback(result); 204 } 205 let callback = read_all_async_trampoline::<B, Q>; 206 unsafe { 207 ffi::g_input_stream_read_all_async( 208 self.as_ref().to_glib_none().0, 209 buffer_ptr, 210 count, 211 io_priority.into_glib(), 212 gcancellable.0, 213 Some(callback), 214 Box::into_raw(user_data) as *mut _, 215 ); 216 } 217 } 218 read_async< B: AsMut<[u8]> + Send + 'static, Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, C: IsA<Cancellable>, >( &self, buffer: B, io_priority: Priority, cancellable: Option<&C>, callback: Q, )219 fn read_async< 220 B: AsMut<[u8]> + Send + 'static, 221 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, 222 C: IsA<Cancellable>, 223 >( 224 &self, 225 buffer: B, 226 io_priority: Priority, 227 cancellable: Option<&C>, 228 callback: Q, 229 ) { 230 let cancellable = cancellable.map(|c| c.as_ref()); 231 let gcancellable = cancellable.to_glib_none(); 232 let mut user_data: Box<Option<(Q, B)>> = Box::new(Some((callback, buffer))); 233 // Need to do this after boxing as the contents pointer might change by moving into the box 234 let (count, buffer_ptr) = { 235 let buffer = &mut (*user_data).as_mut().unwrap().1; 236 let slice = (*buffer).as_mut(); 237 (slice.len(), slice.as_mut_ptr()) 238 }; 239 unsafe extern "C" fn read_async_trampoline< 240 B: AsMut<[u8]> + Send + 'static, 241 Q: FnOnce(Result<(B, usize), (B, glib::Error)>) + Send + 'static, 242 >( 243 _source_object: *mut glib::gobject_ffi::GObject, 244 res: *mut ffi::GAsyncResult, 245 user_data: glib::ffi::gpointer, 246 ) { 247 let mut user_data: Box<Option<(Q, B)>> = Box::from_raw(user_data as *mut _); 248 let (callback, buffer) = user_data.take().unwrap(); 249 250 let mut error = ptr::null_mut(); 251 let ret = ffi::g_input_stream_read_finish(_source_object as *mut _, res, &mut error); 252 253 let result = if error.is_null() { 254 Ok((buffer, ret as usize)) 255 } else { 256 Err((buffer, from_glib_full(error))) 257 }; 258 259 callback(result); 260 } 261 let callback = read_async_trampoline::<B, Q>; 262 unsafe { 263 ffi::g_input_stream_read_async( 264 self.as_ref().to_glib_none().0, 265 buffer_ptr, 266 count, 267 io_priority.into_glib(), 268 gcancellable.0, 269 Some(callback), 270 Box::into_raw(user_data) as *mut _, 271 ); 272 } 273 } 274 read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin< Box< dyn std::future::Future< Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, > + 'static, >, >275 fn read_all_async_future<'a, B: AsMut<[u8]> + Send + 'static>( 276 &self, 277 buffer: B, 278 io_priority: Priority, 279 ) -> Pin< 280 Box< 281 dyn std::future::Future< 282 Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>, 283 > + 'static, 284 >, 285 > { 286 Box::pin(crate::GioFuture::new( 287 self, 288 move |obj, cancellable, send| { 289 obj.read_all_async(buffer, io_priority, Some(cancellable), move |res| { 290 send.resolve(res); 291 }); 292 }, 293 )) 294 } 295 read_async_future<'a, B: AsMut<[u8]> + Send + 'static>( &self, buffer: B, io_priority: Priority, ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>>296 fn read_async_future<'a, B: AsMut<[u8]> + Send + 'static>( 297 &self, 298 buffer: B, 299 io_priority: Priority, 300 ) -> Pin<Box<dyn std::future::Future<Output = Result<(B, usize), (B, glib::Error)>> + 'static>> 301 { 302 Box::pin(crate::GioFuture::new( 303 self, 304 move |obj, cancellable, send| { 305 obj.read_async(buffer, io_priority, Some(cancellable), move |res| { 306 send.resolve(res); 307 }); 308 }, 309 )) 310 } 311 } 312 313 #[derive(Debug)] 314 pub struct InputStreamRead<T: IsA<InputStream>>(T); 315 316 impl<T: IsA<InputStream>> InputStreamRead<T> { into_input_stream(self) -> T317 pub fn into_input_stream(self) -> T { 318 self.0 319 } 320 input_stream(&self) -> &T321 pub fn input_stream(&self) -> &T { 322 &self.0 323 } 324 } 325 326 impl<T: IsA<InputStream>> io::Read for InputStreamRead<T> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>327 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 328 let gio_result = self.0.as_ref().read(buf, crate::NONE_CANCELLABLE); 329 to_std_io_result(gio_result) 330 } 331 } 332 333 impl<T: IsA<InputStream> + IsA<Seekable>> io::Seek for InputStreamRead<T> { seek(&mut self, pos: io::SeekFrom) -> io::Result<u64>334 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> { 335 let (pos, type_) = match pos { 336 io::SeekFrom::Start(pos) => (pos as i64, glib::SeekType::Set), 337 io::SeekFrom::End(pos) => (pos, glib::SeekType::End), 338 io::SeekFrom::Current(pos) => (pos, glib::SeekType::Cur), 339 }; 340 let seekable: &Seekable = self.0.as_ref(); 341 let gio_result = seekable 342 .seek(pos, type_, crate::NONE_CANCELLABLE) 343 .map(|_| seekable.tell() as u64); 344 to_std_io_result(gio_result) 345 } 346 } 347 348 enum State { 349 Waiting { 350 buffer: Vec<u8>, 351 }, 352 Transitioning, 353 Reading { 354 pending: Pin< 355 Box< 356 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> 357 + 'static, 358 >, 359 >, 360 }, 361 HasData { 362 buffer: Vec<u8>, 363 valid: (usize, usize), // first index is inclusive, second is exclusive 364 }, 365 Failed(crate::IOErrorEnum), 366 } 367 368 impl State { into_buffer(self) -> Vec<u8>369 fn into_buffer(self) -> Vec<u8> { 370 match self { 371 State::Waiting { buffer } => buffer, 372 _ => panic!("Invalid state"), 373 } 374 } 375 376 #[doc(alias = "get_pending")] pending( &mut self, ) -> &mut Pin< Box< dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> + 'static, >, >377 fn pending( 378 &mut self, 379 ) -> &mut Pin< 380 Box< 381 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> 382 + 'static, 383 >, 384 > { 385 match self { 386 State::Reading { ref mut pending } => pending, 387 _ => panic!("Invalid state"), 388 } 389 } 390 } 391 pub struct InputStreamAsyncBufRead<T: IsA<InputStream>> { 392 stream: T, 393 state: State, 394 } 395 396 impl<T: IsA<InputStream>> InputStreamAsyncBufRead<T> { into_input_stream(self) -> T397 pub fn into_input_stream(self) -> T { 398 self.stream 399 } 400 input_stream(&self) -> &T401 pub fn input_stream(&self) -> &T { 402 &self.stream 403 } 404 new(stream: T, buffer_size: usize) -> Self405 fn new(stream: T, buffer_size: usize) -> Self { 406 let buffer = vec![0; buffer_size]; 407 408 Self { 409 stream, 410 state: State::Waiting { buffer }, 411 } 412 } set_reading( &mut self, ) -> &mut Pin< Box< dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> + 'static, >, >413 fn set_reading( 414 &mut self, 415 ) -> &mut Pin< 416 Box< 417 dyn std::future::Future<Output = Result<(Vec<u8>, usize), (Vec<u8>, glib::Error)>> 418 + 'static, 419 >, 420 > { 421 match self.state { 422 State::Waiting { .. } => { 423 let waiting = mem::replace(&mut self.state, State::Transitioning); 424 let buffer = waiting.into_buffer(); 425 let pending = self 426 .input_stream() 427 .read_async_future(buffer, Priority::default()); 428 self.state = State::Reading { pending }; 429 } 430 State::Reading { .. } => {} 431 _ => panic!("Invalid state"), 432 }; 433 434 self.state.pending() 435 } 436 437 #[doc(alias = "get_data")] data(&self) -> Poll<io::Result<&[u8]>>438 fn data(&self) -> Poll<io::Result<&[u8]>> { 439 if let State::HasData { 440 ref buffer, 441 valid: (i, j), 442 } = self.state 443 { 444 return Poll::Ready(Ok(&buffer[i..j])); 445 } 446 panic!("Invalid state") 447 } 448 set_waiting(&mut self, buffer: Vec<u8>)449 fn set_waiting(&mut self, buffer: Vec<u8>) { 450 match self.state { 451 State::Reading { .. } | State::Transitioning => self.state = State::Waiting { buffer }, 452 _ => panic!("Invalid state"), 453 } 454 } 455 set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize))456 fn set_has_data(&mut self, buffer: Vec<u8>, valid: (usize, usize)) { 457 match self.state { 458 State::Reading { .. } | State::Transitioning { .. } => { 459 self.state = State::HasData { buffer, valid } 460 } 461 _ => panic!("Invalid state"), 462 } 463 } 464 poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>>465 fn poll_fill_buf(&mut self, cx: &mut Context) -> Poll<Result<&[u8], futures_io::Error>> { 466 match self.state { 467 State::Failed(kind) => Poll::Ready(Err(io::Error::new( 468 io::ErrorKind::from(kind), 469 BufReadError::Failed, 470 ))), 471 State::HasData { .. } => self.data(), 472 State::Transitioning => panic!("Invalid state"), 473 State::Waiting { .. } | State::Reading { .. } => { 474 let pending = self.set_reading(); 475 match Pin::new(pending).poll(cx) { 476 Poll::Ready(Ok((buffer, res))) => { 477 if res == 0 { 478 self.set_waiting(buffer); 479 Poll::Ready(Ok(&[])) 480 } else { 481 self.set_has_data(buffer, (0, res)); 482 self.data() 483 } 484 } 485 Poll::Ready(Err((_, err))) => { 486 let kind = err.kind::<crate::IOErrorEnum>().unwrap(); 487 self.state = State::Failed(kind); 488 Poll::Ready(Err(io::Error::new(io::ErrorKind::from(kind), err))) 489 } 490 Poll::Pending => Poll::Pending, 491 } 492 } 493 } 494 } 495 consume(&mut self, amt: usize)496 fn consume(&mut self, amt: usize) { 497 if amt == 0 { 498 return; 499 } 500 501 if let State::HasData { .. } = self.state { 502 let has_data = mem::replace(&mut self.state, State::Transitioning); 503 if let State::HasData { 504 buffer, 505 valid: (i, j), 506 } = has_data 507 { 508 let available = j - i; 509 if amt > available { 510 panic!( 511 "Cannot consume {} bytes as only {} are available", 512 amt, available 513 ) 514 } 515 let remaining = available - amt; 516 if remaining == 0 { 517 return self.set_waiting(buffer); 518 } else { 519 return self.set_has_data(buffer, (i + amt, j)); 520 } 521 } 522 } 523 524 panic!("Invalid state") 525 } 526 } 527 528 #[derive(thiserror::Error, Debug)] 529 enum BufReadError { 530 #[error("Previous read operation failed")] 531 Failed, 532 } 533 534 impl<T: IsA<InputStream>> AsyncRead for InputStreamAsyncBufRead<T> { poll_read( self: Pin<&mut Self>, cx: &mut Context, out_buf: &mut [u8], ) -> Poll<io::Result<usize>>535 fn poll_read( 536 self: Pin<&mut Self>, 537 cx: &mut Context, 538 out_buf: &mut [u8], 539 ) -> Poll<io::Result<usize>> { 540 let reader = self.get_mut(); 541 let poll = reader.poll_fill_buf(cx); 542 543 let poll = poll.map_ok(|buffer| { 544 let copied = buffer.len().min(out_buf.len()); 545 out_buf[..copied].copy_from_slice(&buffer[..copied]); 546 copied 547 }); 548 549 if let Poll::Ready(Ok(consumed)) = poll { 550 reader.consume(consumed); 551 } 552 poll 553 } 554 } 555 556 impl<T: IsA<InputStream>> AsyncBufRead for InputStreamAsyncBufRead<T> { poll_fill_buf( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Result<&[u8], futures_io::Error>>557 fn poll_fill_buf( 558 self: Pin<&mut Self>, 559 cx: &mut Context, 560 ) -> Poll<Result<&[u8], futures_io::Error>> { 561 self.get_mut().poll_fill_buf(cx) 562 } 563 consume(self: Pin<&mut Self>, amt: usize)564 fn consume(self: Pin<&mut Self>, amt: usize) { 565 self.get_mut().consume(amt); 566 } 567 } 568 569 impl<T: IsA<InputStream>> Unpin for InputStreamAsyncBufRead<T> {} 570 571 #[cfg(test)] 572 mod tests { 573 use crate::prelude::*; 574 use crate::test_util::run_async; 575 use crate::MemoryInputStream; 576 use glib::Bytes; 577 use std::io::Read; 578 579 #[test] read_all_async()580 fn read_all_async() { 581 let ret = run_async(|tx, l| { 582 let b = Bytes::from_owned(vec![1, 2, 3]); 583 let strm = MemoryInputStream::from_bytes(&b); 584 585 let buf = vec![0; 10]; 586 strm.read_all_async( 587 buf, 588 glib::PRIORITY_DEFAULT_IDLE, 589 crate::NONE_CANCELLABLE, 590 move |ret| { 591 tx.send(ret).unwrap(); 592 l.quit(); 593 }, 594 ); 595 }); 596 597 let (buf, count, err) = ret.unwrap(); 598 assert_eq!(count, 3); 599 assert!(err.is_none()); 600 assert_eq!(buf[0], 1); 601 assert_eq!(buf[1], 2); 602 assert_eq!(buf[2], 3); 603 } 604 605 #[test] read_all()606 fn read_all() { 607 let b = Bytes::from_owned(vec![1, 2, 3]); 608 let strm = MemoryInputStream::from_bytes(&b); 609 let mut buf = vec![0; 10]; 610 611 let ret = strm.read_all(&mut buf, crate::NONE_CANCELLABLE).unwrap(); 612 613 assert_eq!(ret.0, 3); 614 assert!(ret.1.is_none()); 615 assert_eq!(buf[0], 1); 616 assert_eq!(buf[1], 2); 617 assert_eq!(buf[2], 3); 618 } 619 620 #[test] read()621 fn read() { 622 let b = Bytes::from_owned(vec![1, 2, 3]); 623 let strm = MemoryInputStream::from_bytes(&b); 624 let mut buf = vec![0; 10]; 625 626 let ret = strm.read(&mut buf, crate::NONE_CANCELLABLE); 627 628 assert_eq!(ret.unwrap(), 3); 629 assert_eq!(buf[0], 1); 630 assert_eq!(buf[1], 2); 631 assert_eq!(buf[2], 3); 632 } 633 634 #[test] read_async()635 fn read_async() { 636 let ret = run_async(|tx, l| { 637 let b = Bytes::from_owned(vec![1, 2, 3]); 638 let strm = MemoryInputStream::from_bytes(&b); 639 640 let buf = vec![0; 10]; 641 strm.read_async( 642 buf, 643 glib::PRIORITY_DEFAULT_IDLE, 644 crate::NONE_CANCELLABLE, 645 move |ret| { 646 tx.send(ret).unwrap(); 647 l.quit(); 648 }, 649 ); 650 }); 651 652 let (buf, count) = ret.unwrap(); 653 assert_eq!(count, 3); 654 assert_eq!(buf[0], 1); 655 assert_eq!(buf[1], 2); 656 assert_eq!(buf[2], 3); 657 } 658 659 #[test] read_bytes_async()660 fn read_bytes_async() { 661 let ret = run_async(|tx, l| { 662 let b = Bytes::from_owned(vec![1, 2, 3]); 663 let strm = MemoryInputStream::from_bytes(&b); 664 665 strm.read_bytes_async( 666 10, 667 glib::PRIORITY_DEFAULT_IDLE, 668 crate::NONE_CANCELLABLE, 669 move |ret| { 670 tx.send(ret).unwrap(); 671 l.quit(); 672 }, 673 ); 674 }); 675 676 let bytes = ret.unwrap(); 677 assert_eq!(bytes, vec![1, 2, 3]); 678 } 679 680 #[test] skip_async()681 fn skip_async() { 682 let ret = run_async(|tx, l| { 683 let b = Bytes::from_owned(vec![1, 2, 3]); 684 let strm = MemoryInputStream::from_bytes(&b); 685 686 strm.skip_async( 687 10, 688 glib::PRIORITY_DEFAULT_IDLE, 689 crate::NONE_CANCELLABLE, 690 move |ret| { 691 tx.send(ret).unwrap(); 692 l.quit(); 693 }, 694 ); 695 }); 696 697 let skipped = ret.unwrap(); 698 assert_eq!(skipped, 3); 699 } 700 701 #[test] std_io_read()702 fn std_io_read() { 703 let b = Bytes::from_owned(vec![1, 2, 3]); 704 let mut read = MemoryInputStream::from_bytes(&b).into_read(); 705 let mut buf = [0u8; 10]; 706 707 let ret = read.read(&mut buf); 708 709 assert_eq!(ret.unwrap(), 3); 710 assert_eq!(buf[0], 1); 711 assert_eq!(buf[1], 2); 712 assert_eq!(buf[2], 3); 713 } 714 715 #[test] into_input_stream()716 fn into_input_stream() { 717 let b = Bytes::from_owned(vec![1, 2, 3]); 718 let stream = MemoryInputStream::from_bytes(&b); 719 let stream_clone = stream.clone(); 720 let stream = stream.into_read().into_input_stream(); 721 722 assert_eq!(stream, stream_clone); 723 } 724 } 725