1 use crate::session::Session; 2 use std::io::{IoSlice, Read, Result, Write}; 3 4 /// This type implements `io::Read` and `io::Write`, encapsulating 5 /// a Session `S` and an underlying transport `T`, such as a socket. 6 /// 7 /// This allows you to use a rustls Session like a normal stream. 8 pub struct Stream<'a, S: 'a + Session + ?Sized, T: 'a + Read + Write + ?Sized> { 9 /// Our session 10 pub sess: &'a mut S, 11 12 /// The underlying transport, like a socket 13 pub sock: &'a mut T, 14 } 15 16 impl<'a, S, T> Stream<'a, S, T> 17 where 18 S: 'a + Session, 19 T: 'a + Read + Write, 20 { 21 /// Make a new Stream using the Session `sess` and socket-like object 22 /// `sock`. This does not fail and does no IO. new(sess: &'a mut S, sock: &'a mut T) -> Stream<'a, S, T>23 pub fn new(sess: &'a mut S, sock: &'a mut T) -> Stream<'a, S, T> { 24 Stream { sess, sock } 25 } 26 27 /// If we're handshaking, complete all the IO for that. 28 /// If we have data to write, write it all. complete_prior_io(&mut self) -> Result<()>29 fn complete_prior_io(&mut self) -> Result<()> { 30 if self.sess.is_handshaking() { 31 self.sess.complete_io(self.sock)?; 32 } 33 34 if self.sess.wants_write() { 35 self.sess.complete_io(self.sock)?; 36 } 37 38 Ok(()) 39 } 40 } 41 42 impl<'a, S, T> Read for Stream<'a, S, T> 43 where 44 S: 'a + Session, 45 T: 'a + Read + Write, 46 { read(&mut self, buf: &mut [u8]) -> Result<usize>47 fn read(&mut self, buf: &mut [u8]) -> Result<usize> { 48 self.complete_prior_io()?; 49 50 // We call complete_io() in a loop since a single call may read only 51 // a partial packet from the underlying transport. A full packet is 52 // needed to get more plaintext, which we must do if EOF has not been 53 // hit. Otherwise, we will prematurely signal EOF by returning 0. We 54 // determine if EOF has actually been hit by checking if 0 bytes were 55 // read from the underlying transport. 56 while self.sess.wants_read() && self.sess.complete_io(self.sock)?.0 != 0 {} 57 58 self.sess.read(buf) 59 } 60 } 61 62 impl<'a, S, T> Write for Stream<'a, S, T> 63 where 64 S: 'a + Session, 65 T: 'a + Read + Write, 66 { write(&mut self, buf: &[u8]) -> Result<usize>67 fn write(&mut self, buf: &[u8]) -> Result<usize> { 68 self.complete_prior_io()?; 69 70 let len = self.sess.write(buf)?; 71 72 // Try to write the underlying transport here, but don't let 73 // any errors mask the fact we've consumed `len` bytes. 74 // Callers will learn of permanent errors on the next call. 75 let _ = self.sess.complete_io(self.sock); 76 77 Ok(len) 78 } 79 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize>80 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> { 81 self.complete_prior_io()?; 82 83 let len = self.sess.write_vectored(bufs)?; 84 85 // Try to write the underlying transport here, but don't let 86 // any errors mask the fact we've consumed `len` bytes. 87 // Callers will learn of permanent errors on the next call. 88 let _ = self.sess.complete_io(self.sock); 89 90 Ok(len) 91 } 92 flush(&mut self) -> Result<()>93 fn flush(&mut self) -> Result<()> { 94 self.complete_prior_io()?; 95 96 self.sess.flush()?; 97 if self.sess.wants_write() { 98 self.sess.complete_io(self.sock)?; 99 } 100 Ok(()) 101 } 102 } 103 104 /// This type implements `io::Read` and `io::Write`, encapsulating 105 /// and owning a Session `S` and an underlying blocking transport 106 /// `T`, such as a socket. 107 /// 108 /// This allows you to use a rustls Session like a normal stream. 109 pub struct StreamOwned<S: Session + Sized, T: Read + Write + Sized> { 110 /// Our session 111 pub sess: S, 112 113 /// The underlying transport, like a socket 114 pub sock: T, 115 } 116 117 impl<S, T> StreamOwned<S, T> 118 where 119 S: Session, 120 T: Read + Write, 121 { 122 /// Make a new StreamOwned taking the Session `sess` and socket-like 123 /// object `sock`. This does not fail and does no IO. 124 /// 125 /// This is the same as `Stream::new` except `sess` and `sock` are 126 /// moved into the StreamOwned. new(sess: S, sock: T) -> StreamOwned<S, T>127 pub fn new(sess: S, sock: T) -> StreamOwned<S, T> { 128 StreamOwned { sess, sock } 129 } 130 131 /// Get a reference to the underlying socket get_ref(&self) -> &T132 pub fn get_ref(&self) -> &T { 133 &self.sock 134 } 135 136 /// Get a mutable reference to the underlying socket get_mut(&mut self) -> &mut T137 pub fn get_mut(&mut self) -> &mut T { 138 &mut self.sock 139 } 140 } 141 142 impl<'a, S, T> StreamOwned<S, T> 143 where 144 S: Session, 145 T: Read + Write, 146 { as_stream(&'a mut self) -> Stream<'a, S, T>147 fn as_stream(&'a mut self) -> Stream<'a, S, T> { 148 Stream { 149 sess: &mut self.sess, 150 sock: &mut self.sock, 151 } 152 } 153 } 154 155 impl<S, T> Read for StreamOwned<S, T> 156 where 157 S: Session, 158 T: Read + Write, 159 { read(&mut self, buf: &mut [u8]) -> Result<usize>160 fn read(&mut self, buf: &mut [u8]) -> Result<usize> { 161 self.as_stream().read(buf) 162 } 163 } 164 165 impl<S, T> Write for StreamOwned<S, T> 166 where 167 S: Session, 168 T: Read + Write, 169 { write(&mut self, buf: &[u8]) -> Result<usize>170 fn write(&mut self, buf: &[u8]) -> Result<usize> { 171 self.as_stream().write(buf) 172 } 173 flush(&mut self) -> Result<()>174 fn flush(&mut self) -> Result<()> { 175 self.as_stream().flush() 176 } 177 } 178 179 #[cfg(test)] 180 mod tests { 181 use super::{Stream, StreamOwned}; 182 use crate::client::ClientSession; 183 use crate::server::ServerSession; 184 use crate::session::Session; 185 use std::net::TcpStream; 186 187 #[test] stream_can_be_created_for_session_and_tcpstream()188 fn stream_can_be_created_for_session_and_tcpstream() { 189 type _Test<'a> = Stream<'a, dyn Session, TcpStream>; 190 } 191 192 #[test] streamowned_can_be_created_for_client_and_tcpstream()193 fn streamowned_can_be_created_for_client_and_tcpstream() { 194 type _Test = StreamOwned<ClientSession, TcpStream>; 195 } 196 197 #[test] streamowned_can_be_created_for_server_and_tcpstream()198 fn streamowned_can_be_created_for_server_and_tcpstream() { 199 type _Test = StreamOwned<ServerSession, TcpStream>; 200 } 201 } 202