1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! Types used to send and receive bytes over an I/O channel.
19 //!
20 //! The core types are the `TReadTransport`, `TWriteTransport` and the
21 //! `TIoChannel` traits, through which `TInputProtocol` or
22 //! `TOutputProtocol` can receive and send primitives over the wire. While
23 //! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
24 //! the types in this module understand only bytes.
25 
26 use std::io;
27 use std::io::{Read, Write};
28 use std::ops::{Deref, DerefMut};
29 
30 #[cfg(test)]
31 macro_rules! assert_eq_transport_num_written_bytes {
32     ($transport:ident, $num_written_bytes:expr) => {{
33         assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
34     }};
35 }
36 
37 #[cfg(test)]
38 macro_rules! assert_eq_transport_written_bytes {
39     ($transport:ident, $expected_bytes:ident) => {{
40         assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
41     }};
42 }
43 
44 mod buffered;
45 mod framed;
46 mod mem;
47 mod socket;
48 
49 pub use self::buffered::{
50     TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport,
51     TBufferedWriteTransportFactory,
52 };
53 pub use self::framed::{
54     TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
55     TFramedWriteTransportFactory,
56 };
57 pub use self::mem::TBufferChannel;
58 pub use self::socket::TTcpChannel;
59 
60 /// Identifies a transport used by a `TInputProtocol` to receive bytes.
61 pub trait TReadTransport: Read {}
62 
63 /// Helper type used by a server to create `TReadTransport` instances for
64 /// accepted client connections.
65 pub trait TReadTransportFactory {
66     /// Create a `TTransport` that wraps a channel over which bytes are to be read.
create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>67     fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>;
68 }
69 
70 /// Identifies a transport used by `TOutputProtocol` to send bytes.
71 pub trait TWriteTransport: Write {}
72 
73 /// Helper type used by a server to create `TWriteTransport` instances for
74 /// accepted client connections.
75 pub trait TWriteTransportFactory {
76     /// Create a `TTransport` that wraps a channel over which bytes are to be sent.
create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>77     fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>;
78 }
79 
80 impl<T> TReadTransport for T where T: Read {}
81 
82 impl<T> TWriteTransport for T where T: Write {}
83 
84 // FIXME: implement the Debug trait for boxed transports
85 
86 impl<T> TReadTransportFactory for Box<T>
87 where
88     T: TReadTransportFactory + ?Sized,
89 {
create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>90     fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> {
91         (**self).create(channel)
92     }
93 }
94 
95 impl<T> TWriteTransportFactory for Box<T>
96 where
97     T: TWriteTransportFactory + ?Sized,
98 {
create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>99     fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> {
100         (**self).create(channel)
101     }
102 }
103 
104 /// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
105 pub trait TIoChannel: Read + Write {
106     /// Split the channel into a readable half and a writable half, where the
107     /// readable half implements `io::Read` and the writable half implements
108     /// `io::Write`. Returns `None` if the channel was not initialized, or if it
109     /// cannot be split safely.
110     ///
111     /// Returned halves may share the underlying OS channel or buffer resources.
112     /// Implementations **should ensure** that these two halves can be safely
113     /// used independently by concurrent threads.
split(self) -> crate::Result<(crate::transport::ReadHalf<Self>, crate::transport::WriteHalf<Self>)> where Self: Sized114     fn split(self) -> crate::Result<(crate::transport::ReadHalf<Self>, crate::transport::WriteHalf<Self>)>
115     where
116         Self: Sized;
117 }
118 
119 /// The readable half of an object returned from `TIoChannel::split`.
120 #[derive(Debug)]
121 pub struct ReadHalf<C>
122 where
123     C: Read,
124 {
125     handle: C,
126 }
127 
128 /// The writable half of an object returned from `TIoChannel::split`.
129 #[derive(Debug)]
130 pub struct WriteHalf<C>
131 where
132     C: Write,
133 {
134     handle: C,
135 }
136 
137 impl<C> ReadHalf<C>
138 where
139     C: Read,
140 {
141     /// Create a `ReadHalf` associated with readable `handle`
new(handle: C) -> ReadHalf<C>142     pub fn new(handle: C) -> ReadHalf<C> {
143         ReadHalf { handle }
144     }
145 }
146 
147 impl<C> WriteHalf<C>
148 where
149     C: Write,
150 {
151     /// Create a `WriteHalf` associated with writable `handle`
new(handle: C) -> WriteHalf<C>152     pub fn new(handle: C) -> WriteHalf<C> {
153         WriteHalf { handle }
154     }
155 }
156 
157 impl<C> Read for ReadHalf<C>
158 where
159     C: Read,
160 {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>161     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
162         self.handle.read(buf)
163     }
164 }
165 
166 impl<C> Write for WriteHalf<C>
167 where
168     C: Write,
169 {
write(&mut self, buf: &[u8]) -> io::Result<usize>170     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
171         self.handle.write(buf)
172     }
173 
flush(&mut self) -> io::Result<()>174     fn flush(&mut self) -> io::Result<()> {
175         self.handle.flush()
176     }
177 }
178 
179 impl<C> Deref for ReadHalf<C>
180 where
181     C: Read,
182 {
183     type Target = C;
184 
deref(&self) -> &Self::Target185     fn deref(&self) -> &Self::Target {
186         &self.handle
187     }
188 }
189 
190 impl<C> DerefMut for ReadHalf<C>
191 where
192     C: Read,
193 {
deref_mut(&mut self) -> &mut C194     fn deref_mut(&mut self) -> &mut C {
195         &mut self.handle
196     }
197 }
198 
199 impl<C> Deref for WriteHalf<C>
200 where
201     C: Write,
202 {
203     type Target = C;
204 
deref(&self) -> &Self::Target205     fn deref(&self) -> &Self::Target {
206         &self.handle
207     }
208 }
209 
210 impl<C> DerefMut for WriteHalf<C>
211 where
212     C: Write,
213 {
deref_mut(&mut self) -> &mut C214     fn deref_mut(&mut self) -> &mut C {
215         &mut self.handle
216     }
217 }
218 
219 #[cfg(test)]
220 mod tests {
221 
222     use std::io::Cursor;
223 
224     use super::*;
225 
226     #[test]
must_create_usable_read_channel_from_concrete_read_type()227     fn must_create_usable_read_channel_from_concrete_read_type() {
228         let r = Cursor::new([0, 1, 2]);
229         let _ = TBufferedReadTransport::new(r);
230     }
231 
232     #[test]
must_create_usable_read_channel_from_boxed_read()233     fn must_create_usable_read_channel_from_boxed_read() {
234         let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2]));
235         let _ = TBufferedReadTransport::new(r);
236     }
237 
238     #[test]
must_create_usable_write_channel_from_concrete_write_type()239     fn must_create_usable_write_channel_from_concrete_write_type() {
240         let w = vec![0u8; 10];
241         let _ = TBufferedWriteTransport::new(w);
242     }
243 
244     #[test]
must_create_usable_write_channel_from_boxed_write()245     fn must_create_usable_write_channel_from_boxed_write() {
246         let w: Box<dyn Write> = Box::new(vec![0u8; 10]);
247         let _ = TBufferedWriteTransport::new(w);
248     }
249 
250     #[test]
must_create_usable_read_transport_from_concrete_read_transport()251     fn must_create_usable_read_transport_from_concrete_read_transport() {
252         let r = Cursor::new([0, 1, 2]);
253         let mut t = TBufferedReadTransport::new(r);
254         takes_read_transport(&mut t)
255     }
256 
257     #[test]
must_create_usable_read_transport_from_boxed_read()258     fn must_create_usable_read_transport_from_boxed_read() {
259         let r = Cursor::new([0, 1, 2]);
260         let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r));
261         takes_read_transport(&mut t)
262     }
263 
264     #[test]
must_create_usable_write_transport_from_concrete_write_transport()265     fn must_create_usable_write_transport_from_concrete_write_transport() {
266         let w = vec![0u8; 10];
267         let mut t = TBufferedWriteTransport::new(w);
268         takes_write_transport(&mut t)
269     }
270 
271     #[test]
must_create_usable_write_transport_from_boxed_write()272     fn must_create_usable_write_transport_from_boxed_write() {
273         let w = vec![0u8; 10];
274         let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
275         takes_write_transport(&mut t)
276     }
277 
takes_read_transport<R>(t: &mut R) where R: TReadTransport,278     fn takes_read_transport<R>(t: &mut R)
279     where
280         R: TReadTransport,
281     {
282         t.bytes();
283     }
284 
takes_write_transport<W>(t: &mut W) where W: TWriteTransport,285     fn takes_write_transport<W>(t: &mut W)
286     where
287         W: TWriteTransport,
288     {
289         t.flush().unwrap();
290     }
291 }
292