1 use crate::session::Session;
2 use std::io::{IoSlice, Read, Result, Write};
3 
4 /// This type implements `io::Read` and `io::Write`, encapsulating
5 /// a Session `S` and an underlying transport `T`, such as a socket.
6 ///
7 /// This allows you to use a rustls Session like a normal stream.
8 pub struct Stream<'a, S: 'a + Session + ?Sized, T: 'a + Read + Write + ?Sized> {
9     /// Our session
10     pub sess: &'a mut S,
11 
12     /// The underlying transport, like a socket
13     pub sock: &'a mut T,
14 }
15 
16 impl<'a, S, T> Stream<'a, S, T>
17 where
18     S: 'a + Session,
19     T: 'a + Read + Write,
20 {
21     /// Make a new Stream using the Session `sess` and socket-like object
22     /// `sock`.  This does not fail and does no IO.
new(sess: &'a mut S, sock: &'a mut T) -> Stream<'a, S, T>23     pub fn new(sess: &'a mut S, sock: &'a mut T) -> Stream<'a, S, T> {
24         Stream { sess, sock }
25     }
26 
27     /// If we're handshaking, complete all the IO for that.
28     /// If we have data to write, write it all.
complete_prior_io(&mut self) -> Result<()>29     fn complete_prior_io(&mut self) -> Result<()> {
30         if self.sess.is_handshaking() {
31             self.sess.complete_io(self.sock)?;
32         }
33 
34         if self.sess.wants_write() {
35             self.sess.complete_io(self.sock)?;
36         }
37 
38         Ok(())
39     }
40 }
41 
42 impl<'a, S, T> Read for Stream<'a, S, T>
43 where
44     S: 'a + Session,
45     T: 'a + Read + Write,
46 {
read(&mut self, buf: &mut [u8]) -> Result<usize>47     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
48         self.complete_prior_io()?;
49 
50         // We call complete_io() in a loop since a single call may read only
51         // a partial packet from the underlying transport. A full packet is
52         // needed to get more plaintext, which we must do if EOF has not been
53         // hit. Otherwise, we will prematurely signal EOF by returning 0. We
54         // determine if EOF has actually been hit by checking if 0 bytes were
55         // read from the underlying transport.
56         while self.sess.wants_read() && self.sess.complete_io(self.sock)?.0 != 0 {}
57 
58         self.sess.read(buf)
59     }
60 }
61 
62 impl<'a, S, T> Write for Stream<'a, S, T>
63 where
64     S: 'a + Session,
65     T: 'a + Read + Write,
66 {
write(&mut self, buf: &[u8]) -> Result<usize>67     fn write(&mut self, buf: &[u8]) -> Result<usize> {
68         self.complete_prior_io()?;
69 
70         let len = self.sess.write(buf)?;
71 
72         // Try to write the underlying transport here, but don't let
73         // any errors mask the fact we've consumed `len` bytes.
74         // Callers will learn of permanent errors on the next call.
75         let _ = self.sess.complete_io(self.sock);
76 
77         Ok(len)
78     }
79 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize>80     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
81         self.complete_prior_io()?;
82 
83         let len = self.sess.write_vectored(bufs)?;
84 
85         // Try to write the underlying transport here, but don't let
86         // any errors mask the fact we've consumed `len` bytes.
87         // Callers will learn of permanent errors on the next call.
88         let _ = self.sess.complete_io(self.sock);
89 
90         Ok(len)
91     }
92 
flush(&mut self) -> Result<()>93     fn flush(&mut self) -> Result<()> {
94         self.complete_prior_io()?;
95 
96         self.sess.flush()?;
97         if self.sess.wants_write() {
98             self.sess.complete_io(self.sock)?;
99         }
100         Ok(())
101     }
102 }
103 
104 /// This type implements `io::Read` and `io::Write`, encapsulating
105 /// and owning a Session `S` and an underlying blocking transport
106 /// `T`, such as a socket.
107 ///
108 /// This allows you to use a rustls Session like a normal stream.
109 pub struct StreamOwned<S: Session + Sized, T: Read + Write + Sized> {
110     /// Our session
111     pub sess: S,
112 
113     /// The underlying transport, like a socket
114     pub sock: T,
115 }
116 
117 impl<S, T> StreamOwned<S, T>
118 where
119     S: Session,
120     T: Read + Write,
121 {
122     /// Make a new StreamOwned taking the Session `sess` and socket-like
123     /// object `sock`.  This does not fail and does no IO.
124     ///
125     /// This is the same as `Stream::new` except `sess` and `sock` are
126     /// moved into the StreamOwned.
new(sess: S, sock: T) -> StreamOwned<S, T>127     pub fn new(sess: S, sock: T) -> StreamOwned<S, T> {
128         StreamOwned { sess, sock }
129     }
130 
131     /// Get a reference to the underlying socket
get_ref(&self) -> &T132     pub fn get_ref(&self) -> &T {
133         &self.sock
134     }
135 
136     /// Get a mutable reference to the underlying socket
get_mut(&mut self) -> &mut T137     pub fn get_mut(&mut self) -> &mut T {
138         &mut self.sock
139     }
140 }
141 
142 impl<'a, S, T> StreamOwned<S, T>
143 where
144     S: Session,
145     T: Read + Write,
146 {
as_stream(&'a mut self) -> Stream<'a, S, T>147     fn as_stream(&'a mut self) -> Stream<'a, S, T> {
148         Stream {
149             sess: &mut self.sess,
150             sock: &mut self.sock,
151         }
152     }
153 }
154 
155 impl<S, T> Read for StreamOwned<S, T>
156 where
157     S: Session,
158     T: Read + Write,
159 {
read(&mut self, buf: &mut [u8]) -> Result<usize>160     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
161         self.as_stream().read(buf)
162     }
163 }
164 
165 impl<S, T> Write for StreamOwned<S, T>
166 where
167     S: Session,
168     T: Read + Write,
169 {
write(&mut self, buf: &[u8]) -> Result<usize>170     fn write(&mut self, buf: &[u8]) -> Result<usize> {
171         self.as_stream().write(buf)
172     }
173 
flush(&mut self) -> Result<()>174     fn flush(&mut self) -> Result<()> {
175         self.as_stream().flush()
176     }
177 }
178 
179 #[cfg(test)]
180 mod tests {
181     use super::{Stream, StreamOwned};
182     use crate::client::ClientSession;
183     use crate::server::ServerSession;
184     use crate::session::Session;
185     use std::net::TcpStream;
186 
187     #[test]
stream_can_be_created_for_session_and_tcpstream()188     fn stream_can_be_created_for_session_and_tcpstream() {
189         type _Test<'a> = Stream<'a, dyn Session, TcpStream>;
190     }
191 
192     #[test]
streamowned_can_be_created_for_client_and_tcpstream()193     fn streamowned_can_be_created_for_client_and_tcpstream() {
194         type _Test = StreamOwned<ClientSession, TcpStream>;
195     }
196 
197     #[test]
streamowned_can_be_created_for_server_and_tcpstream()198     fn streamowned_can_be_created_for_server_and_tcpstream() {
199         type _Test = StreamOwned<ServerSession, TcpStream>;
200     }
201 }
202