1 //! Network related functionality for `KafkaClient`.
2 //!
3 //! This module is crate private and not exposed to the public except
4 //! through re-exports of individual items from within
5 //! `kafka::client`.
6 
7 use std::collections::HashMap;
8 use std::fmt;
9 use std::io::{Read, Write};
10 use std::mem;
11 use std::net::{Shutdown, TcpStream};
12 use std::time::{Duration, Instant};
13 
14 #[cfg(feature = "security")]
15 use openssl::ssl::SslConnector;
16 
17 use error::Result;
18 
19 // --------------------------------------------------------------------
20 
21 /// Security relevant configuration options for `KafkaClient`.
22 // This will be expanded in the future. See #51.
23 #[cfg(feature = "security")]
24 pub struct SecurityConfig {
25     connector: SslConnector,
26     verify_hostname: bool,
27 }
28 
29 #[cfg(feature = "security")]
30 impl SecurityConfig {
31     /// In the future this will also support a kerbos via #51.
32     pub fn new(connector: SslConnector) -> Self {
33         SecurityConfig {
34             connector,
35             verify_hostname: true,
36         }
37     }
38 
39     /// Initiates a client-side TLS session with/without performing hostname verification.
40     pub fn with_hostname_verification(self, verify_hostname: bool) -> SecurityConfig {
41         SecurityConfig {
42             verify_hostname,
43             ..self
44         }
45     }
46 }
47 
48 #[cfg(feature = "security")]
49 impl fmt::Debug for SecurityConfig {
50     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51         write!(
52             f,
53             "SecurityConfig {{ verify_hostname: {} }}",
54             self.verify_hostname
55         )
56     }
57 }
58 
59 // --------------------------------------------------------------------
60 
61 struct Pooled<T> {
62     last_checkout: Instant,
63     item: T,
64 }
65 
66 impl<T> Pooled<T> {
67     fn new(last_checkout: Instant, item: T) -> Self {
68         Pooled {
69             last_checkout,
70             item,
71         }
72     }
73 }
74 
75 impl<T: fmt::Debug> fmt::Debug for Pooled<T> {
76     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
77         write!(
78             f,
79             "Pooled {{ last_checkout: {:?}, item: {:?} }}",
80             self.last_checkout, self.item
81         )
82     }
83 }
84 
85 #[derive(Debug)]
86 pub struct Config {
87     rw_timeout: Option<Duration>,
88     idle_timeout: Duration,
89     #[cfg(feature = "security")]
90     security_config: Option<SecurityConfig>,
91 }
92 
93 impl Config {
94     #[cfg(not(feature = "security"))]
95     fn new_conn(&self, id: u32, host: &str) -> Result<KafkaConnection> {
96         KafkaConnection::new(id, host, self.rw_timeout).map(|c| {
97             debug!("Established: {:?}", c);
98             c
99         })
100     }
101 
102     #[cfg(feature = "security")]
103     fn new_conn(&self, id: u32, host: &str) -> Result<KafkaConnection> {
104         KafkaConnection::new(
105             id,
106             host,
107             self.rw_timeout,
108             self.security_config
109                 .as_ref()
110                 .map(|c| (c.connector.clone(), c.verify_hostname)),
111         )
112         .map(|c| {
113             debug!("Established: {:?}", c);
114             c
115         })
116     }
117 }
118 
119 #[derive(Debug)]
120 struct State {
121     num_conns: u32,
122 }
123 
124 impl State {
125     fn new() -> State {
126         State { num_conns: 0 }
127     }
128 
129     fn next_conn_id(&mut self) -> u32 {
130         let c = self.num_conns;
131         self.num_conns = self.num_conns.wrapping_add(1);
132         c
133     }
134 }
135 
136 #[derive(Debug)]
137 pub struct Connections {
138     conns: HashMap<String, Pooled<KafkaConnection>>,
139     state: State,
140     config: Config,
141 }
142 
143 impl Connections {
144     #[cfg(not(feature = "security"))]
145     pub fn new(rw_timeout: Option<Duration>, idle_timeout: Duration) -> Connections {
146         Connections {
147             conns: HashMap::new(),
148             state: State::new(),
149             config: Config {
150                 rw_timeout,
151                 idle_timeout,
152             },
153         }
154     }
155 
156     #[cfg(feature = "security")]
157     pub fn new(rw_timeout: Option<Duration>, idle_timeout: Duration) -> Connections {
158         Self::new_with_security(rw_timeout, idle_timeout, None)
159     }
160 
161     #[cfg(feature = "security")]
162     pub fn new_with_security(
163         rw_timeout: Option<Duration>,
164         idle_timeout: Duration,
165         security: Option<SecurityConfig>,
166     ) -> Connections {
167         Connections {
168             conns: HashMap::new(),
169             state: State::new(),
170             config: Config {
171                 rw_timeout,
172                 idle_timeout,
173                 security_config: security,
174             },
175         }
176     }
177 
178     pub fn set_idle_timeout(&mut self, idle_timeout: Duration) {
179         self.config.idle_timeout = idle_timeout;
180     }
181 
182     pub fn idle_timeout(&self) -> Duration {
183         self.config.idle_timeout
184     }
185 
186     pub fn get_conn<'a>(&'a mut self, host: &str, now: Instant) -> Result<&'a mut KafkaConnection> {
187         if let Some(conn) = self.conns.get_mut(host) {
188             if now.duration_since(conn.last_checkout) >= self.config.idle_timeout {
189                 debug!("Idle timeout reached: {:?}", conn.item);
190                 let new_conn = try!(self.config.new_conn(self.state.next_conn_id(), host));
191                 let _ = conn.item.shutdown();
192                 conn.item = new_conn;
193             }
194             conn.last_checkout = now;
195             let kconn: &mut KafkaConnection = &mut conn.item;
196             // ~ decouple the lifetimes to make the borrowck happy;
197             // this is safe since we're immediatelly returning the
198             // reference and the rest of the code in this method is
199             // not affected
200             return Ok(unsafe { mem::transmute(kconn) });
201         }
202         let cid = self.state.next_conn_id();
203         self.conns.insert(
204             host.to_owned(),
205             Pooled::new(now, try!(self.config.new_conn(cid, host))),
206         );
207         Ok(&mut self.conns.get_mut(host).unwrap().item)
208     }
209 
210     pub fn get_conn_any(&mut self, now: Instant) -> Option<&mut KafkaConnection> {
211         for (host, conn) in &mut self.conns {
212             if now.duration_since(conn.last_checkout) >= self.config.idle_timeout {
213                 debug!("Idle timeout reached: {:?}", conn.item);
214                 let new_conn_id = self.state.next_conn_id();
215                 let new_conn = match self.config.new_conn(new_conn_id, host.as_str()) {
216                     Ok(new_conn) => {
217                         let _ = conn.item.shutdown();
218                         new_conn
219                     }
220                     Err(e) => {
221                         warn!("Failed to establish connection to {}: {:?}", host, e);
222                         continue;
223                     }
224                 };
225                 conn.item = new_conn;
226             }
227             conn.last_checkout = now;
228             let kconn: &mut KafkaConnection = &mut conn.item;
229             return Some(kconn);
230         }
231         None
232     }
233 }
234 
235 // --------------------------------------------------------------------
236 
237 trait IsSecured {
238     fn is_secured(&self) -> bool;
239 }
240 
241 #[cfg(not(feature = "security"))]
242 type KafkaStream = TcpStream;
243 
244 #[cfg(not(feature = "security"))]
245 impl IsSecured for KafkaStream {
246     fn is_secured(&self) -> bool {
247         false
248     }
249 }
250 
251 #[cfg(feature = "security")]
252 use self::openssled::KafkaStream;
253 
254 #[cfg(feature = "security")]
255 mod openssled {
256     use std::io::{self, Read, Write};
257     use std::net::{Shutdown, TcpStream};
258     use std::time::Duration;
259 
260     use openssl::ssl::SslStream;
261 
262     use super::IsSecured;
263 
264     pub enum KafkaStream {
265         Plain(TcpStream),
266         Ssl(SslStream<TcpStream>),
267     }
268 
269     impl IsSecured for KafkaStream {
270         fn is_secured(&self) -> bool {
271             match *self {
272                 KafkaStream::Ssl(_) => true,
273                 _ => false,
274             }
275         }
276     }
277 
278     impl KafkaStream {
279         fn get_ref(&self) -> &TcpStream {
280             match *self {
281                 KafkaStream::Plain(ref s) => s,
282                 KafkaStream::Ssl(ref s) => s.get_ref(),
283             }
284         }
285 
286         pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
287             self.get_ref().set_read_timeout(dur)
288         }
289 
290         pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
291             self.get_ref().set_write_timeout(dur)
292         }
293 
294         pub fn shutdown(&mut self, how: Shutdown) -> io::Result<()> {
295             self.get_ref().shutdown(how)
296         }
297     }
298 
299     impl Read for KafkaStream {
300         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
301             match *self {
302                 KafkaStream::Plain(ref mut s) => s.read(buf),
303                 KafkaStream::Ssl(ref mut s) => s.read(buf),
304             }
305         }
306     }
307 
308     impl Write for KafkaStream {
309         fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
310             match *self {
311                 KafkaStream::Plain(ref mut s) => s.write(buf),
312                 KafkaStream::Ssl(ref mut s) => s.write(buf),
313             }
314         }
315         fn flush(&mut self) -> io::Result<()> {
316             match *self {
317                 KafkaStream::Plain(ref mut s) => s.flush(),
318                 KafkaStream::Ssl(ref mut s) => s.flush(),
319             }
320         }
321     }
322 }
323 
324 /// A TCP stream to a remote Kafka broker.
325 pub struct KafkaConnection {
326     // a surrogate identifier to distinguish between
327     // connections to the same host in debug messages
328     id: u32,
329     // "host:port"
330     host: String,
331     // the (wrapped) tcp stream
332     stream: KafkaStream,
333 }
334 
335 impl fmt::Debug for KafkaConnection {
336     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
337         write!(
338             f,
339             "KafkaConnection {{ id: {}, secured: {}, host: \"{}\" }}",
340             self.id,
341             self.stream.is_secured(),
342             self.host
343         )
344     }
345 }
346 
347 impl KafkaConnection {
348     pub fn send(&mut self, msg: &[u8]) -> Result<usize> {
349         let r = self.stream.write(&msg[..]).map_err(From::from);
350         trace!("Sent {} bytes to: {:?} => {:?}", msg.len(), self, r);
351         r
352     }
353 
354     pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
355         let r = (&mut self.stream).read_exact(buf).map_err(From::from);
356         trace!("Read {} bytes from: {:?} => {:?}", buf.len(), self, r);
357         r
358     }
359 
360     pub fn read_exact_alloc(&mut self, size: u64) -> Result<Vec<u8>> {
361         let size: usize = size as usize;
362         let mut buffer: Vec<u8> = Vec::with_capacity(size);
363         // this is safe actually: we are setting the len to the
364         // buffers capacity and either fully populate it in the
365         // following call to `read_exact` or discard the vector (in
366         // the error case)
367         unsafe { buffer.set_len(size) };
368         try!(self.read_exact(buffer.as_mut_slice()));
369         Ok(buffer)
370     }
371 
372     fn shutdown(&mut self) -> Result<()> {
373         let r = self.stream.shutdown(Shutdown::Both);
374         debug!("Shut down: {:?} => {:?}", self, r);
375         r.map_err(From::from)
376     }
377 
378     fn from_stream(
379         stream: KafkaStream,
380         id: u32,
381         host: &str,
382         rw_timeout: Option<Duration>,
383     ) -> Result<KafkaConnection> {
384         try!(stream.set_read_timeout(rw_timeout));
385         try!(stream.set_write_timeout(rw_timeout));
386         Ok(KafkaConnection {
387             id,
388             host: host.to_owned(),
389             stream,
390         })
391     }
392 
393     #[cfg(not(feature = "security"))]
394     fn new(id: u32, host: &str, rw_timeout: Option<Duration>) -> Result<KafkaConnection> {
395         KafkaConnection::from_stream(try!(TcpStream::connect(host)), id, host, rw_timeout)
396     }
397 
398     #[cfg(feature = "security")]
399     fn new(
400         id: u32,
401         host: &str,
402         rw_timeout: Option<Duration>,
403         security: Option<(SslConnector, bool)>,
404     ) -> Result<KafkaConnection> {
405         let stream = try!(TcpStream::connect(host));
406         let stream = match security {
407             Some((connector, verify_hostname)) => {
408                 if !verify_hostname {
409                     connector
410                         .configure()
411                         .map_err(|err| {
412                             let err: crate::error::Error =
413                                 crate::error::ErrorKind::Ssl(From::from(err)).into();
414                             err
415                         })?
416                         .set_verify_hostname(false);
417                 }
418                 let domain = match host.rfind(':') {
419                     None => host,
420                     Some(i) => &host[..i],
421                 };
422                 let connection = try!(connector.connect(domain, stream));
423                 KafkaStream::Ssl(connection)
424             }
425             None => KafkaStream::Plain(stream),
426         };
427         KafkaConnection::from_stream(stream, id, host, rw_timeout)
428     }
429 }
430