1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 //! Types used to send and receive bytes over an I/O channel. 19 //! 20 //! The core types are the `TReadTransport`, `TWriteTransport` and the 21 //! `TIoChannel` traits, through which `TInputProtocol` or 22 //! `TOutputProtocol` can receive and send primitives over the wire. While 23 //! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives 24 //! the types in this module understand only bytes. 25 26 use std::io; 27 use std::io::{Read, Write}; 28 use std::ops::{Deref, DerefMut}; 29 30 #[cfg(test)] 31 macro_rules! assert_eq_transport_num_written_bytes { 32 ($transport:ident, $num_written_bytes:expr) => {{ 33 assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes); 34 }}; 35 } 36 37 #[cfg(test)] 38 macro_rules! assert_eq_transport_written_bytes { 39 ($transport:ident, $expected_bytes:ident) => {{ 40 assert_eq!($transport.channel.write_bytes(), &$expected_bytes); 41 }}; 42 } 43 44 mod buffered; 45 mod framed; 46 mod mem; 47 mod socket; 48 49 pub use self::buffered::{ 50 TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport, 51 TBufferedWriteTransportFactory, 52 }; 53 pub use self::framed::{ 54 TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, 55 TFramedWriteTransportFactory, 56 }; 57 pub use self::mem::TBufferChannel; 58 pub use self::socket::TTcpChannel; 59 60 /// Identifies a transport used by a `TInputProtocol` to receive bytes. 61 pub trait TReadTransport: Read {} 62 63 /// Helper type used by a server to create `TReadTransport` instances for 64 /// accepted client connections. 65 pub trait TReadTransportFactory { 66 /// Create a `TTransport` that wraps a channel over which bytes are to be read. create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>67 fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>; 68 } 69 70 /// Identifies a transport used by `TOutputProtocol` to send bytes. 71 pub trait TWriteTransport: Write {} 72 73 /// Helper type used by a server to create `TWriteTransport` instances for 74 /// accepted client connections. 75 pub trait TWriteTransportFactory { 76 /// Create a `TTransport` that wraps a channel over which bytes are to be sent. create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>77 fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>; 78 } 79 80 impl<T> TReadTransport for T where T: Read {} 81 82 impl<T> TWriteTransport for T where T: Write {} 83 84 // FIXME: implement the Debug trait for boxed transports 85 86 impl<T> TReadTransportFactory for Box<T> 87 where 88 T: TReadTransportFactory + ?Sized, 89 { create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>90 fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> { 91 (**self).create(channel) 92 } 93 } 94 95 impl<T> TWriteTransportFactory for Box<T> 96 where 97 T: TWriteTransportFactory + ?Sized, 98 { create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>99 fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> { 100 (**self).create(channel) 101 } 102 } 103 104 /// Identifies a splittable bidirectional I/O channel used to send and receive bytes. 105 pub trait TIoChannel: Read + Write { 106 /// Split the channel into a readable half and a writable half, where the 107 /// readable half implements `io::Read` and the writable half implements 108 /// `io::Write`. Returns `None` if the channel was not initialized, or if it 109 /// cannot be split safely. 110 /// 111 /// Returned halves may share the underlying OS channel or buffer resources. 112 /// Implementations **should ensure** that these two halves can be safely 113 /// used independently by concurrent threads. split(self) -> crate::Result<(crate::transport::ReadHalf<Self>, crate::transport::WriteHalf<Self>)> where Self: Sized114 fn split(self) -> crate::Result<(crate::transport::ReadHalf<Self>, crate::transport::WriteHalf<Self>)> 115 where 116 Self: Sized; 117 } 118 119 /// The readable half of an object returned from `TIoChannel::split`. 120 #[derive(Debug)] 121 pub struct ReadHalf<C> 122 where 123 C: Read, 124 { 125 handle: C, 126 } 127 128 /// The writable half of an object returned from `TIoChannel::split`. 129 #[derive(Debug)] 130 pub struct WriteHalf<C> 131 where 132 C: Write, 133 { 134 handle: C, 135 } 136 137 impl<C> ReadHalf<C> 138 where 139 C: Read, 140 { 141 /// Create a `ReadHalf` associated with readable `handle` new(handle: C) -> ReadHalf<C>142 pub fn new(handle: C) -> ReadHalf<C> { 143 ReadHalf { handle } 144 } 145 } 146 147 impl<C> WriteHalf<C> 148 where 149 C: Write, 150 { 151 /// Create a `WriteHalf` associated with writable `handle` new(handle: C) -> WriteHalf<C>152 pub fn new(handle: C) -> WriteHalf<C> { 153 WriteHalf { handle } 154 } 155 } 156 157 impl<C> Read for ReadHalf<C> 158 where 159 C: Read, 160 { read(&mut self, buf: &mut [u8]) -> io::Result<usize>161 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 162 self.handle.read(buf) 163 } 164 } 165 166 impl<C> Write for WriteHalf<C> 167 where 168 C: Write, 169 { write(&mut self, buf: &[u8]) -> io::Result<usize>170 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 171 self.handle.write(buf) 172 } 173 flush(&mut self) -> io::Result<()>174 fn flush(&mut self) -> io::Result<()> { 175 self.handle.flush() 176 } 177 } 178 179 impl<C> Deref for ReadHalf<C> 180 where 181 C: Read, 182 { 183 type Target = C; 184 deref(&self) -> &Self::Target185 fn deref(&self) -> &Self::Target { 186 &self.handle 187 } 188 } 189 190 impl<C> DerefMut for ReadHalf<C> 191 where 192 C: Read, 193 { deref_mut(&mut self) -> &mut C194 fn deref_mut(&mut self) -> &mut C { 195 &mut self.handle 196 } 197 } 198 199 impl<C> Deref for WriteHalf<C> 200 where 201 C: Write, 202 { 203 type Target = C; 204 deref(&self) -> &Self::Target205 fn deref(&self) -> &Self::Target { 206 &self.handle 207 } 208 } 209 210 impl<C> DerefMut for WriteHalf<C> 211 where 212 C: Write, 213 { deref_mut(&mut self) -> &mut C214 fn deref_mut(&mut self) -> &mut C { 215 &mut self.handle 216 } 217 } 218 219 #[cfg(test)] 220 mod tests { 221 222 use std::io::Cursor; 223 224 use super::*; 225 226 #[test] must_create_usable_read_channel_from_concrete_read_type()227 fn must_create_usable_read_channel_from_concrete_read_type() { 228 let r = Cursor::new([0, 1, 2]); 229 let _ = TBufferedReadTransport::new(r); 230 } 231 232 #[test] must_create_usable_read_channel_from_boxed_read()233 fn must_create_usable_read_channel_from_boxed_read() { 234 let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2])); 235 let _ = TBufferedReadTransport::new(r); 236 } 237 238 #[test] must_create_usable_write_channel_from_concrete_write_type()239 fn must_create_usable_write_channel_from_concrete_write_type() { 240 let w = vec![0u8; 10]; 241 let _ = TBufferedWriteTransport::new(w); 242 } 243 244 #[test] must_create_usable_write_channel_from_boxed_write()245 fn must_create_usable_write_channel_from_boxed_write() { 246 let w: Box<dyn Write> = Box::new(vec![0u8; 10]); 247 let _ = TBufferedWriteTransport::new(w); 248 } 249 250 #[test] must_create_usable_read_transport_from_concrete_read_transport()251 fn must_create_usable_read_transport_from_concrete_read_transport() { 252 let r = Cursor::new([0, 1, 2]); 253 let mut t = TBufferedReadTransport::new(r); 254 takes_read_transport(&mut t) 255 } 256 257 #[test] must_create_usable_read_transport_from_boxed_read()258 fn must_create_usable_read_transport_from_boxed_read() { 259 let r = Cursor::new([0, 1, 2]); 260 let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r)); 261 takes_read_transport(&mut t) 262 } 263 264 #[test] must_create_usable_write_transport_from_concrete_write_transport()265 fn must_create_usable_write_transport_from_concrete_write_transport() { 266 let w = vec![0u8; 10]; 267 let mut t = TBufferedWriteTransport::new(w); 268 takes_write_transport(&mut t) 269 } 270 271 #[test] must_create_usable_write_transport_from_boxed_write()272 fn must_create_usable_write_transport_from_boxed_write() { 273 let w = vec![0u8; 10]; 274 let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w)); 275 takes_write_transport(&mut t) 276 } 277 takes_read_transport<R>(t: &mut R) where R: TReadTransport,278 fn takes_read_transport<R>(t: &mut R) 279 where 280 R: TReadTransport, 281 { 282 t.bytes(); 283 } 284 takes_write_transport<W>(t: &mut W) where W: TWriteTransport,285 fn takes_write_transport<W>(t: &mut W) 286 where 287 W: TWriteTransport, 288 { 289 t.flush().unwrap(); 290 } 291 } 292