1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use crate::prelude::*;
4 use crate::IOStream;
5 use crate::InputStreamAsyncRead;
6 use crate::OutputStreamAsyncWrite;
7 use crate::PollableInputStream;
8 use crate::PollableOutputStream;
9 use futures_core::task::{Context, Poll};
10 use futures_io::{AsyncRead, AsyncWrite};
11 use glib::object::{Cast, IsA};
12 use std::io;
13 use std::pin::Pin;
14 
15 pub trait IOStreamExtManual: Sized + IsA<IOStream> {
into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self>16     fn into_async_read_write(self) -> Result<IOStreamAsyncReadWrite<Self>, Self> {
17         let write = self
18             .output_stream()
19             .dynamic_cast::<PollableOutputStream>()
20             .ok()
21             .and_then(|s| s.into_async_write().ok());
22 
23         let read = self
24             .input_stream()
25             .dynamic_cast::<PollableInputStream>()
26             .ok()
27             .and_then(|s| s.into_async_read().ok());
28 
29         let (read, write) = match (read, write) {
30             (Some(read), Some(write)) => (read, write),
31             _ => return Err(self),
32         };
33 
34         Ok(IOStreamAsyncReadWrite {
35             io_stream: self,
36             read,
37             write,
38         })
39     }
40 }
41 
42 impl<O: IsA<IOStream>> IOStreamExtManual for O {}
43 
44 #[derive(Debug)]
45 pub struct IOStreamAsyncReadWrite<T> {
46     io_stream: T,
47     read: InputStreamAsyncRead<PollableInputStream>,
48     write: OutputStreamAsyncWrite<PollableOutputStream>,
49 }
50 
51 impl<T: IsA<IOStream>> IOStreamAsyncReadWrite<T> {
input_stream(&self) -> &PollableInputStream52     pub fn input_stream(&self) -> &PollableInputStream {
53         self.read.input_stream()
54     }
55 
output_stream(&self) -> &PollableOutputStream56     pub fn output_stream(&self) -> &PollableOutputStream {
57         self.write.output_stream()
58     }
59 
into_io_stream(self) -> T60     pub fn into_io_stream(self) -> T {
61         self.io_stream
62     }
63 
io_stream(&self) -> &T64     pub fn io_stream(&self) -> &T {
65         &self.io_stream
66     }
67 }
68 
69 impl<T: IsA<IOStream> + std::marker::Unpin> AsyncRead for IOStreamAsyncReadWrite<T> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize, io::Error>>70     fn poll_read(
71         self: Pin<&mut Self>,
72         cx: &mut Context<'_>,
73         buf: &mut [u8],
74     ) -> Poll<Result<usize, io::Error>> {
75         Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
76     }
77 }
78 
79 impl<T: IsA<IOStream> + std::marker::Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>80     fn poll_write(
81         self: Pin<&mut Self>,
82         cx: &mut Context<'_>,
83         buf: &[u8],
84     ) -> Poll<Result<usize, io::Error>> {
85         Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
86     }
87 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>88     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
89         Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
90     }
91 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>92     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
93         Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
94     }
95 }
96