1 // Take a look at the license at the top of the repository in the LICENSE file. 2 3 use crate::prelude::*; 4 use crate::IOStream; 5 use crate::InputStreamAsyncRead; 6 use crate::OutputStreamAsyncWrite; 7 use crate::PollableInputStream; 8 use crate::PollableOutputStream; 9 use futures_core::task::{Context, Poll}; 10 use futures_io::{AsyncRead, AsyncWrite}; 11 use glib::object::{Cast, IsA}; 12 use std::io; 13 use std::pin::Pin; 14 15 pub trait IOStreamExtManual: Sized + IsA<IOStream> { into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self>16 fn into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self> { 17 let write = self 18 .output_stream() 19 .dynamic_cast::<PollableOutputStream>() 20 .ok() 21 .and_then(|s| s.into_async_write().ok()); 22 23 let read = self 24 .input_stream() 25 .dynamic_cast::<PollableInputStream>() 26 .ok() 27 .and_then(|s| s.into_async_read().ok()); 28 29 let (read, write) = match (read, write) { 30 (Some(read), Some(write)) => (read, write), 31 _ => return Err(self), 32 }; 33 34 Ok(IOStreamAsyncReadWrite { 35 io_stream: self, 36 read, 37 write, 38 }) 39 } 40 } 41 42 impl<O: IsA<IOStream>> IOStreamExtManual for O {} 43 44 #[derive(Debug)] 45 pub struct IOStreamAsyncReadWrite<T> { 46 io_stream: T, 47 read: InputStreamAsyncRead<PollableInputStream>, 48 write: OutputStreamAsyncWrite<PollableOutputStream>, 49 } 50 51 impl<T: IsA<IOStream>> IOStreamAsyncReadWrite<T> { input_stream(&self) -> &PollableInputStream52 pub fn input_stream(&self) -> &PollableInputStream { 53 self.read.input_stream() 54 } 55 output_stream(&self) -> &PollableOutputStream56 pub fn output_stream(&self) -> &PollableOutputStream { 57 self.write.output_stream() 58 } 59 into_io_stream(self) -> T60 pub fn into_io_stream(self) -> T { 61 self.io_stream 62 } 63 io_stream(&self) -> &T64 pub fn io_stream(&self) -> &T { 65 &self.io_stream 66 } 67 } 68 69 impl<T: IsA<IOStream> + std::marker::Unpin> AsyncRead for IOStreamAsyncReadWrite<T> { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, io::Error>>70 fn poll_read( 71 self: Pin<&mut Self>, 72 cx: &mut Context<'_>, 73 buf: &mut [u8], 74 ) -> Poll<Result<usize, io::Error>> { 75 Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf) 76 } 77 } 78 79 impl<T: IsA<IOStream> + std::marker::Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>80 fn poll_write( 81 self: Pin<&mut Self>, 82 cx: &mut Context<'_>, 83 buf: &[u8], 84 ) -> Poll<Result<usize, io::Error>> { 85 Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf) 86 } 87 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>88 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 89 Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx) 90 } 91 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>92 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 93 Pin::new(&mut Pin::get_mut(self).write).poll_close(cx) 94 } 95 } 96