1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use std::convert::From;
19 use std::io;
20 use std::io::{ErrorKind, Read, Write};
21 use std::net::{Shutdown, TcpStream, ToSocketAddrs};
22 
23 use super::{ReadHalf, TIoChannel, WriteHalf};
24 use crate::{new_transport_error, TransportErrorKind};
25 
26 /// Bidirectional TCP/IP channel.
27 ///
28 /// # Examples
29 ///
30 /// Create a `TTcpChannel`.
31 ///
32 /// ```no_run
33 /// use std::io::{Read, Write};
34 /// use thrift::transport::TTcpChannel;
35 ///
36 /// let mut c = TTcpChannel::new();
37 /// c.open("localhost:9090").unwrap();
38 ///
39 /// let mut buf = vec![0u8; 4];
40 /// c.read(&mut buf).unwrap();
41 /// c.write(&vec![0, 1, 2]).unwrap();
42 /// ```
43 ///
44 /// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
45 ///
46 /// ```no_run
47 /// use std::io::{Read, Write};
48 /// use std::net::TcpStream;
49 /// use thrift::transport::TTcpChannel;
50 ///
51 /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
52 ///
53 /// // no need to call c.open() since we've already connected above
54 /// let mut c = TTcpChannel::with_stream(stream);
55 ///
56 /// let mut buf = vec![0u8; 4];
57 /// c.read(&mut buf).unwrap();
58 /// c.write(&vec![0, 1, 2]).unwrap();
59 /// ```
60 #[derive(Debug, Default)]
61 pub struct TTcpChannel {
62     stream: Option<TcpStream>,
63 }
64 
65 impl TTcpChannel {
66     /// Create an uninitialized `TTcpChannel`.
67     ///
68     /// The returned instance must be opened using `TTcpChannel::open(...)`
69     /// before it can be used.
new() -> TTcpChannel70     pub fn new() -> TTcpChannel {
71         TTcpChannel { stream: None }
72     }
73 
74     /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
75     ///
76     /// The passed-in stream is assumed to have been opened before being wrapped
77     /// by the created `TTcpChannel` instance.
with_stream(stream: TcpStream) -> TTcpChannel78     pub fn with_stream(stream: TcpStream) -> TTcpChannel {
79         TTcpChannel {
80             stream: Some(stream),
81         }
82     }
83 
84     /// Connect to `remote_address`, which should implement `ToSocketAddrs` trait.
open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()>85     pub fn open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()> {
86         if self.stream.is_some() {
87             Err(new_transport_error(
88                 TransportErrorKind::AlreadyOpen,
89                 "tcp connection previously opened",
90             ))
91         } else {
92             match TcpStream::connect(&remote_address) {
93                 Ok(s) => {
94                     self.stream = Some(s);
95                     Ok(())
96                 }
97                 Err(e) => Err(From::from(e)),
98             }
99         }
100     }
101 
102     /// Shut down this channel.
103     ///
104     /// Both send and receive halves are closed, and this instance can no
105     /// longer be used to communicate with another endpoint.
close(&mut self) -> crate::Result<()>106     pub fn close(&mut self) -> crate::Result<()> {
107         self.if_set(|s| s.shutdown(Shutdown::Both))
108             .map_err(From::from)
109     }
110 
if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> where F: FnMut(&mut TcpStream) -> io::Result<T>,111     fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
112     where
113         F: FnMut(&mut TcpStream) -> io::Result<T>,
114     {
115         if let Some(ref mut s) = self.stream {
116             stream_operation(s)
117         } else {
118             Err(io::Error::new(
119                 ErrorKind::NotConnected,
120                 "tcp endpoint not connected",
121             ))
122         }
123     }
124 }
125 
126 impl TIoChannel for TTcpChannel {
split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)> where Self: Sized,127     fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
128     where
129         Self: Sized,
130     {
131         let mut s = self;
132 
133         s.stream
134             .as_mut()
135             .and_then(|s| s.try_clone().ok())
136             .map(|cloned| {
137                 let read_half = ReadHalf::new(TTcpChannel {
138                     stream: s.stream.take(),
139                 });
140                 let write_half = WriteHalf::new(TTcpChannel {
141                     stream: Some(cloned),
142                 });
143                 (read_half, write_half)
144             })
145             .ok_or_else(|| {
146                 new_transport_error(
147                     TransportErrorKind::Unknown,
148                     "cannot clone underlying tcp stream",
149                 )
150             })
151     }
152 }
153 
154 impl Read for TTcpChannel {
read(&mut self, b: &mut [u8]) -> io::Result<usize>155     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
156         self.if_set(|s| s.read(b))
157     }
158 }
159 
160 impl Write for TTcpChannel {
write(&mut self, b: &[u8]) -> io::Result<usize>161     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
162         self.if_set(|s| s.write(b))
163     }
164 
flush(&mut self) -> io::Result<()>165     fn flush(&mut self) -> io::Result<()> {
166         self.if_set(|s| s.flush())
167     }
168 }
169