1 //! Network related functionality for `KafkaClient`. 2 //! 3 //! This module is crate private and not exposed to the public except 4 //! through re-exports of individual items from within 5 //! `kafka::client`. 6 7 use std::collections::HashMap; 8 use std::fmt; 9 use std::io::{Read, Write}; 10 use std::mem; 11 use std::net::{Shutdown, TcpStream}; 12 use std::time::{Duration, Instant}; 13 14 #[cfg(feature = "security")] 15 use openssl::ssl::SslConnector; 16 17 use error::Result; 18 19 // -------------------------------------------------------------------- 20 21 /// Security relevant configuration options for `KafkaClient`. 22 // This will be expanded in the future. See #51. 23 #[cfg(feature = "security")] 24 pub struct SecurityConfig { 25 connector: SslConnector, 26 verify_hostname: bool, 27 } 28 29 #[cfg(feature = "security")] 30 impl SecurityConfig { 31 /// In the future this will also support a kerbos via #51. 32 pub fn new(connector: SslConnector) -> Self { 33 SecurityConfig { 34 connector, 35 verify_hostname: true, 36 } 37 } 38 39 /// Initiates a client-side TLS session with/without performing hostname verification. 40 pub fn with_hostname_verification(self, verify_hostname: bool) -> SecurityConfig { 41 SecurityConfig { 42 verify_hostname, 43 ..self 44 } 45 } 46 } 47 48 #[cfg(feature = "security")] 49 impl fmt::Debug for SecurityConfig { 50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 51 write!( 52 f, 53 "SecurityConfig {{ verify_hostname: {} }}", 54 self.verify_hostname 55 ) 56 } 57 } 58 59 // -------------------------------------------------------------------- 60 61 struct Pooled<T> { 62 last_checkout: Instant, 63 item: T, 64 } 65 66 impl<T> Pooled<T> { 67 fn new(last_checkout: Instant, item: T) -> Self { 68 Pooled { 69 last_checkout, 70 item, 71 } 72 } 73 } 74 75 impl<T: fmt::Debug> fmt::Debug for Pooled<T> { 76 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 77 write!( 78 f, 79 "Pooled {{ last_checkout: {:?}, item: {:?} }}", 80 self.last_checkout, self.item 81 ) 82 } 83 } 84 85 #[derive(Debug)] 86 pub struct Config { 87 rw_timeout: Option<Duration>, 88 idle_timeout: Duration, 89 #[cfg(feature = "security")] 90 security_config: Option<SecurityConfig>, 91 } 92 93 impl Config { 94 #[cfg(not(feature = "security"))] 95 fn new_conn(&self, id: u32, host: &str) -> Result<KafkaConnection> { 96 KafkaConnection::new(id, host, self.rw_timeout).map(|c| { 97 debug!("Established: {:?}", c); 98 c 99 }) 100 } 101 102 #[cfg(feature = "security")] 103 fn new_conn(&self, id: u32, host: &str) -> Result<KafkaConnection> { 104 KafkaConnection::new( 105 id, 106 host, 107 self.rw_timeout, 108 self.security_config 109 .as_ref() 110 .map(|c| (c.connector.clone(), c.verify_hostname)), 111 ) 112 .map(|c| { 113 debug!("Established: {:?}", c); 114 c 115 }) 116 } 117 } 118 119 #[derive(Debug)] 120 struct State { 121 num_conns: u32, 122 } 123 124 impl State { 125 fn new() -> State { 126 State { num_conns: 0 } 127 } 128 129 fn next_conn_id(&mut self) -> u32 { 130 let c = self.num_conns; 131 self.num_conns = self.num_conns.wrapping_add(1); 132 c 133 } 134 } 135 136 #[derive(Debug)] 137 pub struct Connections { 138 conns: HashMap<String, Pooled<KafkaConnection>>, 139 state: State, 140 config: Config, 141 } 142 143 impl Connections { 144 #[cfg(not(feature = "security"))] 145 pub fn new(rw_timeout: Option<Duration>, idle_timeout: Duration) -> Connections { 146 Connections { 147 conns: HashMap::new(), 148 state: State::new(), 149 config: Config { 150 rw_timeout, 151 idle_timeout, 152 }, 153 } 154 } 155 156 #[cfg(feature = "security")] 157 pub fn new(rw_timeout: Option<Duration>, idle_timeout: Duration) -> Connections { 158 Self::new_with_security(rw_timeout, idle_timeout, None) 159 } 160 161 #[cfg(feature = "security")] 162 pub fn new_with_security( 163 rw_timeout: Option<Duration>, 164 idle_timeout: Duration, 165 security: Option<SecurityConfig>, 166 ) -> Connections { 167 Connections { 168 conns: HashMap::new(), 169 state: State::new(), 170 config: Config { 171 rw_timeout, 172 idle_timeout, 173 security_config: security, 174 }, 175 } 176 } 177 178 pub fn set_idle_timeout(&mut self, idle_timeout: Duration) { 179 self.config.idle_timeout = idle_timeout; 180 } 181 182 pub fn idle_timeout(&self) -> Duration { 183 self.config.idle_timeout 184 } 185 186 pub fn get_conn<'a>(&'a mut self, host: &str, now: Instant) -> Result<&'a mut KafkaConnection> { 187 if let Some(conn) = self.conns.get_mut(host) { 188 if now.duration_since(conn.last_checkout) >= self.config.idle_timeout { 189 debug!("Idle timeout reached: {:?}", conn.item); 190 let new_conn = try!(self.config.new_conn(self.state.next_conn_id(), host)); 191 let _ = conn.item.shutdown(); 192 conn.item = new_conn; 193 } 194 conn.last_checkout = now; 195 let kconn: &mut KafkaConnection = &mut conn.item; 196 // ~ decouple the lifetimes to make the borrowck happy; 197 // this is safe since we're immediatelly returning the 198 // reference and the rest of the code in this method is 199 // not affected 200 return Ok(unsafe { mem::transmute(kconn) }); 201 } 202 let cid = self.state.next_conn_id(); 203 self.conns.insert( 204 host.to_owned(), 205 Pooled::new(now, try!(self.config.new_conn(cid, host))), 206 ); 207 Ok(&mut self.conns.get_mut(host).unwrap().item) 208 } 209 210 pub fn get_conn_any(&mut self, now: Instant) -> Option<&mut KafkaConnection> { 211 for (host, conn) in &mut self.conns { 212 if now.duration_since(conn.last_checkout) >= self.config.idle_timeout { 213 debug!("Idle timeout reached: {:?}", conn.item); 214 let new_conn_id = self.state.next_conn_id(); 215 let new_conn = match self.config.new_conn(new_conn_id, host.as_str()) { 216 Ok(new_conn) => { 217 let _ = conn.item.shutdown(); 218 new_conn 219 } 220 Err(e) => { 221 warn!("Failed to establish connection to {}: {:?}", host, e); 222 continue; 223 } 224 }; 225 conn.item = new_conn; 226 } 227 conn.last_checkout = now; 228 let kconn: &mut KafkaConnection = &mut conn.item; 229 return Some(kconn); 230 } 231 None 232 } 233 } 234 235 // -------------------------------------------------------------------- 236 237 trait IsSecured { 238 fn is_secured(&self) -> bool; 239 } 240 241 #[cfg(not(feature = "security"))] 242 type KafkaStream = TcpStream; 243 244 #[cfg(not(feature = "security"))] 245 impl IsSecured for KafkaStream { 246 fn is_secured(&self) -> bool { 247 false 248 } 249 } 250 251 #[cfg(feature = "security")] 252 use self::openssled::KafkaStream; 253 254 #[cfg(feature = "security")] 255 mod openssled { 256 use std::io::{self, Read, Write}; 257 use std::net::{Shutdown, TcpStream}; 258 use std::time::Duration; 259 260 use openssl::ssl::SslStream; 261 262 use super::IsSecured; 263 264 pub enum KafkaStream { 265 Plain(TcpStream), 266 Ssl(SslStream<TcpStream>), 267 } 268 269 impl IsSecured for KafkaStream { 270 fn is_secured(&self) -> bool { 271 match *self { 272 KafkaStream::Ssl(_) => true, 273 _ => false, 274 } 275 } 276 } 277 278 impl KafkaStream { 279 fn get_ref(&self) -> &TcpStream { 280 match *self { 281 KafkaStream::Plain(ref s) => s, 282 KafkaStream::Ssl(ref s) => s.get_ref(), 283 } 284 } 285 286 pub fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> { 287 self.get_ref().set_read_timeout(dur) 288 } 289 290 pub fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> { 291 self.get_ref().set_write_timeout(dur) 292 } 293 294 pub fn shutdown(&mut self, how: Shutdown) -> io::Result<()> { 295 self.get_ref().shutdown(how) 296 } 297 } 298 299 impl Read for KafkaStream { 300 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 301 match *self { 302 KafkaStream::Plain(ref mut s) => s.read(buf), 303 KafkaStream::Ssl(ref mut s) => s.read(buf), 304 } 305 } 306 } 307 308 impl Write for KafkaStream { 309 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 310 match *self { 311 KafkaStream::Plain(ref mut s) => s.write(buf), 312 KafkaStream::Ssl(ref mut s) => s.write(buf), 313 } 314 } 315 fn flush(&mut self) -> io::Result<()> { 316 match *self { 317 KafkaStream::Plain(ref mut s) => s.flush(), 318 KafkaStream::Ssl(ref mut s) => s.flush(), 319 } 320 } 321 } 322 } 323 324 /// A TCP stream to a remote Kafka broker. 325 pub struct KafkaConnection { 326 // a surrogate identifier to distinguish between 327 // connections to the same host in debug messages 328 id: u32, 329 // "host:port" 330 host: String, 331 // the (wrapped) tcp stream 332 stream: KafkaStream, 333 } 334 335 impl fmt::Debug for KafkaConnection { 336 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 337 write!( 338 f, 339 "KafkaConnection {{ id: {}, secured: {}, host: \"{}\" }}", 340 self.id, 341 self.stream.is_secured(), 342 self.host 343 ) 344 } 345 } 346 347 impl KafkaConnection { 348 pub fn send(&mut self, msg: &[u8]) -> Result<usize> { 349 let r = self.stream.write(&msg[..]).map_err(From::from); 350 trace!("Sent {} bytes to: {:?} => {:?}", msg.len(), self, r); 351 r 352 } 353 354 pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> { 355 let r = (&mut self.stream).read_exact(buf).map_err(From::from); 356 trace!("Read {} bytes from: {:?} => {:?}", buf.len(), self, r); 357 r 358 } 359 360 pub fn read_exact_alloc(&mut self, size: u64) -> Result<Vec<u8>> { 361 let size: usize = size as usize; 362 let mut buffer: Vec<u8> = Vec::with_capacity(size); 363 // this is safe actually: we are setting the len to the 364 // buffers capacity and either fully populate it in the 365 // following call to `read_exact` or discard the vector (in 366 // the error case) 367 unsafe { buffer.set_len(size) }; 368 try!(self.read_exact(buffer.as_mut_slice())); 369 Ok(buffer) 370 } 371 372 fn shutdown(&mut self) -> Result<()> { 373 let r = self.stream.shutdown(Shutdown::Both); 374 debug!("Shut down: {:?} => {:?}", self, r); 375 r.map_err(From::from) 376 } 377 378 fn from_stream( 379 stream: KafkaStream, 380 id: u32, 381 host: &str, 382 rw_timeout: Option<Duration>, 383 ) -> Result<KafkaConnection> { 384 try!(stream.set_read_timeout(rw_timeout)); 385 try!(stream.set_write_timeout(rw_timeout)); 386 Ok(KafkaConnection { 387 id, 388 host: host.to_owned(), 389 stream, 390 }) 391 } 392 393 #[cfg(not(feature = "security"))] 394 fn new(id: u32, host: &str, rw_timeout: Option<Duration>) -> Result<KafkaConnection> { 395 KafkaConnection::from_stream(try!(TcpStream::connect(host)), id, host, rw_timeout) 396 } 397 398 #[cfg(feature = "security")] 399 fn new( 400 id: u32, 401 host: &str, 402 rw_timeout: Option<Duration>, 403 security: Option<(SslConnector, bool)>, 404 ) -> Result<KafkaConnection> { 405 let stream = try!(TcpStream::connect(host)); 406 let stream = match security { 407 Some((connector, verify_hostname)) => { 408 if !verify_hostname { 409 connector 410 .configure() 411 .map_err(|err| { 412 let err: crate::error::Error = 413 crate::error::ErrorKind::Ssl(From::from(err)).into(); 414 err 415 })? 416 .set_verify_hostname(false); 417 } 418 let domain = match host.rfind(':') { 419 None => host, 420 Some(i) => &host[..i], 421 }; 422 let connection = try!(connector.connect(domain, stream)); 423 KafkaStream::Ssl(connection) 424 } 425 None => KafkaStream::Plain(stream), 426 }; 427 KafkaConnection::from_stream(stream, id, host, rw_timeout) 428 } 429 } 430